From dd80a79c3c4ea2bac4119fddd4303a7e6f364f1d Mon Sep 17 00:00:00 2001 From: mutouyun Date: Thu, 14 Feb 2019 14:11:03 +0800 Subject: [PATCH] impl condition for windows --- src/platform/waiter_linux.h | 4 +- src/platform/waiter_win.h | 150 ++++++++++++++++++++++++------------ 2 files changed, 104 insertions(+), 50 deletions(-) diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index 3cf3eb2..e967a87 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -62,7 +62,7 @@ public: }; class condition { - pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER; + pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER; public: bool open() { @@ -163,8 +163,8 @@ public: } void close(handle_t h) { - if (h == invalid()) return; if (opened_.fetch_sub(1, std::memory_order_release) == 1) { + if (h == invalid()) return; sem_.close(); } } diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index 978dd3d..d780e2a 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -7,6 +7,7 @@ #include "rw_lock.h" #include "pool_alloc.h" +#include "log.h" #include "platform/to_tchar.h" #include "platform/detail.h" @@ -22,10 +23,13 @@ public: return s1.h_ == s2.h_; } - template - semaphore& open(Str const & name, long count = 0, long limit = LONG_MAX) { - h_ = ::CreateSemaphore(NULL, count, limit, name.c_str()); - return *this; + bool open(std::string && name, long count = 0, long limit = LONG_MAX) { + h_ = ::CreateSemaphore(NULL, count, limit, ipc::detail::to_tchar(std::move(name)).c_str()); + if (h_ == NULL) { + ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name.c_str()); + return false; + } + return true; } void close() { @@ -46,82 +50,132 @@ class mutex : public semaphore { using semaphore::post; public: - template - mutex& open(Str const & name) { - semaphore::open(name, 1, 1); - return *this; + bool open(std::string && name) { + return semaphore::open(std::move(name), 1, 1); } bool lock () { return semaphore::wait(); } bool unlock() { return semaphore::post(); } }; +class condition { + mutex lock_; + semaphore sema_, handshake_; + + long volatile * counter_ = nullptr; + +public: + friend bool operator==(condition const & c1, condition const & c2) { + return c1.counter_ == c2.counter_; + } + + friend bool operator!=(condition const & c1, condition const & c2) { + return !(c1 == c2); + } + + bool open(std::string const & name, long volatile * counter) { + if (lock_ .open(name + "__COND_MTX__") && + sema_ .open(name + "__COND_SEM__") && + handshake_.open(name + "__COND_HAN__")) { + counter_ = counter; + return true; + } + return false; + } + + void close() { + handshake_.close(); + sema_ .close(); + lock_ .close(); + } + + template + bool wait_if(Mutex& mtx, F&& pred) { + { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + if (!std::forward(pred)()) return true; + ++ *counter_; + } + mtx.unlock(); + bool ret_s = sema_.wait(); + bool ret_h = handshake_.post(); + mtx.lock(); + return ret_s && ret_h; + } + + bool notify() { + bool ret_s = true, ret_h = true; + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + if (*counter_ > 0) { + ret_s = sema_.post(); + -- *counter_; + ret_h = handshake_.wait(); + } + return ret_s && ret_h; + } + + bool broadcast() { + bool ret_s = true, ret_h = true; + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + if (*counter_ > 0) { + ret_s = sema_.post(*counter_); + do { + -- *counter_; + bool rh = handshake_.wait(); + ret_h = ret_h && rh; + } while (*counter_ > 0); + } + return ret_s && ret_h; + } +}; + class waiter { long volatile counter_ = 0; public: - using handle_t = std::tuple; + using handle_t = condition; static handle_t invalid() { - return handle_t {}; + return condition {}; } -private: - semaphore& sem(handle_t& h) { return std::get<0>(h); } - semaphore& han(handle_t& h) { return std::get<1>(h); } - mutex & mtx(handle_t& h) { return std::get<2>(h); } - -public: handle_t open(char const * name) { - if (name == nullptr || name[0] == '\0') return invalid(); - std::string n = name; - return handle_t { - semaphore {}.open(ipc::detail::to_tchar(n + "__SEM__")), - semaphore {}.open(ipc::detail::to_tchar(n + "__HAN__")), - mutex {}.open(ipc::detail::to_tchar(n + "__MTX__")) - }; + if (name == nullptr || name[0] == '\0') { + return invalid(); + } + condition cond; + if (cond.open(name, &counter_)) { + return cond; + } + return invalid(); } void close(handle_t& h) { if (h == invalid()) return; - mtx(h).close(); - han(h).close(); - sem(h).close(); + h.close(); } template bool wait_if(handle_t& h, F&& pred) { if (h == invalid()) return false; - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(mtx(h)); - if (!std::forward(pred)()) return true; - ++ counter_; - } - bool ret_s = sem(h).wait(); - bool ret_h = han(h).post(); - return ret_s && ret_h; + + class non_mutex { + public: + void lock () {} + void unlock() {} + } nm; + + return h.wait_if(nm, std::forward(pred)); } void notify(handle_t& h) { if (h == invalid()) return; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(mtx(h)); - if (counter_ > 0) { - sem(h).post(); - -- counter_; - han(h).wait(); - } + h.notify(); } void broadcast(handle_t& h) { if (h == invalid()) return; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(mtx(h)); - if (counter_ > 0) { - sem(h).post(counter_); - do { - -- counter_; - han(h).wait(); - } while (counter_ > 0); - } + h.broadcast(); } };