diff --git a/include/libipc/waiter.h b/include/libipc/waiter.h deleted file mode 100755 index 37197e0..0000000 --- a/include/libipc/waiter.h +++ /dev/null @@ -1,93 +0,0 @@ -#pragma once - -#include "libipc/export.h" -#include "libipc/def.h" - -namespace ipc { - -class condition; -class IPC_EXPORT mutex { -public: - mutex(); - explicit mutex(char const * name); - mutex(mutex&& rhs); - - ~mutex(); - - static void remove(char const * name); - - void swap(mutex& rhs); - mutex& operator=(mutex rhs); - - bool valid() const; - char const * name () const; - - bool open (char const * name); - void close(); - - bool lock (); - bool unlock(); - -private: - class mutex_; - mutex_* p_; - - friend class condition; -}; - -class IPC_EXPORT semaphore { -public: - semaphore(); - explicit semaphore(char const * name); - semaphore(semaphore&& rhs); - - ~semaphore(); - - static void remove(char const * name); - - void swap(semaphore& rhs); - semaphore& operator=(semaphore rhs); - - bool valid() const; - char const * name () const; - - bool open (char const * name, long count = 0); - void close(); - - bool wait(std::uint64_t tm = invalid_value); - bool post(long count = 1); - -private: - class semaphore_; - semaphore_* p_; -}; - -class IPC_EXPORT condition { -public: - condition(); - explicit condition(char const * name); - condition(condition&& rhs); - - ~condition(); - - static void remove(char const * name); - - void swap(condition& rhs); - condition& operator=(condition rhs); - - bool valid() const; - char const * name () const; - - bool open (char const * name); - void close(); - - bool wait(mutex&, std::uint64_t tm = invalid_value); - bool notify(); - bool broadcast(); - -private: - class condition_; - condition_* p_; -}; - -} // namespace ipc diff --git a/src/libipc/ipc.cpp b/src/libipc/ipc.cpp index 418f61d..7bda379 100755 --- a/src/libipc/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -17,6 +17,7 @@ #include "libipc/queue.h" #include "libipc/policy.h" #include "libipc/rw_lock.h" +#include "libipc/waiter.h" #include "libipc/utility/log.h" #include "libipc/utility/id_pool.h" @@ -24,10 +25,7 @@ #include "libipc/utility/utility.h" #include "libipc/memory/resource.h" - #include "libipc/platform/detail.h" -#include "libipc/platform/waiter_wrapper.h" - #include "libipc/circ/elem_array.h" namespace { @@ -271,7 +269,7 @@ struct conn_info_head { ipc::string name_; msg_id_t cc_id_; // connection-info id - ipc::waiter cc_waiter_, wt_waiter_, rd_waiter_; + ipc::detail::waiter cc_waiter_, wt_waiter_, rd_waiter_; ipc::shm::handle acc_h_; conn_info_head(char const * name) @@ -284,9 +282,9 @@ struct conn_info_head { } void quit_waiting() { - cc_waiter_.quit_waiting(); - wt_waiter_.quit_waiting(); - rd_waiter_.quit_waiting(); + // cc_waiter_.quit_waiting(); + // wt_waiter_.quit_waiting(); + // rd_waiter_.quit_waiting(); } auto acc() { diff --git a/src/libipc/platform/waiter_linux.h b/src/libipc/platform/waiter_linux.h deleted file mode 100755 index fffe8d1..0000000 --- a/src/libipc/platform/waiter_linux.h +++ /dev/null @@ -1,422 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include "libipc/def.h" -#include "libipc/waiter_helper.h" - -#include "libipc/utility/log.h" -#include "libipc/platform/detail.h" -#include "libipc/memory/resource.h" - -namespace ipc { -namespace detail { - -inline static bool calc_wait_time(timespec& ts, std::uint64_t tm /*ms*/) { - timeval now; - int eno = ::gettimeofday(&now, NULL); - if (eno != 0) { - ipc::error("fail gettimeofday [%d]\n", eno); - return false; - } - ts.tv_nsec = (now.tv_usec + (tm % 1000) * 1000) * 1000; - ts.tv_sec = now.tv_sec + (tm / 1000) + (ts.tv_nsec / 1000000000); - ts.tv_nsec %= 1000000000; - return true; -} - -#pragma push_macro("IPC_PTHREAD_FUNC_") -#undef IPC_PTHREAD_FUNC_ -#define IPC_PTHREAD_FUNC_(CALL, ...) \ - int eno; \ - if ((eno = ::CALL(__VA_ARGS__)) != 0) { \ - ipc::error("fail " #CALL " [%d]\n", eno); \ - return false; \ - } \ - return true - -class mutex { - pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER; - -public: - pthread_mutex_t& native() { - return mutex_; - } - - bool open() { - int eno; - // init mutex - pthread_mutexattr_t mutex_attr; - if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) { - ipc::error("fail pthread_mutexattr_init[%d]\n", eno); - return false; - } - IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); - if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { - ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); - return false; - } - if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) { - ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno); - return false; - } - if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) { - ipc::error("fail pthread_mutex_init[%d]\n", eno); - return false; - } - return true; - } - - bool close() { - IPC_PTHREAD_FUNC_(pthread_mutex_destroy, &mutex_); - } - - bool lock() { - for (;;) { - int eno = ::pthread_mutex_lock(&mutex_); - switch (eno) { - case 0: - return true; - case EOWNERDEAD: - if (::pthread_mutex_consistent(&mutex_) == 0) { - return true; - } - IPC_FALLTHROUGH_; - case ENOTRECOVERABLE: - if (close() && open()) { - break; - } - IPC_FALLTHROUGH_; - default: - ipc::error("fail pthread_mutex_lock[%d]\n", eno); - return false; - } - } - } - - bool unlock() { - IPC_PTHREAD_FUNC_(pthread_mutex_unlock, &mutex_); - } -}; - -class condition { - pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER; - -public: - bool open() { - int eno; - // init condition - pthread_condattr_t cond_attr; - if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) { - ipc::error("fail pthread_condattr_init[%d]\n", eno); - return false; - } - IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); - if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { - ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); - return false; - } - if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) { - ipc::error("fail pthread_cond_init[%d]\n", eno); - return false; - } - return true; - } - - bool close() { - IPC_PTHREAD_FUNC_(pthread_cond_destroy, &cond_); - } - - bool wait(mutex& mtx, std::uint64_t tm = invalid_value) { - switch (tm) { - case 0: - return true; - case invalid_value: - IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native()); - default: { - timespec ts; - if (!calc_wait_time(ts, tm)) { - ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - tm, ts.tv_sec, ts.tv_nsec); - return false; - } - int eno; - if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) { - if (eno != ETIMEDOUT) { - ipc::error("fail pthread_cond_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - eno, tm, ts.tv_sec, ts.tv_nsec); - } - return false; - } - } - return true; - } - } - - bool notify() { - IPC_PTHREAD_FUNC_(pthread_cond_signal, &cond_); - } - - bool broadcast() { - IPC_PTHREAD_FUNC_(pthread_cond_broadcast, &cond_); - } -}; - -#pragma pop_macro("IPC_PTHREAD_FUNC_") - -class sem_helper { -public: - using handle_t = sem_t*; - - constexpr static handle_t invalid() noexcept { - return SEM_FAILED; - } - - static handle_t open(char const * name, long count) { - handle_t sem = ::sem_open(name, O_CREAT, 0666, count); - if (sem == SEM_FAILED) { - ipc::error("fail sem_open[%d]: %s\n", errno, name); - return invalid(); - } - return sem; - } - -#pragma push_macro("IPC_SEMAPHORE_FUNC_") -#undef IPC_SEMAPHORE_FUNC_ -#define IPC_SEMAPHORE_FUNC_(CALL, ...) \ - if (::CALL(__VA_ARGS__) != 0) { \ - ipc::error("fail " #CALL "[%d]\n", errno); \ - return false; \ - } \ - return true - - static bool close(handle_t h) { - if (h == invalid()) return false; - IPC_SEMAPHORE_FUNC_(sem_close, h); - } - - static bool destroy(char const * name) { - IPC_SEMAPHORE_FUNC_(sem_unlink, name); - } - - static bool post(handle_t h, long count) { - if (h == invalid()) return false; - auto spost = [](handle_t h) { - IPC_SEMAPHORE_FUNC_(sem_post, h); - }; - for (long i = 0; i < count; ++i) { - if (!spost(h)) return false; - } - return true; - } - - static bool wait(handle_t h, std::uint64_t tm = invalid_value) { - if (h == invalid()) return false; - switch (tm) { - case invalid_value: - IPC_SEMAPHORE_FUNC_(sem_wait, h); - default: { - timespec ts; - if (!calc_wait_time(ts, tm)) { - ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - tm, ts.tv_sec, ts.tv_nsec); - return false; - } - if (::sem_timedwait(h, &ts) != 0) { - if (errno != ETIMEDOUT) { - ipc::error("fail sem_timedwait [%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", - errno, tm, ts.tv_sec, ts.tv_nsec); - } - return false; - } - } - return true; - } - } - -#pragma pop_macro("IPC_SEMAPHORE_FUNC_") -}; - -class waiter_holder { -public: - using handle_t = std::tuple< - ipc::string, - sem_helper::handle_t /* sema */, - sem_helper::handle_t /* handshake */>; - - static handle_t invalid() noexcept { - return std::make_tuple( - ipc::string{}, - sem_helper::invalid(), - sem_helper::invalid()); - } - -private: - using wait_flags = waiter_helper::wait_flags; - using wait_counter = waiter_helper::wait_counter; - - mutex lock_; - wait_counter cnt_; - - struct contrl { - waiter_holder * me_; - wait_flags * flags_; - handle_t const & h_; - - wait_flags & flags() noexcept { - assert(flags_ != nullptr); - return *flags_; - } - - wait_counter & counter() noexcept { - return me_->cnt_; - } - - auto get_lock() { - return ipc::detail::unique_lock(me_->lock_); - } - - bool sema_wait(std::uint64_t tm) { - return sem_helper::wait(std::get<1>(h_), tm); - } - - bool sema_post(long count) { - return sem_helper::post(std::get<1>(h_), count); - } - - bool handshake_wait(std::uint64_t tm) { - return sem_helper::wait(std::get<2>(h_), tm); - } - - bool handshake_post(long count) { - return sem_helper::post(std::get<2>(h_), count); - } - }; - -public: - handle_t open_h(ipc::string && name) { - auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0); - if (sem == sem_helper::invalid()) { - return invalid(); - } - auto han = sem_helper::open(("__WAITER_HELPER_HAN__" + name).c_str(), 0); - if (han == sem_helper::invalid()) { - return invalid(); - } - return std::make_tuple(std::move(name), sem, han); - } - - void release_h(handle_t const & h) { - sem_helper::close(std::get<2>(h)); - sem_helper::close(std::get<1>(h)); - } - - void close_h(handle_t const & h) { - auto const & name = std::get<0>(h); - sem_helper::destroy(("__WAITER_HELPER_HAN__" + name).c_str()); - sem_helper::destroy(("__WAITER_HELPER_SEM__" + name).c_str()); - } - - bool open() { - return lock_.open(); - } - - void close() { - lock_.close(); - } - - template - bool wait_if(handle_t const & h, wait_flags * flags, F&& pred, std::uint64_t tm = invalid_value) { - assert(flags != nullptr); - contrl ctrl { this, flags, h }; - - class non_mutex { - public: - void lock () noexcept {} - void unlock() noexcept {} - } nm; - - return waiter_helper::wait_if(ctrl, nm, std::forward(pred), tm); - } - - bool notify(handle_t const & h) { - contrl ctrl { this, nullptr, h }; - return waiter_helper::notify(ctrl); - } - - bool broadcast(handle_t const & h) { - contrl ctrl { this, nullptr, h }; - return waiter_helper::broadcast(ctrl); - } - - bool quit_waiting(handle_t const & h, wait_flags * flags) { - assert(flags != nullptr); - contrl ctrl { this, flags, h }; - return waiter_helper::quit_waiting(ctrl); - } -}; - -class waiter { - waiter_holder helper_; - std::atomic opened_ { 0 }; - -public: - using handle_t = waiter_holder::handle_t; - - static handle_t invalid() noexcept { - return waiter_holder::invalid(); - } - - handle_t open(char const * name) { - if (name == nullptr || name[0] == '\0') { - return invalid(); - } - if ((opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && !helper_.open()) { - return invalid(); - } - return helper_.open_h(name); - } - - void close(handle_t h) { - if (h == invalid()) return; - helper_.release_h(h); - if (opened_.fetch_sub(1, std::memory_order_release) == 1) { - helper_.close_h(h); - helper_.close(); - } - } - - template - bool wait_if(handle_t h, waiter_helper::wait_flags * flags, F && pred, std::uint64_t tm = invalid_value) { - if (h == invalid()) return false; - return helper_.wait_if(h, flags, std::forward(pred), tm); - } - - bool notify(handle_t h) { - if (h == invalid()) return false; - return helper_.notify(h); - } - - bool broadcast(handle_t h) { - if (h == invalid()) return false; - return helper_.broadcast(h); - } - - bool quit_waiting(handle_t h, waiter_helper::wait_flags * flags) { - if (h == invalid()) return false; - return helper_.quit_waiting(h, flags); - } -}; - -} // namespace detail -} // namespace ipc diff --git a/src/libipc/platform/waiter_win.h b/src/libipc/platform/waiter_win.h deleted file mode 100755 index 9f5f888..0000000 --- a/src/libipc/platform/waiter_win.h +++ /dev/null @@ -1,233 +0,0 @@ -#pragma once - -#include - -#include -#include -#include -#include - -#include "libipc/rw_lock.h" -#include "libipc/pool_alloc.h" -#include "libipc/shm.h" -#include "libipc/waiter_helper.h" - -#include "libipc/utility/log.h" -#include "libipc/platform/to_tchar.h" -#include "libipc/platform/get_sa.h" -#include "libipc/platform/detail.h" -#include "libipc/memory/resource.h" - -namespace ipc { -namespace detail { - -class semaphore { - HANDLE h_ = NULL; - -public: - static void remove(char const * /*name*/) {} - - bool open(ipc::string && name, long count = 0, long limit = LONG_MAX) { - h_ = ::CreateSemaphore(detail::get_sa(), count, limit, ipc::detail::to_tchar(std::move(name)).c_str()); - if (h_ == NULL) { - ipc::error("fail CreateSemaphore[%lu]: %s\n", ::GetLastError(), name.c_str()); - return false; - } - return true; - } - - void close() { - ::CloseHandle(h_); - } - - bool wait(std::uint64_t tm = invalid_value) { - DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast(tm); - switch ((ret = ::WaitForSingleObject(h_, ms))) { - case WAIT_OBJECT_0: - return true; - case WAIT_TIMEOUT: - return false; - case WAIT_ABANDONED: - default: - ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); - return false; - } - } - - bool post(long count = 1) { - if (::ReleaseSemaphore(h_, count, NULL)) { - return true; - } - ipc::error("fail ReleaseSemaphore[%lu]\n", ::GetLastError()); - return false; - } -}; - -class mutex : public semaphore { - using semaphore::wait; - using semaphore::post; - -public: - bool open(ipc::string && name) { - return semaphore::open(std::move(name), 1, 1); - } - - bool lock () { return semaphore::wait(); } - bool unlock() { return semaphore::post(); } -}; - -class condition { - using wait_flags = waiter_helper::wait_flags; - using wait_counter = waiter_helper::wait_counter; - - mutex lock_; - semaphore sema_, handshake_; - wait_counter * cnt_ = nullptr; - - struct contrl { - condition * me_; - wait_flags * flags_; - - wait_flags & flags() noexcept { - assert(flags_ != nullptr); - return *flags_; - } - - wait_counter & counter() noexcept { - assert(me_->cnt_ != nullptr); - return *(me_->cnt_); - } - - auto get_lock() { - return ipc::detail::unique_lock(me_->lock_); - } - - bool sema_wait(std::uint64_t tm) { - return me_->sema_.wait(tm); - } - - bool sema_post(long count) { - return me_->sema_.post(count); - } - - bool handshake_wait(std::uint64_t tm) { - return me_->handshake_.wait(tm); - } - - bool handshake_post(long count) { - return me_->handshake_.post(count); - } - }; - -public: - friend bool operator==(condition const & c1, condition const & c2) { - return c1.cnt_ == c2.cnt_; - } - - friend bool operator!=(condition const & c1, condition const & c2) { - return !(c1 == c2); - } - - static void remove(char const * name) { - semaphore::remove((ipc::string{ "__COND_HAN__" } + name).c_str()); - semaphore::remove((ipc::string{ "__COND_SEM__" } + name).c_str()); - mutex ::remove((ipc::string{ "__COND_MTX__" } + name).c_str()); - } - - bool open(ipc::string const & name, wait_counter * cnt) { - if (lock_ .open("__COND_MTX__" + name) && - sema_ .open("__COND_SEM__" + name) && - handshake_.open("__COND_HAN__" + name)) { - cnt_ = cnt; - return true; - } - return false; - } - - void close() { - handshake_.close(); - sema_ .close(); - lock_ .close(); - } - - template - bool wait_if(Mutex & mtx, wait_flags * flags, F && pred, std::uint64_t tm = invalid_value) { - assert(flags != nullptr); - contrl ctrl { this, flags }; - return waiter_helper::wait_if(ctrl, mtx, std::forward(pred), tm); - } - - bool notify() { - contrl ctrl { this, nullptr }; - return waiter_helper::notify(ctrl); - } - - bool broadcast() { - contrl ctrl { this, nullptr }; - return waiter_helper::broadcast(ctrl); - } - - bool quit_waiting(wait_flags * flags) { - assert(flags != nullptr); - contrl ctrl { this, flags }; - return waiter_helper::quit_waiting(ctrl); - } -}; - -class waiter { - waiter_helper::wait_counter cnt_; - -public: - using handle_t = condition; - - static handle_t invalid() { - return condition {}; - } - - handle_t open(char const * name) { - if (name == nullptr || name[0] == '\0') { - return invalid(); - } - condition cond; - if (cond.open(name, &cnt_)) { - return cond; - } - return invalid(); - } - - void close(handle_t& h) { - if (h == invalid()) return; - h.close(); - } - - template - bool wait_if(handle_t& h, waiter_helper::wait_flags * flags, F&& pred, std::uint64_t tm = invalid_value) { - if (h == invalid()) return false; - - class non_mutex { - public: - void lock () noexcept {} - void unlock() noexcept {} - } nm; - - return h.wait_if(nm, flags, std::forward(pred), tm); - } - - bool notify(handle_t& h) { - if (h == invalid()) return false; - return h.notify(); - } - - bool broadcast(handle_t& h) { - if (h == invalid()) return false; - return h.broadcast(); - } - - bool quit_waiting(handle_t& h, waiter_helper::wait_flags * flags) { - if (h == invalid()) return false; - return h.quit_waiting(flags); - } -}; - -} // namespace detail -} // namespace ipc diff --git a/src/libipc/platform/waiter_wrapper.h b/src/libipc/platform/waiter_wrapper.h deleted file mode 100755 index 18c06e0..0000000 --- a/src/libipc/platform/waiter_wrapper.h +++ /dev/null @@ -1,291 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "libipc/shm.h" - -#include "libipc/memory/resource.h" -#include "libipc/platform/detail.h" -#if defined(IPC_OS_WINDOWS_) - -#include "libipc/platform/waiter_win.h" - -namespace ipc { -namespace detail { - -using mutex_impl = ipc::detail::mutex; -using semaphore_impl = ipc::detail::semaphore; - -class condition_impl : public ipc::detail::condition { - using base_t = ipc::detail::condition; - - ipc::shm::handle cnt_h_; - waiter_helper::wait_flags flags_; - -public: - static void remove(char const * name) { - base_t::remove(name); - ipc::string n = name; - ipc::shm::remove((n + "__COND_CNT__" ).c_str()); - ipc::shm::remove((n + "__COND_WAIT__").c_str()); - } - - bool open(char const * name) { - if (cnt_h_ .acquire( - (ipc::string { name } + "__COND_CNT__" ).c_str(), - sizeof(waiter_helper::wait_counter))) { - flags_.is_closed_.store(false, std::memory_order_release); - return base_t::open(name, - static_cast(cnt_h_.get())); - } - return false; - } - - void close() { - flags_.is_closed_.store(true, std::memory_order_release); - base_t::quit_waiting(&flags_); - base_t::close(); - cnt_h_.release(); - } - - bool wait(mutex_impl& mtx, std::uint64_t tm = invalid_value) { - return base_t::wait_if(mtx, &flags_, [] { return true; }, tm); - } -}; - -} // namespace detail -} // namespace ipc - -#elif defined(IPC_OS_LINUX_) - -#include "libipc/platform/waiter_linux.h" - -namespace ipc { -namespace detail { - -template -class object_impl { - ipc::shm::handle h_; - - struct info_t { - T object_; - std::atomic opened_; - }; - -public: - static void remove(char const * name) { - { - ipc::shm::handle h { name, sizeof(info_t) }; - if (h.valid()) { - auto info = static_cast(h.get()); - info->object_.close(); - } - } - ipc::shm::remove(name); - } - - T& object() { - return static_cast(h_.get())->object_; - } - - template - bool open(char const * name, P&&... params) { - if (!h_.acquire(name, sizeof(info_t))) { - return false; - } - auto info = static_cast(h_.get()); - if ((info->opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && - !info->object_.open(std::forward

