fix bugs of windows-waiter

This commit is contained in:
mutouyun 2019-02-13 06:45:41 +08:00
parent a6d88a1208
commit cee63e5f81
3 changed files with 90 additions and 31 deletions

View File

@ -30,7 +30,8 @@ Performance data: [performence.xlsx](performence.xlsx)
## Reference
* [http://www.drdobbs.com/lock-free-data-structures/184401865](http://www.drdobbs.com/lock-free-data-structures/184401865)
* [https://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular](https://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular)
* [http://www.cnblogs.com/gaochundong/p/lock_free_programming.html](http://www.cnblogs.com/gaochundong/p/lock_free_programming.html)
* [https://coolshell.cn/articles/8239.html](https://coolshell.cn/articles/8239.html)
* [Lock-Free Data Structures | Dr Dobb's](http://www.drdobbs.com/lock-free-data-structures/184401865)
* [Yet another implementation of a lock-free circular array queue | CodeProject](https://www.codeproject.com/Articles/153898/Yet-another-implementation-of-a-lock-free-circular)
* [Lock-Free 编程 | 匠心十年 - 博客园](http://www.cnblogs.com/gaochundong/p/lock_free_programming.html)
* [无锁队列的实现 | 酷 壳 - CoolShell](https://coolshell.cn/articles/8239.html)
* [Implementing Condition Variables with Semaphores](https://www.microsoft.com/en-us/research/wp-content/uploads/2004/12/ImplementingCVs.pdf)

View File

@ -3,6 +3,7 @@
#include <Windows.h>
#include <atomic>
#include <tuple>
#include "rw_lock.h"
#include "pool_alloc.h"
@ -13,59 +14,112 @@
namespace ipc {
namespace detail {
class waiter {
long volatile counter_ = 0;
spin_lock lock_;
class semaphore {
HANDLE h_ = NULL;
public:
using handle_t = HANDLE;
constexpr static handle_t invalid() {
return NULL;
friend bool operator==(semaphore const & s1, semaphore const & s2) {
return s1.h_ == s2.h_;
}
template <typename Str>
semaphore& open(Str const & name, long count = 0, long limit = LONG_MAX) {
h_ = ::CreateSemaphore(NULL, count, limit, name.c_str());
return *this;
}
void close() {
::CloseHandle(h_);
}
bool wait() {
return ::WaitForSingleObject(h_, INFINITE) == WAIT_OBJECT_0;
}
bool post(long count = 1) {
if (count <= 0) return true;
return !!::ReleaseSemaphore(h_, count, NULL);
}
};
class mutex : public semaphore {
using semaphore::wait;
using semaphore::post;
public:
template <typename Str>
mutex& open(Str const & name) {
semaphore::open(name, 1, 1);
return *this;
}
bool lock () { return semaphore::wait(); }
bool unlock() { return semaphore::post(); }
};
class waiter {
long volatile counter_ = 0;
public:
using handle_t = std::tuple<semaphore, semaphore, mutex>;
static handle_t invalid() {
return handle_t {};
}
private:
semaphore& s(handle_t& h) { return std::get<0>(h); }
semaphore& w(handle_t& h) { return std::get<1>(h); }
mutex & x(handle_t& h) { return std::get<2>(h); }
public:
handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') return invalid();
return ::CreateSemaphore(NULL, 0, LONG_MAX, ipc::detail::to_tchar(name).c_str());
std::string n = name;
return handle_t {
semaphore {}.open(ipc::detail::to_tchar(n + "__S__")),
semaphore {}.open(ipc::detail::to_tchar(n + "__W__")),
mutex {}.open(ipc::detail::to_tchar(n + "__X__"))
};
}
void close(handle_t h) {
void close(handle_t& h) {
if (h == invalid()) return;
::CloseHandle(h);
x(h).close();
w(h).close();
s(h).close();
}
template <typename F>
bool wait_if(handle_t h, F&& pred) {
bool wait_if(handle_t& h, F&& pred) {
if (h == invalid()) return false;
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(x(h));
if (!std::forward<F>(pred)()) return true;
++ counter_;
}
return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0;
if (!s(h).wait()) return false;
return w(h).post();
}
void notify(handle_t h) {
void notify(handle_t& h) {
if (h == invalid()) return;
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (counter_ == 0) return;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(x(h));
if (counter_ > 0) {
s(h).post();
-- counter_;
::ReleaseSemaphore(h, 1, NULL);
w(h).wait();
}
::Sleep(1);
}
void broadcast(handle_t h) {
void broadcast(handle_t& h) {
if (h == invalid()) return;
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (counter_ == 0) return;
long all_count = counter_;
counter_ = 0;
::ReleaseSemaphore(h, all_count, NULL);
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(x(h));
s(h).post(counter_);
while (counter_ > 0) {
-- counter_;
w(h).wait();
}
::Sleep(1);
}
};

View File

@ -323,6 +323,7 @@ void test_prod_cons() {
}
void Unit::test_route() {
//return;
std::vector<char const *> const datas = {
"hello!",
"foo",
@ -360,6 +361,7 @@ void Unit::test_route() {
}
void Unit::test_route_rtt() {
//return;
test_stopwatch sw;
std::thread t1 {[&] {
@ -399,6 +401,7 @@ void Unit::test_route_rtt() {
}
void Unit::test_route_performance() {
//return;
ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) {
test_prod_cons<ipc::route, 1, decltype(index)::value + 1, false>();
});
@ -406,6 +409,7 @@ void Unit::test_route_performance() {
}
void Unit::test_channel() {
//return;
std::thread t1 {[&] {
ipc::channel cc { "my-ipc-channel" };
for (std::size_t i = 0;; ++i) {