From d0e2a4d80c8fb0a350c71e83e1270939bbf1038a Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 12 Sep 2021 15:59:44 +0800 Subject: [PATCH] add semaphore for win --- include/libipc/mutex.h | 1 + include/libipc/semaphore.h | 37 +++++++ src/libipc/platform/condition_linux.h | 139 ++++++++++++++++++++++++++ src/libipc/platform/condition_win.h | 59 +++++++++++ src/libipc/platform/mutex_linux.h | 2 +- src/libipc/platform/semaphore_linux.h | 0 src/libipc/platform/semaphore_win.h | 79 +++++++++++++++ src/semaphore.cpp | 66 ++++++++++++ 8 files changed, 382 insertions(+), 1 deletion(-) create mode 100644 include/libipc/semaphore.h create mode 100644 src/libipc/platform/condition_linux.h create mode 100644 src/libipc/platform/condition_win.h create mode 100644 src/libipc/platform/semaphore_linux.h create mode 100644 src/libipc/platform/semaphore_win.h create mode 100644 src/semaphore.cpp diff --git a/include/libipc/mutex.h b/include/libipc/mutex.h index 23f7b53..2d4781f 100644 --- a/include/libipc/mutex.h +++ b/include/libipc/mutex.h @@ -25,6 +25,7 @@ public: bool open(char const *name) noexcept; void close() noexcept; + bool lock(std::uint64_t tm = ipc::invalid_value) noexcept; bool try_lock() noexcept(false); // std::system_error bool unlock() noexcept; diff --git a/include/libipc/semaphore.h b/include/libipc/semaphore.h new file mode 100644 index 0000000..ea8ca3d --- /dev/null +++ b/include/libipc/semaphore.h @@ -0,0 +1,37 @@ +#pragma once + +#include // std::uint64_t + +#include "libipc/export.h" +#include "libipc/def.h" + +namespace ipc { +namespace sync { + +class IPC_EXPORT semaphore { + semaphore(semaphore const &) = delete; + semaphore &operator=(semaphore const &) = delete; + +public: + semaphore(); + explicit semaphore(char const *name, std::uint32_t count = 0, std::uint32_t limit = ipc::invalid_value); + ~semaphore(); + + void const *native() const noexcept; + void *native() noexcept; + + bool valid() const noexcept; + + bool open(char const *name, std::uint32_t count = 0, std::uint32_t limit = ipc::invalid_value) noexcept; + void close() noexcept; + + bool wait(std::uint64_t tm = ipc::invalid_value) noexcept; + bool post(std::uint32_t count = 1) noexcept; + +private: + class semaphore_; + semaphore_* p_; +}; + +} // namespace sync +} // namespace ipc diff --git a/src/libipc/platform/condition_linux.h b/src/libipc/platform/condition_linux.h new file mode 100644 index 0000000..745d29c --- /dev/null +++ b/src/libipc/platform/condition_linux.h @@ -0,0 +1,139 @@ +#pragma once + +#include + +#include + +#include "libipc/utility/log.h" +#include "libipc/platform/get_wait_time.h" +#include "libipc/mutex.h" + +namespace ipc { +namespace detail { +namespace sync { + +class condition { + ipc::shm::handle shm_; + pthread_cond_t *cond_ = nullptr; + + pthread_cond_t *acquire_cond(char const *name) { + if (!shm_.acquire(name, sizeof(pthread_cond_t))) { + ipc::error("[acquire_cond] fail shm.acquire: %s\n", name); + return nullptr; + } + return static_cast(shm_.get()); + } + +public: + condition() = default; + explicit condition(char const *name) noexcept { + open(name); + } + + ~condition() = default; + + pthread_cond_t const *native() const noexcept { + return cond_; + } + + pthread_cond_t *native() noexcept { + return cond_; + } + + bool valid() const noexcept { + static const char tmp[sizeof(pthread_cond_t)] {}; + return (cond_ != nullptr) + && (std::memcmp(tmp, cond_, sizeof(pthread_cond_t)) != 0); + } + + bool open(char const *name) noexcept { + close(); + if ((cond_ = acquire_cond(name)) == nullptr) { + return false; + } + if (shm_.ref() == 1) { + ::pthread_cond_destroy(cond_); + auto finally = ipc::guard([this] { close(); }); // close when failed + // init condition + int eno; + pthread_condattr_t cond_attr; + if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) { + ipc::error("fail pthread_condattr_init[%d]\n", eno); + return false; + } + IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); + if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { + ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); + return false; + } + *cond_ = PTHREAD_COND_INITIALIZER; + if ((eno = ::pthread_cond_init(cond_, &cond_attr)) != 0) { + ipc::error("fail pthread_cond_init[%d]\n", eno); + return false; + } + finally.dismiss(); + } + return valid(); + } + + void close() noexcept { + if (shm_.ref() == 1) { + int eno; + if ((eno = ::pthread_cond_destroy(cond_)) != 0) { + ipc::error("fail pthread_cond_destroy[%d]\n", eno); + } + } + shm_.release(); + cond_ = nullptr; + } + + bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { + switch (tm) { + case 0: + return true; + case invalid_value: { + int eno; + if ((eno = ::pthread_cond_wait(cond_, mtx.native())) != 0) { + ipc::error("fail pthread_cond_wait[%d]\n", eno); + return false; + } + } + break; + default: { + auto ts = detail::make_timespec(tm); + int eno; + if ((eno = ::pthread_cond_timedwait(cond_, mtx.native(), &ts)) != 0) { + if (eno != ETIMEDOUT) { + ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + eno, tm, ts.tv_sec, ts.tv_nsec); + } + return false; + } + } + break; + } + return true; + } + + bool notify() noexcept { + int eno; + if ((eno = ::pthread_cond_signal(cond_)) != 0) { + ipc::error("fail pthread_cond_signal[%d]\n", eno); + return false; + } + return true; + } + + bool broadcast() noexcept { + int eno; + if ((eno = ::pthread_cond_broadcast(cond_)) != 0) { + ipc::error("fail pthread_cond_broadcast[%d]\n", eno); + return false; + } + return true; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/condition_win.h b/src/libipc/platform/condition_win.h new file mode 100644 index 0000000..19f3d0c --- /dev/null +++ b/src/libipc/platform/condition_win.h @@ -0,0 +1,59 @@ +#pragma once + +#include + +#include + +#include "libipc/utility/log.h" +#include "libipc/mutex.h" + +namespace ipc { +namespace detail { +namespace sync { + +class condition { + HANDLE h_ = NULL; + +public: + condition() noexcept = default; + explicit condition(char const *name) noexcept { + open(name); + } + + ~condition() noexcept = default; + + HANDLE native() const noexcept { + return h_; + } + + bool valid() const noexcept { + return h_ != NULL; + } + + bool open(char const *name) noexcept { + close(); + return valid(); + } + + void close() noexcept { + if (!valid()) return; + ::CloseHandle(h_); + h_ = NULL; + } + + bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { + return true; + } + + bool notify() noexcept { + return true; + } + + bool broadcast() noexcept { + return true; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/libipc/platform/mutex_linux.h b/src/libipc/platform/mutex_linux.h index 50d6b1a..e3b44f1 100644 --- a/src/libipc/platform/mutex_linux.h +++ b/src/libipc/platform/mutex_linux.h @@ -24,7 +24,7 @@ class mutex { pthread_mutex_t *acquire_mutex(char const *name) { if (!shm_.acquire(name, sizeof(pthread_mutex_t))) { - ipc::error("fail shm.acquire: %s\n", name); + ipc::error("[acquire_mutex] fail shm.acquire: %s\n", name); return nullptr; } return static_cast(shm_.get()); diff --git a/src/libipc/platform/semaphore_linux.h b/src/libipc/platform/semaphore_linux.h new file mode 100644 index 0000000..e69de29 diff --git a/src/libipc/platform/semaphore_win.h b/src/libipc/platform/semaphore_win.h new file mode 100644 index 0000000..ea95ef3 --- /dev/null +++ b/src/libipc/platform/semaphore_win.h @@ -0,0 +1,79 @@ +#pragma once + +#include + +#include + +#include "libipc/utility/log.h" + +#include "libipc/platform/to_tchar.h" +#include "libipc/platform/get_sa.h" + +namespace ipc { +namespace detail { +namespace sync { + +class semaphore { + HANDLE h_ = NULL; + +public: + semaphore() noexcept = default; + explicit semaphore(char const *name, std::uint32_t count, std::uint32_t limit) noexcept { + open(name, count, limit); + } + + ~semaphore() noexcept = default; + + HANDLE native() const noexcept { + return h_; + } + + bool valid() const noexcept { + return h_ != NULL; + } + + bool open(char const *name, std::uint32_t count, std::uint32_t limit) noexcept { + close(); + h_ = ::CreateSemaphore(detail::get_sa(), + static_cast(count), + (limit == invalid_value) ? LONG_MAX : static_cast(limit), + ipc::detail::to_tchar(name).c_str()); + if (h_ == NULL) { + ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name); + return false; + } + return true; + } + + void close() noexcept { + if (!valid()) return; + ::CloseHandle(h_); + h_ = NULL; + } + + bool wait(std::uint64_t tm) noexcept { + DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast(tm); + switch ((ret = ::WaitForSingleObject(h_, ms))) { + case WAIT_OBJECT_0: + return true; + case WAIT_TIMEOUT: + return false; + case WAIT_ABANDONED: + default: + ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); + return false; + } + } + + bool post(std::uint32_t count) noexcept { + if (!::ReleaseSemaphore(h_, static_cast(count), NULL)) { + ipc::error("fail ReleaseSemaphore[%lu]\n", ::GetLastError()); + return false; + } + return true; + } +}; + +} // namespace sync +} // namespace detail +} // namespace ipc diff --git a/src/semaphore.cpp b/src/semaphore.cpp new file mode 100644 index 0000000..274aead --- /dev/null +++ b/src/semaphore.cpp @@ -0,0 +1,66 @@ + +#include "libipc/semaphore.h" + +#include "libipc/utility/pimpl.h" +#include "libipc/memory/resource.h" +#include "libipc/platform/detail.h" +#if defined(IPC_OS_WINDOWS_) +#include "libipc/platform/semaphore_win.h" +#elif defined(IPC_OS_LINUX_) +#include "libipc/platform/semaphore_linux.h" +#else/*linux*/ +# error "Unsupported platform." +#endif + +namespace ipc { +namespace sync { + +class semaphore::semaphore_ : public ipc::pimpl { +public: + ipc::detail::sync::semaphore sem_; +}; + +semaphore::semaphore() + : p_(p_->make()) { +} + +semaphore::semaphore(char const * name, std::uint32_t count, std::uint32_t limit) + : semaphore() { + open(name, count, limit); +} + +semaphore::~semaphore() { + close(); + p_->clear(); +} + +void const *semaphore::native() const noexcept { + return impl(p_)->sem_.native(); +} + +void *semaphore::native() noexcept { + return impl(p_)->sem_.native(); +} + +bool semaphore::valid() const noexcept { + return impl(p_)->sem_.valid(); +} + +bool semaphore::open(char const *name, std::uint32_t count, std::uint32_t limit) noexcept { + return impl(p_)->sem_.open(name, count, limit); +} + +void semaphore::close() noexcept { + impl(p_)->sem_.close(); +} + +bool semaphore::wait(std::uint64_t tm) noexcept { + return impl(p_)->sem_.wait(tm); +} + +bool semaphore::post(std::uint32_t count) noexcept { + return impl(p_)->sem_.post(count); +} + +} // namespace sync +} // namespace ipc