From 018bea223e840c699a81d8e1fea20fbb8c21f6a2 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jan 2019 20:43:23 +0800 Subject: [PATCH] try semaphore --- src/platform/waiter_linux.h | 94 +++++++++++++++++++++++++++---------- 1 file changed, 69 insertions(+), 25 deletions(-) diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index ac8b1c2..e7987ab 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -1,6 +1,10 @@ #pragma once #include +#include +#include +#include +#include #include #include @@ -20,6 +24,54 @@ namespace ipc { namespace detail { +class semaphore { +public: + using handle_t = sem_t*; + + constexpr static handle_t invalid() { + return SEM_FAILED; + } + + static handle_t open(char const* name) { + handle_t sem = ::sem_open(name, O_CREAT, 0666, 0); + if (sem == SEM_FAILED) { + ipc::error("fail sem_open[%d]: %s\n", errno, name); + return invalid(); + } + return sem; + } + +#pragma push_macro("IPC_SEMAPHORE_FUNC_") +#undef IPC_SEMAPHORE_FUNC_ +#define IPC_SEMAPHORE_FUNC_(CALL, PAR) \ + if (::CALL(PAR) != 0) { \ + ipc::error("fail " #CALL "[%d]\n", errno); \ + return false; \ + } \ + return true + + static bool close(handle_t h) { + if (h == invalid()) return false; + IPC_SEMAPHORE_FUNC_(sem_close, h); + } + + static bool destroy(char const* name) { + IPC_SEMAPHORE_FUNC_(sem_unlink, name); + } + + static bool post(handle_t h) { + if (h == invalid()) return false; + IPC_SEMAPHORE_FUNC_(sem_post, h); + } + + static bool wait(handle_t h) { + if (h == invalid()) return false; + IPC_SEMAPHORE_FUNC_(sem_wait, h); + } + +#pragma pop_macro("IPC_SEMAPHORE_FUNC_") +}; + class mutex { pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER; @@ -138,14 +190,8 @@ public: }; class event { - using cnt_t = std::atomic; - - struct info_t { - cnt_t cnt_; - mutex mutex_; - condition cond_; - } * info_ = nullptr; - + std::atomic* cnt_ = nullptr; + semaphore::handle_t sem_ = semaphore::invalid(); uint_t<16> wait_id_; std::string name() const { @@ -154,25 +200,23 @@ class event { void open() { auto n = name(); - info_ = static_cast(shm::acquire(n.c_str(), sizeof(info_t))); - if (info_ == nullptr) { + cnt_ = static_cast*>( + shm::acquire(n.c_str(), sizeof(std::atomic))); + if (cnt_ == nullptr) { ipc::error("fail shm::acquire: %s\n", n.c_str()); return; } - if (info_->cnt_.fetch_add(1, std::memory_order_acq_rel) == 0) { - if (!info_->mutex_.open()) return; - if (!info_->cond_.open()) return; + if (cnt_->fetch_add(1, std::memory_order_acq_rel) == 0) { + sem_ = semaphore::open(n.c_str()); } - info_->mutex_.lock(); } void close() { - info_->mutex_.unlock(); - if (info_->cnt_.fetch_sub(1, std::memory_order_acq_rel) == 1) { - info_->cond_.close(); - info_->mutex_.close(); + semaphore::close(sem_); + if (cnt_->fetch_sub(1, std::memory_order_acq_rel) == 1) { + semaphore::destroy(name().c_str()); } - shm::release(info_, sizeof(info_t)); + shm::release(cnt_, sizeof(std::atomic)); } public: @@ -190,13 +234,13 @@ public: } bool wait() { - if (info_ == nullptr) return false; - return info_->cond_.wait(info_->mutex_); + if (sem_ == semaphore::invalid()) return false; + return semaphore::wait(sem_); } bool notify() { - if (info_ == nullptr) return false; - return info_->cond_.notify(); + if (sem_ == semaphore::invalid()) return false; + return semaphore::post(sem_); } }; @@ -278,7 +322,7 @@ public: evt_ids_.for_acquired([this](auto id) { event evt { *static_cast(evt_ids_.at(id)) }; evt.notify(); - return false; + return false; // return first }); } @@ -288,7 +332,7 @@ public: evt_ids_.for_acquired([this](auto id) { event evt { *static_cast(evt_ids_.at(id)) }; evt.notify(); - return true; + return true; // return after all }); } };