(params)...)) { - return false; - } - return true; - } - - void close() { - if (!h_.valid()) return; - auto info = static_cast(h_.get()); - if (info->opened_.fetch_sub(1, std::memory_order_release) == 1) { - info->object_.close(); - } - h_.release(); - } -}; - -class mutex_impl : public object_impl { -public: - bool lock () { return object().lock (); } - bool unlock() { return object().unlock(); } -}; - -class condition_impl : public object_impl { -public: - bool wait(mutex_impl& mtx, std::uint64_t tm = invalid_value) { - return object().wait(mtx.object(), tm); - } - - bool notify () { return object().notify (); } - bool broadcast() { return object().broadcast(); } -}; - -class semaphore_impl { - sem_helper::handle_t h_; - ipc::shm::handle opened_; // std::atomic - ipc::string name_; - - auto cnt() { - return static_cast*>(opened_.get()); - } - -public: - static void remove(char const * name) { - sem_helper::destroy((ipc::string{ "__SEMAPHORE_IMPL_SEM__" } + name).c_str()); - ipc::shm::remove ((ipc::string{ "__SEMAPHORE_IMPL_CNT__" } + name).c_str()); - } - - bool open(char const * name, long count) { - name_ = name; - if (!opened_.acquire(("__SEMAPHORE_IMPL_CNT__" + name_).c_str(), sizeof(std::atomic))) { - return false; - } - if ((h_ = sem_helper::open(("__SEMAPHORE_IMPL_SEM__" + name_).c_str(), count)) == sem_helper::invalid()) { - return false; - } - cnt()->fetch_add(1, std::memory_order_acq_rel); - return true; - } - - void close() { - if (h_ == sem_helper::invalid()) return; - sem_helper::close(h_); - if (cnt() == nullptr) return; - if (cnt()->fetch_sub(1, std::memory_order_release) == 1) { - sem_helper::destroy(("__SEMAPHORE_IMPL_SEM__" + name_).c_str()); - } - opened_.release(); - } - - bool wait(std::uint64_t tm = invalid_value) { - return sem_helper::wait(h_, tm); - } - - bool post(long count) { - return sem_helper::post(h_, count); - } -}; - -} // namespace detail -} // namespace ipc -#else/*linux*/ -# error "Unsupported platform." -#endif - -namespace ipc { -namespace detail { - -class waiter_wrapper { -public: - using waiter_t = detail::waiter; - -private: - waiter_t* w_ = nullptr; - waiter_t::handle_t h_ = waiter_t::invalid(); - waiter_helper::wait_flags flags_; - -public: - waiter_wrapper() = default; - explicit waiter_wrapper(waiter_t* w) { - attach(w); - } - waiter_wrapper(const waiter_wrapper&) = delete; - waiter_wrapper& operator=(const waiter_wrapper&) = delete; - - waiter_t * waiter() { return w_; } - waiter_t const * waiter() const { return w_; } - - void attach(waiter_t* w) { - close(); - w_ = w; - } - - bool valid() const { - return (w_ != nullptr) && (h_ != waiter_t::invalid()); - } - - bool open(char const * name) { - if (w_ == nullptr) return false; - close(); - flags_.is_closed_.store(false, std::memory_order_release); - h_ = w_->open(name); - return valid(); - } - - void close() { - if (!valid()) return; - flags_.is_closed_.store(true, std::memory_order_release); - quit_waiting(); - w_->close(h_); - h_ = waiter_t::invalid(); - } - - void quit_waiting() { - w_->quit_waiting(h_, &flags_); - } - - template - bool wait_if(F && pred, std::uint64_t tm = invalid_value) { - if (!valid()) return false; - return w_->wait_if(h_, &flags_, std::forward(pred), tm); - } - - bool notify() { - if (!valid()) return false; - w_->notify(h_); - return true; - } - - bool broadcast() { - if (!valid()) return false; - w_->broadcast(h_); - return true; - } -}; - -} // namespace detail - -class waiter : public detail::waiter_wrapper { - - shm::handle shm_; - - using detail::waiter_wrapper::attach; - -public: - waiter() = default; - waiter(char const * name) { - open(name); - } - - ~waiter() { - close(); - } - - bool open(char const * name) { - if (name == nullptr || name[0] == '\0') { - return false; - } - close(); - if (!shm_.acquire((ipc::string{ "__SHM_WAITER__" } + name).c_str(), sizeof(waiter_t))) { - return false; - } - attach(static_cast(shm_.get())); - return detail::waiter_wrapper::open((ipc::string{ "__IMP_WAITER__" } + name).c_str()); - } - - void close() { - detail::waiter_wrapper::close(); - shm_.release(); - } -}; - -} // namespace ipc diff --git a/src/libipc/waiter.cpp b/src/libipc/waiter.cpp deleted file mode 100755 index 2ac32c0..0000000 --- a/src/libipc/waiter.cpp +++ /dev/null @@ -1,77 +0,0 @@ - -#include - -#include "libipc/waiter.h" - -#include "libipc/utility/pimpl.h" -#include "libipc/platform/waiter_wrapper.h" - -#undef IPC_PP_CAT_ -#undef IPC_PP_JOIN_T__ -#undef IPC_PP_JOIN_ - -#define IPC_PP_CAT_(X, ...) X##__VA_ARGS__ -#define IPC_PP_JOIN_T__(X, ...) IPC_PP_CAT_(X, __VA_ARGS__) -#define IPC_PP_JOIN_(X, ...) IPC_PP_JOIN_T__(X, __VA_ARGS__) - -namespace ipc { - -#undef IPC_OBJECT_TYPE_ -#undef IPC_OBJECT_TYPE_OPEN_PARS_ -#undef IPC_OBJECT_TYPE_OPEN_ARGS_ - -#define IPC_OBJECT_TYPE_ mutex -#define IPC_OBJECT_TYPE_OPEN_PARS_ -#define IPC_OBJECT_TYPE_OPEN_ARGS_ - -#include "libipc/waiter_template.inc" - -bool mutex::lock() { - return impl(p_)->h_.lock(); -} - -bool mutex::unlock() { - return impl(p_)->h_.unlock(); -} - -#undef IPC_OBJECT_TYPE_ -#undef IPC_OBJECT_TYPE_OPEN_PARS_ -#undef IPC_OBJECT_TYPE_OPEN_ARGS_ - -#define IPC_OBJECT_TYPE_ semaphore -#define IPC_OBJECT_TYPE_OPEN_PARS_ , long count -#define IPC_OBJECT_TYPE_OPEN_ARGS_ , count - -#include "libipc/waiter_template.inc" - -bool semaphore::wait(std::uint64_t tm) { - return impl(p_)->h_.wait(tm); -} - -bool semaphore::post(long count) { - return impl(p_)->h_.post(count); -} - -#undef IPC_OBJECT_TYPE_ -#undef IPC_OBJECT_TYPE_OPEN_PARS_ -#undef IPC_OBJECT_TYPE_OPEN_ARGS_ - -#define IPC_OBJECT_TYPE_ condition -#define IPC_OBJECT_TYPE_OPEN_PARS_ -#define IPC_OBJECT_TYPE_OPEN_ARGS_ - -#include "libipc/waiter_template.inc" - -bool condition::wait(mutex& mtx, std::uint64_t tm) { - return impl(p_)->h_.wait(impl(mtx.p_)->h_, tm); -} - -bool condition::notify() { - return impl(p_)->h_.notify(); -} - -bool condition::broadcast() { - return impl(p_)->h_.broadcast(); -} - -} // namespace ipc diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h deleted file mode 100644 index 39031f1..0000000 --- a/src/libipc/waiter_helper.h +++ /dev/null @@ -1,143 +0,0 @@ -#pragma once - -#include -#include -#include - -#include "libipc/def.h" -#include "libipc/utility/scope_guard.h" - -namespace ipc { -namespace detail { - -struct waiter_helper { - - struct wait_counter { - std::atomic waiting_ { 0 }; - long counter_ = 0; - }; - - struct wait_flags { - std::atomic is_waiting_ { false }; - std::atomic is_closed_ { true }; - std::atomic need_dest_ { false }; - }; - - template - static bool wait_if(Ctrl & ctrl, Mutex & mtx, F && pred, std::uint64_t tm) { - auto & flags = ctrl.flags(); - if (flags.is_closed_.load(std::memory_order_acquire)) { - return false; - } - - auto & counter = ctrl.counter(); - counter.waiting_.fetch_add(1, std::memory_order_release); - flags.is_waiting_.store(true, std::memory_order_relaxed); - auto finally = ipc::guard([&ctrl, &counter, &flags] { - for (auto curr_wait = counter.waiting_.load(std::memory_order_relaxed); curr_wait > 0;) { - if (counter.waiting_.compare_exchange_weak(curr_wait, curr_wait - 1, std::memory_order_acq_rel)) { - break; - } - } - flags.is_waiting_.store(false, std::memory_order_relaxed); - }); - { - IPC_UNUSED_ auto guard = ctrl.get_lock(); - if (!std::forward(pred)()) return true; - counter.counter_ = counter.waiting_.load(std::memory_order_relaxed); - } - mtx.unlock(); - - bool ret = false; - do { - bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed); - bool is_closed = flags.is_closed_ .load(std::memory_order_acquire); - if (!is_waiting || is_closed) { - flags.need_dest_.store(false, std::memory_order_release); - ret = false; - break; - } - else if (flags.need_dest_.exchange(false, std::memory_order_release)) { - ret = false; - ctrl.sema_wait(default_timeout); - break; - } - else { - ret = ctrl.sema_wait(tm); - } - } while (flags.need_dest_.load(std::memory_order_acquire)); - finally.do_exit(); - ret = ctrl.handshake_post(1) && ret; - - mtx.lock(); - return ret; - } - - template - static void clear_handshake(Ctrl & ctrl) { - while (ctrl.handshake_wait(0)) ; - } - - template - static bool notify(Ctrl & ctrl) { - auto & counter = ctrl.counter(); - if ((counter.waiting_.load(std::memory_order_acquire)) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ctrl.get_lock(); - clear_handshake(ctrl); - if (counter.counter_ > 0) { - ret = ctrl.sema_post(1); - counter.counter_ -= 1; - ret = ret && ctrl.handshake_wait(default_timeout); - } - return ret; - } - - template - static bool broadcast(Ctrl & ctrl) { - auto & counter = ctrl.counter(); - if ((counter.waiting_.load(std::memory_order_acquire)) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ctrl.get_lock(); - clear_handshake(ctrl); - if (counter.counter_ > 0) { - ret = ctrl.sema_post(counter.counter_); - auto tm = default_timeout / counter.counter_; - do { - counter.counter_ -= 1; - ret = ret && ctrl.handshake_wait(tm); - } while (counter.counter_ > 0); - counter.waiting_.store(0, std::memory_order_release); - } - return ret; - } - - template - static bool quit_waiting(Ctrl & ctrl) { - auto & flags = ctrl.flags(); - flags.need_dest_.store(true, std::memory_order_relaxed); - if (!flags.is_waiting_.exchange(false, std::memory_order_release)) { - return true; - } - auto & counter = ctrl.counter(); - if ((counter.waiting_.load(std::memory_order_acquire)) == 0) { - return true; - } - bool ret = true; - IPC_UNUSED_ auto guard = ctrl.get_lock(); - clear_handshake(ctrl); - if (counter.counter_ > 0) { - ret = ctrl.sema_post(counter.counter_); - counter.counter_ -= 1; - ret = ret && ctrl.handshake_wait(default_timeout); - } - return ret; - } -}; - -} // namespace detail -} // namespace ipc diff --git a/src/libipc/waiter_template.inc b/src/libipc/waiter_template.inc deleted file mode 100755 index 976b3d4..0000000 --- a/src/libipc/waiter_template.inc +++ /dev/null @@ -1,71 +0,0 @@ - -#undef IPC_OBJECT_TYPE_P_ -#undef IPC_OBJECT_TYPE_I_ - -#define IPC_OBJECT_TYPE_P_ IPC_PP_JOIN_(IPC_OBJECT_TYPE_, _) -#define IPC_OBJECT_TYPE_I_ IPC_PP_JOIN_(IPC_OBJECT_TYPE_, _impl) - -class IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_P_ : public pimpl { -public: - std::string n_; - ipc::detail::IPC_OBJECT_TYPE_I_ h_; -}; - -void IPC_OBJECT_TYPE_::remove(char const * name) { - detail::IPC_OBJECT_TYPE_I_::remove(name); -} - -IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_() - : p_(p_->make()) { -} - -IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_(char const * name) - : IPC_OBJECT_TYPE_() { - open(name); -} - -IPC_OBJECT_TYPE_::IPC_OBJECT_TYPE_(IPC_OBJECT_TYPE_&& rhs) - : IPC_OBJECT_TYPE_() { - swap(rhs); -} - -IPC_OBJECT_TYPE_::~IPC_OBJECT_TYPE_() { - close(); - p_->clear(); -} - -void IPC_OBJECT_TYPE_::swap(IPC_OBJECT_TYPE_& rhs) { - std::swap(p_, rhs.p_); -} - -IPC_OBJECT_TYPE_& IPC_OBJECT_TYPE_::operator=(IPC_OBJECT_TYPE_ rhs) { - swap(rhs); - return *this; -} - -bool IPC_OBJECT_TYPE_::valid() const { - return (p_ != nullptr) && !impl(p_)->n_.empty(); -} - -char const * IPC_OBJECT_TYPE_::name() const { - return impl(p_)->n_.c_str(); -} - -bool IPC_OBJECT_TYPE_::open(char const * name IPC_OBJECT_TYPE_OPEN_PARS_) { - if (name == nullptr || name[0] == '\0') { - return false; - } - if (impl(p_)->n_ == name) return true; - close(); - if (impl(p_)->h_.open(name IPC_OBJECT_TYPE_OPEN_ARGS_)) { - impl(p_)->n_ = name; - return true; - } - return false; -} - -void IPC_OBJECT_TYPE_::close() { - if (!valid()) return; - impl(p_)->h_.close(); - impl(p_)->n_.clear(); -} diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 715fa16..c731198 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -29,7 +29,7 @@ struct msg_head { class rand_buf : public buffer { public: rand_buf() { - int size = capo::random<>{sizeof(msg_head), TestBuffMax}(); + int size = capo::random<>{(int)sizeof(msg_head), TestBuffMax}(); *this = buffer(new char[size], size, [](void * p, std::size_t) { delete [] static_cast(p); }); diff --git a/test/test_waiter.cpp b/test/test_waiter.cpp index 8230ed5..0802109 100755 --- a/test/test_waiter.cpp +++ b/test/test_waiter.cpp @@ -1,33 +1,34 @@ #include #include -#include "libipc/platform/waiter_wrapper.h" +#include "libipc/waiter.h" #include "test.h" namespace { TEST(Waiter, broadcast) { - ipc::detail::waiter w; + ipc::detail::waiter waiter; std::thread ts[10]; + int k = 0; for (auto& t : ts) { - t = std::thread([&w] { - ipc::detail::waiter_wrapper wp { &w }; - EXPECT_TRUE(wp.open("test-ipc-waiter")); - EXPECT_TRUE(wp.wait_if([] { return true; })); - wp.close(); + t = std::thread([&k] { + ipc::detail::waiter waiter {"test-ipc-waiter"}; + EXPECT_TRUE(waiter.valid()); + for (int i = 0; i < 99; ++i) { + ASSERT_TRUE(waiter.wait_if([&k, &i] { return k == i; })); + } }); } - ipc::detail::waiter_wrapper wp { &w }; - EXPECT_TRUE(wp.open("test-ipc-waiter")); - + EXPECT_TRUE(waiter.open("test-ipc-waiter")); std::cout << "waiting for broadcast...\n"; - std::this_thread::sleep_for(std::chrono::seconds(1)); - EXPECT_TRUE(wp.broadcast()); - + for (k = 1; k < 100; ++k) { + std::cout << "broadcast: " << k << "\n"; + ASSERT_TRUE(waiter.broadcast()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } for (auto& t : ts) t.join(); - wp.close(); } } // internal-linkage