diff --git a/src/prod_cons.h b/src/prod_cons.h index f8207b3..bdd8966 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -83,18 +83,15 @@ template <> struct prod_cons_impl> : prod_cons_impl> { - enum : std::uint64_t { - invalid_index = (std::numeric_limits::max)() - }; + using flag_t = std::uint64_t; template struct elem_t { byte_t data_[DataSize] {}; - alignas(circ::cache_line_size) std::atomic f_ct_ { invalid_index }; // commit flag + std::atomic f_ct_ { 0 }; // commit flag }; alignas(circ::cache_line_size) std::atomic ct_; // commit index - alignas(circ::cache_line_size) std::atomic barrier_; template class E, std::size_t DataSize> bool push(W* /*wrapper*/, F&& f, E* elems) { @@ -113,17 +110,16 @@ struct prod_cons_impl> auto* el = elems + circ::index_of(cur_ct); std::forward(f)(&(el->data_)); // set flag & try update wt - el->f_ct_.store(cur_ct, std::memory_order_release); + el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); while (1) { - barrier_.exchange(0, std::memory_order_acq_rel); auto cac_ct = el->f_ct_.load(std::memory_order_acquire); if (cur_ct != wt_.load(std::memory_order_acquire)) { return true; } - if (cac_ct != cur_ct) { + if ((~cac_ct) != cur_ct) { return true; } - if (!el->f_ct_.compare_exchange_strong(cac_ct, invalid_index, std::memory_order_relaxed)) { + if (!el->f_ct_.compare_exchange_strong(cac_ct, 0, std::memory_order_relaxed)) { return true; } wt_.store(nxt_ct, std::memory_order_release); @@ -133,6 +129,36 @@ struct prod_cons_impl> } return true; } + + template class E, std::size_t DataSize> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + byte_t buff[DataSize]; + for (unsigned k = 0;;) { + auto cur_rd = rd_.load(std::memory_order_relaxed); + auto cur_wt = wt_.load(std::memory_order_acquire); + auto id_rd = circ::index_of(cur_rd); + auto id_wt = circ::index_of(cur_wt); + if (id_rd == id_wt) { + auto* el = elems + id_wt; + auto cac_ct = el->f_ct_.load(std::memory_order_acquire); + if ((~cac_ct) != cur_wt) { + return false; // empty + } + if (el->f_ct_.compare_exchange_weak(cac_ct, 0, std::memory_order_relaxed)) { + wt_.store(cur_wt + 1, std::memory_order_release); + } + k = 0; + } + else { + std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); + if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { + std::forward(f)(buff); + return true; + } + ipc::yield(k); + } + } + } }; template <> @@ -143,7 +169,7 @@ struct prod_cons_impl> { template struct elem_t { byte_t data_[DataSize] {}; - alignas(circ::cache_line_size) std::atomic rc_ { 0 }; // read-counter + std::atomic rc_ { 0 }; // read-counter }; alignas(circ::cache_line_size) std::atomic wt_; // write index