From c667f1513f653fbf5ac5b59aa17f32b78cfb666a Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jan 2019 15:24:30 +0800 Subject: [PATCH] add multi-wait (not ready, TBD) --- src/channel.cpp | 7 +- src/id_pool.h | 44 +++++- src/ipc.cpp | 27 ++-- src/log.h | 32 ++++- src/memory/alloc.h | 6 +- src/platform/detail.h | 16 +++ src/platform/shm_linux.cpp | 6 +- src/platform/shm_win.cpp | 4 +- src/platform/waiter_linux.h | 257 +++++++++++++++++++++++++++------- src/platform/waiter_win.h | 23 +++ src/platform/waiter_wrapper.h | 25 ++++ 11 files changed, 366 insertions(+), 81 deletions(-) diff --git a/src/channel.cpp b/src/channel.cpp index 19e75bd..3287143 100644 --- a/src/channel.cpp +++ b/src/channel.cpp @@ -1,5 +1,6 @@ #include #include +#include #include "ipc.h" #include "shm.h" @@ -13,8 +14,8 @@ namespace { using namespace ipc; struct ch_info_t { - rw_lock lc_; - id_pool ch_acc_; // only support 255 channels with one name + rw_lock lc_; + id_pool<> ch_acc_; // only support 255 channels with one name }; struct ch_multi_routes { @@ -24,7 +25,7 @@ struct ch_multi_routes { std::size_t id_; bool marked_ = false; - std::array rts_; + std::array rts_; ch_info_t& info() { return *static_cast(h_.get()); diff --git a/src/id_pool.h b/src/id_pool.h index 39fb94b..87b2cda 100644 --- a/src/id_pool.h +++ b/src/id_pool.h @@ -3,9 +3,41 @@ #include #include "def.h" +#include "platform/detail.h" namespace ipc { +template +struct id_type { + uint_t<8> id_; + alignas(AlignSize) byte_t data_[DataSize] {}; + + id_type& operator=(uint_t<8> val) { + id_ = val; + return (*this); + } + + operator uint_t<8>() const { + return id_; + } +}; + +template +struct id_type<0, AlignSize> { + uint_t<8> id_; + + id_type& operator=(uint_t<8> val) { + id_ = val; + return (*this); + } + + operator uint_t<8>() const { + return id_; + } +}; + +template class id_pool { public: enum : std::size_t { @@ -13,9 +45,10 @@ public: }; private: + id_type next_[max_count]; + uint_t<8> acquir_ = 0; uint_t<8> cursor_ = 0; - uint_t<8> next_[max_count] {}; public: void init() { @@ -49,12 +82,13 @@ public: } bool release(std::size_t id) { + if (id == invalid_value) return false; if (acquir_ == max_count) return false; if (acquir_ == id) { acquir_ = next_[id]; // point to next } else { - auto a = next_[acquir_], l = acquir_; + uint_t<8> a = next_[acquir_], l = acquir_; while (1) { if (a == max_count) { return false; // found nothing @@ -76,10 +110,14 @@ public: void for_acquired(F&& fr) { auto a = acquir_; while (a != max_count) { - fr(a); + if (!fr(a)) return; a = next_[a]; } } + + void* at(std::size_t id) { + return next_[id].data_; + } }; } // namespace ipc diff --git a/src/ipc.cpp b/src/ipc.cpp index 5293e96..fa76797 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -14,6 +14,7 @@ #include "policy.h" #include "memory/resource.h" +#include "platform/detail.h" namespace { @@ -21,17 +22,8 @@ using namespace ipc; using msg_id_t = std::size_t; -inline auto acc_of_msg() { - static shm::handle g_shm { "GLOBAL_ACC_STORAGE__", sizeof(std::atomic) }; - return static_cast*>(g_shm.get()); -} - template = 201703L - std::size_t AlignSize = (std::min)(DataSize, alignof(std::size_t))> -# else /*__cplusplus < 201703L*/ - std::size_t AlignSize = (DataSize < alignof(std::size_t)) ? DataSize : alignof(std::size_t)> -# endif/*__cplusplus < 201703L*/ + std::size_t AlignSize = (ipc::detail::min)(DataSize, alignof(std::size_t))> struct msg_t; template @@ -141,8 +133,8 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { if (que == nullptr) { return false; } - // calc a new message id, start with 1 - auto msg_id = acc_of_msg()->fetch_add(1, std::memory_order_relaxed) + 1; + // calc a new message id + auto msg_id = ipc::detail::calc_unique_id(); // push message fragment int offset = 0; for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { @@ -173,7 +165,7 @@ static buff_t recv(ipc::handle_t h) { while (1) { // pop a new message auto msg = que->pop(); - if (msg.head_.id_ == 0) return {}; + if (msg.head_.que_ == nullptr) return {}; if (msg.head_.que_ == que) continue; // pop next // msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length std::size_t remain = static_cast( @@ -213,6 +205,15 @@ using policy_t = policy::choose; namespace ipc { +namespace detail { + +std::size_t calc_unique_id() { + static shm::handle g_shm { "__GLOBAL_ACC_STORAGE__", sizeof(std::atomic) }; + return static_cast*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed); +} + +} // namespace detail + template ipc::handle_t chan_impl::connect(char const * name) { return detail_impl>::connect(name); diff --git a/src/log.h b/src/log.h index 90ee676..a26d8bc 100644 --- a/src/log.h +++ b/src/log.h @@ -4,10 +4,36 @@ #include namespace ipc { +namespace detail { -template -void log(char const * fmt, P&&... params) { - std::fprintf(stderr, fmt, std::forward

