diff --git a/include/elem_circ.h b/include/elem_circ.h index 55cb73d..c492617 100644 --- a/include/elem_circ.h +++ b/include/elem_circ.h @@ -8,6 +8,8 @@ #include "rw_lock.h" #include "elem_def.h" +#include "platform/waiter.h" + namespace ipc { namespace circ { @@ -257,6 +259,7 @@ public: private: head_t head_; + ipc::detail::waiter waiter_; elem_t block_[elem_max]; public: @@ -264,8 +267,11 @@ public: elem_array(const elem_array&) = delete; elem_array& operator=(const elem_array&) = delete; - auto & waiter() { return head_.waiter_; } - auto const & waiter() const { return head_.waiter_; } + auto & waiter() { return this->waiter_; } + auto const & waiter() const { return this->waiter_; } + + auto & conn_waiter() { return head_.conn_waiter(); } + auto const & conn_waiter() const { return head_.conn_waiter(); } std::size_t connect () noexcept { return head_.connect (); } std::size_t disconnect() noexcept { return head_.disconnect(); } diff --git a/include/elem_def.h b/include/elem_def.h index 4b8b05a..fc51f5a 100644 --- a/include/elem_def.h +++ b/include/elem_def.h @@ -10,9 +10,12 @@ namespace ipc { template struct conn_head { - ipc::detail::waiter waiter_; + ipc::detail::waiter cc_waiter_; std::atomic cc_ { 0 }; // connection counter + auto & conn_waiter() { return this->cc_waiter_; } + auto const & conn_waiter() const { return this->cc_waiter_; } + std::size_t connect() noexcept { return cc_.fetch_add(1, std::memory_order_release); } diff --git a/include/ipc.h b/include/ipc.h index 18eca8f..0f273fa 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -20,7 +20,7 @@ struct IPC_EXPORT channel_detail { static void disconnect(handle_t h); static std::size_t recv_count(handle_t h); - static void wait_for_recv(handle_t h, std::size_t r_count); + static bool wait_for_recv(handle_t h, std::size_t r_count); static void clear_recv(handle_t h); static void clear_recv(char const * name); @@ -94,11 +94,11 @@ public: return Detail::recv_count(h_); } - void wait_for_recv(std::size_t r_count) const { + bool wait_for_recv(std::size_t r_count) const { return Detail::wait_for_recv(h_, r_count); } - static void wait_for_recv(char const * name, std::size_t r_count) { + static bool wait_for_recv(char const * name, std::size_t r_count) { return channel_impl(name).wait_for_recv(r_count); } diff --git a/include/queue.h b/include/queue.h index 984b35b..1b4b5a4 100644 --- a/include/queue.h +++ b/include/queue.h @@ -27,7 +27,7 @@ public: private: elems_t* elems_ = nullptr; - ipc::detail::waiter_impl wi_; + ipc::detail::waiter_impl waiter_, cc_waiter_; decltype(std::declval().cursor()) cursor_ = 0; std::atomic_bool connected_ { false }; @@ -53,7 +53,9 @@ public: // if it's already connected, just return an error count return invalid_value; } - return elems_->connect(); + auto ret = elems_->connect(); + cc_waiter_.broadcast(); + return ret; } std::size_t disconnect() noexcept { @@ -62,13 +64,23 @@ public: // if it's already disconnected, just return an error count return invalid_value; } - return elems_->disconnect(); + auto ret = elems_->disconnect(); + cc_waiter_.broadcast(); + return ret; } std::size_t conn_count() const noexcept { return (elems_ == nullptr) ? invalid_value : elems_->conn_count(); } + bool wait_for_connect(std::size_t count) { + if (elems_ == nullptr) return false; + for (unsigned k = 0; elems_->conn_count() < count;) { + ipc::sleep(k, [this] { return cc_waiter_.wait(); }); + } + return true; + } + bool empty() const noexcept { return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor()); } @@ -82,12 +94,16 @@ public: auto old = elems_; elems_ = els; if (name == nullptr) { - wi_.close(); - wi_.attach(nullptr); + waiter_.close(); + waiter_.attach(nullptr); + cc_waiter_.close(); + cc_waiter_.attach(nullptr); } else { - wi_.attach(&(elems_->waiter())); - wi_.open((std::string{ "__IPC_WAITER__" } +name).c_str()); + waiter_.attach(&(elems_->waiter())); + waiter_.open((std::string{ "__IPC_WAITER__" } + name).c_str()); + cc_waiter_.attach(&(elems_->conn_waiter())); + cc_waiter_.open((std::string{ "__IPC_CC_WAITER__" } + name).c_str()); } cursor_ = elems_->cursor(); return old; @@ -106,7 +122,7 @@ public: if (elems_->push([&](void* p) { ::new (p) T(std::forward

(params)...); })) { - wi_.notify(); + waiter_.broadcast(); return true; } return false; @@ -123,7 +139,7 @@ public: })) { return item; } - ipc::sleep(k, [this] { return wi_.wait(); }); + ipc::sleep(k, [this] { return waiter_.wait(); }); } } }; diff --git a/src/ipc.cpp b/src/ipc.cpp index b379c33..55ea328 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -119,10 +119,12 @@ static std::size_t recv_count(handle_t h) { return que->conn_count(); } -static void wait_for_recv(handle_t h, std::size_t r_count) { - for (unsigned k = 0; recv_count(h) < r_count;) { - ipc::sleep(k); +static bool wait_for_recv(handle_t h, std::size_t r_count) { + auto que = queue_of(h); + if (que == nullptr) { + return false; } + return que->wait_for_connect(r_count); } static void clear_recv(handle_t h) { @@ -237,7 +239,7 @@ std::size_t channel_detail::recv_count(handle_t h) { } template