diff --git a/include/circ_elem_array.h b/include/circ_elem_array.h index 54e658d..26af8c4 100644 --- a/include/circ_elem_array.h +++ b/include/circ_elem_array.h @@ -100,16 +100,18 @@ public: using base_t::cursor; void* acquire() noexcept { + uint_t<32> conn_cnt = static_cast>(conn_count()); // acquire + if (conn_cnt == 0) return nullptr; elem_t* el = elem(base_t::acquire()); // check all consumers have finished reading while(1) { - uint_t<32> expected = 0, - conn_cnt = static_cast>(conn_count()); // acquire + uint_t<32> expected = 0; if (el->head_.rc_.compare_exchange_weak( expected, conn_cnt, std::memory_order_relaxed)) { break; } std::this_thread::yield(); + conn_cnt = static_cast>(conn_count()); // acquire } return el->data_; } diff --git a/include/circ_queue.h b/include/circ_queue.h index eaaa88a..ccbd1f0 100644 --- a/include/circ_queue.h +++ b/include/circ_queue.h @@ -43,7 +43,7 @@ public: std::size_t connect() noexcept { if (elems_ == nullptr) return invalid_value; - if (connected_.exchange(true, std::memory_order_relaxed)) { + if (connected_.exchange(true, std::memory_order_acq_rel)) { // if it's already connected, just return an error count return invalid_value; } @@ -52,7 +52,7 @@ public: std::size_t disconnect() noexcept { if (elems_ == nullptr) return invalid_value; - if (!connected_.exchange(false, std::memory_order_relaxed)) { + if (!connected_.exchange(false, std::memory_order_acq_rel)) { // if it's already disconnected, just return an error count return invalid_value; } @@ -63,8 +63,12 @@ public: return (elems_ == nullptr) ? invalid_value : elems_->conn_count(); } + bool empty() const noexcept { + return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor()); + } + bool connected() const noexcept { - return connected_.load(std::memory_order_relaxed); + return connected_.load(std::memory_order_acquire); } array_t* attach(array_t* arr) noexcept { @@ -82,30 +86,12 @@ public: return old; } - bool push(T const & item) noexcept { - if (elems_ == nullptr) return false; - auto ptr = elems_->acquire(); - ::new (ptr) T(item); - elems_->commit(ptr); - return true; - } - - template - auto push(P&& param) noexcept // disable this if P is as same as T - -> Requires, T>::value, bool> { - if (elems_ == nullptr) return false; - auto ptr = elems_->acquire(); - ::new (ptr) T { std::forward

(param) }; - elems_->commit(ptr); - return true; - } - template - auto push(P&&... params) noexcept // some compilers are not support this well - -> Requires<(sizeof...(P) != 1), bool> { + auto push(P&&... params) noexcept { if (elems_ == nullptr) return false; auto ptr = elems_->acquire(); - ::new (ptr) T { std::forward

