From 6730fa578d19a594e949b50aa79d2fc4a4e2933e Mon Sep 17 00:00:00 2001 From: zhangyi Date: Tue, 26 Mar 2019 11:10:22 +0800 Subject: [PATCH] simplify --- src/ipc.cpp | 35 ++++++++++++++------------ src/platform/waiter_win.h | 46 ++++++++++++----------------------- src/platform/waiter_wrapper.h | 13 +++++++--- 3 files changed, 44 insertions(+), 50 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index cdc7cc6..9bd6b67 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -75,6 +75,19 @@ struct cache_t { } }; +template +void wait_for(W& waiter, F&& pred) { + for (unsigned k = 0; pred();) { + bool ret = true; + ipc::sleep(k, [&ret, &waiter, &pred] { + return waiter.wait_if([&ret, &pred] { + return ret = pred(); + }); + }); + if (!ret) break; + } +} + template struct detail_impl { @@ -148,13 +161,9 @@ static bool wait_for_recv(ipc::handle_t h, std::size_t r_count) { if (que == nullptr) { return false; } - for (unsigned k = 0; que->conn_count() < r_count;) { - ipc::sleep(k, [h, que, r_count] { - return info_of(h)->cc_waiter_.wait_if([que, r_count] { - return que->conn_count() < r_count; - }); - }); - } + wait_for(info_of(h)->cc_waiter_, [que, r_count] { + return que->conn_count() < r_count; + }); return true; } @@ -202,15 +211,9 @@ static buff_t recv(ipc::handle_t h) { while (1) { // pop a new message typename queue_t::value_t msg; - for (unsigned k = 0; !que->pop(msg);) { - bool succ = false; - ipc::sleep(k, [h, que, &msg, &succ] { - return info_of(h)->rd_waiter_.wait_if([que, &msg, &succ] { - return !(succ = que->pop(msg)); - }); - }); - if (succ) break; - } + wait_for(info_of(h)->rd_waiter_, [que, &msg] { + return !que->pop(msg); + }); if (msg.head_.que_ == nullptr) return {}; if (msg.head_.que_ == que) continue; // pop next // msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index d5190e7..fbe08de 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -20,10 +20,6 @@ class semaphore { HANDLE h_ = NULL; public: - friend bool operator==(semaphore const & s1, semaphore const & s2) { - return s1.h_ == s2.h_; - } - bool open(std::string && name, long count = 0, long limit = LONG_MAX) { h_ = ::CreateSemaphore(NULL, count, limit, ipc::detail::to_tchar(std::move(name)).c_str()); if (h_ == NULL) { @@ -72,27 +68,23 @@ class condition { mutex lock_; semaphore sema_, handshake_; - ipc::shm::handle waiting_; // std::atomic - long * counter_ = nullptr; - - auto waiting_cnt() { - return static_cast*>(waiting_.get()); - } + std::atomic * waiting_ = nullptr; + long * counter_ = nullptr; public: friend bool operator==(condition const & c1, condition const & c2) { - return c1.counter_ == c2.counter_; + return (c1.waiting_ == c2.waiting_) && (c1.counter_ == c2.counter_); } friend bool operator!=(condition const & c1, condition const & c2) { return !(c1 == c2); } - bool open(std::string const & name, long * counter) { - if (lock_ .open ("__COND_MTX__" + name) && - sema_ .open ("__COND_SEM__" + name) && - handshake_.open ("__COND_HAN__" + name) && - waiting_ .acquire(("__COND_WAITING_CNT__" + name).c_str(), sizeof(std::atomic))) { + bool open(std::string const & name, std::atomic * waiting, long * counter) { + if (lock_ .open("__COND_MTX__" + name) && + sema_ .open("__COND_SEM__" + name) && + handshake_.open("__COND_HAN__" + name)) { + waiting_ = waiting; counter_ = counter; return true; } @@ -100,7 +92,6 @@ public: } void close() { - waiting_ .release(); handshake_.close(); sema_ .close(); lock_ .close(); @@ -108,10 +99,7 @@ public: template bool wait_if(Mutex& mtx, F&& pred) { - auto cnt = waiting_cnt(); - if (cnt != nullptr) { - cnt->fetch_add(1, std::memory_order_release); - } + waiting_->fetch_add(1, std::memory_order_release); { IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); if (!std::forward(pred)()) return true; @@ -119,9 +107,7 @@ public: } mtx.unlock(); bool ret = sema_.wait(); - if (cnt != nullptr) { - cnt->fetch_sub(1, std::memory_order_release); - } + waiting_->fetch_sub(1, std::memory_order_release); ret = handshake_.post() && ret; mtx.lock(); return ret; @@ -129,8 +115,7 @@ public: bool notify() { std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_cnt() != nullptr && - waiting_cnt()->load(std::memory_order_relaxed) == 0) { + if (waiting_->load(std::memory_order_relaxed) == 0) { return true; } bool ret = true; @@ -145,8 +130,7 @@ public: bool broadcast() { std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_cnt() != nullptr && - waiting_cnt()->load(std::memory_order_relaxed) == 0) { + if (waiting_->load(std::memory_order_relaxed) == 0) { return true; } bool ret = true; @@ -163,7 +147,9 @@ public: }; class waiter { - long counter_ = 0; + + std::atomic waiting_ { 0 }; + long counter_ = 0; public: using handle_t = condition; @@ -177,7 +163,7 @@ public: return invalid(); } condition cond; - if (cond.open(name, &counter_)) { + if (cond.open(name, &waiting_, &counter_)) { return cond; } return invalid(); diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 5bb16ec..0941676 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -20,19 +20,24 @@ using mutex_impl = ipc::detail::mutex; using semaphore_impl = ipc::detail::semaphore; class condition_impl : public ipc::detail::condition { - ipc::shm::handle h_; + + ipc::shm::handle wait_h_, cnt_h_; public: bool open(std::string const & name) { - if (h_.acquire((name + "__COND_CNT__").c_str(), sizeof(long))) { - return ipc::detail::condition::open(name, static_cast(h_.get())); + if (wait_h_.acquire((name + "__COND_WAIT__").c_str(), sizeof(std::atomic)) && + cnt_h_ .acquire((name + "__COND_CNT__" ).c_str(), sizeof(long))) { + return ipc::detail::condition::open(name, + static_cast *>(wait_h_.get()), + static_cast(cnt_h_.get())); } return false; } void close() { ipc::detail::condition::close(); - h_.release(); + wait_h_.release(); + cnt_h_ .release(); } bool wait(mutex_impl& mtx) {