From 27eb5ee99b29c106b43ab5128dc2aaf0381eec74 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 6 Jan 2019 23:20:19 +0800 Subject: [PATCH] use ipc::yield --- include/circ_elem_array.h | 21 +++++++++++---------- include/circ_queue.h | 9 +++------ include/ipc.h | 4 +--- include/rw_lock.h | 12 ++++++++++-- src/channel.inc | 18 ++++-------------- 5 files changed, 29 insertions(+), 35 deletions(-) diff --git a/include/circ_elem_array.h b/include/circ_elem_array.h index ef78bf1..d3dfd15 100644 --- a/include/circ_elem_array.h +++ b/include/circ_elem_array.h @@ -5,6 +5,7 @@ #include #include "def.h" +#include "rw_lock.h" namespace ipc { namespace circ { @@ -99,7 +100,7 @@ struct prod_cons template bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t* elem_start) noexcept { byte_t buff[sizeof(detail::elem_t)]; - while (1) { + for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_acquire); if (detail::index_of(cur_rd) == detail::index_of(wt_.load(std::memory_order_relaxed))) { @@ -110,7 +111,7 @@ struct prod_cons std::forward(f)(buff); return true; } - std::this_thread::yield(); + ipc::yield(k); } } }; @@ -124,7 +125,7 @@ struct prod_cons template bool push(E* /*elems*/, F&& f, detail::elem_t* elem_start) { detail::u2_t cur_ct, nxt_ct; - while (1) { + for (unsigned k = 0;;) { cur_ct = ct_.load(std::memory_order_acquire); if (detail::index_of(nxt_ct = cur_ct + 1) == detail::index_of(rd_.load(std::memory_order_relaxed))) { @@ -133,15 +134,15 @@ struct prod_cons if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) { break; } - std::this_thread::yield(); + ipc::yield(k); } std::forward(f)(elem_start + detail::index_of(cur_ct)); - while (1) { + for (unsigned k = 0;;) { auto exp_wt = cur_ct; if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { break; } - std::this_thread::yield(); + ipc::sleep(k); } return true; } @@ -170,13 +171,13 @@ struct prod_cons { if (conn_cnt == 0) return false; auto el = elem_start + detail::index_of(wt_.load(std::memory_order_relaxed)); // check all consumers have finished reading this element - while (1) { + for (unsigned k = 0;;) { rc_t expected = 0; if (el->head_.rc_.compare_exchange_weak( expected, static_cast(conn_cnt), std::memory_order_relaxed)) { break; } - std::this_thread::yield(); + ipc::sleep(k); conn_cnt = elems->conn_count(); // acquire if (conn_cnt == 0) return false; } @@ -190,7 +191,7 @@ struct prod_cons { if (cur == cursor()) return false; // acquire auto el = elem_start + detail::index_of(cur++); std::forward(f)(el->data_); - while (1) { + for (unsigned k = 0;;) { rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire); if (cur_rc == 0) { return true; @@ -199,7 +200,7 @@ struct prod_cons { cur_rc, cur_rc - 1, std::memory_order_release)) { return true; } - std::this_thread::yield(); + ipc::yield(k); } } }; diff --git a/include/circ_queue.h b/include/circ_queue.h index 5fb0e8a..6913067 100644 --- a/include/circ_queue.h +++ b/include/circ_queue.h @@ -10,6 +10,7 @@ #include #include "def.h" +#include "rw_lock.h" #include "circ_elem_array.h" namespace ipc { @@ -97,7 +98,7 @@ public: template static queue* multi_wait_for(F&& upd) noexcept { - for (uint_t<32> k = 0;;) { + for (unsigned k = 0;;) { auto [ques, size] = upd(); for (std::size_t i = 0; i < static_cast(size); ++i) { queue* que = ques[i]; @@ -107,11 +108,7 @@ public: return que; } } - if (k < 4096) { - std::this_thread::yield(); - ++k; - } - else std::this_thread::sleep_for(std::chrono::milliseconds(1)); + ipc::sleep(k); } } diff --git a/include/ipc.h b/include/ipc.h index 6947cfd..e59196a 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -102,9 +102,7 @@ public: void disconnect(); std::size_t recv_count() const; - - bool wait_for_recv(std::size_t r_count, std::size_t until) const; - bool wait_for_recv(std::size_t r_count) const; + void wait_for_recv(std::size_t r_count) const; bool send(void const * data, std::size_t size); bool send(buff_t const & buff); diff --git a/include/rw_lock.h b/include/rw_lock.h index 895e11a..362e2bf 100644 --- a/include/rw_lock.h +++ b/include/rw_lock.h @@ -57,12 +57,15 @@ namespace ipc { -inline void yield(unsigned& k) noexcept { +template +inline void yield(K& k) noexcept { if (k < 4) { /* Do nothing */ } else if (k < 16) { IPC_LOCK_PAUSE_(); } else - if (k < 32) { std::this_thread::yield(); } + if (k < static_cast(N)) { + std::this_thread::yield(); + } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); return; @@ -70,6 +73,11 @@ inline void yield(unsigned& k) noexcept { ++k; } +template +inline void sleep(K& k) noexcept { + ipc::yield(k); +} + } // namespace ipc #pragma pop_macro("IPC_LOCK_PAUSE_") diff --git a/src/channel.inc b/src/channel.inc index 785d982..a77920c 100644 --- a/src/channel.inc +++ b/src/channel.inc @@ -27,14 +27,6 @@ struct ch_info_t { }; #pragma pack() -inline bool wait_for_recv(route const & rt, std::size_t r_count, std::size_t until) { - for (unsigned k = 0; rt.recv_count() < r_count; ++k) { - if (k > until) return false; - std::this_thread::yield(); - } - return true; -} - } // internal-linkage //////////////////////////////////////////////////////////////// @@ -154,12 +146,10 @@ std::size_t channel::recv_count() const { return impl(p_)->r_.recv_count(); } -bool channel::wait_for_recv(std::size_t r_count, std::size_t until) const { - return ::wait_for_recv(impl(p_)->sender(), r_count, until); -} - -bool channel::wait_for_recv(std::size_t r_count) const { - return wait_for_recv(r_count, (std::numeric_limits::max)()); +void channel::wait_for_recv(std::size_t r_count) const { + for (unsigned k = 0; impl(p_)->sender().recv_count() < r_count;) { + ipc::sleep(k); + } } bool channel::send(void const * data, std::size_t size) {