diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index 7140006..b97b73a 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -2,18 +2,8 @@ #include -#include -#include -#include -#include -#include - #include "def.h" #include "log.h" -#include "shm.h" -#include "rw_lock.h" -#include "id_pool.h" -#include "pool_alloc.h" #include "platform/detail.h" @@ -115,9 +105,9 @@ public: #pragma pop_macro("IPC_PTHREAD_FUNC_") class semaphore { - mutex lock_; - condition cond_; - long counter_ = 0; + mutex lock_; + condition cond_; + long volatile counter_ = 0; public: bool open() { diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index 469631a..1e9ed57 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -18,7 +18,8 @@ namespace ipc { namespace detail { class waiter { - std::atomic counter_ { 0 }; + long volatile counter_ = 0; + spin_lock lock_; public: using handle_t = HANDLE; @@ -37,51 +38,32 @@ public: ::CloseHandle(h); } - template - static bool multi_wait_if(std::tuple const * all, std::size_t size, F&& pred) { - if (all == nullptr || size == 0) { - return false; - } - if (!std::forward(pred)()) return true; - auto hs = static_cast(mem::alloc(sizeof(handle_t) * size)); - IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(handle_t) * size); }); - std::size_t i = 0; - for (; i < size; ++i) { - auto& info = all[i]; - if ((std::get<0>(all[i]) == nullptr) || - (std::get<1>(all[i]) == invalid())) continue; - std::get<0>(info)->counter_.fetch_add(1, std::memory_order_relaxed); - hs[i] = std::get<1>(all[i]); - } - std::atomic_thread_fence(std::memory_order_release); - return ::WaitForMultipleObjects(static_cast(i), hs, FALSE, INFINITE) != WAIT_FAILED; - } - template bool wait_if(handle_t h, F&& pred) { if (h == invalid()) return false; - if (!std::forward(pred)()) return true; - counter_.fetch_add(1, std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_release); + { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + if (!std::forward(pred)()) return true; + ++ counter_; + } return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0; } void notify(handle_t h) { if (h == invalid()) return; - for (unsigned k = 0;;) { - auto c = counter_.load(std::memory_order_acquire); - if (c == 0) return; - if (counter_.compare_exchange_weak(c, c - 1, std::memory_order_release)) { - break; - } - ipc::yield(k); - } + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + if (counter_ == 0) return; + -- counter_; ::ReleaseSemaphore(h, 1, NULL); } void broadcast(handle_t h) { if (h == invalid()) return; - ::ReleaseSemaphore(h, counter_.exchange(0, std::memory_order_acquire), NULL); + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + if (counter_ == 0) return; + long all_count = counter_; + counter_ = 0; + ::ReleaseSemaphore(h, all_count, NULL); } }; diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 9af1cde..80b7288 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -1,10 +1,7 @@ #pragma once -#include -#include #include - -#include "pool_alloc.h" +#include #if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ @@ -26,10 +23,6 @@ private: waiter_t* w_ = nullptr; waiter_t::handle_t h_ = waiter_t::invalid(); - auto to_w_info() { - return std::make_tuple(w_, h_); - } - public: waiter_wrapper() = default; explicit waiter_wrapper(waiter_t* w) { diff --git a/src/prod_cons.h b/src/prod_cons.h index ac1d17c..e5a2edb 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -94,7 +94,7 @@ struct prod_cons_impl> template bool push(E* /*elems*/, F&& f, EB* elem_start) { circ::u2_t cur_ct, nxt_ct; - while(1) { + while (1) { cur_ct = ct_.load(std::memory_order_relaxed); if (circ::index_of(nxt_ct = cur_ct + 1) == circ::index_of(rd_.load(std::memory_order_acquire))) { @@ -106,7 +106,7 @@ struct prod_cons_impl> std::this_thread::yield(); } std::forward(f)(elem_start + circ::index_of(cur_ct)); - while(1) { + while (1) { auto exp_wt = cur_ct; if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { break; @@ -145,7 +145,7 @@ struct prod_cons_impl> { if (conn_cnt == 0) return false; auto el = elem_start + circ::index_of(wt_.load(std::memory_order_acquire)); // check all consumers have finished reading this element - while(1) { + while (1) { rc_t expected = 0; if (el->head_.rc_.compare_exchange_weak( expected, static_cast(conn_cnt), std::memory_order_release)) { @@ -193,7 +193,7 @@ struct prod_cons_impl> nxt_ct = cur_ct + 1; auto el = elem_start + circ::index_of(cur_ct); // check all consumers have finished reading this element - while(1) { + while (1) { rc_t expected = 0; if (el->head_.rc_.compare_exchange_weak( expected, static_cast(conn_cnt), std::memory_order_release)) { @@ -204,7 +204,7 @@ struct prod_cons_impl> if (conn_cnt == 0) return false; } std::forward(f)(el->data_); - while(1) { + while (1) { auto exp_wt = cur_ct; if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) { break;