use waiter for wait_for_recv

This commit is contained in:
mutouyun 2019-01-16 18:06:21 +08:00
parent e09ea90949
commit d25f070627
5 changed files with 46 additions and 19 deletions

View File

@ -8,6 +8,8 @@
#include "rw_lock.h"
#include "elem_def.h"
#include "platform/waiter.h"
namespace ipc {
namespace circ {
@ -257,6 +259,7 @@ public:
private:
head_t head_;
ipc::detail::waiter waiter_;
elem_t block_[elem_max];
public:
@ -264,8 +267,11 @@ public:
elem_array(const elem_array&) = delete;
elem_array& operator=(const elem_array&) = delete;
auto & waiter() { return head_.waiter_; }
auto const & waiter() const { return head_.waiter_; }
auto & waiter() { return this->waiter_; }
auto const & waiter() const { return this->waiter_; }
auto & conn_waiter() { return head_.conn_waiter(); }
auto const & conn_waiter() const { return head_.conn_waiter(); }
std::size_t connect () noexcept { return head_.connect (); }
std::size_t disconnect() noexcept { return head_.disconnect(); }

View File

@ -10,9 +10,12 @@ namespace ipc {
template <typename U2>
struct conn_head {
ipc::detail::waiter waiter_;
ipc::detail::waiter cc_waiter_;
std::atomic<U2> cc_ { 0 }; // connection counter
auto & conn_waiter() { return this->cc_waiter_; }
auto const & conn_waiter() const { return this->cc_waiter_; }
std::size_t connect() noexcept {
return cc_.fetch_add(1, std::memory_order_release);
}

View File

@ -20,7 +20,7 @@ struct IPC_EXPORT channel_detail {
static void disconnect(handle_t h);
static std::size_t recv_count(handle_t h);
static void wait_for_recv(handle_t h, std::size_t r_count);
static bool wait_for_recv(handle_t h, std::size_t r_count);
static void clear_recv(handle_t h);
static void clear_recv(char const * name);
@ -94,11 +94,11 @@ public:
return Detail::recv_count(h_);
}
void wait_for_recv(std::size_t r_count) const {
bool wait_for_recv(std::size_t r_count) const {
return Detail::wait_for_recv(h_, r_count);
}
static void wait_for_recv(char const * name, std::size_t r_count) {
static bool wait_for_recv(char const * name, std::size_t r_count) {
return channel_impl(name).wait_for_recv(r_count);
}

View File

@ -27,7 +27,7 @@ public:
private:
elems_t* elems_ = nullptr;
ipc::detail::waiter_impl wi_;
ipc::detail::waiter_impl waiter_, cc_waiter_;
decltype(std::declval<elems_t>().cursor()) cursor_ = 0;
std::atomic_bool connected_ { false };
@ -53,7 +53,9 @@ public:
// if it's already connected, just return an error count
return invalid_value;
}
return elems_->connect();
auto ret = elems_->connect();
cc_waiter_.broadcast();
return ret;
}
std::size_t disconnect() noexcept {
@ -62,13 +64,23 @@ public:
// if it's already disconnected, just return an error count
return invalid_value;
}
return elems_->disconnect();
auto ret = elems_->disconnect();
cc_waiter_.broadcast();
return ret;
}
std::size_t conn_count() const noexcept {
return (elems_ == nullptr) ? invalid_value : elems_->conn_count();
}
bool wait_for_connect(std::size_t count) {
if (elems_ == nullptr) return false;
for (unsigned k = 0; elems_->conn_count() < count;) {
ipc::sleep(k, [this] { return cc_waiter_.wait(); });
}
return true;
}
bool empty() const noexcept {
return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor());
}
@ -82,12 +94,16 @@ public:
auto old = elems_;
elems_ = els;
if (name == nullptr) {
wi_.close();
wi_.attach(nullptr);
waiter_.close();
waiter_.attach(nullptr);
cc_waiter_.close();
cc_waiter_.attach(nullptr);
}
else {
wi_.attach(&(elems_->waiter()));
wi_.open((std::string{ "__IPC_WAITER__" } +name).c_str());
waiter_.attach(&(elems_->waiter()));
waiter_.open((std::string{ "__IPC_WAITER__" } + name).c_str());
cc_waiter_.attach(&(elems_->conn_waiter()));
cc_waiter_.open((std::string{ "__IPC_CC_WAITER__" } + name).c_str());
}
cursor_ = elems_->cursor();
return old;
@ -106,7 +122,7 @@ public:
if (elems_->push([&](void* p) {
::new (p) T(std::forward<P>(params)...);
})) {
wi_.notify();
waiter_.broadcast();
return true;
}
return false;
@ -123,7 +139,7 @@ public:
})) {
return item;
}
ipc::sleep(k, [this] { return wi_.wait(); });
ipc::sleep(k, [this] { return waiter_.wait(); });
}
}
};

View File

@ -119,10 +119,12 @@ static std::size_t recv_count(handle_t h) {
return que->conn_count();
}
static void wait_for_recv(handle_t h, std::size_t r_count) {
for (unsigned k = 0; recv_count(h) < r_count;) {
ipc::sleep(k);
static bool wait_for_recv(handle_t h, std::size_t r_count) {
auto que = queue_of(h);
if (que == nullptr) {
return false;
}
return que->wait_for_connect(r_count);
}
static void clear_recv(handle_t h) {
@ -237,7 +239,7 @@ std::size_t channel_detail<Queue, Policy>::recv_count(handle_t h) {
}
template <template <typename...> class Queue, typename Policy>
void channel_detail<Queue, Policy>::wait_for_recv(handle_t h, std::size_t r_count) {
bool channel_detail<Queue, Policy>::wait_for_recv(handle_t h, std::size_t r_count) {
return detail_impl<Queue, Policy>::wait_for_recv(h, r_count);
}