diff --git a/README.md b/README.md index 50a0437..56a12eb 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,8 @@ Performance data: [performence.xlsx](performence.xlsx) ## Reference - * [http://www.drdobbs.com/lock-free-data-structures/184401865](http://www.drdobbs.com/lock-free-data-structures/184401865) - * [https://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular](https://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular) - * [http://www.cnblogs.com/gaochundong/p/lock_free_programming.html](http://www.cnblogs.com/gaochundong/p/lock_free_programming.html) - * [https://coolshell.cn/articles/8239.html](https://coolshell.cn/articles/8239.html) + * [Lock-Free Data Structures | Dr Dobb's](http://www.drdobbs.com/lock-free-data-structures/184401865) + * [Yet another implementation of a lock-free circular array queue | CodeProject](https://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular) + * [Lock-Free 编程 | 匠心十年 - 博客园](http://www.cnblogs.com/gaochundong/p/lock_free_programming.html) + * [无锁队列的实现 | 酷 壳 - CoolShell](https://coolshell.cn/articles/8239.html) + * [Implementing Condition Variables with Semaphores](https://www.microsoft.com/en-us/research/wp-content/uploads/2004/12/ImplementingCVs.pdf) diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index 8715ace..7db2d15 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -3,6 +3,7 @@ #include #include +#include #include "rw_lock.h" #include "pool_alloc.h" @@ -13,59 +14,112 @@ namespace ipc { namespace detail { -class waiter { - long volatile counter_ = 0; - spin_lock lock_; +class semaphore { + HANDLE h_ = NULL; public: - using handle_t = HANDLE; - - constexpr static handle_t invalid() { - return NULL; + friend bool operator==(semaphore const & s1, semaphore const & s2) { + return s1.h_ == s2.h_; } + template + semaphore& open(Str const & name, long count = 0, long limit = LONG_MAX) { + h_ = ::CreateSemaphore(NULL, count, limit, name.c_str()); + return *this; + } + + void close() { + ::CloseHandle(h_); + } + + bool wait() { + return ::WaitForSingleObject(h_, INFINITE) == WAIT_OBJECT_0; + } + + bool post(long count = 1) { + if (count <= 0) return true; + return !!::ReleaseSemaphore(h_, count, NULL); + } +}; + +class mutex : public semaphore { + using semaphore::wait; + using semaphore::post; + +public: + template + mutex& open(Str const & name) { + semaphore::open(name, 1, 1); + return *this; + } + + bool lock () { return semaphore::wait(); } + bool unlock() { return semaphore::post(); } +}; + +class waiter { + long volatile counter_ = 0; + +public: + using handle_t = std::tuple; + + static handle_t invalid() { + return handle_t {}; + } + +private: + semaphore& s(handle_t& h) { return std::get<0>(h); } + semaphore& w(handle_t& h) { return std::get<1>(h); } + mutex & x(handle_t& h) { return std::get<2>(h); } + +public: handle_t open(char const * name) { if (name == nullptr || name[0] == '\0') return invalid(); - return ::CreateSemaphore(NULL, 0, LONG_MAX, ipc::detail::to_tchar(name).c_str()); + std::string n = name; + return handle_t { + semaphore {}.open(ipc::detail::to_tchar(n + "__S__")), + semaphore {}.open(ipc::detail::to_tchar(n + "__W__")), + mutex {}.open(ipc::detail::to_tchar(n + "__X__")) + }; } - void close(handle_t h) { + void close(handle_t& h) { if (h == invalid()) return; - ::CloseHandle(h); + x(h).close(); + w(h).close(); + s(h).close(); } template - bool wait_if(handle_t h, F&& pred) { + bool wait_if(handle_t& h, F&& pred) { if (h == invalid()) return false; { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(x(h)); if (!std::forward(pred)()) return true; ++ counter_; } - return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0; + if (!s(h).wait()) return false; + return w(h).post(); } - void notify(handle_t h) { + void notify(handle_t& h) { if (h == invalid()) return; - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (counter_ == 0) return; + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(x(h)); + if (counter_ > 0) { + s(h).post(); -- counter_; - ::ReleaseSemaphore(h, 1, NULL); + w(h).wait(); } - ::Sleep(1); } - void broadcast(handle_t h) { + void broadcast(handle_t& h) { if (h == invalid()) return; - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - if (counter_ == 0) return; - long all_count = counter_; - counter_ = 0; - ::ReleaseSemaphore(h, all_count, NULL); + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(x(h)); + s(h).post(counter_); + while (counter_ > 0) { + -- counter_; + w(h).wait(); } - ::Sleep(1); } }; diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index f4c20e2..72dc0ff 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -323,6 +323,7 @@ void test_prod_cons() { } void Unit::test_route() { + //return; std::vector const datas = { "hello!", "foo", @@ -360,6 +361,7 @@ void Unit::test_route() { } void Unit::test_route_rtt() { + //return; test_stopwatch sw; std::thread t1 {[&] { @@ -399,6 +401,7 @@ void Unit::test_route_rtt() { } void Unit::test_route_performance() { + //return; ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) { test_prod_cons(); }); @@ -406,6 +409,7 @@ void Unit::test_route_performance() { } void Unit::test_channel() { + //return; std::thread t1 {[&] { ipc::channel cc { "my-ipc-channel" }; for (std::size_t i = 0;; ++i) {