diff --git a/src/libipc/platform/condition_win.h b/src/libipc/platform/condition_win.h index 02a68ea..5d82d47 100644 --- a/src/libipc/platform/condition_win.h +++ b/src/libipc/platform/condition_win.h @@ -1,52 +1,116 @@ #pragma once #include +#include +#include #include #include "libipc/utility/log.h" +#include "libipc/utility/scope_guard.h" +#include "libipc/platform/detail.h" #include "libipc/mutex.h" +#include "libipc/semaphore.h" +#include "libipc/shm.h" namespace ipc { namespace detail { namespace sync { class condition { - HANDLE h_ = NULL; + ipc::sync::semaphore sem_; + ipc::sync::mutex lock_; + ipc::shm::handle shm_; + + std::int32_t &counter() { + return *static_cast(shm_.get()); + } public: - condition() noexcept = default; + condition() = default; ~condition() noexcept = default; - HANDLE native() const noexcept { - return h_; + auto native() noexcept { + return sem_.native(); + } + + auto native() const noexcept { + return sem_.native(); } bool valid() const noexcept { - return h_ != NULL; + return sem_.valid() && lock_.valid() && shm_.valid(); } bool open(char const *name) noexcept { close(); + if (!sem_.open((std::string{"_cond_sem_"} + name).c_str())) { + return false; + } + auto finally_sem = ipc::guard([this] { sem_.close(); }); // close when failed + if (!lock_.open((std::string{"_cond_lock_"} + name).c_str())) { + return false; + } + auto finally_lock = ipc::guard([this] { lock_.close(); }); // close when failed + if (!shm_.acquire((std::string{"_cond_shm_"} + name).c_str(), sizeof(std::int32_t))) { + return false; + } + finally_lock.dismiss(); + finally_sem.dismiss(); return valid(); } void close() noexcept { if (!valid()) return; - ::CloseHandle(h_); - h_ = NULL; + sem_.close(); + lock_.close(); + shm_.release(); } bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { - return true; + if (!valid()) return false; + auto &cnt = counter(); + { + IPC_UNUSED_ std::lock_guard guard {lock_}; + cnt = (cnt < 0) ? 1 : cnt + 1; + } + DWORD ms = (tm == invalid_value) ? INFINITE : static_cast(tm); + /** + * @see + * - https://www.microsoft.com/en-us/research/wp-content/uploads/2004/12/ImplementingCVs.pdf + * - https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-signalobjectandwait + */ + bool rs = ::SignalObjectAndWait(mtx.native(), sem_.native(), ms, FALSE) == WAIT_OBJECT_0; + bool rl = mtx.lock(); // INFINITE + if (!rs) { + IPC_UNUSED_ std::lock_guard guard {lock_}; + cnt -= 1; + } + return rs && rl; } bool notify() noexcept { - return true; + if (!valid()) return false; + auto &cnt = counter(); + if (!lock_.lock()) return false; + bool ret = false; + if (cnt > 0) { + ret = sem_.post(1); + cnt -= 1; + } + return lock_.unlock() && ret; } bool broadcast() noexcept { - return true; + if (!valid()) return false; + auto &cnt = counter(); + if (!lock_.lock()) return false; + bool ret = false; + if (cnt > 0) { + ret = sem_.post(cnt); + cnt = 0; + } + return lock_.unlock() && ret; } }; diff --git a/src/libipc/platform/to_tchar.h b/src/libipc/platform/to_tchar.h index 3cc2840..61def06 100755 --- a/src/libipc/platform/to_tchar.h +++ b/src/libipc/platform/to_tchar.h @@ -41,7 +41,7 @@ constexpr auto to_tchar(ipc::string &&str) -> IsSameChar #include #include +#include #include #include "test.h" @@ -129,8 +130,10 @@ TEST(Sync, Condition) { std::printf("test-cond-%d: %d\n", num, val); } }; - std::thread test_cond1 {job, 1}; - std::thread test_cond2 {job, 2}; + std::array test_conds; + for (int i = 0; i < (int)test_conds.size(); ++i) { + test_conds[i] = std::thread{job, i}; + } for (int i = 1; i < 100; ++i) { { @@ -150,11 +153,11 @@ TEST(Sync, Condition) { } { std::lock_guard guard {lock}; - que.push_back(0); - que.push_back(0); + for (int i = 0; i < (int)test_conds.size(); ++i) { + que.push_back(0); + } } cond.broadcast(); - test_cond1.join(); - test_cond2.join(); + for (auto &t : test_conds) t.join(); } \ No newline at end of file