diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index 4ea504e..a2b85c6 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -23,126 +23,164 @@ namespace ipc { namespace detail { -class semaphore { +class mutex { + pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER; + public: - using handle_t = sem_t*; - - constexpr static handle_t invalid() { - return SEM_FAILED; + pthread_mutex_t& native() { + return mutex_; } - static handle_t open(char const* name) { - handle_t sem = ::sem_open(name, O_CREAT, 0666, 0); - if (sem == SEM_FAILED) { - ipc::error("fail sem_open[%d]: %s\n", errno, name); - return invalid(); + bool open() { + int eno; + // init mutex + pthread_mutexattr_t mutex_attr; + if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) { + ipc::error("fail pthread_mutexattr_init[%d]\n", eno); + return false; } - return sem; + IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); + if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { + ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); + return false; + } + if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) { + ipc::error("fail pthread_mutex_init[%d]\n", eno); + return false; + } + return true; } -#pragma push_macro("IPC_SEMAPHORE_FUNC_") -#undef IPC_SEMAPHORE_FUNC_ -#define IPC_SEMAPHORE_FUNC_(CALL, PAR) \ - if (::CALL(PAR) != 0) { \ - ipc::error("fail " #CALL "[%d]\n", errno); \ - return false; \ - } \ - return true - - static bool close(handle_t h) { - if (h == invalid()) return false; - IPC_SEMAPHORE_FUNC_(sem_close, h); + bool close() { + int eno; + if ((eno = ::pthread_mutex_destroy(&mutex_)) != 0) { + ipc::error("fail pthread_mutex_destroy[%d]\n", eno); + return false; + } + return true; } - static bool destroy(char const* name) { - IPC_SEMAPHORE_FUNC_(sem_unlink, name); + bool lock() { + int eno; + if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) { + ipc::error("fail pthread_mutex_lock[%d]\n", eno); + return false; + } + return true; } - static bool post(handle_t h) { - if (h == invalid()) return false; - IPC_SEMAPHORE_FUNC_(sem_post, h); + bool unlock() { + int eno; + if ((eno = ::pthread_mutex_unlock(&mutex_)) != 0) { + ipc::error("fail pthread_mutex_unlock[%d]\n", eno); + return false; + } + return true; } - - static bool wait(handle_t h) { - if (h == invalid()) return false; - IPC_SEMAPHORE_FUNC_(sem_wait, h); - } - -#pragma pop_macro("IPC_SEMAPHORE_FUNC_") }; -class event { - std::atomic* cnt_ = nullptr; - semaphore::handle_t sem_ = semaphore::invalid(); - std::size_t wait_id_; - - std::string name() const { - return "__IPC_WAIT__" + std::to_string(wait_id_); - } +class condition { + pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER; public: - event(std::size_t id) - : wait_id_(static_cast>(id)) { - auto n = name(); - cnt_ = static_cast*>( - shm::acquire(n.c_str(), sizeof(std::atomic))); - if (cnt_ == nullptr) { - ipc::error("fail shm::acquire: %s\n", n.c_str()); - return; + bool open() { + int eno; + // init condition + pthread_condattr_t cond_attr; + if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) { + ipc::error("fail pthread_condattr_init[%d]\n", eno); + return false; } - cnt_->fetch_add(1, std::memory_order_acquire); - sem_ = semaphore::open(n.c_str()); - } - - ~event() { - semaphore::close(sem_); - if (cnt_->fetch_sub(1, std::memory_order_release) == 1) { - semaphore::destroy(name().c_str()); + IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); + if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { + ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); + return false; } - shm::release(cnt_, sizeof(std::atomic)); + if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) { + ipc::error("fail pthread_cond_init[%d]\n", eno); + return false; + } + return true; } - auto get_id() const noexcept { - return wait_id_; + bool close() { + int eno; + if ((eno = ::pthread_cond_destroy(&cond_)) != 0) { + ipc::error("fail pthread_cond_destroy[%d]\n", eno); + return false; + } + return true; } - bool wait() { - if (sem_ == semaphore::invalid()) return false; - return semaphore::wait(sem_); + bool wait(mutex& mtx) { + int eno; + if ((eno = ::pthread_cond_wait(&cond_, &mtx.native())) != 0) { + ipc::error("fail pthread_cond_wait[%d]\n", eno); + return false; + } + return true; } bool notify() { - if (sem_ == semaphore::invalid()) return false; - return semaphore::post(sem_); + int eno; + if ((eno = ::pthread_cond_signal(&cond_)) != 0) { + ipc::error("fail pthread_cond_signal[%d]\n", eno); + return false; + } + return true; + } + + bool broadcast() { + int eno; + if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) { + ipc::error("fail pthread_cond_broadcast[%d]\n", eno); + return false; + } + return true; + } +}; + +class semaphore { + mutex lock_; + condition cond_; + long counter_ = 0; + +public: + bool open() { + return lock_.open() && cond_.open(); + } + + void close() { + cond_.close(); + lock_.close(); + } + + template + void wait_if(F&& check) { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + while ((counter_ <= 0) && std::forward(check)()) { + cond_.wait(lock_); + } + -- counter_; + } + + template + bool post(F&& count) { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + auto c = std::forward(count)(); + if (c <= 0) return false; + counter_ += c; + return cond_.broadcast(); } }; class waiter { - using evt_id_t = decltype(std::declval().get_id()); - - std::atomic counter_ { 0 }; - spin_lock evt_lc_; - id_pool evt_ids_; - - std::size_t push_event(event const & evt) { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_); - std::size_t id = evt_ids_.acquire(); - if (id == invalid_value) { - ipc::error("fail push_event[has too many waiters]\n"); - return id; - } - (*static_cast(evt_ids_.at(id))) = evt.get_id(); - evt_ids_.mark_acquired(id); - return id; - } - - void pop_event(std::size_t id) { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_); - evt_ids_.release(id); - } + semaphore sem_; + std::atomic counter_ { 0 }; + std::atomic opened_ { 0 }; public: - using handle_t = waiter*; + using handle_t = waiter * ; constexpr static handle_t invalid() { return nullptr; @@ -152,60 +190,48 @@ public: if (name == nullptr || name[0] == '\0') { return invalid(); } - if (counter_.fetch_add(1, std::memory_order_acq_rel) == 0) { - evt_ids_.init(); + if ((opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && !sem_.open()) { + return invalid(); } return this; } void close(handle_t h) { if (h == invalid()) return; - counter_.fetch_sub(1, std::memory_order_acq_rel); + if (opened_.fetch_sub(1, std::memory_order_release) == 1) { + sem_.close(); + } } - static bool wait_all(std::tuple const * all, std::size_t size) { + template + static bool multi_wait_if(std::tuple const * all, std::size_t size, F&& /*check*/) { if (all == nullptr || size == 0) { return false; } - // calc a new wait-id & construct event object - event evt { ipc::detail::calc_unique_id() }; - auto ids = static_cast(mem::alloc(sizeof(std::size_t) * size)); - for (std::size_t i = 0; i < size; ++i) { - ids[i] = std::get<0>(all[i])->push_event(evt); - } - IPC_UNUSED_ auto guard = unique_ptr(ids, [all, size](std::size_t* ids) { - for (std::size_t i = 0; i < size; ++i) { - std::get<0>(all[i])->pop_event(ids[i]); - } - mem::free(ids, sizeof(std::size_t) * size); - }); - // wait for event signal - return evt.wait(); + return true; } - bool wait(handle_t h) { + template + bool wait_if(handle_t h, F&& check) { if (h == invalid()) return false; - auto info = std::make_tuple(this, h); - return wait_all(&info, 1); + counter_.fetch_add(1, std::memory_order_release); + IPC_UNUSED_ auto guard = unique_ptr(&counter_, [](decltype(counter_)* c) { + c->fetch_sub(1, std::memory_order_release); + }); + sem_.wait_if(std::forward(check)); + return true; } void notify(handle_t h) { if (h == invalid()) return; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_); - evt_ids_.for_acquired([this](auto id) { - event evt { *static_cast(evt_ids_.at(id)) }; - return !evt.notify(); // return if succ + sem_.post([this] { + return (0 < counter_.load(std::memory_order_relaxed)) ? 1 : 0; }); } void broadcast(handle_t h) { if (h == invalid()) return; - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_); - evt_ids_.for_acquired([this](auto id) { - event evt { *static_cast(evt_ids_.at(id)) }; - evt.notify(); - return true; // return after all - }); + sem_.post([this] { return counter_.load(std::memory_order_relaxed); }); } }; diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index aa69d91..05942a1 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -37,10 +37,12 @@ public: ::CloseHandle(h); } - static bool wait_all(std::tuple const * all, std::size_t size) { + template + static bool multi_wait_if(std::tuple const * all, std::size_t size, F&& check) { if (all == nullptr || size == 0) { return false; } + if (!std::forward(check)()) return true; auto hs = static_cast(mem::alloc(sizeof(handle_t) * size)); IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(handle_t) * size); }); std::size_t i = 0; @@ -55,8 +57,10 @@ public: return ::WaitForMultipleObjects(static_cast(i), hs, FALSE, INFINITE) != WAIT_FAILED; } - bool wait(handle_t h) { + template + bool wait_if(handle_t h, F&& check) { if (h == invalid()) return false; + if (!std::forward(check)()) return true; counter_.fetch_add(1, std::memory_order_relaxed); std::atomic_thread_fence(std::memory_order_release); return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0; diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 050b897..1cd97ea 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -42,6 +42,7 @@ public: waiter_t const * waiter() const { return w_; } void attach(waiter_t* w) { + close(); w_ = w; } @@ -62,7 +63,9 @@ public: h_ = waiter_t::invalid(); } - bool wait_all(waiter_wrapper * all, std::size_t size) { + + template + bool multi_wait_if(waiter_wrapper * all, std::size_t size, F&& check) { if (all == nullptr || size == 0) { return false; } @@ -75,12 +78,13 @@ public: if (!w.valid()) continue; hs[i] = w.to_w_info(); } - return waiter_t::wait_all(hs, i); + return waiter_t::multi_wait_if(hs, i, std::forward(check)); } - bool wait() { + template + bool wait_if(F&& check) { if (!valid()) return false; - return w_->wait(h_); + return w_->wait_if(h_, std::forward(check)); } bool notify() { diff --git a/src/queue.h b/src/queue.h index 2ad17a8..c9c2eb4 100644 --- a/src/queue.h +++ b/src/queue.h @@ -110,7 +110,11 @@ public: bool wait_for_connect(Elems* elems, std::size_t count) { if (elems == nullptr) return false; for (unsigned k = 0; elems->conn_count() < count;) { - ipc::sleep(k, [this] { return cc_waiter_.wait(); }); + ipc::sleep(k, [this, elems, count] { + return cc_waiter_.wait_if([elems, count] { + return elems->conn_count() < count; + }); + }); } return true; } @@ -214,13 +218,20 @@ public: return {}; } T item; - for (unsigned k = 0;;) { - if (elems_->pop(&this->cursor_, [&item](void* p) { + auto pop_item = [this, &item] { + return elems_->pop(&this->cursor_, [&item](void* p) { ::new (&item) T(std::move(*static_cast(p))); - })) { - return item; - } - ipc::sleep(k, [this] { return this->waiter_.wait(); }); + }); + }; + for (unsigned k = 0;;) { + if (pop_item()) return item; + bool succ = false; + ipc::sleep(k, [this, &succ, &pop_item] { + return this->waiter_.wait_if([&succ, &pop_item] { + return !(succ = pop_item()); + }); + }); + if (succ) return item; } } }; diff --git a/src/waiter.cpp b/src/waiter.cpp index 266001f..411a128 100644 --- a/src/waiter.cpp +++ b/src/waiter.cpp @@ -64,7 +64,7 @@ void waiter::close() { } bool waiter::wait() { - return impl(p_)->w_.wait(); + return impl(p_)->w_.wait_if([] { return true; }); } bool waiter::notify() { diff --git a/test/test_waiter.cpp b/test/test_waiter.cpp index 5376c02..22560a0 100644 --- a/test/test_waiter.cpp +++ b/test/test_waiter.cpp @@ -1,4 +1,5 @@ #include +#include #include "platform/waiter_wrapper.h" #include "test.h" @@ -13,34 +14,68 @@ class Unit : public TestSuite { } private slots: - void test_wakeup(); + void test_broadcast(); + void test_multiwait(); } unit__; #include "test_waiter.moc" -void Unit::test_wakeup() { +void Unit::test_broadcast() { ipc::detail::waiter w; + std::thread ts[10]; - std::thread t1 {[&w] { - ipc::detail::waiter_wrapper wp { &w }; - QVERIFY(wp.open("test-ipc-waiter")); - QVERIFY(wp.wait()); - }}; - - std::thread t2 {[&w] { - ipc::detail::waiter_wrapper wp { &w }; - QVERIFY(wp.open("test-ipc-waiter")); - QVERIFY(wp.wait()); - }}; + for (auto& t : ts) { + t = std::thread([&w] { + ipc::detail::waiter_wrapper wp { &w }; + QVERIFY(wp.open("test-ipc-waiter")); + QVERIFY(wp.wait_if([] { return true; })); + }); + } ipc::detail::waiter_wrapper wp { &w }; QVERIFY(wp.open("test-ipc-waiter")); + std::cout << "waiting for broadcast...\n"; std::this_thread::sleep_for(std::chrono::seconds(1)); QVERIFY(wp.broadcast()); - t1.join(); - t2.join(); + for (auto& t : ts) t.join(); +} + +void Unit::test_multiwait() { + ipc::detail::waiter w, mw; + std::thread ts[10]; + ipc::detail::waiter_wrapper ws[10]; + + std::size_t i = 0; + for (auto& t : ts) { + t = std::thread([&w, &mw, &ws, i] { + ipc::detail::waiter_wrapper wp { &w }; + ws[i].attach(&mw); + + QVERIFY(wp.open("test-ipc-waiter")); + QVERIFY(ws[i].open("test-ipc-multiwait")); + + QVERIFY(wp.wait_if([] { return true; })); + if (i == 3) { + std::cout << "waiting for notify...\n"; + std::this_thread::sleep_for(std::chrono::seconds(1)); + QVERIFY(ws[i].broadcast()); + } + }); + ++i; + } + + ipc::detail::waiter_wrapper wp { &w }, wm { &mw }; + QVERIFY(wp.open("test-ipc-waiter")); + QVERIFY(wm.open("test-ipc-multiwait")); + + std::cout << "waiting for broadcast...\n"; + std::this_thread::sleep_for(std::chrono::seconds(1)); + QVERIFY(wp.broadcast()); + QVERIFY(wm.multi_wait_if(ws, 10, [] { return true; })); + + for (auto& t : ts) t.join(); } } // internal-linkage