diff --git a/src/prod_cons.h b/src/prod_cons.h index 56a3992..5e075a9 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -197,8 +197,8 @@ struct prod_cons_impl> { el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); - if (cur_rc) { - return false; // not reading finished yet + if (cc & cur_rc) { + return false; // has not finished yet } // cur_rc should be 0 here if (el->rc_.compare_exchange_weak( @@ -221,7 +221,7 @@ struct prod_cons_impl> { el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); - if (cur_rc) { + if (cc & cur_rc) { ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, cur_rc); cc = wrapper->elems()->disconnect(cur_rc); // disconnect all remained readers if (cc == 0) return false; // no reader @@ -292,8 +292,8 @@ struct prod_cons_impl> { el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); - if (cur_rc & rc_mask) { - return false; // not reading finished yet + if (cc & (cur_rc & rc_mask)) { + return false; // has not finished yet } auto cur_fl = el->f_ct_.load(std::memory_order_acquire); if ((cur_fl != cur_ct) && cur_fl) { @@ -325,7 +325,7 @@ struct prod_cons_impl> { // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); circ::cc_t rem_cc = cur_rc & rc_mask; - if (rem_cc) { + if (cc & rem_cc) { ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, rem_cc); cc = wrapper->elems()->disconnect(rem_cc); // disconnect all remained readers if (cc == 0) return false; // no reader