diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index c152fcf..9503596 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -1,6 +1,13 @@ #pragma once #include +#include +#include +#include +#include + +#include +#include #include "def.h" #include "log.h" @@ -104,98 +111,191 @@ public: #pragma pop_macro("IPC_PTHREAD_FUNC_") -class semaphore { - mutex lock_; - condition cond_; - long counter_; +class sem_helper { +public: + using handle_t = sem_t*; + + constexpr static handle_t invalid() noexcept { + return SEM_FAILED; + } + + static handle_t open(char const* name, long count) { + handle_t sem = ::sem_open(name, O_CREAT, 0666, count); + if (sem == SEM_FAILED) { + ipc::error("fail sem_open[%d]: %s\n", errno, name); + return invalid(); + } + return sem; + } + +#pragma push_macro("IPC_SEMAPHORE_FUNC_") +#undef IPC_SEMAPHORE_FUNC_ +#define IPC_SEMAPHORE_FUNC_(CALL, PAR) \ + if (::CALL(PAR) != 0) { \ + ipc::error("fail " #CALL "[%d]\n", errno); \ + return false; \ + } \ + return true + + static bool close(handle_t h) { + if (h == invalid()) return false; + IPC_SEMAPHORE_FUNC_(sem_close, h); + } + + static bool destroy(char const* name) { + IPC_SEMAPHORE_FUNC_(sem_unlink, name); + } + + static bool post(handle_t h) { + if (h == invalid()) return false; + IPC_SEMAPHORE_FUNC_(sem_post, h); + } + + static bool wait(handle_t h) { + if (h == invalid()) return false; + IPC_SEMAPHORE_FUNC_(sem_wait, h); + } + +#pragma pop_macro("IPC_SEMAPHORE_FUNC_") +}; + +class waiter_helper { + mutex lock_; + + std::atomic waiting_ { 0 }; + long counter_ = 0; public: - bool open(long count = 0) { - if (lock_.open() && cond_.open()) { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - counter_ = count; - return true; + using handle_t = std::tuple; + + static handle_t invalid() noexcept { + return std::make_tuple(std::string{}, sem_helper::invalid(), sem_helper::invalid()); + } + + handle_t open_h(std::string && name) { + auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0); + if (sem == sem_helper::invalid()) { + return invalid(); } - return false; + auto han = sem_helper::open(("__WAITER_HELPER_HAN__" + name).c_str(), 0); + if (han == sem_helper::invalid()) { + return invalid(); + } + return std::make_tuple(std::move(name), sem, han); + } + + void release_h(handle_t const & h) { + sem_helper::close(std::get<2>(h)); + sem_helper::close(std::get<1>(h)); + } + + void close_h(handle_t const & h) { + auto const & name = std::get<0>(h); + sem_helper::destroy(("__WAITER_HELPER_HAN__" + name).c_str()); + sem_helper::destroy(("__WAITER_HELPER_SEM__" + name).c_str()); + } + + bool open() { + return lock_.open(); } void close() { - cond_.close(); lock_.close(); } template - bool wait_if(F&& check) { - bool ret = true; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - while ((counter_ <= 0) && - std::forward(check)() && - (ret = cond_.wait(lock_))) ; - -- counter_; + bool wait_if(handle_t const & h, F&& pred) { + waiting_.fetch_add(1, std::memory_order_release); + { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + if (!std::forward(pred)()) return true; + ++ counter_; + } + bool ret = sem_helper::wait(std::get<1>(h)); + waiting_.fetch_sub(1, std::memory_order_release); + ret = sem_helper::post(std::get<2>(h)) && ret; return ret; } - template - bool post(F&& count) { + bool notify(handle_t const & h) { + std::atomic_thread_fence(std::memory_order_acq_rel); + if (waiting_.load(std::memory_order_relaxed) == 0) { + return true; + } + bool ret = true; IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - auto c = std::forward(count)(); - if (c <= 0) return false; - counter_ += c; - return cond_.broadcast(); + if (counter_ > 0) { + ret = sem_helper::post(std::get<1>(h)); + -- counter_; + ret = ret && sem_helper::wait(std::get<2>(h)); + } + return ret; + } + + bool broadcast(handle_t const & h) { + std::atomic_thread_fence(std::memory_order_acq_rel); + if (waiting_.load(std::memory_order_relaxed) == 0) { + return true; + } + bool ret = true; + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + if (counter_ > 0) { + for (long i = 0; i < counter_; ++i) { + ret = ret && sem_helper::post(std::get<1>(h)); + } + do { + -- counter_; + ret = ret && sem_helper::wait(std::get<2>(h)); + } while (counter_ > 0); + } + return ret; } }; class waiter { - semaphore sem_; - std::atomic counter_ { 0 }; - std::atomic opened_ { 0 }; + waiter_helper helper_; + std::atomic opened_ { 0 }; public: - using handle_t = waiter * ; + using handle_t = waiter_helper::handle_t; - constexpr static handle_t invalid() { - return nullptr; + static handle_t invalid() noexcept { + return waiter_helper::invalid(); } handle_t open(char const * name) { if (name == nullptr || name[0] == '\0') { return invalid(); } - if ((opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && !sem_.open()) { + if ((opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && !helper_.open()) { return invalid(); } - return this; + return helper_.open_h(name); } void close(handle_t h) { + if (h == invalid()) return; + helper_.release_h(h); if (opened_.fetch_sub(1, std::memory_order_release) == 1) { - if (h == invalid()) return; - sem_.close(); + helper_.close_h(h); + helper_.close(); } } template bool wait_if(handle_t h, F&& pred) { if (h == invalid()) return false; - counter_.fetch_add(1, std::memory_order_release); - IPC_UNUSED_ auto guard = unique_ptr(&counter_, [](decltype(counter_)* c) { - c->fetch_sub(1, std::memory_order_release); - }); - return sem_.wait_if(std::forward(pred)); + return helper_.wait_if(h, std::forward(pred)); } void notify(handle_t h) { if (h == invalid()) return; - sem_.post([this] { - return (0 < counter_.load(std::memory_order_relaxed)) ? 1 : 0; - }); + helper_.notify(h); } void broadcast(handle_t h) { if (h == invalid()) return; - sem_.post([this] { - return counter_.load(std::memory_order_relaxed); - }); + helper_.broadcast(h); } }; diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index eb17156..9b1bb24 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -8,6 +8,7 @@ #include "rw_lock.h" #include "pool_alloc.h" #include "log.h" +#include "shm.h" #include "platform/to_tchar.h" #include "platform/detail.h" @@ -37,11 +38,20 @@ public: } bool wait() { - return ::WaitForSingleObject(h_, INFINITE) == WAIT_OBJECT_0; + DWORD ret; + if ((ret = ::WaitForSingleObject(h_, INFINITE)) == WAIT_OBJECT_0) { + return true; + } + ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); + return false; } bool post(long count = 1) { - return !!::ReleaseSemaphore(h_, count, NULL); + if (::ReleaseSemaphore(h_, count, NULL)) { + return true; + } + ipc::error("fail ReleaseSemaphore[%lu]\n", ::GetLastError()); + return false; } }; @@ -62,8 +72,13 @@ class condition { mutex lock_; semaphore sema_, handshake_; + ipc::shm::handle waiting_; // std::atomic long * counter_ = nullptr; + auto waiting_cnt() { + return static_cast*>(waiting_.get()); + } + public: friend bool operator==(condition const & c1, condition const & c2) { return c1.counter_ == c2.counter_; @@ -74,9 +89,10 @@ public: } bool open(std::string const & name, long * counter) { - if (lock_ .open(name + "__COND_MTX__") && - sema_ .open(name + "__COND_SEM__") && - handshake_.open(name + "__COND_HAN__")) { + if (lock_ .open ("__COND_MTX__" + name) && + sema_ .open ("__COND_SEM__" + name) && + handshake_.open ("__COND_HAN__" + name) && + waiting_ .acquire(("__COND_WAITING_CNT__" + name).c_str(), sizeof(std::atomic))) { counter_ = counter; return true; } @@ -84,6 +100,7 @@ public: } void close() { + waiting_ .release(); handshake_.close(); sema_ .close(); lock_ .close(); @@ -91,41 +108,57 @@ public: template bool wait_if(Mutex& mtx, F&& pred) { + auto cnt = waiting_cnt(); + if (cnt != nullptr) { + cnt->fetch_add(1, std::memory_order_release); + } { IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); if (!std::forward(pred)()) return true; ++ *counter_; } mtx.unlock(); - bool ret_s = sema_.wait(); - bool ret_h = handshake_.post(); + bool ret = sema_.wait(); + if (cnt != nullptr) { + cnt->fetch_sub(1, std::memory_order_release); + } + ret = handshake_.post() && ret; mtx.lock(); - return ret_s && ret_h; + return ret; } bool notify() { - bool ret_s = true, ret_h = true; + std::atomic_thread_fence(std::memory_order_acq_rel); + if (waiting_cnt() != nullptr && + waiting_cnt()->load(std::memory_order_relaxed) == 0) { + return true; + } + bool ret = true; IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); if (*counter_ > 0) { - ret_s = sema_.post(); + ret = sema_.post(); -- *counter_; - ret_h = handshake_.wait(); + ret = ret && handshake_.wait(); } - return ret_s && ret_h; + return ret; } bool broadcast() { - bool ret_s = true, ret_h = true; + std::atomic_thread_fence(std::memory_order_acq_rel); + if (waiting_cnt() != nullptr && + waiting_cnt()->load(std::memory_order_relaxed) == 0) { + return true; + } + bool ret = true; IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); if (*counter_ > 0) { - ret_s = sema_.post(*counter_); + ret = sema_.post(*counter_); do { -- *counter_; - bool rh = handshake_.wait(); - ret_h = ret_h && rh; + ret = ret && handshake_.wait(); } while (*counter_ > 0); } - return ret_s && ret_h; + return ret; } }; diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 1b0a558..8f9b55e 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -24,7 +24,7 @@ class condition_impl : public ipc::detail::condition { public: bool open(std::string const & name) { - if (h_.acquire((name + "__COND_CNT__").c_str(), sizeof(long volatile))) { + if (h_.acquire((name + "__COND_CNT__").c_str(), sizeof(long))) { return ipc::detail::condition::open(name, static_cast(h_.get())); } return false; @@ -69,8 +69,9 @@ public: if (!h_.acquire(name, sizeof(info_t))) { return false; } - if ((static_cast(h_.get())->opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && - !static_cast(h_.get())->object_.open(std::forward

