mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
push shouldn't return false after force_push has been called
This commit is contained in:
parent
9b2bd3787a
commit
6850b47e3a
@ -45,11 +45,11 @@ public:
|
|||||||
|
|
||||||
cc_t connect() noexcept {
|
cc_t connect() noexcept {
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
cc_t cur = cc_.load(std::memory_order_acquire);
|
cc_t curr = cc_.load(std::memory_order_acquire);
|
||||||
cc_t next = cur | (cur + 1); // find the first 0, and set it to 1.
|
cc_t next = curr | (curr + 1); // find the first 0, and set it to 1.
|
||||||
if (next == 0) return 0;
|
if (next == 0) return 0;
|
||||||
if (cc_.compare_exchange_weak(cur, next, std::memory_order_release)) {
|
if (cc_.compare_exchange_weak(curr, next, std::memory_order_release)) {
|
||||||
return next ^ cur; // return connected id
|
return next ^ curr; // return connected id
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -222,7 +222,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
|||||||
// check all consumers have finished reading this element
|
// check all consumers have finished reading this element
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
||||||
if (cc & cur_rc) {
|
if (cc & cur_rc) {
|
||||||
ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, cur_rc);
|
ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, cur_rc);
|
||||||
cc = wrapper->elems()->disconnect(cur_rc); // disconnect all remained readers
|
cc = wrapper->elems()->disconnect(cur_rc); // disconnect all remained readers
|
||||||
if (cc == 0) return false; // no reader
|
if (cc == 0) return false; // no reader
|
||||||
}
|
}
|
||||||
@ -292,13 +292,16 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
|
|||||||
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
|
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
|
||||||
// check all consumers have finished reading this element
|
// check all consumers have finished reading this element
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
||||||
if (cc & (cur_rc & rc_mask)) {
|
circ::cc_t rem_cc = cur_rc & rc_mask;
|
||||||
|
if (cc & rem_cc) {
|
||||||
return false; // has not finished yet
|
return false; // has not finished yet
|
||||||
}
|
}
|
||||||
|
else if (!rem_cc) {
|
||||||
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
||||||
if ((cur_fl != cur_ct) && cur_fl) {
|
if ((cur_fl != cur_ct) && cur_fl) {
|
||||||
return false; // full
|
return false; // full
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// (cur_rc & rc_mask) should be 0 here
|
// (cur_rc & rc_mask) should be 0 here
|
||||||
if (el->rc_.compare_exchange_weak(
|
if (el->rc_.compare_exchange_weak(
|
||||||
cur_rc, ((cur_rc + rc_incr) & ~rc_mask) | static_cast<rc_t>(cc), std::memory_order_release)) {
|
cur_rc, ((cur_rc + rc_incr) & ~rc_mask) | static_cast<rc_t>(cc), std::memory_order_release)) {
|
||||||
@ -326,7 +329,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
|
|||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
||||||
circ::cc_t rem_cc = cur_rc & rc_mask;
|
circ::cc_t rem_cc = cur_rc & rc_mask;
|
||||||
if (cc & rem_cc) {
|
if (cc & rem_cc) {
|
||||||
ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, rem_cc);
|
ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc);
|
||||||
cc = wrapper->elems()->disconnect(rem_cc); // disconnect all remained readers
|
cc = wrapper->elems()->disconnect(rem_cc); // disconnect all remained readers
|
||||||
if (cc == 0) return false; // no reader
|
if (cc == 0) return false; // no reader
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user