add semaphore for win

This commit is contained in:
mutouyun 2021-09-12 15:59:44 +08:00
parent 415be36477
commit d0e2a4d80c
8 changed files with 382 additions and 1 deletions

View File

@ -25,6 +25,7 @@ public:
bool open(char const *name) noexcept;
void close() noexcept;
bool lock(std::uint64_t tm = ipc::invalid_value) noexcept;
bool try_lock() noexcept(false); // std::system_error
bool unlock() noexcept;

View File

@ -0,0 +1,37 @@
#pragma once
#include <cstdint> // std::uint64_t
#include "libipc/export.h"
#include "libipc/def.h"
namespace ipc {
namespace sync {
class IPC_EXPORT semaphore {
semaphore(semaphore const &) = delete;
semaphore &operator=(semaphore const &) = delete;
public:
semaphore();
explicit semaphore(char const *name, std::uint32_t count = 0, std::uint32_t limit = ipc::invalid_value);
~semaphore();
void const *native() const noexcept;
void *native() noexcept;
bool valid() const noexcept;
bool open(char const *name, std::uint32_t count = 0, std::uint32_t limit = ipc::invalid_value) noexcept;
void close() noexcept;
bool wait(std::uint64_t tm = ipc::invalid_value) noexcept;
bool post(std::uint32_t count = 1) noexcept;
private:
class semaphore_;
semaphore_* p_;
};
} // namespace sync
} // namespace ipc

View File

@ -0,0 +1,139 @@
#pragma once
#include <cstdint>
#include <pthread.h>
#include "libipc/utility/log.h"
#include "libipc/platform/get_wait_time.h"
#include "libipc/mutex.h"
namespace ipc {
namespace detail {
namespace sync {
class condition {
ipc::shm::handle shm_;
pthread_cond_t *cond_ = nullptr;
pthread_cond_t *acquire_cond(char const *name) {
if (!shm_.acquire(name, sizeof(pthread_cond_t))) {
ipc::error("[acquire_cond] fail shm.acquire: %s\n", name);
return nullptr;
}
return static_cast<pthread_cond_t *>(shm_.get());
}
public:
condition() = default;
explicit condition(char const *name) noexcept {
open(name);
}
~condition() = default;
pthread_cond_t const *native() const noexcept {
return cond_;
}
pthread_cond_t *native() noexcept {
return cond_;
}
bool valid() const noexcept {
static const char tmp[sizeof(pthread_cond_t)] {};
return (cond_ != nullptr)
&& (std::memcmp(tmp, cond_, sizeof(pthread_cond_t)) != 0);
}
bool open(char const *name) noexcept {
close();
if ((cond_ = acquire_cond(name)) == nullptr) {
return false;
}
if (shm_.ref() == 1) {
::pthread_cond_destroy(cond_);
auto finally = ipc::guard([this] { close(); }); // close when failed
// init condition
int eno;
pthread_condattr_t cond_attr;
if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) {
ipc::error("fail pthread_condattr_init[%d]\n", eno);
return false;
}
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::error("fail pthread_condattr_setpshared[%d]\n", eno);
return false;
}
*cond_ = PTHREAD_COND_INITIALIZER;
if ((eno = ::pthread_cond_init(cond_, &cond_attr)) != 0) {
ipc::error("fail pthread_cond_init[%d]\n", eno);
return false;
}
finally.dismiss();
}
return valid();
}
void close() noexcept {
if (shm_.ref() == 1) {
int eno;
if ((eno = ::pthread_cond_destroy(cond_)) != 0) {
ipc::error("fail pthread_cond_destroy[%d]\n", eno);
}
}
shm_.release();
cond_ = nullptr;
}
bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept {
switch (tm) {
case 0:
return true;
case invalid_value: {
int eno;
if ((eno = ::pthread_cond_wait(cond_, mtx.native())) != 0) {
ipc::error("fail pthread_cond_wait[%d]\n", eno);
return false;
}
}
break;
default: {
auto ts = detail::make_timespec(tm);
int eno;
if ((eno = ::pthread_cond_timedwait(cond_, mtx.native(), &ts)) != 0) {
if (eno != ETIMEDOUT) {
ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
eno, tm, ts.tv_sec, ts.tv_nsec);
}
return false;
}
}
break;
}
return true;
}
bool notify() noexcept {
int eno;
if ((eno = ::pthread_cond_signal(cond_)) != 0) {
ipc::error("fail pthread_cond_signal[%d]\n", eno);
return false;
}
return true;
}
bool broadcast() noexcept {
int eno;
if ((eno = ::pthread_cond_broadcast(cond_)) != 0) {
ipc::error("fail pthread_cond_broadcast[%d]\n", eno);
return false;
}
return true;
}
};
} // namespace sync
} // namespace detail
} // namespace ipc

View File

