diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index c573aca..735abbc 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -285,16 +285,17 @@ struct prod_cons_impl> { }; template <> -struct prod_cons_impl> { +struct prod_cons_impl> { using rc_t = std::uint64_t; using flag_t = std::uint64_t; enum : rc_t { rc_mask = 0x00000000ffffffffull, - ep_mask = 0x0000ffffffffffffull, - ep_incr = 0x0001000000000000ull, - ic_mask = 0xffff0000ffffffffull + ep_mask = 0x00ffffffffffffffull, + ep_incr = 0x0100000000000000ull, + ic_mask = 0xff000000ffffffffull, + ic_incr = 0x0000000100000000ull }; template @@ -312,8 +313,7 @@ struct prod_cons_impl> { } constexpr static rc_t inc_rc(rc_t rc) noexcept { - auto ic = static_cast((rc & ~ic_mask) >> 32); - return (rc & ic_mask) | (static_cast(ic + 1) << 32); + return (rc & ic_mask) | ((rc + ic_incr) & ~ic_mask); } constexpr static rc_t inc_mask(rc_t rc) noexcept { @@ -324,6 +324,7 @@ struct prod_cons_impl> { bool push(W* wrapper, F&& f, E* elems) { E* el; circ::u2_t cur_ct; + rc_t epoch = epoch_.load(std::memory_order_acquire); for (unsigned k = 0;;) { circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); if (cc == 0) return false; // no reader @@ -331,7 +332,6 @@ struct prod_cons_impl> { // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_relaxed); circ::cc_t rem_cc = cur_rc & rc_mask; - rc_t epoch = epoch_.load(std::memory_order_acquire); if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch)) { return false; // has not finished yet } @@ -343,7 +343,8 @@ struct prod_cons_impl> { } // consider rem_cc to be 0 here if (el->rc_.compare_exchange_weak( - cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_release)) { + cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed) && + epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) { break; } ipc::yield(k); @@ -375,7 +376,8 @@ struct prod_cons_impl> { } // just compare & exchange if (el->rc_.compare_exchange_weak( - cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_release)) { + cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast(cc), std::memory_order_relaxed) && + epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) { break; } ipc::yield(k);