调整代码结构

This commit is contained in:
mutouyun 2020-09-20 11:43:27 +08:00
parent aaf67858c2
commit e51855f1df
6 changed files with 299 additions and 216 deletions

View File

@ -26,10 +26,6 @@ ipc::channel receiver__ { name__, ipc::receiver };
} // namespace } // namespace
int main() { int main() {
::signal(SIGINT, [](int) {
receiver__.disconnect();
});
std::string buf, id = id__ + std::to_string(calc_unique_id()); std::string buf, id = id__ + std::to_string(calc_unique_id());
std::regex reg { "(c\\d+)> (.*)" }; std::regex reg { "(c\\d+)> (.*)" };
@ -58,6 +54,7 @@ int main() {
if (buf.empty() || (buf == quit__)) break; if (buf.empty() || (buf == quit__)) break;
// std::cout << "[" << i << "]" << std::endl; // std::cout << "[" << i << "]" << std::endl;
sender__.send(id + "> " + buf); sender__.send(id + "> " + buf);
buf.clear();
} }
receiver__.disconnect(); receiver__.disconnect();

View File

@ -280,10 +280,10 @@ struct conn_info_head {
, acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) { , acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) {
} }
void enable(bool e) { void quit_waiting() {
cc_waiter_.set_enabled(e); cc_waiter_.quit_waiting();
wt_waiter_.set_enabled(e); wt_waiter_.quit_waiting();
rd_waiter_.set_enabled(e); rd_waiter_.quit_waiting();
} }
auto acc() { auto acc() {
@ -356,7 +356,6 @@ static bool connect(handle_t * ph, char const * name, bool start) {
if (que == nullptr) { if (que == nullptr) {
return false; return false;
} }
info_of(*ph)->enable(true);
if (start) { if (start) {
if (que->connect()) { // wouldn't connect twice if (que->connect()) { // wouldn't connect twice
info_of(*ph)->cc_waiter_.broadcast(); info_of(*ph)->cc_waiter_.broadcast();
@ -371,7 +370,7 @@ static void disconnect(ipc::handle_t h) {
return; return;
} }
bool dis = que->disconnect(); bool dis = que->disconnect();
info_of(h)->enable(false); info_of(h)->quit_waiting();
if (dis) { if (dis) {
info_of(h)->recv_cache().clear(); info_of(h)->recv_cache().clear();
} }

View File

@ -10,8 +10,10 @@
#include <atomic> #include <atomic>
#include <tuple> #include <tuple>
#include <utility> #include <utility>
#include <cassert>
#include "libipc/def.h" #include "libipc/def.h"
#include "libipc/waiter_helper.h"
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/utility/scope_guard.h" #include "libipc/utility/scope_guard.h"
@ -205,9 +207,15 @@ public:
IPC_SEMAPHORE_FUNC_(sem_unlink, name); IPC_SEMAPHORE_FUNC_(sem_unlink, name);
} }
static bool post(handle_t h) { static bool post(handle_t h, long count) {
if (h == invalid()) return false; if (h == invalid()) return false;
auto spost = [](handle_t h) {
IPC_SEMAPHORE_FUNC_(sem_post, h); IPC_SEMAPHORE_FUNC_(sem_post, h);
};
for (long i = 0; i < count; ++i) {
if (!spost(h)) return false;
}
return true;
} }
static bool wait(handle_t h, std::size_t tm = invalid_value) { static bool wait(handle_t h, std::size_t tm = invalid_value) {
@ -235,23 +243,61 @@ public:
#pragma pop_macro("IPC_SEMAPHORE_FUNC_") #pragma pop_macro("IPC_SEMAPHORE_FUNC_")
}; };
class waiter_helper { class waiter_holder {
public:
using handle_t = std::tuple<
ipc::string,
sem_helper::handle_t /* sema */,
sem_helper::handle_t /* handshake */>;
static handle_t invalid() noexcept {
return std::make_tuple(
ipc::string{},
sem_helper::invalid(),
sem_helper::invalid());
}
private:
mutex lock_; mutex lock_;
waiter_helper::wait_counter cnt_;
std::atomic<unsigned> waiting_ { 0 }; struct contrl {
long counter_ = 0; waiter_holder * me_;
waiter_helper::wait_flags * flags_;
handle_t const & h_;
enum : unsigned { waiter_helper::wait_flags & flags() noexcept {
destruct_mask = (std::numeric_limits<unsigned>::max)() >> 1, assert(flags_ != nullptr);
destruct_flag = ~destruct_mask return *flags_;
}
waiter_helper::wait_counter & counter() noexcept {
return me_->cnt_;
}
auto get_lock() {
return ipc::detail::unique_lock(me_->lock_);
}
bool sema_wait(std::size_t tm) {
return sem_helper::wait(std::get<1>(h_), tm);
}
bool sema_post(long count) {
return sem_helper::post(std::get<1>(h_), count);
}
bool handshake_wait(std::size_t tm) {
return sem_helper::wait(std::get<2>(h_), tm);
}
bool handshake_post(long count) {
return sem_helper::post(std::get<2>(h_), count);
}
}; };
public: public:
using handle_t = std::tuple<ipc::string, sem_helper::handle_t, sem_helper::handle_t>; using wait_flags = waiter_helper::wait_flags;
static handle_t invalid() noexcept {
return std::make_tuple(ipc::string{}, sem_helper::invalid(), sem_helper::invalid());
}
handle_t open_h(ipc::string && name) { handle_t open_h(ipc::string && name) {
auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0); auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0);
@ -285,95 +331,38 @@ public:
} }
template <typename F> template <typename F>
bool wait_if(handle_t const & h, std::atomic<bool> const & enabled, F&& pred, std::size_t tm = invalid_value) { bool wait_if(handle_t const & h, wait_flags * flags, F&& pred, std::size_t tm = invalid_value) {
if (!enabled.load(std::memory_order_acquire)) { assert(flags != nullptr);
return false; contrl ctrl { this, flags, h };
} return waiter_helper::wait_if(ctrl, mtx, std::forward<F>(pred), tm);
waiting_.fetch_add(1, std::memory_order_release);
auto finally = ipc::guard([this] {
waiting_.fetch_sub(1, std::memory_order_release);
});
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (!std::forward<F>(pred)()) return true;
++ counter_;
}
bool ret = false;
do {
if (!enabled.load(std::memory_order_acquire)) {
break;
}
ret = sem_helper::wait(std::get<1>(h), tm);
} while (waiting_.load(std::memory_order_acquire) & destruct_flag);
finally.do_exit();
ret = sem_helper::post(std::get<2>(h)) && ret;
return ret;
} }
bool notify(handle_t const & h) { bool notify(handle_t const & h) {
if ((waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { contrl ctrl { this, nullptr, h };
return true; return waiter_helper::notify(ctrl);
}
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (counter_ > 0) {
ret = sem_helper::post(std::get<1>(h));
-- counter_;
ret = ret && sem_helper::wait(std::get<2>(h), default_timeout);
}
return ret;
} }
bool broadcast(handle_t const & h) { bool broadcast(handle_t const & h) {
if ((waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { contrl ctrl { this, nullptr, h };
return true; return waiter_helper::broadcast(ctrl);
}
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (counter_ > 0) {
for (long i = 0; i < counter_; ++i) {
ret = ret && sem_helper::post(std::get<1>(h));
}
do {
-- counter_;
ret = ret && sem_helper::wait(std::get<2>(h), default_timeout);
} while (counter_ > 0);
}
return ret;
} }
bool emit_destruction(handle_t const & h) { bool quit_waiting(handle_t const & h, wait_flags * flags) {
if ((waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { assert(flags != nullptr);
return true; contrl ctrl { this, flags, h };
} return waiter_helper::quit_waiting(ctrl);
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
waiting_.fetch_or(destruct_flag, std::memory_order_relaxed);
IPC_UNUSED_ auto finally = ipc::guard([this] {
waiting_.fetch_and(destruct_mask, std::memory_order_relaxed);
});
if (counter_ > 0) {
for (long i = 0; i < counter_; ++i) {
ret = ret && sem_helper::post(std::get<1>(h));
}
do {
-- counter_;
ret = ret && sem_helper::wait(std::get<2>(h), default_timeout);
} while (counter_ > 0);
}
return ret;
} }
}; };
class waiter { class waiter {
waiter_helper helper_; waiter_holder helper_;
std::atomic<unsigned> opened_ { 0 }; std::atomic<unsigned> opened_ { 0 };
public: public:
using handle_t = waiter_helper::handle_t; using handle_t = waiter_holder::handle_t;
static handle_t invalid() noexcept { static handle_t invalid() noexcept {
return waiter_helper::invalid(); return waiter_holder::invalid();
} }
handle_t open(char const * name) { handle_t open(char const * name) {
@ -396,9 +385,9 @@ public:
} }
template <typename F> template <typename F>
bool wait_if(handle_t h, std::atomic<bool> const & enabled, F && pred, std::size_t tm = invalid_value) { bool wait_if(handle_t h, waiter_holder::wait_flags * flags, F && pred, std::size_t tm = invalid_value) {
if (h == invalid()) return false; if (h == invalid()) return false;
return helper_.wait_if(h, enabled, std::forward<F>(pred), tm); return helper_.wait_if(h, flags, std::forward<F>(pred), tm);
} }
bool notify(handle_t h) { bool notify(handle_t h) {
@ -411,9 +400,9 @@ public:
return helper_.broadcast(h); return helper_.broadcast(h);
} }
bool emit_destruction(handle_t h) { bool quit_waiting(handle_t h, waiter_holder::wait_flags * flags) {
if (h == invalid()) return false; if (h == invalid()) return false;
return helper_.emit_destruction(h); return helper_.quit_waiting(h, flags);
} }
}; };

View File

@ -5,10 +5,12 @@
#include <atomic> #include <atomic>
#include <utility> #include <utility>
#include <limits> #include <limits>
#include <cassert>
#include "libipc/rw_lock.h" #include "libipc/rw_lock.h"
#include "libipc/pool_alloc.h" #include "libipc/pool_alloc.h"
#include "libipc/shm.h" #include "libipc/shm.h"
#include "libipc/waiter_helper.h"
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/utility/scope_guard.h" #include "libipc/utility/scope_guard.h"
@ -44,8 +46,9 @@ public:
switch ((ret = ::WaitForSingleObject(h_, ms))) { switch ((ret = ::WaitForSingleObject(h_, ms))) {
case WAIT_OBJECT_0: case WAIT_OBJECT_0:
return true; return true;
case WAIT_ABANDONED:
case WAIT_TIMEOUT: case WAIT_TIMEOUT:
return false;
case WAIT_ABANDONED:
default: default:
ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret);
return false; return false;
@ -77,18 +80,48 @@ public:
class condition { class condition {
mutex lock_; mutex lock_;
semaphore sema_, handshake_; semaphore sema_, handshake_;
waiter_helper::wait_counter * cnt_ = nullptr;
std::atomic<unsigned> * waiting_ = nullptr; struct contrl {
long * counter_ = nullptr; condition * me_;
waiter_helper::wait_flags * flags_;
enum : unsigned { waiter_helper::wait_flags & flags() noexcept {
destruct_mask = (std::numeric_limits<unsigned>::max)() >> 1, assert(flags_ != nullptr);
destruct_flag = ~destruct_mask return *flags_;
}
waiter_helper::wait_counter & counter() noexcept {
assert(me_->cnt_ != nullptr);
return *(me_->cnt_);
}
auto get_lock() {
return ipc::detail::unique_lock(me_->lock_);
}
bool sema_wait(std::size_t tm) {
return me_->sema_.wait(tm);
}
bool sema_post(long count) {
return me_->sema_.post(count);
}
bool handshake_wait(std::size_t tm) {
return me_->handshake_.wait(tm);
}
bool handshake_post(long count) {
return me_->handshake_.post(count);
}
}; };
public: public:
using wait_flags = waiter_helper::wait_flags;
friend bool operator==(condition const & c1, condition const & c2) { friend bool operator==(condition const & c1, condition const & c2) {
return (c1.waiting_ == c2.waiting_) && (c1.counter_ == c2.counter_); return c1.cnt_ == c2.cnt_;
} }
friend bool operator!=(condition const & c1, condition const & c2) { friend bool operator!=(condition const & c1, condition const & c2) {
@ -101,12 +134,11 @@ public:
mutex ::remove((ipc::string{ "__COND_MTX__" } + name).c_str()); mutex ::remove((ipc::string{ "__COND_MTX__" } + name).c_str());
} }
bool open(ipc::string const & name, std::atomic<unsigned> * waiting, long * counter) { bool open(ipc::string const & name, waiter_helper::wait_counter * cnt) {
if (lock_ .open("__COND_MTX__" + name) && if (lock_ .open("__COND_MTX__" + name) &&
sema_ .open("__COND_SEM__" + name) && sema_ .open("__COND_SEM__" + name) &&
handshake_.open("__COND_HAN__" + name)) { handshake_.open("__COND_HAN__" + name)) {
waiting_ = waiting; cnt_ = cnt;
counter_ = counter;
return true; return true;
} }
return false; return false;
@ -119,88 +151,31 @@ public:
} }
template <typename Mutex, typename F> template <typename Mutex, typename F>
bool wait_if(Mutex & mtx, std::atomic<bool> const & enabled, F && pred, std::size_t tm = invalid_value) { bool wait_if(Mutex & mtx, wait_flags * flags, F && pred, std::size_t tm = invalid_value) {
if (!enabled.load(std::memory_order_acquire)) { assert(flags != nullptr);
return false; contrl ctrl { this, flags };
} return waiter_helper::wait_if(ctrl, mtx, std::forward<F>(pred), tm);
waiting_->fetch_add(1, std::memory_order_release);
auto finally = ipc::guard([this] {
waiting_->fetch_sub(1, std::memory_order_release);
});
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (!std::forward<F>(pred)()) return true;
++ *counter_;
}
mtx.unlock();
bool ret = false;
do {
if (!enabled.load(std::memory_order_acquire)) {
break;
}
ret = sema_.wait(tm);
} while (waiting_->load(std::memory_order_acquire) & destruct_flag);
finally.do_exit();
ret = handshake_.post() && ret;
mtx.lock();
return ret;
} }
bool notify() { bool notify() {
if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) { contrl ctrl { this, nullptr };
return true; return waiter_helper::notify(ctrl);
}
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (*counter_ > 0) {
ret = sema_.post();
-- *counter_;
ret = ret && handshake_.wait(default_timeout);
}
return ret;
} }
bool broadcast() { bool broadcast() {
if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) { contrl ctrl { this, nullptr };
return true; return waiter_helper::broadcast(ctrl);
}
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (*counter_ > 0) {
ret = sema_.post(*counter_);
do {
-- *counter_;
ret = ret && handshake_.wait(default_timeout);
} while (*counter_ > 0);
}
return ret;
} }
bool emit_destruction() { bool quit_waiting(wait_flags * flags) {
if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) { assert(flags != nullptr);
return true; contrl ctrl { this, flags };
} return waiter_helper::quit_waiting(ctrl);
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
waiting_->fetch_or(destruct_flag, std::memory_order_relaxed);
IPC_UNUSED_ auto finally = ipc::guard([this] {
waiting_->fetch_and(destruct_mask, std::memory_order_relaxed);
});
if (*counter_ > 0) {
ret = sema_.post(*counter_);
do {
-- *counter_;
ret = ret && handshake_.wait(default_timeout);
} while (*counter_ > 0);
}
return ret;
} }
}; };
class waiter { class waiter {
waiter_helper::wait_counter cnt_;
std::atomic<unsigned> waiting_ { 0 };
long counter_ = 0;
public: public:
using handle_t = condition; using handle_t = condition;
@ -214,7 +189,7 @@ public:
return invalid(); return invalid();
} }
condition cond; condition cond;
if (cond.open(name, &waiting_, &counter_)) { if (cond.open(name, &cnt_)) {
return cond; return cond;
} }
return invalid(); return invalid();
@ -226,7 +201,7 @@ public:
} }
template <typename F> template <typename F>
bool wait_if(handle_t& h, std::atomic<bool> const & enabled, F&& pred, std::size_t tm = invalid_value) { bool wait_if(handle_t& h, handle_t::wait_flags * flags, F&& pred, std::size_t tm = invalid_value) {
if (h == invalid()) return false; if (h == invalid()) return false;
class non_mutex { class non_mutex {
@ -235,7 +210,7 @@ public:
void unlock() noexcept {} void unlock() noexcept {}
} nm; } nm;
return h.wait_if(nm, enabled, std::forward<F>(pred), tm); return h.wait_if(nm, flags, std::forward<F>(pred), tm);
} }
bool notify(handle_t& h) { bool notify(handle_t& h) {
@ -248,9 +223,9 @@ public:
return h.broadcast(); return h.broadcast();
} }
bool emit_destruction(handle_t& h) { bool quit_waiting(handle_t& h, handle_t::wait_flags * flags) {
if (h == invalid()) return false; if (h == invalid()) return false;
return h.emit_destruction(); return h.quit_waiting(flags);
} }
}; };

