diff --git a/performance.xlsx b/performance.xlsx index f5564d2..82f29d5 100644 Binary files a/performance.xlsx and b/performance.xlsx differ diff --git a/src/ipc.cpp b/src/ipc.cpp index c23dbcd..79d20d7 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -146,7 +146,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { { que, msg_id, static_cast(size) - offset - static_cast(data_length) }, {} }; std::memcpy(msg.data_, static_cast(data) + offset, data_length); - if (!que->push(msg)) return false; + while (!que->push(msg)) std::this_thread::yield(); } // if remain > 0, this is the last message fragment int remain = static_cast(size) - offset; @@ -156,7 +156,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { }; std::memcpy(msg.data_, static_cast(data) + offset, static_cast(remain)); - if (!que->push(msg)) return false; + while (!que->push(msg)) std::this_thread::yield(); } return true; } diff --git a/src/prod_cons.h b/src/prod_cons.h index bdd8966..4acd4da 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -182,17 +182,12 @@ struct prod_cons_impl> { bool push(W* wrapper, F&& f, E* elems) { auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); if (conn_cnt == 0) return false; - auto el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); + auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); // check all consumers have finished reading this element - while (1) { - rc_t expected = 0; - if (el->rc_.compare_exchange_weak( - expected, static_cast(conn_cnt), std::memory_order_release)) { - break; - } - std::this_thread::yield(); - conn_cnt = wrapper->conn_count(); // acquire - if (conn_cnt == 0) return false; + rc_t expected = 0; + if (!el->rc_.compare_exchange_strong( + expected, static_cast(conn_cnt), std::memory_order_release)) { + return false; // full } std::forward(f)(&(el->data_)); wt_.fetch_add(1, std::memory_order_release); @@ -202,7 +197,7 @@ struct prod_cons_impl> { template class E, std::size_t DataSize> bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E* elems) { if (cur == cursor()) return false; // acquire - auto el = elems + circ::index_of(cur++); + auto* el = elems + circ::index_of(cur++); std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { rc_t cur_rc = el->rc_.load(std::memory_order_acquire); @@ -219,38 +214,89 @@ struct prod_cons_impl> { }; template <> -struct prod_cons_impl> - : prod_cons_impl> { +struct prod_cons_impl> { + + using rc_t = std::uint64_t; + using flag_t = std::uint64_t; + + enum : rc_t { + rc_mask = 0x00000000ffffffffull, + rc_incr = 0x0000000100000000ull + }; + + template + struct elem_t { + byte_t data_[DataSize] {}; + std::atomic rc_ { 0 }; // read-counter + std::atomic f_ct_ { 0 }; // commit flag + }; alignas(circ::cache_line_size) std::atomic ct_; // commit index + circ::u2_t cursor() const noexcept { + return ct_.load(std::memory_order_acquire); + } + template class E, std::size_t DataSize> bool push(W* wrapper, F&& f, E* elems) { - auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); - if (conn_cnt == 0) return false; - circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_acquire), - nxt_ct = cur_ct + 1; - auto el = elems + circ::index_of(cur_ct); - // check all consumers have finished reading this element - while (1) { - rc_t expected = 0; + E* el; + circ::u2_t cur_ct, nxt_ct; + for (unsigned k = 0;;) { + auto cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) { + return false; // no reader + } + el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); + auto cur_rc = el->rc_.load(std::memory_order_acquire); + if (cur_rc & rc_mask) { + return false; // full + } + auto cur_fl = el->f_ct_.load(std::memory_order_acquire); + if ((cur_fl != cur_ct) && cur_fl) { + return false; // full + } + // (cur_rc & rc_mask) should == 0 here if (el->rc_.compare_exchange_weak( - expected, static_cast(conn_cnt), std::memory_order_release)) { + cur_rc, static_cast(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) { break; } - std::this_thread::yield(); - conn_cnt = wrapper->conn_count(); // acquire - if (conn_cnt == 0) return false; + ipc::yield(k); } + // only one thread/process would touch here at one time + ct_.store(nxt_ct = cur_ct + 1, std::memory_order_release); std::forward(f)(&(el->data_)); - while (1) { - auto exp_wt = cur_ct; - if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { + // set flag & try update wt + el->f_ct_.store(~static_cast(cur_ct)); + return true; + } + + template class E, std::size_t DataSize, std::size_t N> + bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E(& elems)[N]) { + auto* el = elems + circ::index_of(cur); + auto cur_fl = el->f_ct_.load(std::memory_order_acquire); + if (cur_fl != ~static_cast(cur)) { + return false; // empty + } + ++cur; + std::forward(f)(&(el->data_)); + for (unsigned k = 0;;) { + auto cur_rc = el->rc_.load(std::memory_order_acquire); + switch (cur_rc & rc_mask) { + case 0: + el->f_ct_.store(cur + N - 1, std::memory_order_release); + return true; + case 1: + el->f_ct_.store(cur + N - 1, std::memory_order_release); + [[fallthrough]]; + default: + if (el->rc_.compare_exchange_weak( + cur_rc, cur_rc + rc_incr - 1, std::memory_order_release)) { + return true; + } break; } - std::this_thread::yield(); + ipc::yield(k); } - return true; } }; diff --git a/src/queue.h b/src/queue.h index c9c2eb4..82cb02c 100644 --- a/src/queue.h +++ b/src/queue.h @@ -81,29 +81,31 @@ public: } template - std::size_t connect(Elems* elems) { - if (elems == nullptr) return invalid_value; + auto connect(Elems* elems) + -> std::tuple().cursor())> { + if (elems == nullptr) return {}; if (connected_) { - // if it's already connected, just return an error count - return invalid_value; + // if it's already connected, just return false + return {}; } connected_ = true; - auto ret = elems->connect(); + elems->connect(); + auto ret = std::make_tuple(true, elems->cursor()); cc_waiter_.broadcast(); return ret; } template - std::size_t disconnect(Elems* elems) { - if (elems == nullptr) return invalid_value; + bool disconnect(Elems* elems) { + if (elems == nullptr) return false; if (!connected_) { - // if it's already disconnected, just return an error count - return invalid_value; + // if it's already disconnected, just return false + return false; } connected_ = false; - auto ret = elems->disconnect(); + elems->disconnect(); cc_waiter_.broadcast(); - return ret; + return true; } template @@ -155,11 +157,16 @@ public: return elems_; } - std::size_t connect() { - return base_t::connect(elems_); + bool connect() { + auto tp = base_t::connect(elems_); + if (std::get<0>(tp)) { + cursor_ = std::get<1>(tp); + return true; + } + return false; } - std::size_t disconnect() { + bool disconnect() { return base_t::disconnect(elems_); } @@ -186,9 +193,6 @@ public: base_t::close(old); } else base_t::open(elems_, name); - if (elems_ != nullptr) { - cursor_ = elems_->cursor(); - } return old; } diff --git a/test/test_circ.cpp b/test/test_circ.cpp index 9f60ee5..63e87d3 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -183,12 +183,12 @@ struct test_cq> { cn_t* connect() { cn_t* queue = new cn_t { ca_ }; - [&] { QVERIFY(queue->connect() != ipc::invalid_value); } (); + [&] { QVERIFY(queue->connect()); } (); return queue; } void disconnect(cn_t* queue) { - QVERIFY(queue->disconnect() != ipc::invalid_value); + QVERIFY(queue->disconnect()); QVERIFY(queue->detach() != nullptr); delete queue; } @@ -241,7 +241,7 @@ private slots: #include "test_circ.moc" -constexpr int LoopCount = 1000000; +constexpr int LoopCount = 10000000; //constexpr int LoopCount = 1000/*0000*/; void Unit::initTestCase() { @@ -270,6 +270,13 @@ void test_prod_cons() { } void Unit::test_prod_cons_1v1() { +// ea_t< +// sizeof(msg_t), +// pc_t +// > el_arr_mmb; +// benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb); +// benchmark_prod_cons<2, 1, LoopCount, void>(&el_arr_mmb); + ea_t< sizeof(msg_t), pc_t @@ -387,7 +394,7 @@ void Unit::test_queue() { queue.attach(cq); QVERIFY(queue.detach() != nullptr); - ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { + ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) { benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((queue_t*)nullptr); }); } diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 72dc0ff..6d0c544 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -402,7 +402,7 @@ void Unit::test_route_rtt() { void Unit::test_route_performance() { //return; - ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { + ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) { test_prod_cons(); }); test_prod_cons(); // test & verify @@ -476,13 +476,13 @@ void Unit::test_channel_rtt() { } void Unit::test_channel_performance() { - ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { + ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) { test_prod_cons(); }); - ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { + ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) { test_prod_cons(); }); - ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { + ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) { test_prod_cons(); });