auto disconnect when timeout

This commit is contained in:
mutouyun 2019-03-26 22:41:27 +08:00
parent 6cc2913f6b
commit 5374eaa128
2 changed files with 15 additions and 12 deletions

View File

@ -46,11 +46,11 @@ public:
conn_head& operator=(const conn_head&) = delete; conn_head& operator=(const conn_head&) = delete;
std::size_t connect() noexcept { std::size_t connect() noexcept {
return cc_.fetch_add(1, std::memory_order_release); return cc_.fetch_add(1, std::memory_order_acq_rel);
} }
std::size_t disconnect() noexcept { std::size_t disconnect() noexcept {
return cc_.fetch_sub(1, std::memory_order_release); return cc_.fetch_sub(1, std::memory_order_acq_rel);
} }
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {

View File

@ -190,13 +190,13 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
template <typename W, typename F, typename E> template <typename W, typename F, typename E>
bool push(W* wrapper, F&& f, E* elems) { bool push(W* wrapper, F&& f, E* elems) {
auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (conn_cnt == 0) return false; if (cc == 0) return false; // no reader
auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element // check all consumers have finished reading this element
rc_t expected = 0; rc_t expected = 0;
if (!el->rc_.compare_exchange_strong( if (!el->rc_.compare_exchange_strong(
expected, static_cast<rc_t>(conn_cnt), std::memory_order_acq_rel)) { expected, static_cast<rc_t>(cc), std::memory_order_acq_rel)) {
return false; // full return false; // full
} }
std::forward<F>(f)(&(el->data_)); std::forward<F>(f)(&(el->data_));
@ -206,11 +206,13 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
template <typename W, typename F, typename E> template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&& f, E* elems) { bool force_push(W* wrapper, F&& f, E* elems) {
auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed); auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (conn_cnt == 0) return false; if (cc == 0) return false; // no reader
cc = wrapper->disconnect() - 1; // disconnect a reader
if (cc == 0) return false; // no reader
auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// reset reading flag // reset reading flag
el->rc_.store(static_cast<rc_t>(conn_cnt), std::memory_order_relaxed); el->rc_.store(static_cast<rc_t>(cc), std::memory_order_relaxed);
std::forward<F>(f)(&(el->data_)); std::forward<F>(f)(&(el->data_));
wt_.fetch_add(1, std::memory_order_release); wt_.fetch_add(1, std::memory_order_release);
return true; return true;
@ -296,11 +298,12 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
bool force_push(W* wrapper, F&& f, E* elems) { bool force_push(W* wrapper, F&& f, E* elems) {
E* el; E* el;
circ::u2_t cur_ct; circ::u2_t cur_ct;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed); auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) { if (cc == 0) return false; // no reader
return false; // no reader wrapper->disconnect(); // disconnect a reader
} for (unsigned k = 0;;) {
cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
auto cur_rc = el->rc_.load(std::memory_order_acquire); auto cur_rc = el->rc_.load(std::memory_order_acquire);
el->rc_.store(static_cast<rc_t>(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_relaxed); el->rc_.store(static_cast<rc_t>(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_relaxed);