@ -0,0 +1,59 @@
#pragma once
#include <cstdint>
#include <Windows.h>
#include "libipc/utility/log.h"
#include "libipc/mutex.h"
namespace ipc {
namespace detail {
namespace sync {
class condition {
HANDLE h_ = NULL;
public:
condition() noexcept = default;
explicit condition(char const *name) noexcept {
open(name);
}
~condition() noexcept = default;
HANDLE native() const noexcept {
return h_;
}
bool valid() const noexcept {
return h_ != NULL;
}
bool open(char const *name) noexcept {
close();
return valid();
}
void close() noexcept {
if (!valid()) return;
::CloseHandle(h_);
h_ = NULL;
}
bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept {
return true;
}
bool notify() noexcept {
return true;
}
bool broadcast() noexcept {
return true;
}
};
} // namespace sync
} // namespace detail
} // namespace ipc

View File

@ -24,7 +24,7 @@ class mutex {
pthread_mutex_t *acquire_mutex(char const *name) {
if (!shm_.acquire(name, sizeof(pthread_mutex_t))) {
ipc::error("fail shm.acquire: %s\n", name);
ipc::error("[acquire_mutex] fail shm.acquire: %s\n", name);
return nullptr;
}
return static_cast<pthread_mutex_t *>(shm_.get());

View File

View File

@ -0,0 +1,79 @@
#pragma once
#include <cstdint>
#include <Windows.h>
#include "libipc/utility/log.h"
#include "libipc/platform/to_tchar.h"
#include "libipc/platform/get_sa.h"
namespace ipc {
namespace detail {
namespace sync {
class semaphore {
HANDLE h_ = NULL;
public:
semaphore() noexcept = default;
explicit semaphore(char const *name, std::uint32_t count, std::uint32_t limit) noexcept {
open(name, count, limit);
}
~semaphore() noexcept = default;
HANDLE native() const noexcept {
return h_;
}
bool valid() const noexcept {
return h_ != NULL;
}
bool open(char const *name, std::uint32_t count, std::uint32_t limit) noexcept {
close();
h_ = ::CreateSemaphore(detail::get_sa(),
static_cast<LONG>(count),
(limit == invalid_value) ? LONG_MAX : static_cast<LONG>(limit),
ipc::detail::to_tchar(name).c_str());
if (h_ == NULL) {
ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name);
return false;
}
return true;
}
void close() noexcept {
if (!valid()) return;
::CloseHandle(h_);
h_ = NULL;
}
bool wait(std::uint64_t tm) noexcept {
DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast<DWORD>(tm);
switch ((ret = ::WaitForSingleObject(h_, ms))) {
case WAIT_OBJECT_0:
return true;
case WAIT_TIMEOUT:
return false;
case WAIT_ABANDONED:
default:
ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret);
return false;
}
}
bool post(std::uint32_t count) noexcept {
if (!::ReleaseSemaphore(h_, static_cast<LONG>(count), NULL)) {
ipc::error("fail ReleaseSemaphore[%lu]\n", ::GetLastError());
return false;
}
return true;
}
};
} // namespace sync
} // namespace detail
} // namespace ipc

66
src/semaphore.cpp Normal file
View File

@ -0,0 +1,66 @@
#include "libipc/semaphore.h"
#include "libipc/utility/pimpl.h"
#include "libipc/memory/resource.h"
#include "libipc/platform/detail.h"
#if defined(IPC_OS_WINDOWS_)
#include "libipc/platform/semaphore_win.h"
#elif defined(IPC_OS_LINUX_)
#include "libipc/platform/semaphore_linux.h"
#else/*linux*/
# error "Unsupported platform."
#endif
namespace ipc {
namespace sync {
class semaphore::semaphore_ : public ipc::pimpl<semaphore_> {
public:
ipc::detail::sync::semaphore sem_;
};
semaphore::semaphore()
: p_(p_->make()) {
}
semaphore::semaphore(char const * name, std::uint32_t count, std::uint32_t limit)
: semaphore() {
open(name, count, limit);
}
semaphore::~semaphore() {
close();
p_->clear();
}
void const *semaphore::native() const noexcept {
return impl(p_)->sem_.native();
}
void *semaphore::native() noexcept {
return impl(p_)->sem_.native();
}
bool semaphore::valid() const noexcept {
return impl(p_)->sem_.valid();
}
bool semaphore::open(char const *name, std::uint32_t count, std::uint32_t limit) noexcept {
return impl(p_)->sem_.open(name, count, limit);
}
void semaphore::close() noexcept {
impl(p_)->sem_.close();
}
bool semaphore::wait(std::uint64_t tm) noexcept {
return impl(p_)->sem_.wait(tm);
}
bool semaphore::post(std::uint32_t count) noexcept {
return impl(p_)->sem_.post(count);
}
} // namespace sync
} // namespace ipc