From 46051733bb712bc7dcf4e8c8b1d0fb1bcfbc5c31 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Tue, 12 Mar 2019 11:16:33 +0800 Subject: [PATCH] optimize --- src/circ/elem_array.h | 7 +-- src/circ/elem_def.h | 22 +------ src/prod_cons.h | 143 +++++++++++++++++++++++------------------- test/test_circ.cpp | 4 +- 4 files changed, 85 insertions(+), 91 deletions(-) diff --git a/src/circ/elem_array.h b/src/circ/elem_array.h index dd97281..33c8fab 100644 --- a/src/circ/elem_array.h +++ b/src/circ/elem_array.h @@ -18,12 +18,7 @@ class elem_array { public: using policy_t = Policy; using cursor_t = decltype(std::declval().cursor()); - -#if __cplusplus >= 201703L - using elem_t = ipc::circ::elem_t>; -#else /*__cplusplus < 201703L*/ - using elem_t = ipc::circ::elem_t::value>; -#endif/*__cplusplus < 201703L*/ + using elem_t = typename policy_t::template elem_t; enum : std::size_t { data_size = DataSize, diff --git a/src/circ/elem_def.h b/src/circ/elem_def.h index 90ca6b1..7f77714 100644 --- a/src/circ/elem_def.h +++ b/src/circ/elem_def.h @@ -13,28 +13,12 @@ namespace ipc { namespace circ { -struct elem_head { - std::atomic rc_ { 0 }; // read-counter +enum { + cache_line_size = 64 }; -template -struct elem_t { - elem_head head_; - byte_t data_[DataSize] {}; -}; - -template <> -struct elem_t<0> { - elem_head head_; -}; - -template -elem_t* elem_of(void* ptr) noexcept { - return reinterpret_cast*>(static_cast(ptr) - sizeof(elem_head)); -} - using u1_t = ipc::uint_t<8>; -using u2_t = ipc::uint_t<16>; +using u2_t = ipc::uint_t<32>; constexpr u1_t index_of(u2_t c) noexcept { return static_cast(c); diff --git a/src/prod_cons.h b/src/prod_cons.h index e5a2edb..f8207b3 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -20,43 +20,37 @@ struct prod_cons_impl; template <> struct prod_cons_impl> { - std::atomic rd_; // read index - std::atomic wt_; // write index -#if __cplusplus >= 201703L template - constexpr static std::size_t elem_param = DataSize - sizeof(circ::elem_head); -#else /*__cplusplus < 201703L*/ - template - struct elem_param { - enum : std::size_t { - value = DataSize - sizeof(circ::elem_head) - }; + struct elem_t { + byte_t data_[DataSize] {}; }; -#endif/*__cplusplus < 201703L*/ + + alignas(circ::cache_line_size) std::atomic rd_; // read index + alignas(circ::cache_line_size) std::atomic wt_; // write index constexpr circ::u2_t cursor() const noexcept { return 0; } - template - bool push(E* /*elems*/, F&& f, EB* elem_start) { + template class E, std::size_t DataSize> + bool push(W* /*wrapper*/, F&& f, E* elems) { auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed)); if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) { return false; // full } - std::forward(f)(elem_start + cur_wt); + std::forward(f)(&(elems[cur_wt].data_)); wt_.fetch_add(1, std::memory_order_release); return true; } - template - bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) { + template class E, std::size_t DataSize> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } - std::forward(f)(elem_start + cur_rd); + std::forward(f)(&(elems[cur_rd].data_)); rd_.fetch_add(1, std::memory_order_release); return true; } @@ -66,16 +60,16 @@ template <> struct prod_cons_impl> : prod_cons_impl> { - template - bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) { - byte_t buff[sizeof(E)]; + template class E, std::size_t DataSize> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + byte_t buff[DataSize]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); if (circ::index_of(cur_rd) == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } - std::memcpy(buff, elem_start + circ::index_of(cur_rd), sizeof(buff)); + std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); return true; @@ -89,12 +83,23 @@ template <> struct prod_cons_impl> : prod_cons_impl> { - std::atomic ct_; // commit index + enum : std::uint64_t { + invalid_index = (std::numeric_limits::max)() + }; - template - bool push(E* /*elems*/, F&& f, EB* elem_start) { + template + struct elem_t { + byte_t data_[DataSize] {}; + alignas(circ::cache_line_size) std::atomic f_ct_ { invalid_index }; // commit flag + }; + + alignas(circ::cache_line_size) std::atomic ct_; // commit index + alignas(circ::cache_line_size) std::atomic barrier_; + + template class E, std::size_t DataSize> + bool push(W* /*wrapper*/, F&& f, E* elems) { circ::u2_t cur_ct, nxt_ct; - while (1) { + for (unsigned k = 0;;) { cur_ct = ct_.load(std::memory_order_relaxed); if (circ::index_of(nxt_ct = cur_ct + 1) == circ::index_of(rd_.load(std::memory_order_acquire))) { @@ -103,15 +108,28 @@ struct prod_cons_impl> if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_release)) { break; } - std::this_thread::yield(); + ipc::yield(k); } - std::forward(f)(elem_start + circ::index_of(cur_ct)); + auto* el = elems + circ::index_of(cur_ct); + std::forward(f)(&(el->data_)); + // set flag & try update wt + el->f_ct_.store(cur_ct, std::memory_order_release); while (1) { - auto exp_wt = cur_ct; - if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { - break; + barrier_.exchange(0, std::memory_order_acq_rel); + auto cac_ct = el->f_ct_.load(std::memory_order_acquire); + if (cur_ct != wt_.load(std::memory_order_acquire)) { + return true; } - std::this_thread::yield(); + if (cac_ct != cur_ct) { + return true; + } + if (!el->f_ct_.compare_exchange_strong(cac_ct, invalid_index, std::memory_order_relaxed)) { + return true; + } + wt_.store(nxt_ct, std::memory_order_release); + cur_ct = nxt_ct; + nxt_ct = cur_ct + 1; + el = elems + circ::index_of(cur_ct); } return true; } @@ -119,58 +137,53 @@ struct prod_cons_impl> template <> struct prod_cons_impl> { - std::atomic wt_; // write index -#if __cplusplus >= 201703L - template - constexpr static std::size_t elem_param = DataSize; -#else /*__cplusplus < 201703L*/ - template - struct elem_param { enum : std::size_t { value = DataSize }; }; -#endif/*__cplusplus < 201703L*/ + using rc_t = std::size_t; - /* - std::atomic may not have value_type. - See: https://stackoverflow.com/questions/53648614/what-happened-to-stdatomicxvalue-type - */ - using rc_t = decltype(circ::elem_head::rc_.load()); + template + struct elem_t { + byte_t data_[DataSize] {}; + alignas(circ::cache_line_size) std::atomic rc_ { 0 }; // read-counter + }; + + alignas(circ::cache_line_size) std::atomic wt_; // write index circ::u2_t cursor() const noexcept { return wt_.load(std::memory_order_acquire); } - template - bool push(E* elems, F&& f, EB* elem_start) { - auto conn_cnt = elems->conn_count(std::memory_order_relaxed); + template class E, std::size_t DataSize> + bool push(W* wrapper, F&& f, E* elems) { + auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); if (conn_cnt == 0) return false; - auto el = elem_start + circ::index_of(wt_.load(std::memory_order_acquire)); + auto el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); // check all consumers have finished reading this element while (1) { rc_t expected = 0; - if (el->head_.rc_.compare_exchange_weak( + if (el->rc_.compare_exchange_weak( expected, static_cast(conn_cnt), std::memory_order_release)) { break; } std::this_thread::yield(); - conn_cnt = elems->conn_count(); // acquire + conn_cnt = wrapper->conn_count(); // acquire if (conn_cnt == 0) return false; } - std::forward(f)(el->data_); + std::forward(f)(&(el->data_)); wt_.fetch_add(1, std::memory_order_release); return true; } - template - bool pop(E* /*elems*/, circ::u2_t& cur, F&& f, EB* elem_start) { + template class E, std::size_t DataSize> + bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E* elems) { if (cur == cursor()) return false; // acquire - auto el = elem_start + circ::index_of(cur++); - std::forward(f)(el->data_); + auto el = elems + circ::index_of(cur++); + std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { - rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire); + rc_t cur_rc = el->rc_.load(std::memory_order_acquire); if (cur_rc == 0) { return true; } - if (el->head_.rc_.compare_exchange_weak( + if (el->rc_.compare_exchange_weak( cur_rc, cur_rc - 1, std::memory_order_release)) { return true; } @@ -183,27 +196,27 @@ template <> struct prod_cons_impl> : prod_cons_impl> { - std::atomic ct_; // commit index + alignas(circ::cache_line_size) std::atomic ct_; // commit index - template - bool push(E* elems, F&& f, EB* elem_start) { - auto conn_cnt = elems->conn_count(std::memory_order_relaxed); + template class E, std::size_t DataSize> + bool push(W* wrapper, F&& f, E* elems) { + auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); if (conn_cnt == 0) return false; circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_acquire), nxt_ct = cur_ct + 1; - auto el = elem_start + circ::index_of(cur_ct); + auto el = elems + circ::index_of(cur_ct); // check all consumers have finished reading this element while (1) { rc_t expected = 0; - if (el->head_.rc_.compare_exchange_weak( + if (el->rc_.compare_exchange_weak( expected, static_cast(conn_cnt), std::memory_order_release)) { break; } std::this_thread::yield(); - conn_cnt = elems->conn_count(); // acquire + conn_cnt = wrapper->conn_count(); // acquire if (conn_cnt == 0) return false; } - std::forward(f)(el->data_); + std::forward(f)(&(el->data_)); while (1) { auto exp_wt = cur_ct; if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { diff --git a/test/test_circ.cpp b/test/test_circ.cpp index c831dc7..9f60ee5 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -213,7 +213,9 @@ struct test_cq> { } void send(cn_t* /*cn*/, msg_t const & msg) { - cn_t{ ca_ }.push(msg); + while (!cn_t{ ca_ }.push(msg)) { + std::this_thread::yield(); + } } };