(params)...)) { + auto info = static_cast(h_.get()); + if ((info->opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && + !info->object_.open(std::forward

(params)...)) { return false; } return true; @@ -78,8 +79,9 @@ public: void close() { if (!h_.valid()) return; - if (static_cast(h_.get())->opened_.fetch_sub(1, std::memory_order_release) == 1) { - static_cast(h_.get())->object_.close(); + auto info = static_cast(h_.get()); + if (info->opened_.fetch_sub(1, std::memory_order_release) == 1) { + info->object_.close(); } h_.release(); } @@ -91,17 +93,6 @@ public: bool unlock() { return object().unlock(); } }; -class semaphore_impl : public object_impl { -public: - bool wait() { - return object().wait_if([] { return true; }); - } - - bool post(long count) { - return object().post([count] { return count; }); - } -}; - class condition_impl : public object_impl { public: bool wait(mutex_impl& mtx) { @@ -112,6 +103,53 @@ public: bool broadcast() { return object().broadcast(); } }; +class semaphore_impl { + sem_helper::handle_t h_; + ipc::shm::handle opened_; // std::atomic + std::string name_; + + auto cnt() { + return static_cast*>(opened_.get()); + } + +public: + bool open(char const * name, long count) { + name_ = name; + if (!opened_.acquire(("__SEMAPHORE_IMPL_CNT__" + name_).c_str(), sizeof(std::atomic))) { + return false; + } + if ((h_ = sem_helper::open(("__SEMAPHORE_IMPL_SEM__" + name_).c_str(), count)) == sem_helper::invalid()) { + return false; + } + cnt()->fetch_add(1, std::memory_order_acq_rel); + return true; + } + + void close() { + if (h_ == sem_helper::invalid()) return; + sem_helper::close(h_); + if (cnt() == nullptr) return; + if (cnt()->fetch_sub(1, std::memory_order_release) == 1) { + sem_helper::destroy(("__SEMAPHORE_IMPL_SEM__" + name_).c_str()); + } + opened_.release(); + } + + bool wait() { + if (h_ == sem_helper::invalid()) return false; + return sem_helper::wait(h_); + } + + bool post(long count) { + if (h_ == sem_helper::invalid()) return false; + bool ret = true; + for (long i = 0; i < count; ++i) { + ret = ret && sem_helper::post(h_); + } + return ret; + } +}; + } // namespace detail } // namespace ipc diff --git a/src/prod_cons.h b/src/prod_cons.h index 4acd4da..21e6ecf 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -266,7 +266,7 @@ struct prod_cons_impl> { ct_.store(nxt_ct = cur_ct + 1, std::memory_order_release); std::forward(f)(&(el->data_)); // set flag & try update wt - el->f_ct_.store(~static_cast(cur_ct)); + el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); return true; }