diff --git a/demo/chat/main.cpp b/demo/chat/main.cpp index 7f88e6f..715dd0d 100755 --- a/demo/chat/main.cpp +++ b/demo/chat/main.cpp @@ -50,20 +50,18 @@ int main() { } std::cout << dat << std::endl; } + std::cout << id << " receiver is quit..." << std::endl; }}; for (/*int i = 1*/;; /*++i*/) { std::cin >> buf; - if (buf.empty()) break; + if (buf.empty() || (buf == quit__)) break; // std::cout << "[" << i << "]" << std::endl; sender__.send(id + "> " + buf); - if (buf == quit__) { - receiver__.disconnect(); - break; - } } + receiver__.disconnect(); r.join(); - std::cout << id << " is quit..." << std::endl; + std::cout << id << " sender is quit..." << std::endl; return 0; } diff --git a/src/libipc/platform/waiter_linux.h b/src/libipc/platform/waiter_linux.h index b84644b..0fe0ed2 100755 --- a/src/libipc/platform/waiter_linux.h +++ b/src/libipc/platform/waiter_linux.h @@ -241,6 +241,11 @@ class waiter_helper { std::atomic waiting_ { 0 }; long counter_ = 0; + enum : unsigned { + destruct_mask = (std::numeric_limits::max)() >> 1, + destruct_flag = ~destruct_mask + }; + public: using handle_t = std::tuple; @@ -280,24 +285,33 @@ public: } template - bool wait_if(handle_t const & h, F&& pred, std::size_t tm = invalid_value) { + bool wait_if(handle_t const & h, std::atomic const & enabled, F&& pred, std::size_t tm = invalid_value) { + if (!enabled.load(std::memory_order_acquire)) { + return false; + } waiting_.fetch_add(1, std::memory_order_release); auto finally = ipc::guard([this] { - waiting_->fetch_sub(1, std::memory_order_release); + waiting_.fetch_sub(1, std::memory_order_release); }); { IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); if (!std::forward(pred)()) return true; ++ counter_; } - bool ret = sem_helper::wait(std::get<1>(h), tm); + bool ret = false; + do { + if (!enabled.load(std::memory_order_acquire)) { + break; + } + ret = sem_helper::wait(std::get<1>(h), tm); + } while (waiting_.load(std::memory_order_acquire) & destruct_flag); finally.do_exit(); ret = sem_helper::post(std::get<2>(h)) && ret; return ret; } bool notify(handle_t const & h) { - if (waiting_.load(std::memory_order_acquire) == 0) { + if ((waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { return true; } bool ret = true; @@ -311,7 +325,7 @@ public: } bool broadcast(handle_t const & h) { - if (waiting_.load(std::memory_order_acquire) == 0) { + if ((waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { return true; } bool ret = true; @@ -327,6 +341,28 @@ public: } return ret; } + + bool emit_destruction(handle_t const & h) { + if ((waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + return true; + } + bool ret = true; + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + waiting_.fetch_or(destruct_flag, std::memory_order_relaxed); + IPC_UNUSED_ auto finally = ipc::guard([this] { + waiting_.fetch_and(destruct_mask, std::memory_order_relaxed); + }); + if (counter_ > 0) { + for (long i = 0; i < counter_; ++i) { + ret = ret && sem_helper::post(std::get<1>(h)); + } + do { + -- counter_; + ret = ret && sem_helper::wait(std::get<2>(h), default_timeout); + } while (counter_ > 0); + } + return ret; + } }; class waiter { @@ -360,19 +396,24 @@ public: } template - bool wait_if(handle_t h, F&& pred, std::size_t tm = invalid_value) { + bool wait_if(handle_t h, std::atomic const & enabled, F && pred, std::size_t tm = invalid_value) { if (h == invalid()) return false; - return helper_.wait_if(h, std::forward(pred), tm); + return helper_.wait_if(h, enabled, std::forward(pred), tm); } - void notify(handle_t h) { - if (h == invalid()) return; - helper_.notify(h); + bool notify(handle_t h) { + if (h == invalid()) return false; + return helper_.notify(h); } - void broadcast(handle_t h) { - if (h == invalid()) return; - helper_.broadcast(h); + bool broadcast(handle_t h) { + if (h == invalid()) return false; + return helper_.broadcast(h); + } + + bool emit_destruction(handle_t h) { + if (h == invalid()) return false; + return helper_.emit_destruction(h); } }; diff --git a/src/libipc/platform/waiter_wrapper.h b/src/libipc/platform/waiter_wrapper.h index 8242d31..4c30094 100755 --- a/src/libipc/platform/waiter_wrapper.h +++ b/src/libipc/platform/waiter_wrapper.h @@ -241,7 +241,7 @@ public: } template - bool wait_if(F&& pred, std::size_t tm = invalid_value) { + bool wait_if(F && pred, std::size_t tm = invalid_value) { if (!valid()) return false; return w_->wait_if(h_, enabled_, std::forward(pred), tm); }