From f7f06ab052f8fac05c5b18d0b19e0d3262160ea7 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Mon, 4 Feb 2019 10:41:39 +0800 Subject: [PATCH] give up multi-wait temporarily --- src/platform/waiter_linux.h | 88 +++++++++++------------------------ src/platform/waiter_win.h | 8 ++-- src/platform/waiter_wrapper.h | 22 +-------- test/test_ipc.cpp | 2 +- test/test_waiter.cpp | 37 --------------- 5 files changed, 35 insertions(+), 122 deletions(-) diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index a2b85c6..7140006 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -1,9 +1,6 @@ #pragma once -#include -#include -#include -#include +#include #include #include @@ -23,6 +20,16 @@ namespace ipc { namespace detail { +#pragma push_macro("IPC_PTHREAD_FUNC_") +#undef IPC_PTHREAD_FUNC_ +#define IPC_PTHREAD_FUNC_(CALL, ...) \ + int eno; \ + if ((eno = ::CALL(__VA_ARGS__)) != 0) { \ + ipc::error("fail " #CALL "[%d]\n", eno); \ + return false; \ + } \ + return true + class mutex { pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER; @@ -52,30 +59,15 @@ public: } bool close() { - int eno; - if ((eno = ::pthread_mutex_destroy(&mutex_)) != 0) { - ipc::error("fail pthread_mutex_destroy[%d]\n", eno); - return false; - } - return true; + IPC_PTHREAD_FUNC_(pthread_mutex_destroy, &mutex_); } bool lock() { - int eno; - if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) { - ipc::error("fail pthread_mutex_lock[%d]\n", eno); - return false; - } - return true; + IPC_PTHREAD_FUNC_(pthread_mutex_lock, &mutex_); } bool unlock() { - int eno; - if ((eno = ::pthread_mutex_unlock(&mutex_)) != 0) { - ipc::error("fail pthread_mutex_unlock[%d]\n", eno); - return false; - } - return true; + IPC_PTHREAD_FUNC_(pthread_mutex_unlock, &mutex_); } }; @@ -104,42 +96,24 @@ public: } bool close() { - int eno; - if ((eno = ::pthread_cond_destroy(&cond_)) != 0) { - ipc::error("fail pthread_cond_destroy[%d]\n", eno); - return false; - } - return true; + IPC_PTHREAD_FUNC_(pthread_cond_destroy, &cond_); } bool wait(mutex& mtx) { - int eno; - if ((eno = ::pthread_cond_wait(&cond_, &mtx.native())) != 0) { - ipc::error("fail pthread_cond_wait[%d]\n", eno); - return false; - } - return true; + IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native()); } bool notify() { - int eno; - if ((eno = ::pthread_cond_signal(&cond_)) != 0) { - ipc::error("fail pthread_cond_signal[%d]\n", eno); - return false; - } - return true; + IPC_PTHREAD_FUNC_(pthread_cond_signal, &cond_); } bool broadcast() { - int eno; - if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) { - ipc::error("fail pthread_cond_broadcast[%d]\n", eno); - return false; - } - return true; + IPC_PTHREAD_FUNC_(pthread_cond_broadcast, &cond_); } }; +#pragma pop_macro("IPC_PTHREAD_FUNC_") + class semaphore { mutex lock_; condition cond_; @@ -156,12 +130,14 @@ public: } template - void wait_if(F&& check) { + bool wait_if(F&& check) { + bool ret = true; IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); - while ((counter_ <= 0) && std::forward(check)()) { - cond_.wait(lock_); - } + while ((counter_ <= 0) && + std::forward(check)() && + (ret = cond_.wait(lock_))) ; -- counter_; + return ret; } template @@ -204,21 +180,13 @@ public: } template - static bool multi_wait_if(std::tuple const * all, std::size_t size, F&& /*check*/) { - if (all == nullptr || size == 0) { - return false; - } - return true; - } - - template - bool wait_if(handle_t h, F&& check) { + bool wait_if(handle_t h, F&& pred) { if (h == invalid()) return false; counter_.fetch_add(1, std::memory_order_release); IPC_UNUSED_ auto guard = unique_ptr(&counter_, [](decltype(counter_)* c) { c->fetch_sub(1, std::memory_order_release); }); - sem_.wait_if(std::forward(check)); + sem_.wait_if(std::forward(pred)); return true; } diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index 05942a1..469631a 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -38,11 +38,11 @@ public: } template - static bool multi_wait_if(std::tuple const * all, std::size_t size, F&& check) { + static bool multi_wait_if(std::tuple const * all, std::size_t size, F&& pred) { if (all == nullptr || size == 0) { return false; } - if (!std::forward(check)()) return true; + if (!std::forward(pred)()) return true; auto hs = static_cast(mem::alloc(sizeof(handle_t) * size)); IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(handle_t) * size); }); std::size_t i = 0; @@ -58,9 +58,9 @@ public: } template - bool wait_if(handle_t h, F&& check) { + bool wait_if(handle_t h, F&& pred) { if (h == invalid()) return false; - if (!std::forward(check)()) return true; + if (!std::forward(pred)()) return true; counter_.fetch_add(1, std::memory_order_relaxed); std::atomic_thread_fence(std::memory_order_release); return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0; diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 1cd97ea..9af1cde 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -63,28 +63,10 @@ public: h_ = waiter_t::invalid(); } - template - bool multi_wait_if(waiter_wrapper * all, std::size_t size, F&& check) { - if (all == nullptr || size == 0) { - return false; - } - using tp_t = decltype(std::declval().to_w_info()); - auto hs = static_cast(mem::alloc(sizeof(tp_t) * size)); - IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(tp_t) * size); }); - std::size_t i = 0; - for (; i < size; ++i) { - auto& w = all[i]; - if (!w.valid()) continue; - hs[i] = w.to_w_info(); - } - return waiter_t::multi_wait_if(hs, i, std::forward(check)); - } - - template - bool wait_if(F&& check) { + bool wait_if(F&& pred) { if (!valid()) return false; - return w_->wait_if(h_, std::forward(check)); + return w_->wait_if(h_, std::forward(pred)); } bool notify() { diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 16b0c64..7295bab 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -30,7 +30,7 @@ std::vector datas__; constexpr int DataMin = 2; constexpr int DataMax = 256; constexpr int LoopCount = 100000; -//constexpr int LoopCount = 10000; +//constexpr int LoopCount = 1000; } // internal-linkage diff --git a/test/test_waiter.cpp b/test/test_waiter.cpp index 22560a0..5077add 100644 --- a/test/test_waiter.cpp +++ b/test/test_waiter.cpp @@ -15,7 +15,6 @@ class Unit : public TestSuite { private slots: void test_broadcast(); - void test_multiwait(); } unit__; #include "test_waiter.moc" @@ -42,40 +41,4 @@ void Unit::test_broadcast() { for (auto& t : ts) t.join(); } -void Unit::test_multiwait() { - ipc::detail::waiter w, mw; - std::thread ts[10]; - ipc::detail::waiter_wrapper ws[10]; - - std::size_t i = 0; - for (auto& t : ts) { - t = std::thread([&w, &mw, &ws, i] { - ipc::detail::waiter_wrapper wp { &w }; - ws[i].attach(&mw); - - QVERIFY(wp.open("test-ipc-waiter")); - QVERIFY(ws[i].open("test-ipc-multiwait")); - - QVERIFY(wp.wait_if([] { return true; })); - if (i == 3) { - std::cout << "waiting for notify...\n"; - std::this_thread::sleep_for(std::chrono::seconds(1)); - QVERIFY(ws[i].broadcast()); - } - }); - ++i; - } - - ipc::detail::waiter_wrapper wp { &w }, wm { &mw }; - QVERIFY(wp.open("test-ipc-waiter")); - QVERIFY(wm.open("test-ipc-multiwait")); - - std::cout << "waiting for broadcast...\n"; - std::this_thread::sleep_for(std::chrono::seconds(1)); - QVERIFY(wp.broadcast()); - QVERIFY(wm.multi_wait_if(ws, 10, [] { return true; })); - - for (auto& t : ts) t.join(); -} - } // internal-linkage