update src/prod_cons.h

This commit is contained in:
zhangyi 2020-03-28 15:00:20 +08:00
parent 91cc1b7767
commit 9b2bd3787a

View File

@ -197,8 +197,8 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element // check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire); auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc) { if (cc & cur_rc) {
return false; // not reading finished yet return false; // has not finished yet
} }
// cur_rc should be 0 here // cur_rc should be 0 here
if (el->rc_.compare_exchange_weak( if (el->rc_.compare_exchange_weak(
@ -221,7 +221,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element // check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire); auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc) { if (cc & cur_rc) {
ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, cur_rc); ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, cur_rc);
cc = wrapper->elems()->disconnect(cur_rc); // disconnect all remained readers cc = wrapper->elems()->disconnect(cur_rc); // disconnect all remained readers
if (cc == 0) return false; // no reader if (cc == 0) return false; // no reader
@ -292,8 +292,8 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element // check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire); auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc & rc_mask) { if (cc & (cur_rc & rc_mask)) {
return false; // not reading finished yet return false; // has not finished yet
} }
auto cur_fl = el->f_ct_.load(std::memory_order_acquire); auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
if ((cur_fl != cur_ct) && cur_fl) { if ((cur_fl != cur_ct) && cur_fl) {
@ -325,7 +325,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
// check all consumers have finished reading this element // check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire); auto cur_rc = el->rc_.load(std::memory_order_acquire);
circ::cc_t rem_cc = cur_rc & rc_mask; circ::cc_t rem_cc = cur_rc & rc_mask;
if (rem_cc) { if (cc & rem_cc) {
ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, rem_cc); ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, rem_cc);
cc = wrapper->elems()->disconnect(rem_cc); // disconnect all remained readers cc = wrapper->elems()->disconnect(rem_cc); // disconnect all remained readers
if (cc == 0) return false; // no reader if (cc == 0) return false; // no reader