(params)...); +template +void print(O out, char const * fmt) { + std::fprintf(out, "%s", fmt); +} + +template +void print(O out, char const * fmt, P1&& p1, P&&... params) { + std::fprintf(out, fmt, std::forward(p1), std::forward

(params)...); +} + +} // namespace detail + +inline void log(char const * fmt) { + ipc::detail::print(stdout, fmt); +} + +template +void log(char const * fmt, P1&& p1, P&&... params) { + ipc::detail::print(stdout, fmt, std::forward(p1), std::forward

(params)...); +} + +inline void error(char const * fmt) { + ipc::detail::print(stderr, fmt); +} + +template +void error(char const * fmt, P1&& p1, P&&... params) { + ipc::detail::print(stderr, fmt, std::forward(p1), std::forward

(params)...); } } // namespace ipc diff --git a/src/memory/alloc.h b/src/memory/alloc.h index 875bbc9..ace13a5 100644 --- a/src/memory/alloc.h +++ b/src/memory/alloc.h @@ -150,11 +150,7 @@ public: using alloc_policy = AllocP; enum : std::size_t { -# if __cplusplus >= 201703L - block_size = (std::max)(BlockSize, sizeof(void*)) -# else /*__cplusplus < 201703L*/ - block_size = (BlockSize < sizeof(void*)) ? sizeof(void*) : BlockSize -# endif/*__cplusplus < 201703L*/ + block_size = (ipc::detail::max)(BlockSize, sizeof(void*)) }; private: diff --git a/src/platform/detail.h b/src/platform/detail.h index 8ad3e00..c0f0c6e 100644 --- a/src/platform/detail.h +++ b/src/platform/detail.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include "def.h" @@ -55,6 +57,8 @@ namespace detail { using std::unique_ptr; using std::unique_lock; using std::shared_lock; +using std::max; +using std::min; #else /*__cplusplus < 201703L*/ @@ -79,8 +83,20 @@ constexpr auto shared_lock(T&& lc) { return std::shared_lock> { std::forward(lc) }; } +template +constexpr T const & max(const T& a, const T& b) { + return a < b ? b : a; +} + +template +constexpr T const & min(const T& a, const T& b) { + return b < a ? b : a; +} + #endif/*__cplusplus < 201703L*/ +std::size_t calc_unique_id(); + template constexpr decltype(auto) static_switch(std::size_t /*i*/, std::index_sequence<>, F&& /*f*/, D&& def) { return def(); diff --git a/src/platform/shm_linux.cpp b/src/platform/shm_linux.cpp index eb3d913..8f9f25d 100644 --- a/src/platform/shm_linux.cpp +++ b/src/platform/shm_linux.cpp @@ -51,12 +51,12 @@ void* acquire(char const * name, std::size_t size) { S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); if (fd == -1) { - ipc::log("fail shm_open[%d]: %s\n", errno, name); + ipc::error("fail shm_open[%d]: %s\n", errno, name); return nullptr; } size += sizeof(acc_t); if (::ftruncate(fd, static_cast(size)) != 0) { - ipc::log("fail ftruncate[%d]: %s\n", errno, name); + ipc::error("fail ftruncate[%d]: %s\n", errno, name); ::close(fd); ::shm_unlink(op_name.c_str()); return nullptr; @@ -64,7 +64,7 @@ void* acquire(char const * name, std::size_t size) { void* mem = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); ::close(fd); if (mem == MAP_FAILED) { - ipc::log("fail mmap[%d]: %s\n", errno, name); + ipc::error("fail mmap[%d]: %s\n", errno, name); ::shm_unlink(op_name.c_str()); return nullptr; } diff --git a/src/platform/shm_win.cpp b/src/platform/shm_win.cpp index 6c2edf4..b2d6815 100644 --- a/src/platform/shm_win.cpp +++ b/src/platform/shm_win.cpp @@ -35,12 +35,12 @@ void* acquire(char const * name, std::size_t size) { 0, static_cast(size), ipc::detail::to_tchar(std::string{"__IPC_SHM__"} + name).c_str()); if (h == NULL) { - ipc::log("fail CreateFileMapping[%d]: %s\n", static_cast(::GetLastError()), name); + ipc::error("fail CreateFileMapping[%d]: %s\n", static_cast(::GetLastError()), name); return nullptr; } LPVOID mem = ::MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0); if (mem == NULL) { - ipc::log("fail MapViewOfFile[%d]: %s\n", static_cast(::GetLastError()), name); + ipc::error("fail MapViewOfFile[%d]: %s\n", static_cast(::GetLastError()), name); ::CloseHandle(h); return nullptr; } diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index 3b90984..3bee40d 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -4,133 +4,292 @@ #include #include +#include +#include +#include #include "def.h" #include "log.h" +#include "shm.h" +#include "rw_lock.h" +#include "id_pool.h" +#include "pool_alloc.h" + #include "platform/detail.h" namespace ipc { namespace detail { -class condition { +class mutex { pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER; - pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER; public: - bool open(char const * name) { - if (name == nullptr || name[0] == '\0') return false; + pthread_mutex_t& native() { + return mutex_; + } + + bool open() { int eno; // init mutex pthread_mutexattr_t mutex_attr; if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) { - ipc::log("fail pthread_mutexattr_init[%d]: %s\n", eno, name); + ipc::error("fail pthread_mutexattr_init[%d]\n", eno); return false; } IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { - ipc::log("fail pthread_mutexattr_setpshared[%d]: %s\n", eno, name); + ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); return false; } if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) { - ipc::log("fail pthread_mutex_init[%d]: %s\n", eno, name); + ipc::error("fail pthread_mutex_init[%d]\n", eno); return false; } - auto guard_mutex = unique_ptr(&mutex_, ::pthread_mutex_destroy); + return true; + } + + bool close() { + int eno; + if ((eno = ::pthread_mutex_destroy(&mutex_)) != 0) { + ipc::error("fail pthread_mutex_destroy[%d]\n", eno); + return false; + } + return true; + } + + bool lock() { + int eno; + if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) { + ipc::error("fail pthread_mutex_lock[%d]\n", eno); + return false; + } + return true; + } + + bool unlock() { + int eno; + if ((eno = ::pthread_mutex_unlock(&mutex_)) != 0) { + ipc::error("fail pthread_mutex_unlock[%d]\n", eno); + return false; + } + return true; + } +}; + +class condition { + pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER; + +public: + bool open() { + int eno; // init condition pthread_condattr_t cond_attr; if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) { - ipc::log("fail pthread_condattr_init[%d]: %s\n", eno, name); + ipc::error("fail pthread_condattr_init[%d]\n", eno); return false; } IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { - ipc::log("fail pthread_condattr_setpshared[%d]: %s\n", eno, name); + ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); return false; } if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) { - ipc::log("fail pthread_cond_init[%d]: %s\n", eno, name); + ipc::error("fail pthread_cond_init[%d]\n", eno); return false; } - // no need to guard condition - // release guards - guard_mutex.release(); return true; } + bool close() { + int eno; + if ((eno = ::pthread_cond_destroy(&cond_)) != 0) { + ipc::error("fail pthread_cond_destroy[%d]\n", eno); + return false; + } + return true; + } + + bool wait(mutex& mtx) { + int eno; + if ((eno = ::pthread_cond_wait(&cond_, &mtx.native())) != 0) { + ipc::error("fail pthread_cond_wait[%d]\n", eno); + return false; + } + return true; + } + + bool notify() { + int eno; + if ((eno = ::pthread_cond_signal(&cond_)) != 0) { + ipc::error("fail pthread_cond_signal[%d]\n", eno); + return false; + } + return true; + } + + bool broadcast() { + int eno; + if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) { + ipc::error("fail pthread_cond_broadcast[%d]\n", eno); + return false; + } + return true; + } +}; + +class event { + using cnt_t = std::atomic; + + struct info_t { + cnt_t cnt_; + mutex mutex_; + condition cond_; + } * info_ = nullptr; + + uint_t<16> wait_id_; + + std::string name() const { + return "__IPC_WAIT__" + std::to_string(wait_id_); + } + + void open() { + auto n = name(); + info_ = static_cast(shm::acquire(n.c_str(), sizeof(info_t))); + if (info_ == nullptr) { + ipc::error("fail shm::acquire: %s\n", n.c_str()); + return; + } + if (info_->cnt_.fetch_add(1, std::memory_order_acq_rel) == 0) { + if (!info_->mutex_.open()) return; + if (!info_->cond_.open()) return; + } + info_->mutex_.lock(); + } + void close() { - ::pthread_cond_destroy(&cond_); - ::pthread_mutex_destroy(&mutex_); + info_->mutex_.unlock(); + if (info_->cnt_.fetch_sub(1, std::memory_order_acq_rel) == 1) { + info_->cond_.close(); + info_->mutex_.close(); + } + shm::release(info_, sizeof(info_t)); + } + +public: + event(std::size_t id) + : wait_id_(static_cast>(id)) { + open(); + } + + ~event() { + close(); + } + + auto get_id() const noexcept { + return wait_id_; } bool wait() { - int eno; - if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) { - ipc::log("fail pthread_mutex_lock[%d]\n", eno); - return false; - } - IPC_UNUSED_ auto guard = unique_ptr(&mutex_, ::pthread_mutex_unlock); - if ((eno = ::pthread_cond_wait(&cond_, &mutex_)) != 0) { - ipc::log("fail pthread_cond_wait[%d]\n", eno); - return false; - } - return true; + if (info_ == nullptr) return false; + return info_->cond_.wait(info_->mutex_); } - void notify() { - int eno; - if ((eno = ::pthread_cond_signal(&cond_)) != 0) { - ipc::log("fail pthread_cond_signal[%d]\n", eno); - } - } - - void broadcast() { - int eno; - if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) { - ipc::log("fail pthread_cond_broadcast[%d]\n", eno); - } + bool notify() { + if (info_ == nullptr) return false; + return info_->cond_.notify(); } }; class waiter { - ipc::detail::condition cond_; - std::atomic counter_ { 0 }; + using evt_id_t = decltype(std::declval().get_id()); + + std::atomic counter_ { 0 }; + spin_lock evt_lc_; + id_pool evt_ids_; + + std::size_t push_event(event const & evt) { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_); + std::size_t id = evt_ids_.acquire(); + if (id == invalid_value) { + ipc::error("fail push_event[has too many waiters]\n"); + return id; + } + (*static_cast(evt_ids_.at(id))) = evt.get_id(); + evt_ids_.mark_acquired(id); + return id; + } + + void pop_event(std::size_t id) { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_); + evt_ids_.release(id); + } public: using handle_t = waiter*; -public: constexpr static handle_t invalid() { return nullptr; } handle_t open(char const * name) { - if (name == nullptr || name[0] == '\0') return invalid(); - if ((counter_.fetch_add(1, std::memory_order_acq_rel) == 0) && !cond_.open(name)) { + if (name == nullptr || name[0] == '\0') { return invalid(); } + if (counter_.fetch_add(1, std::memory_order_acq_rel) == 0) { + evt_ids_.init(); + } return this; } void close(handle_t h) { if (h == invalid()) return; - if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) { - cond_.close(); + counter_.fetch_sub(1, std::memory_order_acq_rel); + } + + static bool wait_all(std::tuple const * all, std::size_t size) { + if (all == nullptr || size == 0) { + return false; } + // calc a new wait-id & construct event object + event evt { ipc::detail::calc_unique_id() }; + auto ids = static_cast(mem::alloc(sizeof(std::size_t[size]))); + for (std::size_t i = 0; i < size; ++i) { + ids[i] = std::get<0>(all[i])->push_event(evt); + } + IPC_UNUSED_ auto guard = unique_ptr(ids, [all, size](std::size_t* ids) { + for (std::size_t i = 0; i < size; ++i) { + std::get<0>(all[i])->pop_event(ids[i]); + } + mem::free(ids, sizeof(std::size_t[size])); + }); + // wait for event signal + return evt.wait(); } bool wait(handle_t h) { if (h == invalid()) return false; - return cond_.wait(); + auto info = std::make_tuple(this, h); + return wait_all(&info, 1); } void notify(handle_t h) { if (h == invalid()) return; - cond_.notify(); + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_); + evt_ids_.for_acquired([this](auto id) { + event evt { *static_cast(evt_ids_.at(id)) }; + evt.notify(); + return false; + }); } void broadcast(handle_t h) { if (h == invalid()) return; - cond_.broadcast(); + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_); + evt_ids_.for_acquired([this](auto id) { + event evt { *static_cast(evt_ids_.at(id)) }; + evt.notify(); + return true; + }); } }; diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index 6b3565d..8e2378f 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -5,9 +5,14 @@ #include #include #include +#include +#include #include "rw_lock.h" +#include "pool_alloc.h" + #include "platform/to_tchar.h" +#include "platform/detail.h" namespace ipc { namespace detail { @@ -32,6 +37,24 @@ public: ::CloseHandle(h); } + static bool wait_all(std::tuple const * all, std::size_t size) { + if (all == nullptr || size == 0) { + return false; + } + auto hs = static_cast(mem::alloc(sizeof(handle_t[size]))); + IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(handle_t[size])); }); + std::size_t i = 0; + for (; i < size; ++i) { + auto& info = all[i]; + if ((std::get<0>(all[i]) == nullptr) || + (std::get<1>(all[i]) == invalid())) continue; + std::get<0>(info)->counter_.fetch_add(1, std::memory_order_relaxed); + hs[i] = std::get<1>(all[i]); + } + std::atomic_thread_fence(std::memory_order_release); + return ::WaitForMultipleObjects(hs, i, FALSE, INFINITE) != WAIT_FAILED; + } + bool wait(handle_t h) { if (h == invalid()) return false; counter_.fetch_add(1, std::memory_order_relaxed); diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 9cc6a65..6397282 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -1,6 +1,10 @@ #pragma once #include +#include +#include + +#include "pool_alloc.h" #if defined(WIN64) || defined(_WIN64) || defined(__WIN64__) || \ defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) || \ @@ -9,6 +13,7 @@ #else #include "platform/waiter_linux.h" #endif +#include "platform/detail.h" namespace ipc { namespace detail { @@ -21,6 +26,10 @@ private: waiter_t* w_ = nullptr; waiter_t::handle_t h_ = waiter_t::invalid(); + auto to_w_info() { + return std::make_tuple(w_, h_); + } + public: waiter_wrapper() = default; explicit waiter_wrapper(waiter_t* w) { @@ -53,6 +62,22 @@ public: h_ = waiter_t::invalid(); } + bool wait_all(waiter_wrapper * all, std::size_t size) { + if (all == nullptr || size == 0) { + return false; + } + using tp_t = decltype(std::declval().to_w_info()); + auto hs = static_cast(mem::alloc(sizeof(tp_t[size]))); + IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(tp_t[size])); }); + std::size_t i = 0; + for (; i < size; ++i) { + auto& w = all[i]; + if (!w.valid()) continue; + hs[i] = w.to_w_info(); + } + return waiter_t::wait_all(hs, i); + } + bool wait() { if (!valid()) return false; return w_->wait(h_);