add channel ut

This commit is contained in:
mutouyun 2018-12-15 22:51:40 +08:00
parent ee2a4e1106
commit d6afba1d7a
3 changed files with 73 additions and 12 deletions

View File

@ -13,7 +13,7 @@ using shm::handle_t;
IPC_EXPORT handle_t connect (char const * name); IPC_EXPORT handle_t connect (char const * name);
IPC_EXPORT void disconnect(handle_t h); IPC_EXPORT void disconnect(handle_t h);
IPC_EXPORT bool send(handle_t h, void* data, int size); IPC_EXPORT bool send(handle_t h, void* data, std::size_t size);
IPC_EXPORT std::vector<byte_t> recv(handle_t h); IPC_EXPORT std::vector<byte_t> recv(handle_t h);
class IPC_EXPORT channel { class IPC_EXPORT channel {
@ -35,6 +35,9 @@ public:
bool connect(char const * name); bool connect(char const * name);
void disconnect(void); void disconnect(void);
bool send(void* data, std::size_t size);
std::vector<byte_t> recv();
private: private:
class channel_; class channel_;
channel_* p_; channel_* p_;

View File

@ -94,11 +94,11 @@ void disconnect(handle_t h) {
shm::release(h, sizeof(queue_t)); shm::release(h, sizeof(queue_t));
} }
bool send(handle_t h, void* data, int size) { bool send(handle_t h, void* data, std::size_t size) {
if (data == nullptr) { if (data == nullptr) {
return false; return false;
} }
if (size <= 0) { if (size == 0) {
return false; return false;
} }
auto queue = queue_of(h); auto queue = queue_of(h);
@ -108,15 +108,15 @@ bool send(handle_t h, void* data, int size) {
static unsigned msg_id = 0; static unsigned msg_id = 0;
++msg_id; // calc a new message id, atomic is unnecessary ++msg_id; // calc a new message id, atomic is unnecessary
int offset = 0; int offset = 0;
for (int i = 0; i < (size / static_cast<int>(data_length)); ++i, offset += data_length) { for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
msg_t msg { msg_t msg {
size - offset - static_cast<int>(data_length), static_cast<int>(size) - offset - static_cast<int>(data_length),
msg_id, { 0 } msg_id, { 0 }
}; };
std::memcpy(msg.data_, static_cast<byte_t*>(data) + offset, data_length); std::memcpy(msg.data_, static_cast<byte_t*>(data) + offset, data_length);
queue->push(msg); queue->push(msg);
} }
int remain = size - offset; int remain = static_cast<int>(size) - offset;
if (remain > 0) { if (remain > 0) {
msg_t msg { msg_t msg {
remain - static_cast<int>(data_length), remain - static_cast<int>(data_length),
@ -225,4 +225,12 @@ void channel::disconnect(void) {
ipc::disconnect(impl(p_)->h_); ipc::disconnect(impl(p_)->h_);
} }
bool channel::send(void* data, std::size_t size) {
return ipc::send(impl(p_)->h_, data, size);
}
std::vector<byte_t> channel::recv() {
return ipc::recv(impl(p_)->h_);
}
} // namespace ipc } // namespace ipc

View File

@ -6,6 +6,7 @@
#include <mutex> #include <mutex>
#include <typeinfo> #include <typeinfo>
#include <memory> #include <memory>
#include <string>
#if defined(__GNUC__) #if defined(__GNUC__)
# include <cxxabi.h> // abi::__cxa_demangle # include <cxxabi.h> // abi::__cxa_demangle
@ -46,7 +47,7 @@ struct lc_wrapper : Mutex {
}; };
template <typename Lc, int W, int R, int Loops = 100000> template <typename Lc, int W, int R, int Loops = 100000>
void benchmark() { void benchmark_lc() {
std::thread w_trd[W]; std::thread w_trd[W];
std::thread r_trd[R]; std::thread r_trd[R];
std::atomic_int fini { 0 }; std::atomic_int fini { 0 };
@ -128,10 +129,10 @@ void test_performance() {
<< "test_performance: [" << W << ":" << R << "]" << "test_performance: [" << W << ":" << R << "]"
<< std::endl; << std::endl;
benchmark<ipc::rw_lock , W, R>(); benchmark_lc<ipc::rw_lock , W, R>();
benchmark<lc_wrapper<capo::spin_lock>, W, R>(); benchmark_lc<lc_wrapper<capo::spin_lock>, W, R>();
benchmark<lc_wrapper<std::mutex> , W, R>(); benchmark_lc<lc_wrapper<std::mutex> , W, R>();
benchmark<std::shared_timed_mutex , W, R>(); benchmark_lc<std::shared_timed_mutex , W, R>();
} }
void Unit::test_rw_lock() { void Unit::test_rw_lock() {
@ -152,7 +153,56 @@ void Unit::test_send_recv() {
} }
void Unit::test_channel() { void Unit::test_channel() {
ipc::channel cc; auto conn = [](int id) {
std::string ack = "copy:" + std::to_string(id);
ipc::channel cc { "my-ipc-channel" };
std::atomic_bool unmatched { true };
std::thread re {[&] {
do {
auto dd = cc.recv();
std::cout << id << "-recv: "
<< std::string { reinterpret_cast<char*>(dd.data()), dd.size() }
<< "[" << dd.size() << "]" << std::endl;
if ((unmatched = (ack.size() != dd.size()) ||
(ack != reinterpret_cast<char*>(dd.data())))) {
bool need_cp = true;
const char cp[] = "copy:";
for (std::size_t i = 0; i < dd.size() && i < sizeof(cp); ++i) {
if (dd[i] != cp[i]) {
need_cp = false;
break;
}
}
if (need_cp) {
cc.send(dd.data(), dd.size());
}
}
else std::cout << id << " matched!" << std::endl;
} while (unmatched.load(std::memory_order_relaxed));
}};
while (unmatched.load(std::memory_order_relaxed)) {
if (!cc.send(const_cast<char*>(ack.c_str()), ack.size())) {
std::cout << "send failed!" << std::endl;
unmatched = false;
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
re.join();
std::cout << "fini conn " << id << std::endl;
return cc;
};
std::thread t1 {[&] {
auto cc = conn(1);
}};
std::thread t2 {[&] {
auto cc = conn(2);
}};
t1.join();
t2.join();
} }
} // internal-linkage } // internal-linkage