use sync to refactor waiter

This commit is contained in:
mutouyun 2021-09-20 15:59:44 +08:00
parent c1ceaa657a
commit 04fda1cc3d
10 changed files with 21 additions and 1352 deletions

View File

@ -1,93 +0,0 @@
#pragma once
#include "libipc/export.h"
#include "libipc/def.h"
namespace ipc {
class condition;
class IPC_EXPORT mutex {
public:
mutex();
explicit mutex(char const * name);
mutex(mutex&& rhs);
~mutex();
static void remove(char const * name);
void swap(mutex& rhs);
mutex& operator=(mutex rhs);
bool valid() const;
char const * name () const;
bool open (char const * name);
void close();
bool lock ();
bool unlock();
private:
class mutex_;
mutex_* p_;
friend class condition;
};
class IPC_EXPORT semaphore {
public:
semaphore();
explicit semaphore(char const * name);
semaphore(semaphore&& rhs);
~semaphore();
static void remove(char const * name);
void swap(semaphore& rhs);
semaphore& operator=(semaphore rhs);
bool valid() const;
char const * name () const;
bool open (char const * name, long count = 0);
void close();
bool wait(std::uint64_t tm = invalid_value);
bool post(long count = 1);
private:
class semaphore_;
semaphore_* p_;
};
class IPC_EXPORT condition {
public:
condition();
explicit condition(char const * name);
condition(condition&& rhs);
~condition();
static void remove(char const * name);
void swap(condition& rhs);
condition& operator=(condition rhs);
bool valid() const;
char const * name () const;
bool open (char const * name);
void close();
bool wait(mutex&, std::uint64_t tm = invalid_value);
bool notify();
bool broadcast();
private:
class condition_;
condition_* p_;
};
} // namespace ipc

View File

