diff --git a/demo/chat/main.cpp b/demo/chat/main.cpp index 715dd0d..d3fc0ac 100755 --- a/demo/chat/main.cpp +++ b/demo/chat/main.cpp @@ -26,10 +26,6 @@ ipc::channel receiver__ { name__, ipc::receiver }; } // namespace int main() { - ::signal(SIGINT, [](int) { - receiver__.disconnect(); - }); - std::string buf, id = id__ + std::to_string(calc_unique_id()); std::regex reg { "(c\\d+)> (.*)" }; @@ -58,6 +54,7 @@ int main() { if (buf.empty() || (buf == quit__)) break; // std::cout << "[" << i << "]" << std::endl; sender__.send(id + "> " + buf); + buf.clear(); } receiver__.disconnect(); diff --git a/src/ipc.cpp b/src/ipc.cpp index 71c0a1c..379799a 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -280,10 +280,10 @@ struct conn_info_head { , acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) { } - void enable(bool e) { - cc_waiter_.set_enabled(e); - wt_waiter_.set_enabled(e); - rd_waiter_.set_enabled(e); + void quit_waiting() { + cc_waiter_.quit_waiting(); + wt_waiter_.quit_waiting(); + rd_waiter_.quit_waiting(); } auto acc() { @@ -356,7 +356,6 @@ static bool connect(handle_t * ph, char const * name, bool start) { if (que == nullptr) { return false; } - info_of(*ph)->enable(true); if (start) { if (que->connect()) { // wouldn't connect twice info_of(*ph)->cc_waiter_.broadcast(); @@ -371,7 +370,7 @@ static void disconnect(ipc::handle_t h) { return; } bool dis = que->disconnect(); - info_of(h)->enable(false); + info_of(h)->quit_waiting(); if (dis) { info_of(h)->recv_cache().clear(); } diff --git a/src/libipc/platform/waiter_linux.h b/src/libipc/platform/waiter_linux.h index 0fe0ed2..d8ff4ef 100755 --- a/src/libipc/platform/waiter_linux.h +++ b/src/libipc/platform/waiter_linux.h @@ -10,8 +10,10 @@ #include #include #include +#include #include "libipc/def.h" +#include "libipc/waiter_helper.h" #include "libipc/utility/log.h" #include "libipc/utility/scope_guard.h" @@ -178,7 +180,7 @@ public: return SEM_FAILED; } - static handle_t open(char const* name, long count) { + static handle_t open(char const * name, long count) { handle_t sem = ::sem_open(name, O_CREAT, 0666, count); if (sem == SEM_FAILED) { ipc::error("fail sem_open[%d]: %s\n", errno, name); @@ -201,13 +203,19 @@ public: IPC_SEMAPHORE_FUNC_(sem_close, h); } - static bool destroy(char const* name) { + static bool destroy(char const * name) { IPC_SEMAPHORE_FUNC_(sem_unlink, name); } - static bool post(handle_t h) { + static bool post(handle_t h, long count) { if (h == invalid()) return false; - IPC_SEMAPHORE_FUNC_(sem_post, h); + auto spost = [](handle_t h) { + IPC_SEMAPHORE_FUNC_(sem_post, h); + }; + for (long i = 0; i < count; ++i) { + if (!spost(h)) return false; + } + return true; } static bool wait(handle_t h, std::size_t tm = invalid_value) { @@ -235,23 +243,61 @@ public: #pragma pop_macro("IPC_SEMAPHORE_FUNC_") }; -class waiter_helper { +class waiter_holder { +public: + using handle_t = std::tuple< + ipc::string, + sem_helper::handle_t /* sema */, + sem_helper::handle_t /* handshake */>; + + static handle_t invalid() noexcept { + return std::make_tuple( + ipc::string{}, + sem_helper::invalid(), + sem_helper::invalid()); + } + +private: mutex lock_; + waiter_helper::wait_counter cnt_; - std::atomic waiting_ { 0 }; - long counter_ = 0; + struct contrl { + waiter_holder * me_; + waiter_helper::wait_flags * flags_; + handle_t const & h_; - enum : unsigned { - destruct_mask = (std::numeric_limits::max)() >> 1, - destruct_flag = ~destruct_mask + waiter_helper::wait_flags & flags() noexcept { + assert(flags_ != nullptr); + return *flags_; + } + + waiter_helper::wait_counter & counter() noexcept { + return me_->cnt_; + } + + auto get_lock() { + return ipc::detail::unique_lock(me_->lock_); + } + + bool sema_wait(std::size_t tm) { + return sem_helper::wait(std::get<1>(h_), tm); + } + + bool sema_post(long count) { + return sem_helper::post(std::get<1>(h_), count); + } + + bool handshake_wait(std::size_t tm) { + return sem_helper::wait(std::get<2>(h_), tm); + } + + bool handshake_post(long count) { + return sem_helper::post(std::get<2>(h_), count); + } }; public: - using handle_t = std::tuple; - - static handle_t invalid() noexcept { - return std::make_tuple(ipc::string{}, sem_helper::invalid(), sem_helper::invalid()); - } + using wait_flags = waiter_helper::wait_flags; handle_t open_h(ipc::string && name) { auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0); @@ -285,95 +331,38 @@ public: } template - bool wait_if(handle_t const & h, std::atomic const & enabled, F&& pred, std::size_t tm = invalid_value) { - if (!enabled.load(std::memory_order_acquire)) { - return false; - } - waiting_.fetch_add(1, std::memory_order_release); - auto finally = ipc::guard([this] { - waiting_.fetch_sub(1, std::memory_order_release); - }); - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (!std::forward(pred)()) return true; - ++ counter_; - } - bool ret = false; - do { - if (!enabled.load(std::memory_order_acquire)) { - break; - } - ret = sem_helper::wait(std::get<1>(h), tm); - } while (waiting_.load(std::memory_order_acquire) & destruct_flag); - finally.do_exit(); - ret = sem_helper::post(std::get<2>(h)) && ret; - return ret; + bool wait_if(handle_t const & h, wait_flags * flags, F&& pred, std::size_t tm = invalid_value) { + assert(flags != nullptr); + contrl ctrl { this, flags, h }; + return waiter_helper::wait_if(ctrl, mtx, std::forward(pred), tm); } bool notify(handle_t const & h) { - if ((waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (counter_ > 0) { - ret = sem_helper::post(std::get<1>(h)); - -- counter_; - ret = ret && sem_helper::wait(std::get<2>(h), default_timeout); - } - return ret; + contrl ctrl { this, nullptr, h }; + return waiter_helper::notify(ctrl); } bool broadcast(handle_t const & h) { - if ((waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (counter_ > 0) { - for (long i = 0; i < counter_; ++i) { - ret = ret && sem_helper::post(std::get<1>(h)); - } - do { - -- counter_; - ret = ret && sem_helper::wait(std::get<2>(h), default_timeout); - } while (counter_ > 0); - } - return ret; + contrl ctrl { this, nullptr, h }; + return waiter_helper::broadcast(ctrl); } - bool emit_destruction(handle_t const & h) { - if ((waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - waiting_.fetch_or(destruct_flag, std::memory_order_relaxed); - IPC_UNUSED_ auto finally = ipc::guard([this] { - waiting_.fetch_and(destruct_mask, std::memory_order_relaxed); - }); - if (counter_ > 0) { - for (long i = 0; i < counter_; ++i) { - ret = ret && sem_helper::post(std::get<1>(h)); - } - do { - -- counter_; - ret = ret && sem_helper::wait(std::get<2>(h), default_timeout); - } while (counter_ > 0); - } - return ret; + bool quit_waiting(handle_t const & h, wait_flags * flags) { + assert(flags != nullptr); + contrl ctrl { this, flags, h }; + return waiter_helper::quit_waiting(ctrl); } }; class waiter { - waiter_helper helper_; + waiter_holder helper_; std::atomic opened_ { 0 }; public: - using handle_t = waiter_helper::handle_t; + using handle_t = waiter_holder::handle_t; static handle_t invalid() noexcept { - return waiter_helper::invalid(); + return waiter_holder::invalid(); } handle_t open(char const * name) { @@ -396,9 +385,9 @@ public: } template - bool wait_if(handle_t h, std::atomic const & enabled, F && pred, std::size_t tm = invalid_value) { + bool wait_if(handle_t h, waiter_holder::wait_flags * flags, F && pred, std::size_t tm = invalid_value) { if (h == invalid()) return false; - return helper_.wait_if(h, enabled, std::forward(pred), tm); + return helper_.wait_if(h, flags, std::forward(pred), tm); } bool notify(handle_t h) { @@ -411,9 +400,9 @@ public: return helper_.broadcast(h); } - bool emit_destruction(handle_t h) { + bool quit_waiting(handle_t h, waiter_holder::wait_flags * flags) { if (h == invalid()) return false; - return helper_.emit_destruction(h); + return helper_.quit_waiting(h, flags); } }; diff --git a/src/libipc/platform/waiter_win.h b/src/libipc/platform/waiter_win.h index 0782510..155cadb 100755 --- a/src/libipc/platform/waiter_win.h +++ b/src/libipc/platform/waiter_win.h @@ -5,10 +5,12 @@ #include #include #include +#include #include "libipc/rw_lock.h" #include "libipc/pool_alloc.h" #include "libipc/shm.h" +#include "libipc/waiter_helper.h" #include "libipc/utility/log.h" #include "libipc/utility/scope_guard.h" @@ -44,8 +46,9 @@ public: switch ((ret = ::WaitForSingleObject(h_, ms))) { case WAIT_OBJECT_0: return true; - case WAIT_ABANDONED: case WAIT_TIMEOUT: + return false; + case WAIT_ABANDONED: default: ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); return false; @@ -77,18 +80,48 @@ public: class condition { mutex lock_; semaphore sema_, handshake_; + waiter_helper::wait_counter * cnt_ = nullptr; - std::atomic * waiting_ = nullptr; - long * counter_ = nullptr; + struct contrl { + condition * me_; + waiter_helper::wait_flags * flags_; - enum : unsigned { - destruct_mask = (std::numeric_limits::max)() >> 1, - destruct_flag = ~destruct_mask + waiter_helper::wait_flags & flags() noexcept { + assert(flags_ != nullptr); + return *flags_; + } + + waiter_helper::wait_counter & counter() noexcept { + assert(me_->cnt_ != nullptr); + return *(me_->cnt_); + } + + auto get_lock() { + return ipc::detail::unique_lock(me_->lock_); + } + + bool sema_wait(std::size_t tm) { + return me_->sema_.wait(tm); + } + + bool sema_post(long count) { + return me_->sema_.post(count); + } + + bool handshake_wait(std::size_t tm) { + return me_->handshake_.wait(tm); + } + + bool handshake_post(long count) { + return me_->handshake_.post(count); + } }; public: + using wait_flags = waiter_helper::wait_flags; + friend bool operator==(condition const & c1, condition const & c2) { - return (c1.waiting_ == c2.waiting_) && (c1.counter_ == c2.counter_); + return c1.cnt_ == c2.cnt_; } friend bool operator!=(condition const & c1, condition const & c2) { @@ -101,12 +134,11 @@ public: mutex ::remove((ipc::string{ "__COND_MTX__" } + name).c_str()); } - bool open(ipc::string const & name, std::atomic * waiting, long * counter) { + bool open(ipc::string const & name, waiter_helper::wait_counter * cnt) { if (lock_ .open("__COND_MTX__" + name) && sema_ .open("__COND_SEM__" + name) && handshake_.open("__COND_HAN__" + name)) { - waiting_ = waiting; - counter_ = counter; + cnt_ = cnt; return true; } return false; @@ -119,88 +151,31 @@ public: } template - bool wait_if(Mutex & mtx, std::atomic const & enabled, F && pred, std::size_t tm = invalid_value) { - if (!enabled.load(std::memory_order_acquire)) { - return false; - } - waiting_->fetch_add(1, std::memory_order_release); - auto finally = ipc::guard([this] { - waiting_->fetch_sub(1, std::memory_order_release); - }); - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (!std::forward(pred)()) return true; - ++ *counter_; - } - mtx.unlock(); - bool ret = false; - do { - if (!enabled.load(std::memory_order_acquire)) { - break; - } - ret = sema_.wait(tm); - } while (waiting_->load(std::memory_order_acquire) & destruct_flag); - finally.do_exit(); - ret = handshake_.post() && ret; - mtx.lock(); - return ret; + bool wait_if(Mutex & mtx, wait_flags * flags, F && pred, std::size_t tm = invalid_value) { + assert(flags != nullptr); + contrl ctrl { this, flags }; + return waiter_helper::wait_if(ctrl, mtx, std::forward(pred), tm); } bool notify() { - if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (*counter_ > 0) { - ret = sema_.post(); - -- *counter_; - ret = ret && handshake_.wait(default_timeout); - } - return ret; + contrl ctrl { this, nullptr }; + return waiter_helper::notify(ctrl); } bool broadcast() { - if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (*counter_ > 0) { - ret = sema_.post(*counter_); - do { - -- *counter_; - ret = ret && handshake_.wait(default_timeout); - } while (*counter_ > 0); - } - return ret; + contrl ctrl { this, nullptr }; + return waiter_helper::broadcast(ctrl); } - bool emit_destruction() { - if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - waiting_->fetch_or(destruct_flag, std::memory_order_relaxed); - IPC_UNUSED_ auto finally = ipc::guard([this] { - waiting_->fetch_and(destruct_mask, std::memory_order_relaxed); - }); - if (*counter_ > 0) { - ret = sema_.post(*counter_); - do { - -- *counter_; - ret = ret && handshake_.wait(default_timeout); - } while (*counter_ > 0); - } - return ret; + bool quit_waiting(wait_flags * flags) { + assert(flags != nullptr); + contrl ctrl { this, flags }; + return waiter_helper::quit_waiting(ctrl); } }; class waiter { - - std::atomic waiting_ { 0 }; - long counter_ = 0; + waiter_helper::wait_counter cnt_; public: using handle_t = condition; @@ -214,7 +189,7 @@ public: return invalid(); } condition cond; - if (cond.open(name, &waiting_, &counter_)) { + if (cond.open(name, &cnt_)) { return cond; } return invalid(); @@ -226,7 +201,7 @@ public: } template - bool wait_if(handle_t& h, std::atomic const & enabled, F&& pred, std::size_t tm = invalid_value) { + bool wait_if(handle_t& h, handle_t::wait_flags * flags, F&& pred, std::size_t tm = invalid_value) { if (h == invalid()) return false; class non_mutex { @@ -235,7 +210,7 @@ public: void unlock() noexcept {} } nm; - return h.wait_if(nm, enabled, std::forward(pred), tm); + return h.wait_if(nm, flags, std::forward(pred), tm); } bool notify(handle_t& h) { @@ -248,9 +223,9 @@ public: return h.broadcast(); } - bool emit_destruction(handle_t& h) { + bool quit_waiting(handle_t& h, handle_t::wait_flags * flags) { if (h == invalid()) return false; - return h.emit_destruction(); + return h.quit_waiting(flags); } }; diff --git a/src/libipc/platform/waiter_wrapper.h b/src/libipc/platform/waiter_wrapper.h index 4c30094..bbb9902 100755 --- a/src/libipc/platform/waiter_wrapper.h +++ b/src/libipc/platform/waiter_wrapper.h @@ -21,39 +21,39 @@ using mutex_impl = ipc::detail::mutex; using semaphore_impl = ipc::detail::semaphore; class condition_impl : public ipc::detail::condition { + using base_t = ipc::detail::condition; - ipc::shm::handle wait_h_, cnt_h_; - std::atomic enabled_ { false }; + ipc::shm::handle cnt_h_; + base_t::wait_flags flags_; public: static void remove(char const * name) { - ipc::detail::condition::remove(name); + base_t::remove(name); ipc::string n = name; ipc::shm::remove((n + "__COND_CNT__" ).c_str()); ipc::shm::remove((n + "__COND_WAIT__").c_str()); } - bool open(ipc::string const & name) { - if (wait_h_.acquire((name + "__COND_WAIT__").c_str(), sizeof(std::atomic)) && - cnt_h_ .acquire((name + "__COND_CNT__" ).c_str(), sizeof(long))) { - enabled_.store(true, std::memory_order_release); - return ipc::detail::condition::open(name, - static_cast *>(wait_h_.get()), - static_cast(cnt_h_.get())); + bool open(char const * name) { + if (cnt_h_ .acquire( + (ipc::string { name } + "__COND_CNT__" ).c_str(), + sizeof(waiter_helper::wait_counter))) { + flags_.is_closed_.store(false, std::memory_order_release); + return base_t::open(name, + static_cast(cnt_h_.get())); } return false; } void close() { - enabled_.store(false, std::memory_order_release); - ipc::detail::condition::emit_destruction(); - ipc::detail::condition::close(); - cnt_h_ .release(); - wait_h_.release(); + flags_.is_closed_.store(true, std::memory_order_release); + base_t::quit_waiting(&flags_); + base_t::close(); + cnt_h_.release(); } bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) { - return ipc::detail::condition::wait_if(mtx, enabled_, [] { return true; }, tm); + return base_t::wait_if(mtx, &flags_, [] { return true; }, tm); } }; @@ -169,17 +169,11 @@ public: } bool wait(std::size_t tm = invalid_value) { - if (h_ == sem_helper::invalid()) return false; return sem_helper::wait(h_, tm); } bool post(long count) { - if (h_ == sem_helper::invalid()) return false; - bool ret = true; - for (long i = 0; i < count; ++i) { - ret = ret && sem_helper::post(h_); - } - return ret; + return sem_helper::post(h_, count); } }; @@ -198,7 +192,7 @@ public: private: waiter_t* w_ = nullptr; waiter_t::handle_t h_ = waiter_t::invalid(); - std::atomic enabled_ { true }; + waiter_t::handle_t::wait_flags flags_; public: waiter_wrapper() = default; @@ -223,27 +217,27 @@ public: bool open(char const * name) { if (w_ == nullptr) return false; close(); + flags_.is_closed_.store(false, std::memory_order_release); h_ = w_->open(name); return valid(); } void close() { if (!valid()) return; + flags_.is_closed_.store(true, std::memory_order_release); + quit_waiting(); w_->close(h_); h_ = waiter_t::invalid(); } - void set_enabled(bool e) { - if (enabled_.exchange(e, std::memory_order_acq_rel) == e) { - return; - } - if (!e) w_->emit_destruction(h_); + void quit_waiting() { + w_->quit_waiting(h_, &flags_); } template bool wait_if(F && pred, std::size_t tm = invalid_value) { if (!valid()) return false; - return w_->wait_if(h_, enabled_, std::forward(pred), tm); + return w_->wait_if(h_, &flags_, std::forward(pred), tm); } bool notify() { diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h new file mode 100644 index 0000000..46b03a3 --- /dev/null +++ b/src/libipc/waiter_helper.h @@ -0,0 +1,129 @@ +#pragma once + +#include +#include +#include + +#include "libipc/def.h" +#include "libipc/utility/scope_guard.h" + +namespace ipc { +namespace detail { + +class waiter_helper { + + enum : unsigned { + destruct_mask = (std::numeric_limits::max)() >> 1, + destruct_flag = ~destruct_mask + }; + +public: + struct wait_counter { + std::atomic waiting_ { 0 }; + long counter_ = 0; + }; + + struct wait_flags { + std::atomic is_waiting_ { false }; + std::atomic is_closed_ { true }; + }; + + template + static bool wait_if(Ctrl & ctrl, Mutex & mtx, F && pred, std::size_t tm) { + auto & flags = ctrl.flags(); + if (flags.is_closed_.load(std::memory_order_acquire)) { + return false; + } + + auto & counter = ctrl.counter(); + counter.waiting_.fetch_add(1, std::memory_order_release); + flags.is_waiting_.store(true, std::memory_order_relaxed); + auto finally = ipc::guard([&counter, &flags] { + counter.waiting_.fetch_sub(1, std::memory_order_release); + flags.is_waiting_.store(false, std::memory_order_relaxed); + }); + { + IPC_UNUSED_ auto guard = ctrl.get_lock(); + if (!std::forward(pred)()) return true; + counter.counter_ += 1; + } + mtx.unlock(); + + bool ret = false; + do { + bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed); + bool is_closed = flags.is_closed_ .load(std::memory_order_acquire); + if (!is_waiting || is_closed) { + ret = false; + break; + } + ret = ctrl.sema_wait(tm); + } while (counter.waiting_.load(std::memory_order_acquire) & destruct_flag); + finally.do_exit(); + ret = ctrl.handshake_post(1) && ret; + + mtx.lock(); + return ret; + } + + template + static bool notify(Ctrl & ctrl) { + auto & counter = ctrl.counter(); + if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + return true; + } + bool ret = true; + IPC_UNUSED_ auto guard = ctrl.get_lock(); + if (counter.counter_ > 0) { + ret = ctrl.sema_post(1); + counter.counter_ -= 1; + ret = ret && ctrl.handshake_wait(default_timeout); + } + return ret; + } + + template + static bool broadcast(Ctrl & ctrl) { + auto & counter = ctrl.counter(); + if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + return true; + } + bool ret = true; + IPC_UNUSED_ auto guard = ctrl.get_lock(); + if (counter.counter_ > 0) { + ret = ctrl.sema_post(counter.counter_); + do { + counter.counter_ -= 1; + ret = ret && ctrl.handshake_wait(default_timeout); + } while (counter.counter_ > 0); + } + return ret; + } + + template + static bool quit_waiting(Ctrl & ctrl) { + auto & flags = ctrl.flags(); + if (!flags.is_waiting_.exchange(false, std::memory_order_release)) { + return true; + } + auto & counter = ctrl.counter(); + if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + return true; + } + bool ret = true; + IPC_UNUSED_ auto guard = ctrl.get_lock(); + counter.waiting_.fetch_or(destruct_flag, std::memory_order_relaxed); + IPC_UNUSED_ auto finally = ipc::guard([&counter] { + counter.waiting_.fetch_and(destruct_mask, std::memory_order_relaxed); + }); + if (counter.counter_ > 0) { + ret = ctrl.sema_post(counter.counter_); + counter.counter_ -= 1; + ret = ret && ctrl.handshake_wait(default_timeout); + } + return ret; + } +}; + +} // namespace detail +} // namespace ipc