View File

@ -21,39 +21,39 @@ using mutex_impl = ipc::detail::mutex;
using semaphore_impl = ipc::detail::semaphore; using semaphore_impl = ipc::detail::semaphore;
class condition_impl : public ipc::detail::condition { class condition_impl : public ipc::detail::condition {
using base_t = ipc::detail::condition;
ipc::shm::handle wait_h_, cnt_h_; ipc::shm::handle cnt_h_;
std::atomic<bool> enabled_ { false }; base_t::wait_flags flags_;
public: public:
static void remove(char const * name) { static void remove(char const * name) {
ipc::detail::condition::remove(name); base_t::remove(name);
ipc::string n = name; ipc::string n = name;
ipc::shm::remove((n + "__COND_CNT__" ).c_str()); ipc::shm::remove((n + "__COND_CNT__" ).c_str());
ipc::shm::remove((n + "__COND_WAIT__").c_str()); ipc::shm::remove((n + "__COND_WAIT__").c_str());
} }
bool open(ipc::string const & name) { bool open(char const * name) {
if (wait_h_.acquire((name + "__COND_WAIT__").c_str(), sizeof(std::atomic<unsigned>)) && if (cnt_h_ .acquire(
cnt_h_ .acquire((name + "__COND_CNT__" ).c_str(), sizeof(long))) { (ipc::string { name } + "__COND_CNT__" ).c_str(),
enabled_.store(true, std::memory_order_release); sizeof(waiter_helper::wait_counter))) {
return ipc::detail::condition::open(name, flags_.is_closed_.store(false, std::memory_order_release);
static_cast<std::atomic<unsigned> *>(wait_h_.get()), return base_t::open(name,
static_cast<long *>(cnt_h_.get())); static_cast<waiter_helper::wait_counter *>(cnt_h_.get()));
} }
return false; return false;
} }
void close() { void close() {
enabled_.store(false, std::memory_order_release); flags_.is_closed_.store(true, std::memory_order_release);
ipc::detail::condition::emit_destruction(); base_t::quit_waiting(&flags_);
ipc::detail::condition::close(); base_t::close();
cnt_h_.release(); cnt_h_.release();
wait_h_.release();
} }
bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) { bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) {
return ipc::detail::condition::wait_if(mtx, enabled_, [] { return true; }, tm); return base_t::wait_if(mtx, &flags_, [] { return true; }, tm);
} }
}; };
@ -169,17 +169,11 @@ public:
} }
bool wait(std::size_t tm = invalid_value) { bool wait(std::size_t tm = invalid_value) {
if (h_ == sem_helper::invalid()) return false;
return sem_helper::wait(h_, tm); return sem_helper::wait(h_, tm);
} }
bool post(long count) { bool post(long count) {
if (h_ == sem_helper::invalid()) return false; return sem_helper::post(h_, count);
bool ret = true;
for (long i = 0; i < count; ++i) {
ret = ret && sem_helper::post(h_);
}
return ret;
} }
}; };
@ -198,7 +192,7 @@ public:
private: private:
waiter_t* w_ = nullptr; waiter_t* w_ = nullptr;
waiter_t::handle_t h_ = waiter_t::invalid(); waiter_t::handle_t h_ = waiter_t::invalid();
std::atomic<bool> enabled_ { true }; waiter_t::handle_t::wait_flags flags_;
public: public:
waiter_wrapper() = default; waiter_wrapper() = default;
@ -223,27 +217,27 @@ public:
bool open(char const * name) { bool open(char const * name) {
if (w_ == nullptr) return false; if (w_ == nullptr) return false;
close(); close();
flags_.is_closed_.store(false, std::memory_order_release);
h_ = w_->open(name); h_ = w_->open(name);
return valid(); return valid();
} }
void close() { void close() {
if (!valid()) return; if (!valid()) return;
flags_.is_closed_.store(true, std::memory_order_release);
quit_waiting();
w_->close(h_); w_->close(h_);
h_ = waiter_t::invalid(); h_ = waiter_t::invalid();
} }
void set_enabled(bool e) { void quit_waiting() {
if (enabled_.exchange(e, std::memory_order_acq_rel) == e) { w_->quit_waiting(h_, &flags_);
return;
}
if (!e) w_->emit_destruction(h_);
} }
template <typename F> template <typename F>
bool wait_if(F && pred, std::size_t tm = invalid_value) { bool wait_if(F && pred, std::size_t tm = invalid_value) {
if (!valid()) return false; if (!valid()) return false;
return w_->wait_if(h_, enabled_, std::forward<F>(pred), tm); return w_->wait_if(h_, &flags_, std::forward<F>(pred), tm);
} }
bool notify() { bool notify() {

129
src/libipc/waiter_helper.h Normal file
View File

@ -0,0 +1,129 @@
#pragma once
#include <atomic>
#include <limits>
#include <utility>
#include "libipc/def.h"
#include "libipc/utility/scope_guard.h"
namespace ipc {
namespace detail {
class waiter_helper {
enum : unsigned {
destruct_mask = (std::numeric_limits<unsigned>::max)() >> 1,
destruct_flag = ~destruct_mask
};
public:
struct wait_counter {
std::atomic<unsigned> waiting_ { 0 };
long counter_ = 0;
};
struct wait_flags {
std::atomic<bool> is_waiting_ { false };
std::atomic<bool> is_closed_ { true };
};
template <typename Mutex, typename Ctrl, typename F>
static bool wait_if(Ctrl & ctrl, Mutex & mtx, F && pred, std::size_t tm) {
auto & flags = ctrl.flags();
if (flags.is_closed_.load(std::memory_order_acquire)) {
return false;
}
auto & counter = ctrl.counter();
counter.waiting_.fetch_add(1, std::memory_order_release);
flags.is_waiting_.store(true, std::memory_order_relaxed);
auto finally = ipc::guard([&counter, &flags] {
counter.waiting_.fetch_sub(1, std::memory_order_release);
flags.is_waiting_.store(false, std::memory_order_relaxed);
});
{
IPC_UNUSED_ auto guard = ctrl.get_lock();
if (!std::forward<F>(pred)()) return true;
counter.counter_ += 1;
}
mtx.unlock();
bool ret = false;
do {
bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed);
bool is_closed = flags.is_closed_ .load(std::memory_order_acquire);
if (!is_waiting || is_closed) {
ret = false;
break;
}
ret = ctrl.sema_wait(tm);
} while (counter.waiting_.load(std::memory_order_acquire) & destruct_flag);
finally.do_exit();
ret = ctrl.handshake_post(1) && ret;
mtx.lock();
return ret;
}
template <typename Ctrl>
static bool notify(Ctrl & ctrl) {
auto & counter = ctrl.counter();
if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock();
if (counter.counter_ > 0) {
ret = ctrl.sema_post(1);
counter.counter_ -= 1;
ret = ret && ctrl.handshake_wait(default_timeout);
}
return ret;
}
template <typename Ctrl>
static bool broadcast(Ctrl & ctrl) {
auto & counter = ctrl.counter();
if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock();
if (counter.counter_ > 0) {
ret = ctrl.sema_post(counter.counter_);
do {
counter.counter_ -= 1;
ret = ret && ctrl.handshake_wait(default_timeout);
} while (counter.counter_ > 0);
}
return ret;
}
template <typename Ctrl>
static bool quit_waiting(Ctrl & ctrl) {
auto & flags = ctrl.flags();
if (!flags.is_waiting_.exchange(false, std::memory_order_release)) {
return true;
}
auto & counter = ctrl.counter();
if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock();
counter.waiting_.fetch_or(destruct_flag, std::memory_order_relaxed);
IPC_UNUSED_ auto finally = ipc::guard([&counter] {
counter.waiting_.fetch_and(destruct_mask, std::memory_order_relaxed);
});
if (counter.counter_ > 0) {
ret = ctrl.sema_post(counter.counter_);
counter.counter_ -= 1;
ret = ret && ctrl.handshake_wait(default_timeout);
}
return ret;
}
};
} // namespace detail
} // namespace ipc