@ -17,6 +17,7 @@
#include "libipc/queue.h" #include "libipc/queue.h"
#include "libipc/policy.h" #include "libipc/policy.h"
#include "libipc/rw_lock.h" #include "libipc/rw_lock.h"
#include "libipc/waiter.h"
#include "libipc/utility/log.h" #include "libipc/utility/log.h"
#include "libipc/utility/id_pool.h" #include "libipc/utility/id_pool.h"
@ -24,10 +25,7 @@
#include "libipc/utility/utility.h" #include "libipc/utility/utility.h"
#include "libipc/memory/resource.h" #include "libipc/memory/resource.h"
#include "libipc/platform/detail.h" #include "libipc/platform/detail.h"
#include "libipc/platform/waiter_wrapper.h"
#include "libipc/circ/elem_array.h" #include "libipc/circ/elem_array.h"
namespace { namespace {
@ -271,7 +269,7 @@ struct conn_info_head {
ipc::string name_; ipc::string name_;
msg_id_t cc_id_; // connection-info id msg_id_t cc_id_; // connection-info id
ipc::waiter cc_waiter_, wt_waiter_, rd_waiter_; ipc::detail::waiter cc_waiter_, wt_waiter_, rd_waiter_;
ipc::shm::handle acc_h_; ipc::shm::handle acc_h_;
conn_info_head(char const * name) conn_info_head(char const * name)
@ -284,9 +282,9 @@ struct conn_info_head {
} }
void quit_waiting() { void quit_waiting() {
cc_waiter_.quit_waiting(); // cc_waiter_.quit_waiting();
wt_waiter_.quit_waiting(); // wt_waiter_.quit_waiting();
rd_waiter_.quit_waiting(); // rd_waiter_.quit_waiting();
} }
auto acc() { auto acc() {

View File

@ -1,422 +0,0 @@
#pragma once
#include <pthread.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <semaphore.h>
#include <errno.h>
#include <atomic>
#include <tuple>
#include <utility>
#include <cassert>
#include "libipc/def.h"
#include "libipc/waiter_helper.h"
#include "libipc/utility/log.h"
#include "libipc/platform/detail.h"
#include "libipc/memory/resource.h"
namespace ipc {
namespace detail {
inline static bool calc_wait_time(timespec& ts, std::uint64_t tm /*ms*/) {
timeval now;
int eno = ::gettimeofday(&now, NULL);
if (eno != 0) {
ipc::error("fail gettimeofday [%d]\n", eno);
return false;
}
ts.tv_nsec = (now.tv_usec + (tm % 1000) * 1000) * 1000;
ts.tv_sec = now.tv_sec + (tm / 1000) + (ts.tv_nsec / 1000000000);
ts.tv_nsec %= 1000000000;
return true;
}
#pragma push_macro("IPC_PTHREAD_FUNC_")
#undef IPC_PTHREAD_FUNC_
#define IPC_PTHREAD_FUNC_(CALL, ...) \
int eno; \
if ((eno = ::CALL(__VA_ARGS__)) != 0) { \
ipc::error("fail " #CALL " [%d]\n", eno); \
return false; \
} \
return true
class mutex {
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER;
public:
pthread_mutex_t& native() {
return mutex_;
}
bool open() {
int eno;
// init mutex
pthread_mutexattr_t mutex_attr;
if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) {
ipc::error("fail pthread_mutexattr_init[%d]\n", eno);
return false;
}
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy);
if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno);
return false;
}
if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) {
ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno);
return false;
}
if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) {
ipc::error("fail pthread_mutex_init[%d]\n", eno);
return false;
}
return true;
}
bool close() {
IPC_PTHREAD_FUNC_(pthread_mutex_destroy, &mutex_);
}
bool lock() {
for (;;) {
int eno = ::pthread_mutex_lock(&mutex_);
switch (eno) {
case 0:
return true;
case EOWNERDEAD:
if (::pthread_mutex_consistent(&mutex_) == 0) {
return true;
}
IPC_FALLTHROUGH_;
case ENOTRECOVERABLE:
if (close() && open()) {
break;
}
IPC_FALLTHROUGH_;
default:
ipc::error("fail pthread_mutex_lock[%d]\n", eno);
return false;
}
}
}
bool unlock() {
IPC_PTHREAD_FUNC_(pthread_mutex_unlock, &mutex_);
}
};
class condition {
pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER;
public:
bool open() {
int eno;
// init condition
pthread_condattr_t cond_attr;
if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) {
ipc::error("fail pthread_condattr_init[%d]\n", eno);
return false;
}
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::error("fail pthread_condattr_setpshared[%d]\n", eno);
return false;
}
if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) {
ipc::error("fail pthread_cond_init[%d]\n", eno);
return false;
}
return true;
}
bool close() {
IPC_PTHREAD_FUNC_(pthread_cond_destroy, &cond_);
}
bool wait(mutex& mtx, std::uint64_t tm = invalid_value) {
switch (tm) {
case 0:
return true;
case invalid_value:
IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native());
default: {
timespec ts;
if (!calc_wait_time(ts, tm)) {
ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
tm, ts.tv_sec, ts.tv_nsec);
return false;
}
int eno;
if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) {
if (eno != ETIMEDOUT) {
ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
eno, tm, ts.tv_sec, ts.tv_nsec);
}
return false;
}
}
return true;
}
}
bool notify() {
IPC_PTHREAD_FUNC_(pthread_cond_signal, &cond_);
}
bool broadcast() {
IPC_PTHREAD_FUNC_(pthread_cond_broadcast, &cond_);
}
};
#pragma pop_macro("IPC_PTHREAD_FUNC_")
class sem_helper {
public:
using handle_t = sem_t*;
constexpr static handle_t invalid() noexcept {
return SEM_FAILED;
}
static handle_t open(char const * name, long count) {
handle_t sem = ::sem_open(name, O_CREAT, 0666, count);
if (sem == SEM_FAILED) {
ipc::error("fail sem_open[%d]: %s\n", errno, name);
return invalid();
}
return sem;
}
#pragma push_macro("IPC_SEMAPHORE_FUNC_")
#undef IPC_SEMAPHORE_FUNC_
#define IPC_SEMAPHORE_FUNC_(CALL, ...) \
if (::CALL(__VA_ARGS__) != 0) { \
ipc::error("fail " #CALL "[%d]\n", errno); \
return false; \
} \
return true
static bool close(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_close, h);
}
static bool destroy(char const * name) {
IPC_SEMAPHORE_FUNC_(sem_unlink, name);
}
static bool post(handle_t h, long count) {
if (h == invalid()) return false;
auto spost = [](handle_t h) {
IPC_SEMAPHORE_FUNC_(sem_post, h);
};
for (long i = 0; i < count; ++i) {
if (!spost(h)) return false;
}
return true;
}
static bool wait(handle_t h, std::uint64_t tm = invalid_value) {
if (h == invalid()) return false;
switch (tm) {
case invalid_value:
IPC_SEMAPHORE_FUNC_(sem_wait, h);
default: {
timespec ts;
if (!calc_wait_time(ts, tm)) {
ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
tm, ts.tv_sec, ts.tv_nsec);
return false;
}
if (::sem_timedwait(h, &ts) != 0) {
if (errno != ETIMEDOUT) {
ipc::error("fail sem_timedwait [%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n",
errno, tm, ts.tv_sec, ts.tv_nsec);
}
return false;
}
}
return true;
}
}
#pragma pop_macro("IPC_SEMAPHORE_FUNC_")
};
class waiter_holder {
public:
using handle_t = std::tuple<
ipc::string,
sem_helper::handle_t /* sema */,
sem_helper::handle_t /* handshake */>;
static handle_t invalid() noexcept {
return std::make_tuple(
ipc::string{},
sem_helper::invalid(),
sem_helper::invalid());
}
private:
using wait_flags = waiter_helper::wait_flags;
using wait_counter = waiter_helper::wait_counter;
mutex lock_;
wait_counter cnt_;
struct contrl {
waiter_holder * me_;
wait_flags * flags_;
handle_t const & h_;
wait_flags & flags() noexcept {
assert(flags_ != nullptr);
return *flags_;
}
wait_counter & counter() noexcept {
return me_->cnt_;
}
auto get_lock() {
return ipc::detail::unique_lock(me_->lock_);
}
bool sema_wait(std::uint64_t tm) {
return sem_helper::wait(std::get<1>(h_), tm);
}
bool sema_post(long count) {
return sem_helper::post(std::get<1>(h_), count);
}
bool handshake_wait(std::uint64_t tm) {
return sem_helper::wait(std::get<2>(h_), tm);
}
bool handshake_post(long count) {
return sem_helper::post(std::get<2>(h_), count);
}
};
public:
handle_t open_h(ipc::string && name) {
auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0);
if (sem == sem_helper::invalid()) {
return invalid();
}
auto han = sem_helper::open(("__WAITER_HELPER_HAN__" + name).c_str(), 0);
if (han == sem_helper::invalid()) {
return invalid();
}
return std::make_tuple(std::move(name), sem, han);
}
void release_h(handle_t const & h) {
sem_helper::close(std::get<2>(h));
sem_helper::close(std::get<1>(h));
}
void close_h(handle_t const & h) {
auto const & name = std::get<0>(h);
sem_helper::destroy(("__WAITER_HELPER_HAN__" + name).c_str());
sem_helper::destroy(("__WAITER_HELPER_SEM__" + name).c_str());
}
bool open() {
return lock_.open();
}
void close() {
lock_.close();
}
template <typename F>
bool wait_if(handle_t const & h, wait_flags * flags, F&& pred, std::uint64_t tm = invalid_value) {
assert(flags != nullptr);
contrl ctrl { this, flags, h };
class non_mutex {
public:
void lock () noexcept {}
void unlock() noexcept {}
} nm;
return waiter_helper::wait_if(ctrl, nm, std::forward<F>(pred), tm);
}
bool notify(handle_t const & h) {
contrl ctrl { this, nullptr, h };
return waiter_helper::notify(ctrl);
}
bool broadcast(handle_t const & h) {
contrl ctrl { this, nullptr, h };
return waiter_helper::broadcast(ctrl);
}
bool quit_waiting(handle_t const & h, wait_flags * flags) {
assert(flags != nullptr);
contrl ctrl { this, flags, h };
return waiter_helper::quit_waiting(ctrl);
}
};
class waiter {
waiter_holder helper_;
std::atomic<unsigned> opened_ { 0 };
public:
using handle_t = waiter_holder::handle_t;
static handle_t invalid() noexcept {
return waiter_holder::invalid();
}
handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') {
return invalid();
}
if ((opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && !helper_.open()) {
return invalid();
}
return helper_.open_h(name);
}
void close(handle_t h) {
if (h == invalid()) return;
helper_.release_h(h);
if (opened_.fetch_sub(1, std::memory_order_release) == 1) {
helper_.close_h(h);
helper_.close();
}
}
template <typename F>
bool wait_if(handle_t h, waiter_helper::wait_flags * flags, F && pred, std::uint64_t tm = invalid_value) {
if (h == invalid()) return false;
return helper_.wait_if(h, flags, std::forward<F>(pred), tm);
}
bool notify(handle_t h) {
if (h == invalid()) return false;
return helper_.notify(h);
}
bool broadcast(handle_t h) {
if (h == invalid()) return false;
return helper_.broadcast(h);
}
bool quit_waiting(handle_t h, waiter_helper::wait_flags * flags) {
if (h == invalid()) return false;
return helper_.quit_waiting(h, flags);
}
};
} // namespace detail
} // namespace ipc

