对mmb来说,incr上限65535不够大,调整至16,777,215

This commit is contained in:
mutouyun 2021-01-03 14:03:16 +08:00
parent d3ec4714bd
commit 23d2007c5e

View File

@ -285,16 +285,17 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
}; };
template <> template <>
struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> { struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
using rc_t = std::uint64_t; using rc_t = std::uint64_t;
using flag_t = std::uint64_t; using flag_t = std::uint64_t;
enum : rc_t { enum : rc_t {
rc_mask = 0x00000000ffffffffull, rc_mask = 0x00000000ffffffffull,
ep_mask = 0x0000ffffffffffffull, ep_mask = 0x00ffffffffffffffull,
ep_incr = 0x0001000000000000ull, ep_incr = 0x0100000000000000ull,
ic_mask = 0xffff0000ffffffffull ic_mask = 0xff000000ffffffffull,
ic_incr = 0x0000000100000000ull
}; };
template <std::size_t DataSize, std::size_t AlignSize> template <std::size_t DataSize, std::size_t AlignSize>
@ -312,8 +313,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
} }
constexpr static rc_t inc_rc(rc_t rc) noexcept { constexpr static rc_t inc_rc(rc_t rc) noexcept {
auto ic = static_cast<std::uint16_t>((rc & ~ic_mask) >> 32); return (rc & ic_mask) | ((rc + ic_incr) & ~ic_mask);
return (rc & ic_mask) | (static_cast<rc_t>(ic + 1) << 32);
} }
constexpr static rc_t inc_mask(rc_t rc) noexcept { constexpr static rc_t inc_mask(rc_t rc) noexcept {
@ -324,6 +324,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
bool push(W* wrapper, F&& f, E* elems) { bool push(W* wrapper, F&& f, E* elems) {
E* el; E* el;
circ::u2_t cur_ct; circ::u2_t cur_ct;
rc_t epoch = epoch_.load(std::memory_order_acquire);
for (unsigned k = 0;;) { for (unsigned k = 0;;) {
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
if (cc == 0) return false; // no reader if (cc == 0) return false; // no reader
@ -331,7 +332,6 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
// check all consumers have finished reading this element // check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_relaxed); auto cur_rc = el->rc_.load(std::memory_order_relaxed);
circ::cc_t rem_cc = cur_rc & rc_mask; circ::cc_t rem_cc = cur_rc & rc_mask;
rc_t epoch = epoch_.load(std::memory_order_acquire);
if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch)) { if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch)) {
return false; // has not finished yet return false; // has not finished yet
} }
@ -343,7 +343,8 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
} }
// consider rem_cc to be 0 here // consider rem_cc to be 0 here
if (el->rc_.compare_exchange_weak( if (el->rc_.compare_exchange_weak(
cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_release)) { cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed) &&
epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) {
break; break;
} }
ipc::yield(k); ipc::yield(k);
@ -375,7 +376,8 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
} }
// just compare & exchange // just compare & exchange
if (el->rc_.compare_exchange_weak( if (el->rc_.compare_exchange_weak(
cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_release)) { cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed) &&
epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) {
break; break;
} }
ipc::yield(k); ipc::yield(k);