From d9e24236af20181915f3912617c17d7506aa32fe Mon Sep 17 00:00:00 2001 From: mutouyun Date: Tue, 15 Jan 2019 21:22:09 +0800 Subject: [PATCH] add waiter for long-time wait. (TBD) --- build/ipc.pro | 28 +++++++++----- include/elem_circ.h | 3 ++ include/elem_def.h | 5 ++- include/ipc.h | 24 ++++++------ include/queue.h | 28 +++++++++++--- include/rw_lock.h | 13 ++++++- include/waiter.h | 33 ++++++++++++++++ src/ipc.cpp | 24 ++++++------ src/platform/shm_linux.cpp | 9 +++-- src/platform/shm_win.cpp | 19 +-------- src/platform/to_tchar.h | 55 ++++++++++++++++++++++++++ src/platform/waiter.h | 77 +++++++++++++++++++++++++++++++++++++ src/platform/waiter_linux.h | 32 +++++++++++++++ src/platform/waiter_win.h | 55 ++++++++++++++++++++++++++ src/waiter.cpp | 71 ++++++++++++++++++++++++++++++++++ 15 files changed, 413 insertions(+), 63 deletions(-) create mode 100644 include/waiter.h create mode 100644 src/platform/to_tchar.h create mode 100644 src/platform/waiter.h create mode 100644 src/platform/waiter_linux.h create mode 100644 src/platform/waiter_win.h create mode 100644 src/waiter.cpp diff --git a/build/ipc.pro b/build/ipc.pro index 4d14f8e..3240709 100644 --- a/build/ipc.pro +++ b/build/ipc.pro @@ -12,33 +12,39 @@ DESTDIR = ../output INCLUDEPATH += \ ../include \ - ../src \ - ../src/platform + ../src HEADERS += \ ../include/export.h \ - ../include/shm.h \ - ../include/circ_elems_array.h \ - ../include/circ_elem_array.h \ - ../include/circ_queue.h \ - ../include/ipc.h \ ../include/def.h \ + ../include/shm.h \ + ../include/elem_def.h \ + ../include/elem_circ.h \ + ../include/elem_link.h \ + ../include/waiter.h \ + ../include/queue.h \ + ../include/ipc.h \ ../include/rw_lock.h \ ../include/tls_pointer.h \ ../include/pool_alloc.h \ ../include/buffer.h \ ../src/memory/alloc.hpp \ ../src/memory/wrapper.hpp \ - ../src/memory/resource.hpp + ../src/memory/resource.hpp \ + ../src/platform/waiter.h SOURCES += \ ../src/shm.cpp \ ../src/ipc.cpp \ ../src/pool_alloc.cpp \ - ../src/buffer.cpp + ../src/buffer.cpp \ + ../src/waiter.cpp unix { +HEADERS += \ + ../src/platform/waiter_linux.h + SOURCES += \ ../src/platform/shm_linux.cpp \ ../src/platform/tls_pointer_linux.cpp @@ -52,6 +58,10 @@ INSTALLS += target else:win32 { +HEADERS += \ + ../src/platform/to_tchar.h \ + ../src/platform/waiter_win.h + SOURCES += \ ../src/platform/shm_win.cpp \ ../src/platform/tls_pointer_win.cpp diff --git a/include/elem_circ.h b/include/elem_circ.h index 9ed694d..55cb73d 100644 --- a/include/elem_circ.h +++ b/include/elem_circ.h @@ -264,6 +264,9 @@ public: elem_array(const elem_array&) = delete; elem_array& operator=(const elem_array&) = delete; + auto & waiter() { return head_.waiter_; } + auto const & waiter() const { return head_.waiter_; } + std::size_t connect () noexcept { return head_.connect (); } std::size_t disconnect() noexcept { return head_.disconnect(); } std::size_t conn_count() const noexcept { return head_.conn_count(); } diff --git a/include/elem_def.h b/include/elem_def.h index 3ca8368..4b8b05a 100644 --- a/include/elem_def.h +++ b/include/elem_def.h @@ -4,11 +4,14 @@ #include #include +#include "platform/waiter.h" + namespace ipc { template struct conn_head { - std::atomic cc_ { 0 }; // connection counter + ipc::detail::waiter waiter_; + std::atomic cc_ { 0 }; // connection counter std::size_t connect() noexcept { return cc_.fetch_add(1, std::memory_order_release); diff --git a/include/ipc.h b/include/ipc.h index c9ede7f..18eca8f 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -30,32 +30,32 @@ struct IPC_EXPORT channel_detail { }; template -class channel_ipml { +class channel_impl { private: handle_t h_ = nullptr; std::string n_; public: - channel_ipml() = default; + channel_impl() = default; - explicit channel_ipml(char const * name) { + explicit channel_impl(char const * name) { this->connect(name); } - channel_ipml(channel_ipml&& rhs) { + channel_impl(channel_impl&& rhs) { swap(rhs); } - ~channel_ipml() { + ~channel_impl() { disconnect(); } - void swap(channel_ipml& rhs) { + void swap(channel_impl& rhs) { std::swap(h_, rhs.h_); n_.swap(rhs.n_); } - channel_ipml& operator=(channel_ipml rhs) { + channel_impl& operator=(channel_impl rhs) { swap(rhs); return *this; } @@ -72,8 +72,8 @@ public: return (handle() != nullptr); } - channel_ipml clone() const { - return channel_ipml { name() }; + channel_impl clone() const { + return channel_impl { name() }; } bool connect(char const * name) { @@ -99,7 +99,7 @@ public: } static void wait_for_recv(char const * name, std::size_t r_count) { - return channel_ipml(name).wait_for_recv(r_count); + return channel_impl(name).wait_for_recv(r_count); } void clear_recv() { @@ -137,7 +137,7 @@ public: * A route could only be used in 1 to N * (one producer/server/sender to multi consumers/clients/receivers) */ -using route = channel_ipml >>; @@ -149,7 +149,7 @@ using route = channel_ipml >>; diff --git a/include/queue.h b/include/queue.h index b50754e..984b35b 100644 --- a/include/queue.h +++ b/include/queue.h @@ -8,11 +8,14 @@ #include #include #include +#include #include "def.h" #include "rw_lock.h" #include "elem_circ.h" +#include "platform/waiter.h" + namespace ipc { template ().cursor()) cursor_ = 0; std::atomic_bool connected_ { false }; public: queue() = default; - explicit queue(elems_t* els) : queue() { - attach(els); + explicit queue(elems_t* els, char const * name = nullptr) : queue() { + attach(els, name); } queue(const queue&) = delete; @@ -73,10 +77,18 @@ public: return connected_.load(std::memory_order_acquire); } - elems_t* attach(elems_t* els) noexcept { + elems_t* attach(elems_t* els, char const * name = nullptr) noexcept { if (els == nullptr) return nullptr; auto old = elems_; elems_ = els; + if (name == nullptr) { + wi_.close(); + wi_.attach(nullptr); + } + else { + wi_.attach(&(elems_->waiter())); + wi_.open((std::string{ "__IPC_WAITER__" } +name).c_str()); + } cursor_ = elems_->cursor(); return old; } @@ -91,9 +103,13 @@ public: template auto push(P&&... params) noexcept { if (elems_ == nullptr) return false; - return elems_->push([&](void* p) { + if (elems_->push([&](void* p) { ::new (p) T(std::forward

(params)...); - }); + })) { + wi_.notify(); + return true; + } + return false; } T pop() noexcept { @@ -107,7 +123,7 @@ public: })) { return item; } - ipc::sleep(k); + ipc::sleep(k, [this] { return wi_.wait(); }); } } }; diff --git a/include/rw_lock.h b/include/rw_lock.h index 8e7184a..3ab483c 100644 --- a/include/rw_lock.h +++ b/include/rw_lock.h @@ -5,6 +5,7 @@ #include #include #include +#include //////////////////////////////////////////////////////////////// /// Gives hint to processor that improves performance of spin-wait loops. @@ -69,11 +70,14 @@ inline void yield(K& k) noexcept { ++k; } -template -inline void sleep(K& k) noexcept { +template +inline void sleep(K& k, F&& f) noexcept { if (k < static_cast(N)) { std::this_thread::yield(); } + else if (std::forward(f)()) { + return; + } else { std::this_thread::sleep_for(std::chrono::milliseconds(1)); return; @@ -81,6 +85,11 @@ inline void sleep(K& k) noexcept { ++k; } +template +inline void sleep(K& k) noexcept { + sleep(k, []() constexpr { return false; }); +} + } // namespace ipc #pragma pop_macro("IPC_LOCK_PAUSE_") diff --git a/include/waiter.h b/include/waiter.h new file mode 100644 index 0000000..0c3e074 --- /dev/null +++ b/include/waiter.h @@ -0,0 +1,33 @@ +#pragma once + +#include "export.h" + +namespace ipc { + +class IPC_EXPORT waiter { +public: + waiter(); + explicit waiter(char const * name); + waiter(waiter&& rhs); + + ~waiter(); + + void swap(waiter& rhs); + waiter& operator=(waiter rhs); + + bool valid() const; + char const * name () const; + + bool open (char const * name); + void close(); + + bool wait(); + bool notify(); + bool broadcast(); + +private: + class waiter_; + waiter_* p_; +}; + +} // namespace ipc diff --git a/src/ipc.cpp b/src/ipc.cpp index b65d6b0..b379c33 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -24,10 +24,10 @@ inline auto acc_of_msg() { } template