From 5374eaa128a46df3db2958c8e80a4078c0328a16 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Tue, 26 Mar 2019 22:41:27 +0800 Subject: [PATCH] auto disconnect when timeout --- src/circ/elem_def.h | 4 ++-- src/prod_cons.h | 23 +++++++++++++---------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/src/circ/elem_def.h b/src/circ/elem_def.h index 8787f95..eccde14 100644 --- a/src/circ/elem_def.h +++ b/src/circ/elem_def.h @@ -46,11 +46,11 @@ public: conn_head& operator=(const conn_head&) = delete; std::size_t connect() noexcept { - return cc_.fetch_add(1, std::memory_order_release); + return cc_.fetch_add(1, std::memory_order_acq_rel); } std::size_t disconnect() noexcept { - return cc_.fetch_sub(1, std::memory_order_release); + return cc_.fetch_sub(1, std::memory_order_acq_rel); } std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { diff --git a/src/prod_cons.h b/src/prod_cons.h index 3bda83f..42f2680 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -190,13 +190,13 @@ struct prod_cons_impl> { template bool push(W* wrapper, F&& f, E* elems) { - auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); - if (conn_cnt == 0) return false; + auto cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) return false; // no reader auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); // check all consumers have finished reading this element rc_t expected = 0; if (!el->rc_.compare_exchange_strong( - expected, static_cast(conn_cnt), std::memory_order_acq_rel)) { + expected, static_cast(cc), std::memory_order_acq_rel)) { return false; // full } std::forward(f)(&(el->data_)); @@ -206,11 +206,13 @@ struct prod_cons_impl> { template bool force_push(W* wrapper, F&& f, E* elems) { - auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); - if (conn_cnt == 0) return false; + auto cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) return false; // no reader + cc = wrapper->disconnect() - 1; // disconnect a reader + if (cc == 0) return false; // no reader auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); // reset reading flag - el->rc_.store(static_cast(conn_cnt), std::memory_order_relaxed); + el->rc_.store(static_cast(cc), std::memory_order_relaxed); std::forward(f)(&(el->data_)); wt_.fetch_add(1, std::memory_order_release); return true; @@ -296,11 +298,12 @@ struct prod_cons_impl> { bool force_push(W* wrapper, F&& f, E* elems) { E* el; circ::u2_t cur_ct; + auto cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) return false; // no reader + wrapper->disconnect(); // disconnect a reader for (unsigned k = 0;;) { - auto cc = wrapper->conn_count(std::memory_order_relaxed); - if (cc == 0) { - return false; // no reader - } + cc = wrapper->conn_count(std::memory_order_relaxed); + if (cc == 0) return false; // no reader el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); auto cur_rc = el->rc_.load(std::memory_order_acquire); el->rc_.store(static_cast(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_relaxed);