View File

@ -1,233 +0,0 @@
#pragma once
#include <Windows.h>
#include <atomic>
#include <utility>
#include <limits>
#include <cassert>
#include "libipc/rw_lock.h"
#include "libipc/pool_alloc.h"
#include "libipc/shm.h"
#include "libipc/waiter_helper.h"
#include "libipc/utility/log.h"
#include "libipc/platform/to_tchar.h"
#include "libipc/platform/get_sa.h"
#include "libipc/platform/detail.h"
#include "libipc/memory/resource.h"
namespace ipc {
namespace detail {
class semaphore {
HANDLE h_ = NULL;
public:
static void remove(char const * /*name*/) {}
bool open(ipc::string && name, long count = 0, long limit = LONG_MAX) {
h_ = ::CreateSemaphore(detail::get_sa(), count, limit, ipc::detail::to_tchar(std::move(name)).c_str());
if (h_ == NULL) {
ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name.c_str());
return false;
}
return true;
}
void close() {
::CloseHandle(h_);
}
bool wait(std::uint64_t tm = invalid_value) {
DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast<DWORD>(tm);
switch ((ret = ::WaitForSingleObject(h_, ms))) {
case WAIT_OBJECT_0:
return true;
case WAIT_TIMEOUT:
return false;
case WAIT_ABANDONED:
default:
ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret);
return false;
}
}
bool post(long count = 1) {
if (::ReleaseSemaphore(h_, count, NULL)) {
return true;
}
ipc::error("fail ReleaseSemaphore[%lu]\n", ::GetLastError());
return false;
}
};
class mutex : public semaphore {
using semaphore::wait;
using semaphore::post;
public:
bool open(ipc::string && name) {
return semaphore::open(std::move(name), 1, 1);
}
bool lock () { return semaphore::wait(); }
bool unlock() { return semaphore::post(); }
};
class condition {
using wait_flags = waiter_helper::wait_flags;
using wait_counter = waiter_helper::wait_counter;
mutex lock_;
semaphore sema_, handshake_;
wait_counter * cnt_ = nullptr;
struct contrl {
condition * me_;
wait_flags * flags_;
wait_flags & flags() noexcept {
assert(flags_ != nullptr);
return *flags_;
}
wait_counter & counter() noexcept {
assert(me_->cnt_ != nullptr);
return *(me_->cnt_);
}
auto get_lock() {
return ipc::detail::unique_lock(me_->lock_);
}
bool sema_wait(std::uint64_t tm) {
return me_->sema_.wait(tm);
}
bool sema_post(long count) {
return me_->sema_.post(count);
}
bool handshake_wait(std::uint64_t tm) {
return me_->handshake_.wait(tm);
}
bool handshake_post(long count) {
return me_->handshake_.post(count);
}
};
public:
friend bool operator==(condition const & c1, condition const & c2) {
return c1.cnt_ == c2.cnt_;
}
friend bool operator!=(condition const & c1, condition const & c2) {
return !(c1 == c2);
}
static void remove(char const * name) {
semaphore::remove((ipc::string{ "__COND_HAN__" } + name).c_str());
semaphore::remove((ipc::string{ "__COND_SEM__" } + name).c_str());
mutex ::remove((ipc::string{ "__COND_MTX__" } + name).c_str());
}
bool open(ipc::string const & name, wait_counter * cnt) {
if (lock_ .open("__COND_MTX__" + name) &&
sema_ .open("__COND_SEM__" + name) &&
handshake_.open("__COND_HAN__" + name)) {
cnt_ = cnt;
return true;
}
return false;
}
void close() {
handshake_.close();
sema_ .close();
lock_ .close();
}
template <typename Mutex, typename F>
bool wait_if(Mutex & mtx, wait_flags * flags, F && pred, std::uint64_t tm = invalid_value) {
assert(flags != nullptr);
contrl ctrl { this, flags };
return waiter_helper::wait_if(ctrl, mtx, std::forward<F>(pred), tm);
}
bool notify() {
contrl ctrl { this, nullptr };
return waiter_helper::notify(ctrl);
}
bool broadcast() {
contrl ctrl { this, nullptr };
return waiter_helper::broadcast(ctrl);
}
bool quit_waiting(wait_flags * flags) {
assert(flags != nullptr);
contrl ctrl { this, flags };
return waiter_helper::quit_waiting(ctrl);
}
};
class waiter {
waiter_helper::wait_counter cnt_;
public:
using handle_t = condition;
static handle_t invalid() {
return condition {};
}
handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') {
return invalid();
}
condition cond;
if (cond.open(name, &cnt_)) {
return cond;
}
return invalid();
}
void close(handle_t& h) {
if (h == invalid()) return;
h.close();
}
template <typename F>
bool wait_if(handle_t& h, waiter_helper::wait_flags * flags, F&& pred, std::uint64_t tm = invalid_value) {
if (h == invalid()) return false;
class non_mutex {
public:
void lock () noexcept {}
void unlock() noexcept {}
} nm;
return h.wait_if(nm, flags, std::forward<F>(pred), tm);
}
bool notify(handle_t& h) {
if (h == invalid()) return false;
return h.notify();
}
bool broadcast(handle_t& h) {
if (h == invalid()) return false;
return h.broadcast();
}
bool quit_waiting(handle_t& h, waiter_helper::wait_flags * flags) {
if (h == invalid()) return false;
return h.quit_waiting(flags);
}
};
} // namespace detail
} // namespace ipc

