diff --git a/include/circ_elem_array.h b/include/circ_elem_array.h index 9984fba..4923ad4 100644 --- a/include/circ_elem_array.h +++ b/include/circ_elem_array.h @@ -125,7 +125,7 @@ struct prod_cons template bool push(E* /*elems*/, F&& f, detail::elem_t* elem_start) { detail::u2_t cur_ct, nxt_ct; - for (unsigned k = 0;;) { + while(1) { cur_ct = ct_.load(std::memory_order_acquire); if (detail::index_of(nxt_ct = cur_ct + 1) == detail::index_of(rd_.load(std::memory_order_relaxed))) { @@ -134,15 +134,15 @@ struct prod_cons if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) { break; } - ipc::sleep(k); + std::this_thread::yield(); } std::forward(f)(elem_start + detail::index_of(cur_ct)); - for (unsigned k = 0;;) { + while(1) { auto exp_wt = cur_ct; if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { break; } - ipc::sleep(k); + std::this_thread::yield(); } return true; } @@ -171,13 +171,13 @@ struct prod_cons { if (conn_cnt == 0) return false; auto el = elem_start + detail::index_of(wt_.load(std::memory_order_relaxed)); // check all consumers have finished reading this element - for (unsigned k = 0;;) { + while(1) { rc_t expected = 0; if (el->head_.rc_.compare_exchange_weak( expected, static_cast(conn_cnt), std::memory_order_relaxed)) { break; } - ipc::sleep(k); + std::this_thread::yield(); conn_cnt = elems->conn_count(); // acquire if (conn_cnt == 0) return false; } @@ -205,6 +205,42 @@ struct prod_cons { } }; +template <> +struct prod_cons + : prod_cons { + + std::atomic ct_ { 0 }; // commit index + + template + bool push(E* elems, F&& f, detail::elem_t* elem_start) { + auto conn_cnt = elems->conn_count(); // acquire + if (conn_cnt == 0) return false; + detail::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_relaxed), + nxt_ct = cur_ct + 1; + auto el = elem_start + detail::index_of(cur_ct); + // check all consumers have finished reading this element + while(1) { + rc_t expected = 0; + if (el->head_.rc_.compare_exchange_weak( + expected, static_cast(conn_cnt), std::memory_order_relaxed)) { + break; + } + std::this_thread::yield(); + conn_cnt = elems->conn_count(); // acquire + if (conn_cnt == 0) return false; + } + std::forward(f)(el->data_); + while(1) { + auto exp_wt = cur_ct; + if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { + break; + } + std::this_thread::yield(); + } + return true; + } +}; + //////////////////////////////////////////////////////////////// /// element-array implementation //////////////////////////////////////////////////////////////// diff --git a/include/rw_lock.h b/include/rw_lock.h index 92c6fbc..8e7184a 100644 --- a/include/rw_lock.h +++ b/include/rw_lock.h @@ -57,7 +57,19 @@ namespace ipc { -template +template +inline void yield(K& k) noexcept { + if (k < 4) { /* Do nothing */ } + else + if (k < 16) { IPC_LOCK_PAUSE_(); } + else { + std::this_thread::yield(); + return; + } + ++k; +} + +template inline void sleep(K& k) noexcept { if (k < static_cast(N)) { std::this_thread::yield(); @@ -69,18 +81,6 @@ inline void sleep(K& k) noexcept { ++k; } -template -inline void yield(K& k) noexcept { - if (k < 4) { /* Do nothing */ } - else - if (k < 16) { IPC_LOCK_PAUSE_(); } - else { - ipc::sleep(k); - return; - } - ++k; -} - } // namespace ipc #pragma pop_macro("IPC_LOCK_PAUSE_") diff --git a/test/test_circ.cpp b/test/test_circ.cpp index 1cf1995..0fe66b6 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -266,29 +266,39 @@ void Unit::test_prod_cons_1v1() { ipc::circ::prod_cons - > el_arr_ss; - benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ss); - benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ss); + > el_arr_ssu; + benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ssu); + benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ssu); ipc::circ::elem_array< sizeof(msg_t), ipc::circ::prod_cons - > el_arr_smn; - benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_smn)::policy_t>(&el_arr_smn); - benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smn); + > el_arr_smu; + benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu); + benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smu); ipc::circ::elem_array< sizeof(msg_t), ipc::circ::prod_cons - > el_arr_mmn; - benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmn)::policy_t>(&el_arr_mmn); - benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmn); + > el_arr_mmu; + benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu); + benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmu); test_prod_cons<1, 1>(); + test_prod_cons<1, 1, false>(); + + ipc::circ::elem_array< + sizeof(msg_t), + ipc::circ::prod_cons + > el_arr_mmb; + benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_mmb); + benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb); } void Unit::test_prod_cons_1v3() { @@ -297,20 +307,30 @@ void Unit::test_prod_cons_1v3() { ipc::circ::prod_cons - > el_arr_smn; - benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smn)::policy_t>(&el_arr_smn); - benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smn); + > el_arr_smu; + benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu); + benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smu); ipc::circ::elem_array< sizeof(msg_t), ipc::circ::prod_cons - > el_arr_mmn; - benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmn)::policy_t>(&el_arr_mmn); - benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmn); + > el_arr_mmu; + benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu); + benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmu); test_prod_cons<1, 3>(); + test_prod_cons<1, 3, false>(); + + ipc::circ::elem_array< + sizeof(msg_t), + ipc::circ::prod_cons + > el_arr_mmb; + benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_mmb); + benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmb); } void Unit::test_prod_cons_performance() { @@ -319,31 +339,47 @@ void Unit::test_prod_cons_performance() { ipc::circ::prod_cons - > el_arr_smn; - ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_smn](auto index) { - benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smn); - }); - - ipc::circ::elem_array< - sizeof(msg_t), - ipc::circ::prod_cons - > el_arr_mmn; - ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmn](auto index) { - benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmn); - }); - ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmn](auto index) { - benchmark_prod_cons(&el_arr_mmn); - }); - ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmn](auto index) { - benchmark_prod_cons(&el_arr_mmn); + > el_arr_smu; + ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_smu](auto index) { + benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smu); }); ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [](auto index) { test_prod_cons<1, decltype(index)::value + 1, false>(); }); test_prod_cons<1, 10>(); // test & verify + + ipc::circ::elem_array< + sizeof(msg_t), + ipc::circ::prod_cons + > el_arr_mmu; + ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmu](auto index) { + benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmu); + }); + ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmu](auto index) { + benchmark_prod_cons(&el_arr_mmu); + }); + ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmu](auto index) { + benchmark_prod_cons(&el_arr_mmu); + }); + + ipc::circ::elem_array< + sizeof(msg_t), + ipc::circ::prod_cons + > el_arr_mmb; + ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmb](auto index) { + benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmb); + }); + ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmb](auto index) { + benchmark_prod_cons(&el_arr_mmb); + }); + ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmb](auto index) { + benchmark_prod_cons(&el_arr_mmb); + }); } void Unit::test_queue() {