impl condition for windows

This commit is contained in:
mutouyun 2019-02-14 14:11:03 +08:00
parent 75f5090d6a
commit dd80a79c3c
2 changed files with 104 additions and 50 deletions

View File

@ -62,7 +62,7 @@ public:
};
class condition {
pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER;
pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER;
public:
bool open() {
@ -163,8 +163,8 @@ public:
}
void close(handle_t h) {
if (h == invalid()) return;
if (opened_.fetch_sub(1, std::memory_order_release) == 1) {
if (h == invalid()) return;
sem_.close();
}
}

View File

@ -7,6 +7,7 @@
#include "rw_lock.h"
#include "pool_alloc.h"
#include "log.h"
#include "platform/to_tchar.h"
#include "platform/detail.h"
@ -22,10 +23,13 @@ public:
return s1.h_ == s2.h_;
}
template <typename Str>
semaphore& open(Str const & name, long count = 0, long limit = LONG_MAX) {
h_ = ::CreateSemaphore(NULL, count, limit, name.c_str());
return *this;
bool open(std::string && name, long count = 0, long limit = LONG_MAX) {
h_ = ::CreateSemaphore(NULL, count, limit, ipc::detail::to_tchar(std::move(name)).c_str());
if (h_ == NULL) {
ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name.c_str());
return false;
}
return true;
}
void close() {
@ -46,82 +50,132 @@ class mutex : public semaphore {
using semaphore::post;
public:
template <typename Str>
mutex& open(Str const & name) {
semaphore::open(name, 1, 1);
return *this;
bool open(std::string && name) {
return semaphore::open(std::move(name), 1, 1);
}
bool lock () { return semaphore::wait(); }
bool unlock() { return semaphore::post(); }
};
class condition {
mutex lock_;
semaphore sema_, handshake_;
long volatile * counter_ = nullptr;
public:
friend bool operator==(condition const & c1, condition const & c2) {
return c1.counter_ == c2.counter_;
}
friend bool operator!=(condition const & c1, condition const & c2) {
return !(c1 == c2);
}
bool open(std::string const & name, long volatile * counter) {
if (lock_ .open(name + "__COND_MTX__") &&
sema_ .open(name + "__COND_SEM__") &&
handshake_.open(name + "__COND_HAN__")) {
counter_ = counter;
return true;
}
return false;
}
void close() {
handshake_.close();
sema_ .close();
lock_ .close();
}
template <typename Mutex, typename F>
bool wait_if(Mutex& mtx, F&& pred) {
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (!std::forward<F>(pred)()) return true;
++ *counter_;
}
mtx.unlock();
bool ret_s = sema_.wait();
bool ret_h = handshake_.post();
mtx.lock();
return ret_s && ret_h;
}
bool notify() {
bool ret_s = true, ret_h = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (*counter_ > 0) {
ret_s = sema_.post();
-- *counter_;
ret_h = handshake_.wait();
}
return ret_s && ret_h;
}
bool broadcast() {
bool ret_s = true, ret_h = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (*counter_ > 0) {
ret_s = sema_.post(*counter_);
do {
-- *counter_;
bool rh = handshake_.wait();
ret_h = ret_h && rh;
} while (*counter_ > 0);
}
return ret_s && ret_h;
}
};
class waiter {
long volatile counter_ = 0;
public:
using handle_t = std::tuple<semaphore, semaphore, mutex>;
using handle_t = condition;
static handle_t invalid() {
return handle_t {};
return condition {};
}
private:
semaphore& sem(handle_t& h) { return std::get<0>(h); }
semaphore& han(handle_t& h) { return std::get<1>(h); }
mutex & mtx(handle_t& h) { return std::get<2>(h); }
public:
handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') return invalid();
std::string n = name;
return handle_t {
semaphore {}.open(ipc::detail::to_tchar(n + "__SEM__")),
semaphore {}.open(ipc::detail::to_tchar(n + "__HAN__")),
mutex {}.open(ipc::detail::to_tchar(n + "__MTX__"))
};
if (name == nullptr || name[0] == '\0') {
return invalid();
}
condition cond;
if (cond.open(name, &counter_)) {
return cond;
}
return invalid();
}
void close(handle_t& h) {
if (h == invalid()) return;
mtx(h).close();
han(h).close();
sem(h).close();
h.close();
}
template <typename F>
bool wait_if(handle_t& h, F&& pred) {
if (h == invalid()) return false;
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(mtx(h));
if (!std::forward<F>(pred)()) return true;
++ counter_;
}
bool ret_s = sem(h).wait();
bool ret_h = han(h).post();
return ret_s && ret_h;
class non_mutex {
public:
void lock () {}
void unlock() {}
} nm;
return h.wait_if(nm, std::forward<F>(pred));
}
void notify(handle_t& h) {
if (h == invalid()) return;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(mtx(h));
if (counter_ > 0) {
sem(h).post();
-- counter_;
han(h).wait();
}
h.notify();
}
void broadcast(handle_t& h) {
if (h == invalid()) return;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(mtx(h));
if (counter_ > 0) {
sem(h).post(counter_);
do {
-- counter_;
han(h).wait();
} while (counter_ > 0);
}
h.broadcast();
}
};