fix bugs in force-push

This commit is contained in:
zhangyi 2019-04-30 15:20:33 +08:00
parent fe3fc07829
commit 518550070d
5 changed files with 104 additions and 47 deletions

View File

@ -29,6 +29,8 @@ class conn_head {
ipc::spin_lock lc_;
std::atomic<bool> constructed_;
std::atomic<bool> dis_flag_;
public:
void init() {
/* DCLP */
@ -53,6 +55,21 @@ public:
return cc_.fetch_sub(1, std::memory_order_acq_rel);
}
void try_disconnect() noexcept {
if (!dis_flag_.load(std::memory_order_acquire)) {
cc_.fetch_sub(1, std::memory_order_relaxed);
dis_flag_.store(true, std::memory_order_release);
}
}
void clear_dis_flag(std::memory_order order = std::memory_order_release) noexcept {
dis_flag_.store(false, order);
}
bool dis_flag(std::memory_order order = std::memory_order_acquire) const noexcept {
return dis_flag_.load(order);
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
return cc_.load(order);
}

View File

@ -95,6 +95,7 @@ struct conn_info_head {
template <typename W, typename F>
bool wait_for(W& waiter, F&& pred, std::size_t tm) {
if (tm == 0) return !pred();
for (unsigned k = 0; pred();) {
bool loop = true, ret = true;
ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] {
@ -245,7 +246,8 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
return [info, que, msg_id](int remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] {
return !que->push(que, msg_id, remain, data, size);
}, default_timeut)) {
}, que->dis_flag() ? 0 : static_cast<std::size_t>(default_timeut))) {
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
if (!que->force_push(que, msg_id, remain, data, size)) {
return false;
}

View File

@ -105,19 +105,22 @@ public:
}
bool wait(mutex& mtx, std::size_t tm = invalid_value) {
if (tm == invalid_value) {
switch (tm) {
case 0:
return true;
case invalid_value:
IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native());
}
else {
timespec ts;
calc_wait_time(ts, tm);
int eno;
if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) {
if (eno != ETIMEDOUT) {
ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
eno, tm, ts.tv_sec, ts.tv_nsec);
default: {
timespec ts;
calc_wait_time(ts, tm);
int eno;
if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) {
if (eno != ETIMEDOUT) {
ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
eno, tm, ts.tv_sec, ts.tv_nsec);
}
return false;
}
return false;
}
return true;
}
@ -176,18 +179,21 @@ public:
static bool wait(handle_t h, std::size_t tm = invalid_value) {
if (h == invalid()) return false;
if (tm == invalid_value) {
switch (tm) {
case 0:
return true;
case invalid_value:
IPC_SEMAPHORE_FUNC_(sem_wait, h);
}
else {
timespec ts;
calc_wait_time(ts, tm);
if (::sem_timedwait(h, &ts) != 0) {
if (errno != ETIMEDOUT) {
ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
errno, tm, ts.tv_sec, ts.tv_nsec);
default: {
timespec ts;
calc_wait_time(ts, tm);
if (::sem_timedwait(h, &ts) != 0) {
if (errno != ETIMEDOUT) {
ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
errno, tm, ts.tv_sec, ts.tv_nsec);
}
return false;
}
return false;
}
return true;
}

View File

@ -190,14 +190,23 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
template <typename W, typename F, typename E>
bool push(W* wrapper, F&& f, E* elems) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element
rc_t expected = 0;
if (!el->rc_.compare_exchange_strong(
expected, static_cast<rc_t>(cc), std::memory_order_acq_rel)) {
return false; // full
E* el;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc) {
return false; // full
}
// cur_rc should be 0 here
if (el->rc_.compare_exchange_weak(
cur_rc, static_cast<rc_t>(cc), std::memory_order_release)) {
wrapper->clear_dis_flag(std::memory_order_relaxed);
break;
}
ipc::yield(k);
}
std::forward<F>(f)(&(el->data_));
wt_.fetch_add(1, std::memory_order_release);
@ -206,13 +215,25 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&& f, E* elems) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
cc = wrapper->disconnect() - 1; // disconnect a reader
if (cc == 0) return false; // no reader
auto* el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// reset reading flag
el->rc_.store(static_cast<rc_t>(cc), std::memory_order_relaxed);
E* el;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc) {
wrapper->try_disconnect(); // try disconnect a reader
cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
}
// just compare & exchange
if (el->rc_.compare_exchange_weak(
cur_rc, static_cast<rc_t>(cc), std::memory_order_release)) {
break;
}
ipc::yield(k);
}
std::forward<F>(f)(&(el->data_));
wt_.fetch_add(1, std::memory_order_release);
return true;
@ -267,10 +288,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
circ::u2_t cur_ct;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) {
return false; // no reader
}
if (cc == 0) return false; // no reader
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc & rc_mask) {
return false; // full
@ -279,9 +299,10 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
if ((cur_fl != cur_ct) && cur_fl) {
return false; // full
}
// (cur_rc & rc_mask) should == 0 here
// (cur_rc & rc_mask) should be 0 here
if (el->rc_.compare_exchange_weak(
cur_rc, static_cast<rc_t>(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) {
wrapper->clear_dis_flag(std::memory_order_relaxed);
break;
}
ipc::yield(k);
@ -298,20 +319,27 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
bool force_push(W* wrapper, F&& f, E* elems) {
E* el;
circ::u2_t cur_ct;
auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
wrapper->disconnect(); // disconnect a reader
for (unsigned k = 0;;) {
cc = wrapper->conn_count(std::memory_order_relaxed);
auto cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
el->rc_.store(static_cast<rc_t>(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_relaxed);
if (ct_.compare_exchange_weak(cur_ct, cur_ct + 1, std::memory_order_release)) {
ipc::log("force_push: k = %d, cc = %zd, rc = %zd\n", k, cc, (cur_rc & rc_mask));
if (cur_rc & rc_mask) {
wrapper->try_disconnect(); // try disconnect a reader
cc = wrapper->conn_count(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
}
// just compare & exchange
if (el->rc_.compare_exchange_weak(
cur_rc, static_cast<rc_t>(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) {
break;
}
ipc::yield(k);
}
// only one thread/process would touch here at one time
ct_.store(cur_ct + 1, std::memory_order_release);
std::forward<F>(f)(&(el->data_));
// set flag & try update wt
el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);

View File

@ -125,6 +125,10 @@ public:
return base_t::disconnect(elems_);
}
bool dis_flag() {
return elems_->dis_flag();
}
std::size_t conn_count() const noexcept {
return (elems_ == nullptr) ? invalid_value : elems_->conn_count();
}