fix bugs of force_push; set PTHREAD_MUTEX_ROBUST to mutex in linux

This commit is contained in:
zhangyi 2020-03-17 20:58:54 +08:00
parent b9cc885568
commit 3eeeec88a0
8 changed files with 123 additions and 99 deletions

View File

@ -23,10 +23,10 @@ int main() {
std::string buf, id = id__ + std::to_string(calc_unique_id());
std::regex reg { "(c\\d+)> (.*)" };
ipc::channel cc { name__ };
ipc::channel cc { name__, ipc::sender };
std::thread r {[&id, &reg] {
ipc::channel cc { name__ };
ipc::channel cc { name__, ipc::receiver };
std::cout << id << " is ready." << std::endl;
while (1) {
auto buf = cc.recv();
@ -46,8 +46,9 @@ int main() {
}
}};
while (1) {
for (/*int i = 1*/;; /*++i*/) {
std::cin >> buf;
// std::cout << "[" << i << "]" << std::endl;
cc.send(id + "> " + buf);
if (buf == quit__) break;
}

View File

@ -39,20 +39,20 @@ public:
return head_.cursor();
}
template <typename F>
bool push(F&& f) {
return head_.push(this, std::forward<F>(f), block_);
template <typename Q, typename F>
bool push(Q* que, F&& f) {
return head_.push(que, std::forward<F>(f), block_);
}
template <typename F>
bool force_push(F&& f) {
return head_.force_push(this, std::forward<F>(f), block_);
template <typename Q, typename F>
bool force_push(Q* que, F&& f) {
return head_.force_push(que, std::forward<F>(f), block_);
}
template <typename F>
bool pop(cursor_t* cur, F&& f) {
template <typename Q, typename F>
bool pop(Q* que, cursor_t* cur, F&& f) {
if (cur == nullptr) return false;
return head_.pop(this, *cur, std::forward<F>(f), block_);
return head_.pop(que, *cur, std::forward<F>(f), block_);
}
};

View File

@ -15,18 +15,18 @@ namespace circ {
using u1_t = ipc::uint_t<8>;
using u2_t = ipc::uint_t<32>;
/** only supports max 32 connections */
using cc_t = u2_t;
constexpr u1_t index_of(u2_t c) noexcept {
return static_cast<u1_t>(c);
}
class conn_head {
std::atomic<u2_t> cc_ { 0 }; // connection counter
std::atomic<cc_t> cc_; // connections
ipc::spin_lock lc_;
std::atomic<bool> constructed_;
std::atomic<bool> dis_flag_;
public:
void init() {
/* DCLP */
@ -43,31 +43,31 @@ public:
conn_head(const conn_head&) = delete;
conn_head& operator=(const conn_head&) = delete;
std::size_t connect() noexcept {
return cc_.fetch_add(1, std::memory_order_acq_rel);
cc_t connect() noexcept {
for (unsigned k = 0;;) {
cc_t cur = cc_.load(std::memory_order_acquire);
cc_t next = cur | (cur + 1); // find the first 0, and set it to 1.
if (next == 0) return 0;
if (cc_.compare_exchange_weak(cur, next, std::memory_order_release)) {
return next ^ cur; // return connected id
}
std::size_t disconnect() noexcept {
return cc_.fetch_sub(1, std::memory_order_acq_rel);
}
void try_disconnect() noexcept {
if (!dis_flag_.load(std::memory_order_acquire)) {
cc_.fetch_sub(1, std::memory_order_relaxed);
dis_flag_.store(true, std::memory_order_release);
ipc::yield(k);
}
}
void clear_dis_flag(std::memory_order order = std::memory_order_release) noexcept {
dis_flag_.store(false, order);
cc_t disconnect(cc_t cc_id) noexcept {
return cc_.fetch_and(~cc_id) & ~cc_id;
}
bool dis_flag(std::memory_order order = std::memory_order_acquire) const noexcept {
return dis_flag_.load(order);
cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept {
return cc_.load(order);
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
return cc_.load(order);
cc_t cur = cc_.load(order);
cc_t cnt; // accumulates the total bits set in cc
for (cnt = 0; cur; ++cnt) cur &= cur - 1;
return cnt;
}
};

View File

@ -437,7 +437,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
return [info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
if (!wait_for(info->wt_waiter_, [&] {
return !que->push(info->cc_id_, msg_id, remain, data, size);
}, que->dis_flag() ? 0 : static_cast<std::size_t>(default_timeout))) {
}, default_timeout)) {
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
if (!que->force_push([](void* p) {
auto tmp_msg = static_cast<typename queue_t::value_t*>(p);

View File

@ -57,6 +57,10 @@ public:
ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno);
return false;
}
if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) {
ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno);
return false;
}
if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) {
ipc::error("fail pthread_mutex_init[%d]\n", eno);
return false;
@ -69,7 +73,27 @@ public:
}
bool lock() {
IPC_PTHREAD_FUNC_(pthread_mutex_lock, &mutex_);
for (;;) {
int eno = ::pthread_mutex_lock(&mutex_);
switch (eno) {
case 0:
return true;
case EOWNERDEAD:
if (::pthread_mutex_consistent(&mutex_) == 0) {
::pthread_mutex_unlock(&mutex_);
break;
}
IPC_FALLTHROUGH_;
case ENOTRECOVERABLE:
if (close() && open()) {
break;
}
IPC_FALLTHROUGH_;
default:
ipc::error("fail pthread_mutex_lock[%d]\n", eno);
return false;
}
}
}
bool unlock() {

View File

@ -192,18 +192,17 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
bool push(W* wrapper, F&& f, E* elems) {
E* el;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc) {
return false; // full
return false; // not reading finished yet
}
// cur_rc should be 0 here
if (el->rc_.compare_exchange_weak(
cur_rc, static_cast<rc_t>(cc), std::memory_order_release)) {
wrapper->clear_dis_flag(std::memory_order_relaxed);
break;
}
ipc::yield(k);
@ -217,14 +216,14 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
bool force_push(W* wrapper, F&& f, E* elems) {
E* el;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc) {
wrapper->try_disconnect(); // try disconnect a reader
cc = wrapper->conn_count(std::memory_order_relaxed);
ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, cur_rc);
cc = wrapper->elems()->disconnect(cur_rc); // disconnect all remained readers
if (cc == 0) return false; // no reader
}
// just compare & exchange
@ -240,7 +239,7 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
}
template <typename W, typename F, typename E>
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E* elems) {
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) {
if (cur == cursor()) return false; // acquire
auto* el = elems + circ::index_of(cur++);
std::forward<F>(f)(&(el->data_));
@ -249,8 +248,9 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
if (cur_rc == 0) {
return true;
}
if (el->rc_.compare_exchange_weak(
cur_rc, cur_rc - 1, std::memory_order_release)) {
if (el->rc_.compare_exchange_weak(cur_rc,
cur_rc & ~static_cast<rc_t>(wrapper->connected_id()),
std::memory_order_release)) {
return true;
}
ipc::yield(k);
@ -287,13 +287,13 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
E* el;
circ::u2_t cur_ct;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc & rc_mask) {
return false; // full
return false; // not reading finished yet
}
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
if ((cur_fl != cur_ct) && cur_fl) {
@ -301,8 +301,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
}
// (cur_rc & rc_mask) should be 0 here
if (el->rc_.compare_exchange_weak(
cur_rc, static_cast<rc_t>(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) {
wrapper->clear_dis_flag(std::memory_order_relaxed);
cur_rc, ((cur_rc + rc_incr) & ~rc_mask) | static_cast<rc_t>(cc), std::memory_order_release)) {
break;
}
ipc::yield(k);
@ -320,20 +319,20 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
E* el;
circ::u2_t cur_ct;
for (unsigned k = 0;;) {
auto cc = wrapper->conn_count(std::memory_order_relaxed);
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
ipc::log("force_push: k = %d, cc = %zd, rc = %zd\n", k, cc, (cur_rc & rc_mask));
if (cur_rc & rc_mask) {
wrapper->try_disconnect(); // try disconnect a reader
cc = wrapper->conn_count(std::memory_order_relaxed);
circ::cc_t rem_cc = cur_rc & rc_mask;
if (rem_cc) {
ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, rem_cc);
cc = wrapper->elems()->disconnect(rem_cc); // disconnect all remained readers
if (cc == 0) return false; // no reader
}
// just compare & exchange
if (el->rc_.compare_exchange_weak(
cur_rc, static_cast<rc_t>(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) {
cur_rc, ((cur_rc + rc_incr) & ~rc_mask) | static_cast<rc_t>(cc), std::memory_order_release)) {
break;
}
ipc::yield(k);
@ -347,7 +346,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
}
template <typename W, typename F, typename E, std::size_t N>
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E(& elems)[N]) {
bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) {
auto* el = elems + circ::index_of(cur);
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
if (cur_fl != ~static_cast<flag_t>(cur)) {
@ -357,19 +356,18 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
std::forward<F>(f)(&(el->data_));
for (unsigned k = 0;;) {
auto cur_rc = el->rc_.load(std::memory_order_acquire);
switch (cur_rc & rc_mask) {
case 0:
circ::cc_t rem_cc = cur_rc & rc_mask;
if (rem_cc == 0) {
el->f_ct_.store(cur + N - 1, std::memory_order_release);
return true;
case 1:
el->f_ct_.store(cur + N - 1, std::memory_order_release);
IPC_FALLTHROUGH_;
default:
if (el->rc_.compare_exchange_weak(
cur_rc, cur_rc + rc_incr - 1, std::memory_order_release)) {
return true;
}
break;
if ((rem_cc & ~wrapper->connected_id()) == 0) {
el->f_ct_.store(cur + N - 1, std::memory_order_release);
}
if (el->rc_.compare_exchange_weak(cur_rc,
(cur_rc + rc_incr) & ~static_cast<rc_t>(wrapper->connected_id()),
std::memory_order_release)) {
return true;
}
ipc::yield(k);
}

View File

@ -16,13 +16,14 @@
#include "rw_lock.h"
#include "platform/detail.h"
#include "circ/elem_def.h"
namespace ipc {
namespace detail {
class queue_conn {
protected:
bool connected_ = false;
circ::cc_t connected_ = 0;
shm::handle elems_h_;
template <typename Elems>
@ -53,6 +54,10 @@ public:
queue_conn& operator=(const queue_conn&) = delete;
bool connected() const noexcept {
return connected_ != 0;
}
circ::cc_t connected_id() const noexcept {
return connected_;
}
@ -60,24 +65,18 @@ public:
auto connect(Elems* elems)
-> std::tuple<bool, decltype(std::declval<Elems>().cursor())> {
if (elems == nullptr) return {};
if (connected_) {
// if it's already connected, just return false
return {};
}
connected_ = true;
elems->connect();
return std::make_tuple(true, elems->cursor());
if (connected()) return {};
connected_ = elems->connect();
return std::make_tuple(connected(), elems->cursor());
}
template <typename Elems>
bool disconnect(Elems* elems) {
if (elems == nullptr) return false;
if (!connected_) {
// if it's already disconnected, just return false
return false;
}
connected_ = false;
elems->disconnect();
if (!connected()) return false;
elems->disconnect(connected_);
return true;
}
};
@ -108,9 +107,8 @@ public:
base_t::close();
}
constexpr elems_t * elems() const noexcept {
return elems_;
}
constexpr elems_t * elems() noexcept { return elems_; }
constexpr elems_t const * elems() const noexcept { return elems_; }
bool connect() {
auto tp = base_t::connect(elems_);
@ -125,10 +123,6 @@ public:
return base_t::disconnect(elems_);
}
bool dis_flag() {
return elems_->dis_flag();
}
std::size_t conn_count() const noexcept {
return (elems_ == nullptr) ? invalid_value : elems_->conn_count();
}
@ -138,13 +132,13 @@ public:
}
bool empty() const noexcept {
return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor());
return !valid() || (cursor_ == elems_->cursor());
}
template <typename T, typename... P>
auto push(P&&... params) {
if (elems_ == nullptr) return false;
return elems_->push([&](void* p) {
return elems_->push(this, [&](void* p) {
::new (p) T(std::forward<P>(params)...);
});
}
@ -152,7 +146,7 @@ public:
template <typename T, typename F, typename... P>
auto force_push(F&& prep, P&&... params) {
if (elems_ == nullptr) return false;
return elems_->force_push([&](void* p) {
return elems_->force_push(this, [&](void* p) {
if (prep(p)) ::new (p) T(std::forward<P>(params)...);
});
}
@ -162,7 +156,7 @@ public:
if (elems_ == nullptr) {
return false;
}
return elems_->pop(&(this->cursor_), [&item](void* p) {
return elems_->pop(this, &(this->cursor_), [&item](void* p) {
::new (&item) T(std::move(*static_cast<T*>(p)));
});
}

View File

@ -116,26 +116,33 @@ struct quit_mode<pc_t<Rp, Rc, ipc::trans::broadcast>> {
template <std::size_t D, typename P>
struct test_cq<ea_t<D, P>> {
using ca_t = ea_t<D, P>;
using cn_t = decltype(std::declval<ca_t>().cursor());
using cn_t = decltype(std::declval<ca_t>().connect());
typename quit_mode<P>::type quit_ = false;
ca_t* ca_;
cn_t cc_id_;
test_cq(ca_t* ca) : ca_(ca) {}
cn_t connect() {
auto cur = ca_->cursor();
ca_->connect();
return cur;
return cc_id_ = ca_->connect();
}
void disconnect(cn_t) {
ca_->disconnect();
void disconnect(cn_t cc_id) {
ca_->disconnect(cc_id);
}
void disconnect(ca_t*) {
void disconnect(ca_t* ca) {
ca->disconnect(cc_id_);
}
cn_t connected_id() const noexcept {
return cc_id_;
}
constexpr ca_t * elems() noexcept { return ca_; }
constexpr ca_t const * elems() const noexcept { return ca_; }
void wait_start(int M) {
while (ca_->conn_count() != static_cast<std::size_t>(M)) {
std::this_thread::yield();
@ -146,7 +153,7 @@ struct test_cq<ea_t<D, P>> {
void recv(cn_t cur, F&& proc) {
while (1) {
msg_t msg;
while (ca_->pop(&cur, [&msg](void* p) {
while (ca_->pop(this, &cur, [&msg](void* p) {
msg = *static_cast<msg_t*>(p);
})) {
if (msg.pid_ < 0) {
@ -165,7 +172,7 @@ struct test_cq<ea_t<D, P>> {
}
void send(ca_t* ca, msg_t const & msg) {
while (!ca->push([&msg](void* p) {
while (!ca->push(this, [&msg](void* p) {
(*static_cast<msg_t*>(p)) = msg;
})) {
std::this_thread::yield();