From d6afba1d7ab7477b6fcd39a3e7536e54808dc497 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 15 Dec 2018 22:51:40 +0800 Subject: [PATCH] add channel ut --- include/ipc.h | 5 +++- src/ipc.cpp | 18 ++++++++++---- test/test_ipc.cpp | 62 ++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 73 insertions(+), 12 deletions(-) diff --git a/include/ipc.h b/include/ipc.h index 2383d7c..64ca2c7 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -13,7 +13,7 @@ using shm::handle_t; IPC_EXPORT handle_t connect (char const * name); IPC_EXPORT void disconnect(handle_t h); -IPC_EXPORT bool send(handle_t h, void* data, int size); +IPC_EXPORT bool send(handle_t h, void* data, std::size_t size); IPC_EXPORT std::vector recv(handle_t h); class IPC_EXPORT channel { @@ -35,6 +35,9 @@ public: bool connect(char const * name); void disconnect(void); + bool send(void* data, std::size_t size); + std::vector recv(); + private: class channel_; channel_* p_; diff --git a/src/ipc.cpp b/src/ipc.cpp index 2f57ce3..81a33e9 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -94,11 +94,11 @@ void disconnect(handle_t h) { shm::release(h, sizeof(queue_t)); } -bool send(handle_t h, void* data, int size) { +bool send(handle_t h, void* data, std::size_t size) { if (data == nullptr) { return false; } - if (size <= 0) { + if (size == 0) { return false; } auto queue = queue_of(h); @@ -108,15 +108,15 @@ bool send(handle_t h, void* data, int size) { static unsigned msg_id = 0; ++msg_id; // calc a new message id, atomic is unnecessary int offset = 0; - for (int i = 0; i < (size / static_cast(data_length)); ++i, offset += data_length) { + for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { msg_t msg { - size - offset - static_cast(data_length), + static_cast(size) - offset - static_cast(data_length), msg_id, { 0 } }; std::memcpy(msg.data_, static_cast(data) + offset, data_length); queue->push(msg); } - int remain = size - offset; + int remain = static_cast(size) - offset; if (remain > 0) { msg_t msg { remain - static_cast(data_length), @@ -225,4 +225,12 @@ void channel::disconnect(void) { ipc::disconnect(impl(p_)->h_); } +bool channel::send(void* data, std::size_t size) { + return ipc::send(impl(p_)->h_, data, size); +} + +std::vector channel::recv() { + return ipc::recv(impl(p_)->h_); +} + } // namespace ipc diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index c0eb0f7..acb192c 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #if defined(__GNUC__) # include // abi::__cxa_demangle @@ -46,7 +47,7 @@ struct lc_wrapper : Mutex { }; template -void benchmark() { +void benchmark_lc() { std::thread w_trd[W]; std::thread r_trd[R]; std::atomic_int fini { 0 }; @@ -128,10 +129,10 @@ void test_performance() { << "test_performance: [" << W << ":" << R << "]" << std::endl; - benchmark(); - benchmark, W, R>(); - benchmark , W, R>(); - benchmark(); + benchmark_lc(); + benchmark_lc, W, R>(); + benchmark_lc , W, R>(); + benchmark_lc(); } void Unit::test_rw_lock() { @@ -152,7 +153,56 @@ void Unit::test_send_recv() { } void Unit::test_channel() { - ipc::channel cc; + auto conn = [](int id) { + std::string ack = "copy:" + std::to_string(id); + ipc::channel cc { "my-ipc-channel" }; + std::atomic_bool unmatched { true }; + std::thread re {[&] { + do { + auto dd = cc.recv(); + std::cout << id << "-recv: " + << std::string { reinterpret_cast(dd.data()), dd.size() } + << "[" << dd.size() << "]" << std::endl; + if ((unmatched = (ack.size() != dd.size()) || + (ack != reinterpret_cast(dd.data())))) { + bool need_cp = true; + const char cp[] = "copy:"; + for (std::size_t i = 0; i < dd.size() && i < sizeof(cp); ++i) { + if (dd[i] != cp[i]) { + need_cp = false; + break; + } + } + if (need_cp) { + cc.send(dd.data(), dd.size()); + } + } + else std::cout << id << " matched!" << std::endl; + } while (unmatched.load(std::memory_order_relaxed)); + }}; + while (unmatched.load(std::memory_order_relaxed)) { + if (!cc.send(const_cast(ack.c_str()), ack.size())) { + std::cout << "send failed!" << std::endl; + unmatched = false; + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + re.join(); + std::cout << "fini conn " << id << std::endl; + return cc; + }; + + std::thread t1 {[&] { + auto cc = conn(1); + }}; + + std::thread t2 {[&] { + auto cc = conn(2); + }}; + + t1.join(); + t2.join(); } } // internal-linkage