View File

@ -1,291 +0,0 @@
#pragma once
#include <type_traits>
#include <atomic>
#include <utility>
#include "libipc/shm.h"
#include "libipc/memory/resource.h"
#include "libipc/platform/detail.h"
#if defined(IPC_OS_WINDOWS_)
#include "libipc/platform/waiter_win.h"
namespace ipc {
namespace detail {
using mutex_impl = ipc::detail::mutex;
using semaphore_impl = ipc::detail::semaphore;
class condition_impl : public ipc::detail::condition {
using base_t = ipc::detail::condition;
ipc::shm::handle cnt_h_;
waiter_helper::wait_flags flags_;
public:
static void remove(char const * name) {
base_t::remove(name);
ipc::string n = name;
ipc::shm::remove((n + "__COND_CNT__" ).c_str());
ipc::shm::remove((n + "__COND_WAIT__").c_str());
}
bool open(char const * name) {
if (cnt_h_ .acquire(
(ipc::string { name } + "__COND_CNT__" ).c_str(),
sizeof(waiter_helper::wait_counter))) {
flags_.is_closed_.store(false, std::memory_order_release);
return base_t::open(name,
static_cast<waiter_helper::wait_counter *>(cnt_h_.get()));
}
return false;
}
void close() {
flags_.is_closed_.store(true, std::memory_order_release);
base_t::quit_waiting(&flags_);
base_t::close();
cnt_h_.release();
}
bool wait(mutex_impl& mtx, std::uint64_t tm = invalid_value) {
return base_t::wait_if(mtx, &flags_, [] { return true; }, tm);
}
};
} // namespace detail
} // namespace ipc
#elif defined(IPC_OS_LINUX_)
#include "libipc/platform/waiter_linux.h"
namespace ipc {
namespace detail {
template <typename T>
class object_impl {
ipc::shm::handle h_;
struct info_t {
T object_;
std::atomic<unsigned> opened_;
};
public:
static void remove(char const * name) {
{
ipc::shm::handle h { name, sizeof(info_t) };
if (h.valid()) {
auto info = static_cast<info_t*>(h.get());
info->object_.close();
}
}
ipc::shm::remove(name);
}
T& object() {
return static_cast<info_t*>(h_.get())->object_;
}
template <typename... P>
bool open(char const * name, P&&... params) {
if (!h_.acquire(name, sizeof(info_t))) {
return false;
}
auto info = static_cast<info_t*>(h_.get());
if ((info->opened_.fetch_add(1, std::memory_order_acq_rel) == 0) &&
!info->object_.open(std::forward<P>(params)...)) {
return false;
}
return true;
}
void close() {
if (!h_.valid()) return;
auto info = static_cast<info_t*>(h_.get());
if (info->opened_.fetch_sub(1, std::memory_order_release) == 1) {
info->object_.close();
}
h_.release();
}
};
class mutex_impl : public object_impl<ipc::detail::mutex> {
public:
bool lock () { return object().lock (); }
bool unlock() { return object().unlock(); }
};
class condition_impl : public object_impl<ipc::detail::condition> {
public:
bool wait(mutex_impl& mtx, std::uint64_t tm = invalid_value) {
return object().wait(mtx.object(), tm);
}
bool notify () { return object().notify (); }
bool broadcast() { return object().broadcast(); }
};
class semaphore_impl {
sem_helper::handle_t h_;
ipc::shm::handle opened_; // std::atomic<unsigned>
ipc::string name_;
auto cnt() {
return static_cast<std::atomic<unsigned>*>(opened_.get());
}
public:
static void remove(char const * name) {
sem_helper::destroy((ipc::string{ "__SEMAPHORE_IMPL_SEM__" } + name).c_str());
ipc::shm::remove ((ipc::string{ "__SEMAPHORE_IMPL_CNT__" } + name).c_str());
}
bool open(char const * name, long count) {
name_ = name;
if (!opened_.acquire(("__SEMAPHORE_IMPL_CNT__" + name_).c_str(), sizeof(std::atomic<unsigned>))) {
return false;
}
if ((h_ = sem_helper::open(("__SEMAPHORE_IMPL_SEM__" + name_).c_str(), count)) == sem_helper::invalid()) {
return false;
}
cnt()->fetch_add(1, std::memory_order_acq_rel);
return true;
}
void close() {
if (h_ == sem_helper::invalid()) return;
sem_helper::close(h_);
if (cnt() == nullptr) return;
if (cnt()->fetch_sub(1, std::memory_order_release) == 1) {
sem_helper::destroy(("__SEMAPHORE_IMPL_SEM__" + name_).c_str());
}
opened_.release();
}
bool wait(std::uint64_t tm = invalid_value) {
return sem_helper::wait(h_, tm);
}
bool post(long count) {
return sem_helper::post(h_, count);
}
};
} // namespace detail
} // namespace ipc
#else/*linux*/
# error "Unsupported platform."
#endif
namespace ipc {
namespace detail {
class waiter_wrapper {
public:
using waiter_t = detail::waiter;
private:
waiter_t* w_ = nullptr;
waiter_t::handle_t h_ = waiter_t::invalid();
waiter_helper::wait_flags flags_;
public:
waiter_wrapper() = default;
explicit waiter_wrapper(waiter_t* w) {
attach(w);
}
waiter_wrapper(const waiter_wrapper&) = delete;
waiter_wrapper& operator=(const waiter_wrapper&) = delete;
waiter_t * waiter() { return w_; }
waiter_t const * waiter() const { return w_; }
void attach(waiter_t* w) {
close();
w_ = w;
}
bool valid() const {
return (w_ != nullptr) && (h_ != waiter_t::invalid());
}
bool open(char const * name) {
if (w_ == nullptr) return false;
close();
flags_.is_closed_.store(false, std::memory_order_release);
h_ = w_->open(name);
return valid();
}
void close() {
if (!valid()) return;
flags_.is_closed_.store(true, std::memory_order_release);
quit_waiting();
w_->close(h_);
h_ = waiter_t::invalid();
}
void quit_waiting() {
w_->quit_waiting(h_, &flags_);
}
template <typename F>
bool wait_if(F && pred, std::uint64_t tm = invalid_value) {
if (!valid()) return false;
return w_->wait_if(h_, &flags_, std::forward<F>(pred), tm);
}
bool notify() {
if (!valid()) return false;
w_->notify(h_);
return true;
}
bool broadcast() {
if (!valid()) return false;
w_->broadcast(h_);
return true;
}
};
} // namespace detail
class waiter : public detail::waiter_wrapper {
shm::handle shm_;
using detail::waiter_wrapper::attach;
public:
waiter() = default;
waiter(char const * name) {
open(name);
}
~waiter() {
close();
}
bool open(char const * name) {
if (name == nullptr || name[0] == '\0') {
return false;
}
close();
if (!shm_.acquire((ipc::string{ "__SHM_WAITER__" } + name).c_str(), sizeof(waiter_t))) {
return false;
}
attach(static_cast<waiter_t*>(shm_.get()));
return detail::waiter_wrapper::open((ipc::string{ "__IMP_WAITER__" } + name).c_str());
}
void close() {
detail::waiter_wrapper::close();
shm_.release();
}
};
} // namespace ipc

