From 518550070deb364c634ad677a89ec468546a1c87 Mon Sep 17 00:00:00 2001 From: zhangyi Date: Tue, 30 Apr 2019 15:20:33 +0800 Subject: [PATCH] fix bugs in force-push --- src/circ/elem_def.h | 17 ++++++++ src/ipc.cpp | 4 +- src/platform/waiter_linux.h | 48 +++++++++++++---------- src/prod_cons.h | 78 +++++++++++++++++++++++++------------ src/queue.h | 4 ++ 5 files changed, 104 insertions(+), 47 deletions(-) diff --git a/src/circ/elem_def.h b/src/circ/elem_def.h index eccde14..ac432c8 100644 --- a/src/circ/elem_def.h +++ b/src/circ/elem_def.h @@ -29,6 +29,8 @@ class conn_head { ipc::spin_lock lc_; std::atomic constructed_; + std::atomic dis_flag_; + public: void init() { /* DCLP */ @@ -53,6 +55,21 @@ public: return cc_.fetch_sub(1, std::memory_order_acq_rel); } + void try_disconnect() noexcept { + if (!dis_flag_.load(std::memory_order_acquire)) { + cc_.fetch_sub(1, std::memory_order_relaxed); + dis_flag_.store(true, std::memory_order_release); + } + } + + void clear_dis_flag(std::memory_order order = std::memory_order_release) noexcept { + dis_flag_.store(false, order); + } + + bool dis_flag(std::memory_order order = std::memory_order_acquire) const noexcept { + return dis_flag_.load(order); + } + std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { return cc_.load(order); } diff --git a/src/ipc.cpp b/src/ipc.cpp index 0920f11..6536cff 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -95,6 +95,7 @@ struct conn_info_head { template bool wait_for(W& waiter, F&& pred, std::size_t tm) { + if (tm == 0) return !pred(); for (unsigned k = 0; pred();) { bool loop = true, ret = true; ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] { @@ -245,7 +246,8 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { return [info, que, msg_id](int remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push(que, msg_id, remain, data, size); - }, default_timeut)) { + }, que->dis_flag() ? 0 : static_cast(default_timeut))) { + ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); if (!que->force_push(que, msg_id, remain, data, size)) { return false; } diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index 8d01163..07b13dd 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -105,19 +105,22 @@ public: } bool wait(mutex& mtx, std::size_t tm = invalid_value) { - if (tm == invalid_value) { + switch (tm) { + case 0: + return true; + case invalid_value: IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native()); - } - else { - timespec ts; - calc_wait_time(ts, tm); - int eno; - if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) { - if (eno != ETIMEDOUT) { - ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - eno, tm, ts.tv_sec, ts.tv_nsec); + default: { + timespec ts; + calc_wait_time(ts, tm); + int eno; + if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) { + if (eno != ETIMEDOUT) { + ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + eno, tm, ts.tv_sec, ts.tv_nsec); + } + return false; } - return false; } return true; } @@ -176,18 +179,21 @@ public: static bool wait(handle_t h, std::size_t tm = invalid_value) { if (h == invalid()) return false; - if (tm == invalid_value) { + switch (tm) { + case 0: + return true; + case invalid_value: IPC_SEMAPHORE_FUNC_(sem_wait, h); - } - else { - timespec ts; - calc_wait_time(ts, tm); - if (::sem_timedwait(h, &ts) != 0) { - if (errno != ETIMEDOUT) { - ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - errno, tm, ts.tv_sec, ts.tv_nsec); + default: { + timespec ts; + calc_wait_time(ts, tm); + if (::sem_timedwait(h, &ts) != 0) { + if (errno != ETIMEDOUT) { + ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + errno, tm, ts.tv_sec, ts.tv_nsec); + } + return false; } - return false; } return true; } diff --git a/src/prod_cons.h b/src/prod_cons.h index 42f2680..2f44e98 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -190,14 +190,23 @@ struct prod_cons_impl> { template bool push(W* wrapper, F&& f, E* elems) { - auto cc = wrapper->conn_count(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); - // check all consumers have finished reading this element - rc_t expected = 0; - if (!el->rc_.compare_exchange_strong( - expected, static_cast(cc), std::memory_order_acq_rel)) { - return false; // full + E* el; + for (unsigned k = 0;;) { + auto cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) return false; // no reader + el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); + // check all consumers have finished reading this element + auto cur_rc = el->rc_.load(std::memory_order_acquire); + if (cur_rc) { + return false; // full + } + // cur_rc should be 0 here + if (el->rc_.compare_exchange_weak( + cur_rc, static_cast(cc), std::memory_order_release)) { + wrapper->clear_dis_flag(std::memory_order_relaxed); + break; + } + ipc::yield(k); } std::forward(f)(&(el->data_)); wt_.fetch_add(1, std::memory_order_release); @@ -206,13 +215,25 @@ struct prod_cons_impl> { template bool force_push(W* wrapper, F&& f, E* elems) { - auto cc = wrapper->conn_count(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - cc = wrapper->disconnect() - 1; // disconnect a reader - if (cc == 0) return false; // no reader - auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); - // reset reading flag - el->rc_.store(static_cast(cc), std::memory_order_relaxed); + E* el; + for (unsigned k = 0;;) { + auto cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) return false; // no reader + el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); + // check all consumers have finished reading this element + auto cur_rc = el->rc_.load(std::memory_order_acquire); + if (cur_rc) { + wrapper->try_disconnect(); // try disconnect a reader + cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) return false; // no reader + } + // just compare & exchange + if (el->rc_.compare_exchange_weak( + cur_rc, static_cast(cc), std::memory_order_release)) { + break; + } + ipc::yield(k); + } std::forward(f)(&(el->data_)); wt_.fetch_add(1, std::memory_order_release); return true; @@ -267,10 +288,9 @@ struct prod_cons_impl> { circ::u2_t cur_ct; for (unsigned k = 0;;) { auto cc = wrapper->conn_count(std::memory_order_relaxed); - if (cc == 0) { - return false; // no reader - } + if (cc == 0) return false; // no reader el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); + // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); if (cur_rc & rc_mask) { return false; // full @@ -279,9 +299,10 @@ struct prod_cons_impl> { if ((cur_fl != cur_ct) && cur_fl) { return false; // full } - // (cur_rc & rc_mask) should == 0 here + // (cur_rc & rc_mask) should be 0 here if (el->rc_.compare_exchange_weak( cur_rc, static_cast(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) { + wrapper->clear_dis_flag(std::memory_order_relaxed); break; } ipc::yield(k); @@ -298,20 +319,27 @@ struct prod_cons_impl> { bool force_push(W* wrapper, F&& f, E* elems) { E* el; circ::u2_t cur_ct; - auto cc = wrapper->conn_count(std::memory_order_relaxed); - if (cc == 0) return false; // no reader - wrapper->disconnect(); // disconnect a reader for (unsigned k = 0;;) { - cc = wrapper->conn_count(std::memory_order_relaxed); + auto cc = wrapper->conn_count(std::memory_order_relaxed); if (cc == 0) return false; // no reader el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); + // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); - el->rc_.store(static_cast(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_relaxed); - if (ct_.compare_exchange_weak(cur_ct, cur_ct + 1, std::memory_order_release)) { + ipc::log("force_push: k = %d, cc = %zd, rc = %zd\n", k, cc, (cur_rc & rc_mask)); + if (cur_rc & rc_mask) { + wrapper->try_disconnect(); // try disconnect a reader + cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) return false; // no reader + } + // just compare & exchange + if (el->rc_.compare_exchange_weak( + cur_rc, static_cast(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) { break; } ipc::yield(k); } + // only one thread/process would touch here at one time + ct_.store(cur_ct + 1, std::memory_order_release); std::forward(f)(&(el->data_)); // set flag & try update wt el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); diff --git a/src/queue.h b/src/queue.h index aa35fd2..e424958 100644 --- a/src/queue.h +++ b/src/queue.h @@ -125,6 +125,10 @@ public: return base_t::disconnect(elems_); } + bool dis_flag() { + return elems_->dis_flag(); + } + std::size_t conn_count() const noexcept { return (elems_ == nullptr) ? invalid_value : elems_->conn_count(); }