From 3eeeec88a0b80e8867203e386a707c121babb19f Mon Sep 17 00:00:00 2001 From: zhangyi Date: Tue, 17 Mar 2020 20:58:54 +0800 Subject: [PATCH] fix bugs of force_push; set PTHREAD_MUTEX_ROBUST to mutex in linux --- demo/chat/main.cpp | 7 +++-- src/circ/elem_array.h | 18 ++++++------ src/circ/elem_def.h | 42 +++++++++++++-------------- src/ipc.cpp | 2 +- src/platform/waiter_linux.h | 26 ++++++++++++++++- src/prod_cons.h | 58 ++++++++++++++++++------------------- src/queue.h | 44 ++++++++++++---------------- test/test_circ.cpp | 25 ++++++++++------ 8 files changed, 123 insertions(+), 99 deletions(-) diff --git a/demo/chat/main.cpp b/demo/chat/main.cpp index a39507c..89ac956 100644 --- a/demo/chat/main.cpp +++ b/demo/chat/main.cpp @@ -23,10 +23,10 @@ int main() { std::string buf, id = id__ + std::to_string(calc_unique_id()); std::regex reg { "(c\\d+)> (.*)" }; - ipc::channel cc { name__ }; + ipc::channel cc { name__, ipc::sender }; std::thread r {[&id, ®] { - ipc::channel cc { name__ }; + ipc::channel cc { name__, ipc::receiver }; std::cout << id << " is ready." << std::endl; while (1) { auto buf = cc.recv(); @@ -46,8 +46,9 @@ int main() { } }}; - while (1) { + for (/*int i = 1*/;; /*++i*/) { std::cin >> buf; +// std::cout << "[" << i << "]" << std::endl; cc.send(id + "> " + buf); if (buf == quit__) break; } diff --git a/src/circ/elem_array.h b/src/circ/elem_array.h index 94f3bc5..cbbf4da 100644 --- a/src/circ/elem_array.h +++ b/src/circ/elem_array.h @@ -39,20 +39,20 @@ public: return head_.cursor(); } - template - bool push(F&& f) { - return head_.push(this, std::forward(f), block_); + template + bool push(Q* que, F&& f) { + return head_.push(que, std::forward(f), block_); } - template - bool force_push(F&& f) { - return head_.force_push(this, std::forward(f), block_); + template + bool force_push(Q* que, F&& f) { + return head_.force_push(que, std::forward(f), block_); } - template - bool pop(cursor_t* cur, F&& f) { + template + bool pop(Q* que, cursor_t* cur, F&& f) { if (cur == nullptr) return false; - return head_.pop(this, *cur, std::forward(f), block_); + return head_.pop(que, *cur, std::forward(f), block_); } }; diff --git a/src/circ/elem_def.h b/src/circ/elem_def.h index d19e5d9..da31945 100644 --- a/src/circ/elem_def.h +++ b/src/circ/elem_def.h @@ -15,18 +15,18 @@ namespace circ { using u1_t = ipc::uint_t<8>; using u2_t = ipc::uint_t<32>; +/** only supports max 32 connections */ +using cc_t = u2_t; + constexpr u1_t index_of(u2_t c) noexcept { return static_cast(c); } class conn_head { - std::atomic cc_ { 0 }; // connection counter - + std::atomic cc_; // connections ipc::spin_lock lc_; std::atomic constructed_; - std::atomic dis_flag_; - public: void init() { /* DCLP */ @@ -43,31 +43,31 @@ public: conn_head(const conn_head&) = delete; conn_head& operator=(const conn_head&) = delete; - std::size_t connect() noexcept { - return cc_.fetch_add(1, std::memory_order_acq_rel); - } - - std::size_t disconnect() noexcept { - return cc_.fetch_sub(1, std::memory_order_acq_rel); - } - - void try_disconnect() noexcept { - if (!dis_flag_.load(std::memory_order_acquire)) { - cc_.fetch_sub(1, std::memory_order_relaxed); - dis_flag_.store(true, std::memory_order_release); + cc_t connect() noexcept { + for (unsigned k = 0;;) { + cc_t cur = cc_.load(std::memory_order_acquire); + cc_t next = cur | (cur + 1); // find the first 0, and set it to 1. + if (next == 0) return 0; + if (cc_.compare_exchange_weak(cur, next, std::memory_order_release)) { + return next ^ cur; // return connected id + } + ipc::yield(k); } } - void clear_dis_flag(std::memory_order order = std::memory_order_release) noexcept { - dis_flag_.store(false, order); + cc_t disconnect(cc_t cc_id) noexcept { + return cc_.fetch_and(~cc_id) & ~cc_id; } - bool dis_flag(std::memory_order order = std::memory_order_acquire) const noexcept { - return dis_flag_.load(order); + cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept { + return cc_.load(order); } std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { - return cc_.load(order); + cc_t cur = cc_.load(order); + cc_t cnt; // accumulates the total bits set in cc + for (cnt = 0; cur; ++cnt) cur &= cur - 1; + return cnt; } }; diff --git a/src/ipc.cpp b/src/ipc.cpp index 3828581..6e94f54 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -437,7 +437,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { return [info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push(info->cc_id_, msg_id, remain, data, size); - }, que->dis_flag() ? 0 : static_cast(default_timeout))) { + }, default_timeout)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); if (!que->force_push([](void* p) { auto tmp_msg = static_cast(p); diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index 8a1a78a..071e6c8 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -57,6 +57,10 @@ public: ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); return false; } + if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) { + ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno); + return false; + } if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) { ipc::error("fail pthread_mutex_init[%d]\n", eno); return false; @@ -69,7 +73,27 @@ public: } bool lock() { - IPC_PTHREAD_FUNC_(pthread_mutex_lock, &mutex_); + for (;;) { + int eno = ::pthread_mutex_lock(&mutex_); + switch (eno) { + case 0: + return true; + case EOWNERDEAD: + if (::pthread_mutex_consistent(&mutex_) == 0) { + ::pthread_mutex_unlock(&mutex_); + break; + } + IPC_FALLTHROUGH_; + case ENOTRECOVERABLE: + if (close() && open()) { + break; + } + IPC_FALLTHROUGH_; + default: + ipc::error("fail pthread_mutex_lock[%d]\n", eno); + return false; + } + } } bool unlock() { diff --git a/src/prod_cons.h b/src/prod_cons.h index b8c86d4..56a3992 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -192,18 +192,17 @@ struct prod_cons_impl> { bool push(W* wrapper, F&& f, E* elems) { E* el; for (unsigned k = 0;;) { - auto cc = wrapper->conn_count(std::memory_order_relaxed); + circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); if (cc == 0) return false; // no reader el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); if (cur_rc) { - return false; // full + return false; // not reading finished yet } // cur_rc should be 0 here if (el->rc_.compare_exchange_weak( cur_rc, static_cast(cc), std::memory_order_release)) { - wrapper->clear_dis_flag(std::memory_order_relaxed); break; } ipc::yield(k); @@ -217,14 +216,14 @@ struct prod_cons_impl> { bool force_push(W* wrapper, F&& f, E* elems) { E* el; for (unsigned k = 0;;) { - auto cc = wrapper->conn_count(std::memory_order_relaxed); + circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); if (cc == 0) return false; // no reader el = elems + circ::index_of(wt_.load(std::memory_order_acquire)); // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); if (cur_rc) { - wrapper->try_disconnect(); // try disconnect a reader - cc = wrapper->conn_count(std::memory_order_relaxed); + ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, cur_rc); + cc = wrapper->elems()->disconnect(cur_rc); // disconnect all remained readers if (cc == 0) return false; // no reader } // just compare & exchange @@ -240,7 +239,7 @@ struct prod_cons_impl> { } template - bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E* elems) { + bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) { if (cur == cursor()) return false; // acquire auto* el = elems + circ::index_of(cur++); std::forward(f)(&(el->data_)); @@ -249,8 +248,9 @@ struct prod_cons_impl> { if (cur_rc == 0) { return true; } - if (el->rc_.compare_exchange_weak( - cur_rc, cur_rc - 1, std::memory_order_release)) { + if (el->rc_.compare_exchange_weak(cur_rc, + cur_rc & ~static_cast(wrapper->connected_id()), + std::memory_order_release)) { return true; } ipc::yield(k); @@ -287,13 +287,13 @@ struct prod_cons_impl> { E* el; circ::u2_t cur_ct; for (unsigned k = 0;;) { - auto cc = wrapper->conn_count(std::memory_order_relaxed); + circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); if (cc == 0) return false; // no reader el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); if (cur_rc & rc_mask) { - return false; // full + return false; // not reading finished yet } auto cur_fl = el->f_ct_.load(std::memory_order_acquire); if ((cur_fl != cur_ct) && cur_fl) { @@ -301,8 +301,7 @@ struct prod_cons_impl> { } // (cur_rc & rc_mask) should be 0 here if (el->rc_.compare_exchange_weak( - cur_rc, static_cast(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) { - wrapper->clear_dis_flag(std::memory_order_relaxed); + cur_rc, ((cur_rc + rc_incr) & ~rc_mask) | static_cast(cc), std::memory_order_release)) { break; } ipc::yield(k); @@ -320,20 +319,20 @@ struct prod_cons_impl> { E* el; circ::u2_t cur_ct; for (unsigned k = 0;;) { - auto cc = wrapper->conn_count(std::memory_order_relaxed); + circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed); if (cc == 0) return false; // no reader el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed)); // check all consumers have finished reading this element auto cur_rc = el->rc_.load(std::memory_order_acquire); - ipc::log("force_push: k = %d, cc = %zd, rc = %zd\n", k, cc, (cur_rc & rc_mask)); - if (cur_rc & rc_mask) { - wrapper->try_disconnect(); // try disconnect a reader - cc = wrapper->conn_count(std::memory_order_relaxed); + circ::cc_t rem_cc = cur_rc & rc_mask; + if (rem_cc) { + ipc::log("force_push: k = %d, cc = %d, rem_cc = %d\n", k, cc, rem_cc); + cc = wrapper->elems()->disconnect(rem_cc); // disconnect all remained readers if (cc == 0) return false; // no reader } // just compare & exchange if (el->rc_.compare_exchange_weak( - cur_rc, static_cast(cc) | ((cur_rc & ~rc_mask) + rc_incr), std::memory_order_release)) { + cur_rc, ((cur_rc + rc_incr) & ~rc_mask) | static_cast(cc), std::memory_order_release)) { break; } ipc::yield(k); @@ -347,7 +346,7 @@ struct prod_cons_impl> { } template - bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E(& elems)[N]) { + bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) { auto* el = elems + circ::index_of(cur); auto cur_fl = el->f_ct_.load(std::memory_order_acquire); if (cur_fl != ~static_cast(cur)) { @@ -357,19 +356,18 @@ struct prod_cons_impl> { std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - switch (cur_rc & rc_mask) { - case 0: + circ::cc_t rem_cc = cur_rc & rc_mask; + if (rem_cc == 0) { el->f_ct_.store(cur + N - 1, std::memory_order_release); return true; - case 1: + } + if ((rem_cc & ~wrapper->connected_id()) == 0) { el->f_ct_.store(cur + N - 1, std::memory_order_release); - IPC_FALLTHROUGH_; - default: - if (el->rc_.compare_exchange_weak( - cur_rc, cur_rc + rc_incr - 1, std::memory_order_release)) { - return true; - } - break; + } + if (el->rc_.compare_exchange_weak(cur_rc, + (cur_rc + rc_incr) & ~static_cast(wrapper->connected_id()), + std::memory_order_release)) { + return true; } ipc::yield(k); } diff --git a/src/queue.h b/src/queue.h index 857ce67..679831a 100644 --- a/src/queue.h +++ b/src/queue.h @@ -16,13 +16,14 @@ #include "rw_lock.h" #include "platform/detail.h" +#include "circ/elem_def.h" namespace ipc { namespace detail { class queue_conn { protected: - bool connected_ = false; + circ::cc_t connected_ = 0; shm::handle elems_h_; template @@ -53,6 +54,10 @@ public: queue_conn& operator=(const queue_conn&) = delete; bool connected() const noexcept { + return connected_ != 0; + } + + circ::cc_t connected_id() const noexcept { return connected_; } @@ -60,24 +65,18 @@ public: auto connect(Elems* elems) -> std::tuple().cursor())> { if (elems == nullptr) return {}; - if (connected_) { - // if it's already connected, just return false - return {}; - } - connected_ = true; - elems->connect(); - return std::make_tuple(true, elems->cursor()); + // if it's already connected, just return false + if (connected()) return {}; + connected_ = elems->connect(); + return std::make_tuple(connected(), elems->cursor()); } template bool disconnect(Elems* elems) { if (elems == nullptr) return false; - if (!connected_) { - // if it's already disconnected, just return false - return false; - } - connected_ = false; - elems->disconnect(); + // if it's already disconnected, just return false + if (!connected()) return false; + elems->disconnect(connected_); return true; } }; @@ -108,9 +107,8 @@ public: base_t::close(); } - constexpr elems_t * elems() const noexcept { - return elems_; - } + constexpr elems_t * elems() noexcept { return elems_; } + constexpr elems_t const * elems() const noexcept { return elems_; } bool connect() { auto tp = base_t::connect(elems_); @@ -125,10 +123,6 @@ public: return base_t::disconnect(elems_); } - bool dis_flag() { - return elems_->dis_flag(); - } - std::size_t conn_count() const noexcept { return (elems_ == nullptr) ? invalid_value : elems_->conn_count(); } @@ -138,13 +132,13 @@ public: } bool empty() const noexcept { - return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor()); + return !valid() || (cursor_ == elems_->cursor()); } template auto push(P&&... params) { if (elems_ == nullptr) return false; - return elems_->push([&](void* p) { + return elems_->push(this, [&](void* p) { ::new (p) T(std::forward

(params)...); }); } @@ -152,7 +146,7 @@ public: template auto force_push(F&& prep, P&&... params) { if (elems_ == nullptr) return false; - return elems_->force_push([&](void* p) { + return elems_->force_push(this, [&](void* p) { if (prep(p)) ::new (p) T(std::forward

(params)...); }); } @@ -162,7 +156,7 @@ public: if (elems_ == nullptr) { return false; } - return elems_->pop(&(this->cursor_), [&item](void* p) { + return elems_->pop(this, &(this->cursor_), [&item](void* p) { ::new (&item) T(std::move(*static_cast(p))); }); } diff --git a/test/test_circ.cpp b/test/test_circ.cpp index f9e8d77..78e8c62 100755 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -116,26 +116,33 @@ struct quit_mode> { template struct test_cq> { using ca_t = ea_t; - using cn_t = decltype(std::declval().cursor()); + using cn_t = decltype(std::declval().connect()); typename quit_mode

::type quit_ = false; ca_t* ca_; + cn_t cc_id_; test_cq(ca_t* ca) : ca_(ca) {} cn_t connect() { - auto cur = ca_->cursor(); - ca_->connect(); - return cur; + return cc_id_ = ca_->connect(); } - void disconnect(cn_t) { - ca_->disconnect(); + void disconnect(cn_t cc_id) { + ca_->disconnect(cc_id); } - void disconnect(ca_t*) { + void disconnect(ca_t* ca) { + ca->disconnect(cc_id_); } + cn_t connected_id() const noexcept { + return cc_id_; + } + + constexpr ca_t * elems() noexcept { return ca_; } + constexpr ca_t const * elems() const noexcept { return ca_; } + void wait_start(int M) { while (ca_->conn_count() != static_cast(M)) { std::this_thread::yield(); @@ -146,7 +153,7 @@ struct test_cq> { void recv(cn_t cur, F&& proc) { while (1) { msg_t msg; - while (ca_->pop(&cur, [&msg](void* p) { + while (ca_->pop(this, &cur, [&msg](void* p) { msg = *static_cast(p); })) { if (msg.pid_ < 0) { @@ -165,7 +172,7 @@ struct test_cq> { } void send(ca_t* ca, msg_t const & msg) { - while (!ca->push([&msg](void* p) { + while (!ca->push(this, [&msg](void* p) { (*static_cast(p)) = msg; })) { std::this_thread::yield();