View File

@ -1,77 +0,0 @@
#include <string>
#include "libipc/waiter.h"
#include "libipc/utility/pimpl.h"
#include "libipc/platform/waiter_wrapper.h"
#undef IPC_PP_CAT_
#undef IPC_PP_JOIN_T__
#undef IPC_PP_JOIN_
#define IPC_PP_CAT_(X, ...) X##__VA_ARGS__
#define IPC_PP_JOIN_T__(X, ...) IPC_PP_CAT_(X, __VA_ARGS__)
#define IPC_PP_JOIN_(X, ...) IPC_PP_JOIN_T__(X, __VA_ARGS__)
namespace ipc {
#undef IPC_OBJECT_TYPE_
#undef IPC_OBJECT_TYPE_OPEN_PARS_
#undef IPC_OBJECT_TYPE_OPEN_ARGS_
#define IPC_OBJECT_TYPE_ mutex
#define IPC_OBJECT_TYPE_OPEN_PARS_
#define IPC_OBJECT_TYPE_OPEN_ARGS_
#include "libipc/waiter_template.inc"
bool mutex::lock() {
return impl(p_)->h_.lock();
}
bool mutex::unlock() {
return impl(p_)->h_.unlock();
}
#undef IPC_OBJECT_TYPE_
#undef IPC_OBJECT_TYPE_OPEN_PARS_
#undef IPC_OBJECT_TYPE_OPEN_ARGS_
#define IPC_OBJECT_TYPE_ semaphore
#define IPC_OBJECT_TYPE_OPEN_PARS_ , long count
#define IPC_OBJECT_TYPE_OPEN_ARGS_ , count
#include "libipc/waiter_template.inc"
bool semaphore::wait(std::uint64_t tm) {
return impl(p_)->h_.wait(tm);
}
bool semaphore::post(long count) {
return impl(p_)->h_.post(count);
}
#undef IPC_OBJECT_TYPE_
#undef IPC_OBJECT_TYPE_OPEN_PARS_
#undef IPC_OBJECT_TYPE_OPEN_ARGS_
#define IPC_OBJECT_TYPE_ condition
#define IPC_OBJECT_TYPE_OPEN_PARS_
#define IPC_OBJECT_TYPE_OPEN_ARGS_
#include "libipc/waiter_template.inc"
bool condition::wait(mutex& mtx, std::uint64_t tm) {
return impl(p_)->h_.wait(impl(mtx.p_)->h_, tm);
}
bool condition::notify() {
return impl(p_)->h_.notify();
}
bool condition::broadcast() {
return impl(p_)->h_.broadcast();
}
} // namespace ipc