(params)... }; + if (ptr == nullptr) return false; + ::new (ptr) T(std::forward

(params)...); elems_->commit(ptr); return true; } @@ -116,9 +102,9 @@ public: auto [ques, size] = upd(); for (std::size_t i = 0; i < static_cast(size); ++i) { queue* que = ques[i]; - if (que == nullptr) continue; - if (que->elems_ == nullptr) return nullptr; - if (que->cursor_ != que->elems_->cursor()) { + if ((que != nullptr) && (que->elems_ == nullptr || + que->elems_->cursor() != que->cursor_)) { + // may que->elems_ == nullptr return que; } } @@ -129,8 +115,12 @@ public: } static T pop(queue* que) noexcept { - if (que == nullptr) return {}; - if (que->elems_ == nullptr) return {}; + if (que == nullptr) { + return {}; + } + if (que->elems_ == nullptr) { + return {}; + } auto item_ptr = static_cast(que->elems_->take(que->cursor_++)); T item = std::move(*item_ptr); que->elems_->put(item_ptr); diff --git a/include/ipc.h b/include/ipc.h index c5e37af..2036e7d 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -49,7 +49,7 @@ buff_t recv(handle_t const (& hs)[N]) { return recv(hs, N); } class IPC_EXPORT route { public: route(); - route(char const * name); + explicit route(char const * name); route(route&& rhs); ~route(); @@ -89,7 +89,7 @@ private: class IPC_EXPORT channel { public: channel(); - channel(char const * name); + explicit channel(char const * name); channel(channel&& rhs); ~channel(); @@ -111,6 +111,13 @@ public: bool send(buff_t const & buff); bool send(std::string const & str); + bool wait_for_recv(std::size_t r_count, std::size_t until) const; + bool wait_for_recv(std::size_t r_count) const; + + bool send_for(std::size_t r_count, void const * data, std::size_t size); + bool send_for(std::size_t r_count, buff_t const & buff); + bool send_for(std::size_t r_count, std::string const & str); + buff_t recv(); private: diff --git a/src/channel.inc b/src/channel.inc index be905b7..ced92e4 100644 --- a/src/channel.inc +++ b/src/channel.inc @@ -6,8 +6,9 @@ #include #include #include -#include #include +#include +#include #include "def.h" #include "shm.h" @@ -26,6 +27,20 @@ struct ch_info_t { }; #pragma pack() +inline bool wait_for_recv(route const & rt, std::size_t r_count, std::size_t until) { + for (unsigned k = 0; rt.recv_count() < r_count; ++k) { + if (k > until) return false; + std::this_thread::yield(); + } + return true; +} + +template +inline bool channel_send(route& rt, std::size_t r_count, P&&... params) { + if (!wait_for_recv(rt, r_count, 1024)) return false; + return rt.send(params...); // no need std::forward +} + } // internal-linkage //////////////////////////////////////////////////////////////// @@ -42,9 +57,7 @@ public: std::string n_; std::size_t id_; - std::unordered_map rts_; - - ~channel_(void) { rts_.clear(); } + std::array rts_; ch_info_t& info() { return *static_cast(h_.get()); @@ -92,7 +105,7 @@ char const * channel::name() const { } channel channel::clone() const { - return { name() }; + return channel { name() }; } bool channel::connect(char const * name) { @@ -123,7 +136,9 @@ void channel::disconnect() { std::unique_lock guard { impl(p_)->info().lc_ }; impl(p_)->acc().release(impl(p_)->id_); } - impl(p_)->rts_.clear(); + for (auto& rt : impl(p_)->rts_) { + rt.disconnect(); + } impl(p_)->r_.disconnect(); impl(p_)->h_.release(); } @@ -132,25 +147,36 @@ std::size_t channel::recv_count() const { return impl(p_)->r_.recv_count(); } -template -inline bool channel_send(route& rt, P&&... params) { - for (unsigned k = 0; rt.recv_count() == 0; ++k) { - if (k >= 1024) return false; - std::this_thread::yield(); - } - return rt.send(params...); // no need std::forward +bool channel::wait_for_recv(std::size_t r_count, std::size_t until) const { + return ::wait_for_recv(impl(p_)->r_, r_count, until); +} + +bool channel::wait_for_recv(std::size_t r_count) const { + return wait_for_recv(r_count, (std::numeric_limits::max)()); } bool channel::send(void const * data, std::size_t size) { - return channel_send(impl(p_)->r_, data, size); + return impl(p_)->r_.send(data, size); } bool channel::send(buff_t const & buff) { - return channel_send(impl(p_)->r_, buff); + return impl(p_)->r_.send(buff); } bool channel::send(std::string const & str) { - return channel_send(impl(p_)->r_, str); + return impl(p_)->r_.send(str); +} + +bool channel::send_for(std::size_t r_count, void const * data, std::size_t size) { + return ::channel_send(impl(p_)->r_, r_count, data, size); +} + +bool channel::send_for(std::size_t r_count, buff_t const & buff) { + return ::channel_send(impl(p_)->r_, r_count, buff); +} + +bool channel::send_for(std::size_t r_count, std::string const & str) { + return ::channel_send(impl(p_)->r_, r_count, str); } buff_t channel::recv() { @@ -159,33 +185,26 @@ buff_t channel::recv() { return ipc::multi_recv([&] { std::array acqs; std::size_t counter = 0; - std::unordered_map cache; // get all acquired ids { std::shared_lock guard { impl(p_)->info().lc_ }; - impl(p_)->acc().for_each([&](std::size_t id, bool acquired) { + impl(p_)->acc().for_acquired([this, &acqs, &counter](std::size_t id) { if (id == impl(p_)->id_) return; - if (acquired) { - acqs[counter++] = id; - } + acqs[counter++] = id; }); } // populate route cache & ques - for (std::size_t i = 0; i < counter; ++i) { - auto id = acqs[i]; - auto it = impl(p_)->rts_.find(id); + for (std::size_t k = 0; k < counter; ++k) { + std::size_t id = acqs[k]; + auto& it = impl(p_)->rts_[id]; // it's a new id - if (it == impl(p_)->rts_.end()) { - it = cache.emplace(id, (impl(p_)->n_ + std::to_string(id)).c_str()).first; - queue_of(it->second.handle())->connect(); + if (!it.valid()) { + it.connect((impl(p_)->n_ + std::to_string(id)).c_str()); + queue_of(it.handle())->connect(); } - // it's an existing id - else it = cache.insert(impl(p_)->rts_.extract(it)).position; // get queue of this route - ques[i] = queue_of(it->second.handle()); + ques[k] = queue_of(it.handle()); } - // update rts mapping - impl(p_)->rts_.swap(cache); return std::make_tuple(ques.data(), counter); }); } diff --git a/src/id_pool.inc b/src/id_pool.inc index a3d3400..cf854d3 100644 --- a/src/id_pool.inc +++ b/src/id_pool.inc @@ -11,13 +11,15 @@ public: }; private: + uint_t<8> acquir_ = 0; uint_t<8> cursor_ = 0; - uint_t<8> block_[max_count] {}; + uint_t<8> next_[max_count] {}; public: void init() { + acquir_ = max_count; for (std::size_t i = 0; i < max_count;) { - i = block_[i] = static_cast>(i + 1); + i = next_[i] = static_cast>(i + 1); } } @@ -35,20 +37,42 @@ public: return invalid_value; } std::size_t id = cursor_; - cursor_ = block_[id]; // point to next - block_[id] = 0; // clear flag + cursor_ = next_[id]; // point to next + next_[id] = acquir_; + acquir_ = static_cast>(id); // put it in acquired list return id; } - void release(std::size_t id) { - block_[id] = cursor_; + bool release(std::size_t id) { + if (acquir_ == max_count) return false; + if (acquir_ == id) { + acquir_ = next_[id]; // point to next + } + else { + auto a = next_[acquir_], l = acquir_; + while (1) { + if (a == max_count) { + return false; // found nothing + } + if (a == id) { + next_[l] = next_[a]; + break; + } + l = a; + a = next_[a]; + } + } + next_[id] = cursor_; cursor_ = static_cast>(id); // put it back + return true; } template - void for_each(F&& fr) { - for (std::size_t i = 0; i < max_count; ++i) { - fr(i, block_[i] == 0); + void for_acquired(F&& fr) { + auto a = acquir_; + while (a != max_count) { + fr(a); + a = next_[a]; } } }; diff --git a/src/ipc.cpp b/src/ipc.cpp index 42b6abf..45f0a03 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -17,25 +17,26 @@ using data_t = byte_t[data_length]; #pragma pack(1) struct msg_t { - int remain_; std::size_t id_; + int remain_; data_t data_; }; #pragma pack() -using queue_t = circ::queue; +using queue_t = circ::queue; +using msg_id_t = decltype(msg_t::id_); struct shm_info_t { - std::atomic_size_t id_acc_; // message id accumulator - queue_t::array_t elems_; // the circ_elem_array in shm + std::atomic id_acc_; // message id accumulator + queue_t::array_t elems_; // the circ_elem_array in shm }; constexpr queue_t* queue_of(handle_t h) { return static_cast(h); } -inline std::atomic_size_t* acc_of(queue_t* que) { - return reinterpret_cast(que->elems()) - 1; +inline std::atomic* acc_of(queue_t* que) { + return reinterpret_cast*>(que->elems()) - 1; } inline auto& recv_cache() { @@ -43,7 +44,7 @@ inline auto& recv_cache() { * the performance of tls::pointer is not good enough * so regardless of the mingw-crash-problem for the moment */ - thread_local std::unordered_map rc; + thread_local std::unordered_map rc; return rc; } @@ -109,28 +110,26 @@ bool send(handle_t h, void const * data, std::size_t size) { if (que == nullptr) { return false; } - // calc a new message id - auto msg_id = acc_of(que)->fetch_add(1, std::memory_order_relaxed); + // calc a new message id, start with 1 + auto msg_id = acc_of(que)->fetch_add(1, std::memory_order_relaxed) + 1; // push message fragment, one fragment size is data_length int offset = 0; for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { msg_t msg { - static_cast(size) - offset - static_cast(data_length), - msg_id, { 0 } + msg_id, static_cast(size) - offset - static_cast(data_length), {} }; std::memcpy(msg.data_, static_cast(data) + offset, data_length); - que->push(msg); + if (!que->push(msg)) return false; } // if remain > 0, this is the last message fragment int remain = static_cast(size) - offset; if (remain > 0) { msg_t msg { - remain - static_cast(data_length), - msg_id, { 0 } + msg_id, remain - static_cast(data_length), {} }; std::memcpy(msg.data_, static_cast(data) + offset, static_cast(remain)); - que->push(msg); + if (!que->push(msg)) return false; } return true; } @@ -141,6 +140,7 @@ buff_t multi_recv(F&& upd) { while(1) { // pop a new message auto msg = queue_t::pop(queue_t::multi_wait_for(upd)); + if (msg.id_ == 0) return {}; // msg.remain_ may minus & abs(msg.remain_) < data_length std::size_t remain = static_cast( static_cast(data_length) + msg.remain_); diff --git a/src/route.inc b/src/route.inc index f851302..b098e93 100644 --- a/src/route.inc +++ b/src/route.inc @@ -54,7 +54,7 @@ handle_t route::handle() const { } route route::clone() const { - return { name() }; + return route { name() }; } bool route::connect(char const * name) { diff --git a/test/test.h b/test/test.h index 696a219..25e5186 100644 --- a/test/test.h +++ b/test/test.h @@ -5,6 +5,12 @@ #include #include #include +#include +#include + +#if defined(__GNUC__) +# include // abi::__cxa_demangle +#endif/*__GNUC__*/ #include "stopwatch.hpp" @@ -56,13 +62,27 @@ struct test_verify { template struct test_cq; +template +std::string type_name() { +#if defined(__GNUC__) + const char* typeid_name = typeid(T).name(); + const char* real_name = abi::__cxa_demangle(typeid_name, nullptr, nullptr, nullptr); + std::unique_ptr guard { (void*)real_name, ::free }; + if (real_name == nullptr) real_name = typeid_name; + return real_name; +#else + return typeid(T).name(); +#endif/*__GNUC__*/ +} + template void benchmark_prod_cons(T* cq) { + std::cout << "benchmark_prod_cons " << type_name() << " [" << N << ":" << M << ", " << Loops << "]" << std::endl; test_cq tcq { cq }; std::thread producers[N]; std::thread consumers[M]; - std::atomic_int fini { 0 }; + std::atomic_int fini_p { 0 }, fini_c { 0 }; test_stopwatch sw; test_verify vf { M }; @@ -72,9 +92,20 @@ void benchmark_prod_cons(T* cq) { t = std::thread{[&, cid] { vf.prepare(&t); auto cn = tcq.connect(); - tcq.recv(cn, [&](auto&& msg) { vf.push_data(cid, msg); }); + int i = 0; + tcq.recv(cn, [&](auto&& msg) { + if (i % ((Loops * N) / 10) == 0) { + std::printf("%d-recving: %d%%\n", cid, (i * 100) / (Loops * N)); + } + vf.push_data(cid, msg); + ++i; + }); + std::printf("%d-consumer-disconnect\n", cid); tcq.disconnect(cn); - if (++fini != std::extent::value) return; + if (++fini_c != std::extent::value) { + std::printf("%d-consumer-end\n", cid); + return; + } sw.print_elapsed(N, M, Loops); vf.verify(N, Loops); }}; @@ -87,16 +118,20 @@ void benchmark_prod_cons(T* cq) { int pid = 0; for (auto& t : producers) { t = std::thread{[&, pid] { + auto cn = tcq.connect_send(); sw.start(); for (int i = 0; i < Loops; ++i) { - tcq.send({ pid, i }); + tcq.send(cn, { pid, i }); + if (i % (Loops / 10) == 0) { + std::printf("%d-sending: %d%%\n", pid, i * 100 / Loops); + } } + if (++fini_p != std::extent::value) return; + // quit + tcq.send(cn, { -1, -1 }); }}; ++pid; } for (auto& t : producers) t.join(); - // quit - tcq.send({ -1, -1 }); - for (auto& t : consumers) t.join(); } diff --git a/test/test_circ.cpp b/test/test_circ.cpp index 1e4e85a..6b0eb0a 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -108,10 +108,14 @@ struct test_cq> { } while(1); } - void send(msg_t const & msg) { - msg_t* pmsg = static_cast(ca_->acquire()); + ca_t* connect_send() { + return ca_; + } + + void send(ca_t* ca, msg_t const & msg) { + msg_t* pmsg = static_cast(ca->acquire()); (*pmsg) = msg; - ca_->commit(pmsg); + ca->commit(pmsg); } }; @@ -153,8 +157,12 @@ struct test_cq> { } while(1); } - void send(msg_t const & msg) { - cn_t{ ca_ }.push(msg); + cn_t connect_send() { + return cn_t{ ca_ }; + } + + void send(cn_t& cn, msg_t const & msg) { + cn.push(msg); } }; @@ -260,7 +268,7 @@ void Unit::test_prod_cons_performance() { void Unit::test_queue() { ipc::circ::queue queue; - queue.push(1, 2); + queue.push(msg_t { 1, 2 }); QCOMPARE(queue.pop(), msg_t{}); QVERIFY(sizeof(decltype(queue)::array_t) <= sizeof(*cq__)); diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index f4a69e3..838354e 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -14,10 +14,6 @@ #include #include -#if defined(__GNUC__) -# include // abi::__cxa_demangle -#endif/*__GNUC__*/ - #include "stopwatch.hpp" #include "spin_lock.hpp" #include "random.hpp" @@ -37,53 +33,8 @@ constexpr int LoopCount = 100000; } // internal-linkage -template <> -struct test_cq { - using cn_t = ipc::route; - - std::string conn_name_; - - test_cq(void*) - : conn_name_("test-ipc-route") { - ipc::clear_recv(conn_name_.c_str()); - } - - cn_t connect() { - return { conn_name_.c_str() }; - } - - void disconnect(cn_t& cn) { - cn.disconnect(); - } - - void wait_start(int M) { - auto watcher = ipc::connect(conn_name_.c_str()); - while (ipc::recv_count(watcher) != static_cast(M)) { - std::this_thread::yield(); - } - } - - template - void recv(cn_t& cn, F&& proc) { - do { - auto msg = cn.recv(); - if (msg.size() < 2) return; - proc(msg); - } while(1); - } - - void send(const std::array& info) { - thread_local auto cn = connect(); - int n = info[1]; - if (n < 0) { - cn.send(ipc::buff_t { '\0' }); - } - else cn.send(datas__[static_cast(n)]); - } -}; - -template <> -struct test_verify { +template +struct test_verify { std::unordered_map> list_; int lcount_; @@ -105,6 +56,120 @@ struct test_verify { } }; +template <> +struct test_cq { + using cn_t = ipc::route; + + std::string conn_name_; + + test_cq(void*) + : conn_name_("test-ipc-route") { + ipc::clear_recv(conn_name_.c_str()); + } + + cn_t connect() { + return cn_t { conn_name_.c_str() }; + } + + void disconnect(cn_t& cn) { + cn.disconnect(); + } + + void wait_start(int M) { + auto watcher = ipc::connect(conn_name_.c_str()); + while (ipc::recv_count(watcher) != static_cast(M)) { + std::this_thread::yield(); + } + } + + template + void recv(cn_t& cn, F&& proc) { + do { + auto msg = cn.recv(); + if (msg.size() < 2) { + QCOMPARE(msg, ipc::buff_t { '\0' }); + return; + } + proc(msg); + } while(1); + } + + cn_t connect_send() { + return connect(); + } + + void send(cn_t& cn, const std::array& info) { + int n = info[1]; + if (n < 0) { + cn.send(ipc::buff_t { '\0' }); + } + else cn.send(datas__[static_cast(n)]); + } +}; + +template <> +struct test_cq { + using cn_t = ipc::channel; + + std::string conn_name_; + int m_ = 0; + std::vector s_cns_; + ipc::rw_lock lc_; + + test_cq(void*) + : conn_name_("test-ipc-channel") { + } + + ~test_cq() { + for (auto p : s_cns_) delete p; + } + + cn_t connect() { + return cn_t { conn_name_.c_str() }; + } + + void disconnect(cn_t& cn) { + cn.disconnect(); + } + + void wait_start(int M) { m_ = M; } + + template + void recv(cn_t& cn, F&& proc) { + do { + auto msg = cn.recv(); + if (msg.size() < 2) { + QCOMPARE(msg, ipc::buff_t { '\0' }); + return; + } + proc(msg); + } while(1); + } + + cn_t* connect_send() { + auto p = new cn_t { conn_name_.c_str() }; + { + std::unique_lock guard { lc_ }; + s_cns_.push_back(p); + } + return p; + } + + void send(cn_t* cn, const std::array& info) { + thread_local struct s_dummy { + s_dummy(cn_t* cn, int m) { + cn->wait_for_recv(m); + std::printf("start to send: %d.\n", m); + } + } _(cn, m_); + int n = info[1]; + if (n < 0) { + cn->send(ipc::buff_t { '\0' }); + } + else cn->send(datas__[static_cast(n)]); + } +}; + namespace { class Unit : public TestSuite { @@ -121,9 +186,13 @@ private slots: void test_rw_lock(); void test_send_recv(); void test_route(); + void test_route_rtt(); void test_route_performance(); void test_channel(); void test_channel_rtt(); + void test_channel_performance_1vN(); + void test_channel_performance_Nv1(); + void test_channel_performance_NvN(); } unit__; #include "test_ipc.moc" @@ -170,17 +239,7 @@ void benchmark_lc() { Lc lc; test_stopwatch sw; -#if defined(__GNUC__) - { - const char* typeid_name = typeid(Lc).name(); - const char* real_name = abi::__cxa_demangle(typeid_name, nullptr, nullptr, nullptr); - std::unique_ptr guard { (void*)real_name, ::free }; - if (real_name == nullptr) real_name = typeid_name; - std::cout << std::endl << real_name << std::endl; - } -#else - std::cout << std::endl << typeid(Lc).name() << std::endl; -#endif/*__GNUC__*/ + std::cout << std::endl << type_name() << std::endl; for (auto& t : r_trd) { t = std::thread([&] { @@ -260,56 +319,25 @@ void Unit::test_send_recv() { QVERIFY(h != nullptr); ipc::clear_recv(h); char data[] = "hello ipc!"; - QVERIFY(ipc::send(h, data, sizeof(data))); - auto got = ipc::recv(h); - QCOMPARE((char*)got.data(), data); + std::thread xx([h, data] { + auto got = ipc::recv(h); + QCOMPARE((char*)got.data(), data); + }); + while (!ipc::send(h, data, sizeof(data))) { + std::this_thread::yield(); + } + xx.join(); ipc::disconnect(h); } +template +void test_prod_cons() { + benchmark_prod_cons>((T*)nullptr); +} + void Unit::test_route() { ipc::clear_recv("my-ipc-route"); - auto wait_for_handshake = [](int id) { - ipc::route cc { "my-ipc-route" }; - std::string cfm = "copy:" + std::to_string(id), ack = "re-" + cfm; - std::atomic_bool unmatched { true }; - std::thread re {[&] { - bool has_re = false; - do { - auto dd = cc.recv(); - QVERIFY(!dd.empty()); - std::string got { reinterpret_cast(dd.data()), dd.size() - 1 }; - if (cfm == got) continue; - std::cout << id << "-recv: " << got << "[" << dd.size() << "]" << std::endl; - if (ack != got) { - char const cp[] = "copy:"; - // check header - if (std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0) { - std::cout << id << "-re: " << got << std::endl; - QVERIFY(has_re = cc.send( - std::string{ "re-" }.append( - reinterpret_cast(dd.data()), dd.size() - 1))); - } - } - else if (unmatched.load(std::memory_order_relaxed)) { - unmatched.store(false, std::memory_order_release); - std::cout << id << "-matched!" << std::endl; - } - } while (!has_re || unmatched.load(std::memory_order_relaxed)); - }}; - while (unmatched.load(std::memory_order_acquire)) { - if (!cc.send(cfm)) { - std::cout << id << "-send failed!" << std::endl; - unmatched = false; - break; - } - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } - re.join(); - std::cout << id << "-fini handshake!" << std::endl; - return cc; - }; - std::vector const datas = { "hello!", "foo", @@ -322,71 +350,106 @@ void Unit::test_route() { }; std::thread t1 {[&] { - auto cc = wait_for_handshake(1); - const char cp[] = "copy:", re[] = "re-copy:"; - bool unchecked = true; - for (std::size_t i = 0; i < datas.size(); ++i, unchecked = false) { - ipc::buff_t dd; - do { - dd = cc.recv(); - } while (unchecked && - (((dd.size() > sizeof(cp)) && std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0) || - ((dd.size() > sizeof(re)) && std::memcmp(dd.data(), re, sizeof(re) - 1) == 0))); + ipc::route cc { "my-ipc-route" }; + for (std::size_t i = 0; i < datas.size(); ++i) { + ipc::buff_t dd = cc.recv(); QCOMPARE(dd.size(), std::strlen(datas[i]) + 1); QVERIFY(std::memcmp(dd.data(), datas[i], dd.size()) == 0); } }}; std::thread t2 {[&] { - auto cc = wait_for_handshake(2); + ipc::route cc { "my-ipc-route" }; + while (cc.recv_count() == 0) { + std::this_thread::yield(); + } for (std::size_t i = 0; i < datas.size(); ++i) { std::cout << "sending: " << datas[i] << std::endl; - cc.send(datas[i]); + QVERIFY(cc.send(datas[i])); } }}; t1.join(); t2.join(); + + test_prod_cons(); } -template -void test_prod_cons() { - benchmark_prod_cons>((ipc::route*)nullptr); +void Unit::test_route_rtt() { + test_stopwatch sw; + + std::thread t1 {[&] { + ipc::route cc { "my-ipc-route-1" }; + ipc::route cr { "my-ipc-route-2" }; + for (std::size_t i = 0;; ++i) { + auto dd = cc.recv(); + if (dd.size() < 2) return; +// std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl; + while (!cr.send(ipc::buff_t { 'a' })) { + std::this_thread::yield(); + } + } + }}; + + std::thread t2 {[&] { + ipc::route cc { "my-ipc-route-1" }; + ipc::route cr { "my-ipc-route-2" }; + while (cc.recv_count() == 0) { + std::this_thread::yield(); + } + sw.start(); + for (std::size_t i = 0; i < LoopCount; ++i) { +// std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl; + cc.send(datas__[i]); + /*auto dd = */cr.recv(); +// if (dd.size() != 1 || dd[0] != 'a') { +// QVERIFY(false); +// } + } + cc.send(ipc::buff_t { '\0' }); + t1.join(); + sw.print_elapsed(1, 1, LoopCount); + }}; + + t2.join(); } -template +template struct test_performance { + template static void start() { - test_performance

::start(); - test_prod_cons(); + test_performance

::template start(); + test_prod_cons(); } }; -template -struct test_performance<1, C> { +template +struct test_performance<1, C, V> { + template static void start() { - test_performance<1, C - 1>::start(); - test_prod_cons<1, C, false>(); + test_performance<1, C - 1, V>::template start(); + test_prod_cons(); } }; -template -struct test_performance { +template +struct test_performance { + template static void start() { - test_performance

::start(); - test_prod_cons(); + test_performance

::template start(); + test_prod_cons(); } }; -template <> -struct test_performance<1, 1> { +template +struct test_performance<1, 1, V> { + template static void start() { - test_prod_cons<1, 1, false>(); + test_prod_cons(); } }; void Unit::test_route_performance() { - test_prod_cons<1, 1>(); test_performance<1, 10>::start(); } @@ -402,9 +465,7 @@ void Unit::test_channel() { std::thread t2 {[&] { ipc::channel cc { "my-ipc-channel" }; - while (cc.recv_count() == 0) { - std::this_thread::yield(); - } + cc.wait_for_recv(1); for (std::size_t i = 0; i < (std::min)(100, LoopCount); ++i) { std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl; cc.send(datas__[i]); @@ -425,16 +486,19 @@ void Unit::test_channel_rtt() { auto dd = cc.recv(); if (dd.size() < 2) return; // std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl; - while (!cc.send(ipc::buff_t { 'a' })) {} + while (!cc.send(ipc::buff_t { 'a' })) { + cc.wait_for_recv(1); + } } }}; std::thread t2 {[&] { ipc::channel cc { "my-ipc-channel" }; + cc.wait_for_recv(1); sw.start(); for (std::size_t i = 0; i < LoopCount; ++i) { // std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl; - while (!cc.send(datas__[i])) {} + cc.send(datas__[i]); /*auto dd = */cc.recv(); // if (dd.size() != 1 || dd[0] != 'a') { // QVERIFY(false); @@ -442,10 +506,27 @@ void Unit::test_channel_rtt() { } cc.send(ipc::buff_t { '\0' }); t1.join(); - sw.print_elapsed(DataMin, DataMax, LoopCount); + sw.print_elapsed(1, 1, LoopCount); }}; t2.join(); } +void Unit::test_channel_performance_1vN() { + test_performance<1, 10, true>::start(); +// test_prod_cons(); +} + +void Unit::test_channel_performance_Nv1() { + test_performance<10, 1>::start(); +// test_prod_cons(); +// test_prod_cons(); +// test_prod_cons(); +// test_prod_cons(); +} + +void Unit::test_channel_performance_NvN() { + test_performance<10, 10>::start(); +} + } // internal-linkage