View File

@ -1,143 +0,0 @@
#pragma once
#include <atomic>
#include <limits>
#include <utility>
#include "libipc/def.h"
#include "libipc/utility/scope_guard.h"
namespace ipc {
namespace detail {
struct waiter_helper {
struct wait_counter {
std::atomic<long> waiting_ { 0 };
long counter_ = 0;
};
struct wait_flags {
std::atomic<bool> is_waiting_ { false };
std::atomic<bool> is_closed_ { true };
std::atomic<bool> need_dest_ { false };
};
template <typename Mutex, typename Ctrl, typename F>
static bool wait_if(Ctrl & ctrl, Mutex & mtx, F && pred, std::uint64_t tm) {
auto & flags = ctrl.flags();
if (flags.is_closed_.load(std::memory_order_acquire)) {
return false;
}
auto & counter = ctrl.counter();
counter.waiting_.fetch_add(1, std::memory_order_release);
flags.is_waiting_.store(true, std::memory_order_relaxed);
auto finally = ipc::guard([&ctrl, &counter, &flags] {
for (auto curr_wait = counter.waiting_.load(std::memory_order_relaxed); curr_wait > 0;) {
if (counter.waiting_.compare_exchange_weak(curr_wait, curr_wait - 1, std::memory_order_acq_rel)) {
break;
}
}
flags.is_waiting_.store(false, std::memory_order_relaxed);
});
{
IPC_UNUSED_ auto guard = ctrl.get_lock();
if (!std::forward<F>(pred)()) return true;
counter.counter_ = counter.waiting_.load(std::memory_order_relaxed);
}
mtx.unlock();
bool ret = false;
do {
bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed);
bool is_closed = flags.is_closed_ .load(std::memory_order_acquire);
if (!is_waiting || is_closed) {
flags.need_dest_.store(false, std::memory_order_release);
ret = false;
break;
}
else if (flags.need_dest_.exchange(false, std::memory_order_release)) {
ret = false;
ctrl.sema_wait(default_timeout);
break;
}
else {
ret = ctrl.sema_wait(tm);
}
} while (flags.need_dest_.load(std::memory_order_acquire));
finally.do_exit();
ret = ctrl.handshake_post(1) && ret;
mtx.lock();
return ret;
}
template <typename Ctrl>
static void clear_handshake(Ctrl & ctrl) {
while (ctrl.handshake_wait(0)) ;
}
template <typename Ctrl>
static bool notify(Ctrl & ctrl) {
auto & counter = ctrl.counter();
if ((counter.waiting_.load(std::memory_order_acquire)) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock();
clear_handshake(ctrl);
if (counter.counter_ > 0) {
ret = ctrl.sema_post(1);
counter.counter_ -= 1;
ret = ret && ctrl.handshake_wait(default_timeout);
}
return ret;
}
template <typename Ctrl>
static bool broadcast(Ctrl & ctrl) {
auto & counter = ctrl.counter();
if ((counter.waiting_.load(std::memory_order_acquire)) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock();
clear_handshake(ctrl);
if (counter.counter_ > 0) {
ret = ctrl.sema_post(counter.counter_);
auto tm = default_timeout / counter.counter_;
do {
counter.counter_ -= 1;
ret = ret && ctrl.handshake_wait(tm);
} while (counter.counter_ > 0);
counter.waiting_.store(0, std::memory_order_release);
}
return ret;
}
template <typename Ctrl>
static bool quit_waiting(Ctrl & ctrl) {
auto & flags = ctrl.flags();
flags.need_dest_.store(true, std::memory_order_relaxed);
if (!flags.is_waiting_.exchange(false, std::memory_order_release)) {
return true;
}
auto & counter = ctrl.counter();
if ((counter.waiting_.load(std::memory_order_acquire)) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock();
clear_handshake(ctrl);
if (counter.counter_ > 0) {
ret = ctrl.sema_post(counter.counter_);
counter.counter_ -= 1;
ret = ret && ctrl.handshake_wait(default_timeout);
}
return ret;
}
};
} // namespace detail
} // namespace ipc

View File

@ -1,71 +0,0 @@
#undef IPC_OBJECT_TYPE_P_
#undef IPC_OBJECT_TYPE_I_
#define IPC_OBJECT_TYPE_P_ IPC_PP_JOIN_(IPC_OBJECT_TYPE_, _)
#define IPC_OBJECT_TYPE_I_ IPC_PP_JOIN_(IPC_OBJECT_TYPE_, _impl)
class IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_P_ : public pimpl<IPC_OBJECT_TYPE_P_> {
public:
std::string n_;
ipc::detail::IPC_OBJECT_TYPE_I_ h_;
};
void IPC_OBJECT_TYPE_::remove(char const * name) {
detail::IPC_OBJECT_TYPE_I_::remove(name);
}
IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_()
: p_(p_->make()) {
}
IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_(char const * name)
: IPC_OBJECT_TYPE_() {
open(name);
}
IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_(IPC_OBJECT_TYPE_&& rhs)
: IPC_OBJECT_TYPE_() {
swap(rhs);
}
IPC_OBJECT_TYPE_::~IPC_OBJECT_TYPE_() {
close();
p_->clear();
}
void IPC_OBJECT_TYPE_::swap(IPC_OBJECT_TYPE_& rhs) {
std::swap(p_, rhs.p_);
}
IPC_OBJECT_TYPE_& IPC_OBJECT_TYPE_::operator=(IPC_OBJECT_TYPE_ rhs) {
swap(rhs);
return *this;
}
bool IPC_OBJECT_TYPE_::valid() const {
return (p_ != nullptr) && !impl(p_)->n_.empty();
}
char const * IPC_OBJECT_TYPE_::name() const {
return impl(p_)->n_.c_str();
}
bool IPC_OBJECT_TYPE_::open(char const * name IPC_OBJECT_TYPE_OPEN_PARS_) {
if (name == nullptr || name[0] == '\0') {
return false;
}
if (impl(p_)->n_ == name) return true;
close();
if (impl(p_)->h_.open(name IPC_OBJECT_TYPE_OPEN_ARGS_)) {
impl(p_)->n_ = name;
return true;
}
return false;
}
void IPC_OBJECT_TYPE_::close() {
if (!valid()) return;
impl(p_)->h_.close();
impl(p_)->n_.clear();
}

View File

@ -29,7 +29,7 @@ struct msg_head {
class rand_buf : public buffer { class rand_buf : public buffer {
public: public:
rand_buf() { rand_buf() {
int size = capo::random<>{sizeof(msg_head), TestBuffMax}(); int size = capo::random<>{(int)sizeof(msg_head), TestBuffMax}();
*this = buffer(new char[size], size, [](void * p, std::size_t) { *this = buffer(new char[size], size, [](void * p, std::size_t) {
delete [] static_cast<char *>(p); delete [] static_cast<char *>(p);
}); });

View File

@ -1,33 +1,34 @@
#include <thread> #include <thread>
#include <iostream> #include <iostream>
#include "libipc/platform/waiter_wrapper.h" #include "libipc/waiter.h"
#include "test.h" #include "test.h"
namespace { namespace {
TEST(Waiter, broadcast) { TEST(Waiter, broadcast) {
ipc::detail::waiter w; ipc::detail::waiter waiter;
std::thread ts[10]; std::thread ts[10];
int k = 0;
for (auto& t : ts) { for (auto& t : ts) {
t = std::thread([&w] { t = std::thread([&k] {
ipc::detail::waiter_wrapper wp { &w }; ipc::detail::waiter waiter {"test-ipc-waiter"};
EXPECT_TRUE(wp.open("test-ipc-waiter")); EXPECT_TRUE(waiter.valid());
EXPECT_TRUE(wp.wait_if([] { return true; })); for (int i = 0; i < 99; ++i) {
wp.close(); ASSERT_TRUE(waiter.wait_if([&k, &i] { return k == i; }));
}
}); });
} }
ipc::detail::waiter_wrapper wp { &w }; EXPECT_TRUE(waiter.open("test-ipc-waiter"));
EXPECT_TRUE(wp.open("test-ipc-waiter"));
std::cout << "waiting for broadcast...\n"; std::cout << "waiting for broadcast...\n";
std::this_thread::sleep_for(std::chrono::seconds(1)); for (k = 1; k < 100; ++k) {
EXPECT_TRUE(wp.broadcast()); std::cout << "broadcast: " << k << "\n";
ASSERT_TRUE(waiter.broadcast());
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
for (auto& t : ts) t.join(); for (auto& t : ts) t.join();
wp.close();
} }
} // internal-linkage } // internal-linkage