From bce3894707a15066bf2c4a01d1421ce3ff67f174 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 19 Sep 2020 17:37:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=AD=A3=E5=B8=B8=E9=80=80?= =?UTF-8?q?=E5=87=BA=E7=9A=84=E6=9C=BA=E5=88=B6=EF=BC=88win=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 1 + demo/chat/main.cpp | 37 +++++++++----- demo/msg_que/CMakeLists.txt | 8 +++ demo/msg_que/main.cpp | 19 +++++++ include/libipc/ipc.h | 34 ++++++++----- include/libipc/rw_lock.h | 9 ++-- src/CMakeLists.txt | 1 + src/ipc.cpp | 55 +++++++++++++++------ src/libipc/platform/waiter_linux.h | 13 +++-- src/libipc/platform/waiter_win.h | 74 ++++++++++++++++++++++------ src/libipc/platform/waiter_wrapper.h | 16 +++++- src/libipc/utility/log.h | 8 +-- src/libipc/utility/scope_guard.h | 64 ++++++++++++++++++++++++ 13 files changed, 267 insertions(+), 72 deletions(-) create mode 100644 demo/msg_que/CMakeLists.txt create mode 100644 demo/msg_que/main.cpp create mode 100644 src/libipc/utility/scope_guard.h diff --git a/CMakeLists.txt b/CMakeLists.txt index cc4660f..b6df272 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,3 +16,4 @@ add_subdirectory(src) add_subdirectory(3rdparty/gtest) add_subdirectory(test) add_subdirectory(demo/chat) +add_subdirectory(demo/msg_que) diff --git a/demo/chat/main.cpp b/demo/chat/main.cpp index 53e1cbc..7f88e6f 100755 --- a/demo/chat/main.cpp +++ b/demo/chat/main.cpp @@ -1,3 +1,6 @@ + +#include + #include #include #include @@ -8,36 +11,39 @@ namespace { -char name__[] = "ipc-chat"; -char quit__[] = "q"; -char id__ [] = "c"; +constexpr char const name__[] = "ipc-chat"; +constexpr char const quit__[] = "q"; +constexpr char const id__ [] = "c"; -std::size_t calc_unique_id() { +inline std::size_t calc_unique_id() { static ipc::shm::handle g_shm { "__CHAT_ACC_STORAGE__", sizeof(std::atomic) }; return static_cast*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed); } +ipc::channel sender__ { name__, ipc::sender }; +ipc::channel receiver__ { name__, ipc::receiver }; + } // namespace int main() { + ::signal(SIGINT, [](int) { + receiver__.disconnect(); + }); + std::string buf, id = id__ + std::to_string(calc_unique_id()); std::regex reg { "(c\\d+)> (.*)" }; - ipc::channel cc { name__, ipc::sender }; - std::thread r {[&id, ®] { - ipc::channel cc { name__, ipc::receiver }; std::cout << id << " is ready." << std::endl; while (1) { - auto buf = cc.recv(); - if (buf.empty()) continue; + ipc::buff_t buf = receiver__.recv(); + if (buf.empty()) break; // quit std::string dat { buf.get(), buf.size() - 1 }; std::smatch mid; if (std::regex_match(dat, mid, reg)) { if (mid.str(1) == id) { if (mid.str(2) == quit__) { - std::cout << "receiver quit..." << std::endl; - return; + break; } continue; } @@ -48,11 +54,16 @@ int main() { for (/*int i = 1*/;; /*++i*/) { std::cin >> buf; + if (buf.empty()) break; // std::cout << "[" << i << "]" << std::endl; - cc.send(id + "> " + buf); - if (buf == quit__) break; + sender__.send(id + "> " + buf); + if (buf == quit__) { + receiver__.disconnect(); + break; + } } r.join(); + std::cout << id << " is quit..." << std::endl; return 0; } diff --git a/demo/msg_que/CMakeLists.txt b/demo/msg_que/CMakeLists.txt new file mode 100644 index 0000000..13b63bf --- /dev/null +++ b/demo/msg_que/CMakeLists.txt @@ -0,0 +1,8 @@ +project(msg_que) + +file(GLOB SRC_FILES ./*.cpp) +file(GLOB HEAD_FILES ./*.h) + +add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES}) + +target_link_libraries(${PROJECT_NAME} ipc) diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp new file mode 100644 index 0000000..7c4a8c5 --- /dev/null +++ b/demo/msg_que/main.cpp @@ -0,0 +1,19 @@ +#include +#include +#include +#include +#include + +#include "libipc/ipc.h" + +namespace { + +constexpr char const name__[] = "ipc-msg-que"; +constexpr char const quit__[] = "q"; +constexpr char const id__ [] = "c"; + +} // namespace + +int main() { + return 0; +} diff --git a/include/libipc/ipc.h b/include/libipc/ipc.h index 44a7135..4f7dcd3 100755 --- a/include/libipc/ipc.h +++ b/include/libipc/ipc.h @@ -19,8 +19,9 @@ enum : unsigned { template struct IPC_EXPORT chan_impl { - static handle_t connect (char const * name, unsigned mode); - static void disconnect(handle_t h); + static bool connect (handle_t * ph, char const * name, unsigned mode); + static void disconnect(handle_t h); + static void destroy (handle_t h); static char const * name(handle_t h); @@ -54,7 +55,7 @@ public: } ~chan_wrapper() { - disconnect(); + detail_t::destroy(h_); } void swap(chan_wrapper& rhs) noexcept { @@ -89,14 +90,13 @@ public: bool connect(char const * name, unsigned mode = ipc::sender | ipc::receiver) { if (name == nullptr || name[0] == '\0') return false; this->disconnect(); - h_ = detail_t::connect(name, mode_ = mode); + detail_t::connect(&h_, name, mode_ = mode); return valid(); } void disconnect() { if (!valid()) return; detail_t::disconnect(h_); - h_ = nullptr; } std::size_t recv_count() const { @@ -111,13 +111,25 @@ public: return chan_wrapper(name).wait_for_recv(r_count, tm); } - bool send (void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::send(h_, data, size, tm) ; } - bool send (buff_t const & buff , std::size_t tm = default_timeout) { return this->send(buff.data(), buff.size(), tm) ; } - bool send (std::string const & str , std::size_t tm = default_timeout) { return this->send(str.c_str(), str.size() + 1, tm); } + bool send(void const * data, std::size_t size, std::size_t tm = default_timeout) { + return detail_t::send(h_, data, size, tm); + } + bool send(buff_t const & buff, std::size_t tm = default_timeout) { + return this->send(buff.data(), buff.size(), tm); + } + bool send(std::string const & str, std::size_t tm = default_timeout) { + return this->send(str.c_str(), str.size() + 1, tm); + } - bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::try_send(h_, data, size, tm) ; } - bool try_send(buff_t const & buff , std::size_t tm = default_timeout) { return this->try_send(buff.data(), buff.size(), tm) ; } - bool try_send(std::string const & str , std::size_t tm = default_timeout) { return this->try_send(str.c_str(), str.size() + 1, tm); } + bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) { + return detail_t::try_send(h_, data, size, tm); + } + bool try_send(buff_t const & buff, std::size_t tm = default_timeout) { + return this->try_send(buff.data(), buff.size(), tm); + } + bool try_send(std::string const & str, std::size_t tm = default_timeout) { + return this->try_send(str.c_str(), str.size() + 1, tm); + } buff_t recv(std::size_t tm = invalid_value) { return detail_t::recv(h_, tm); diff --git a/include/libipc/rw_lock.h b/include/libipc/rw_lock.h index 2a898b1..5757b23 100755 --- a/include/libipc/rw_lock.h +++ b/include/libipc/rw_lock.h @@ -77,11 +77,8 @@ inline void sleep(K& k, F&& f) { if (k < static_cast(N)) { std::this_thread::yield(); } - else if (std::forward(f)()) { - return; - } else { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + static_cast(std::forward(f)()); return; } ++k; @@ -89,7 +86,9 @@ inline void sleep(K& k, F&& f) { template inline void sleep(K& k) { - sleep(k, [] { return false; }); + sleep(k, [] { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + }); } } // namespace ipc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2dccff1..4dfae8e 100755 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -19,6 +19,7 @@ aux_source_directory(${CMAKE_SOURCE_DIR}/src SRC_FILES) file(GLOB HEAD_FILES ${CMAKE_SOURCE_DIR}/include/libipc/*.h + ${CMAKE_SOURCE_DIR}/src/libipc/*.h ${CMAKE_SOURCE_DIR}/src/libipc/*.inc ${CMAKE_SOURCE_DIR}/src/libipc/circ/*.h ${CMAKE_SOURCE_DIR}/src/libipc/memory/*.h diff --git a/src/ipc.cpp b/src/ipc.cpp index e24cf6a..71c0a1c 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include "libipc/ipc.h" #include "libipc/def.h" @@ -279,6 +280,12 @@ struct conn_info_head { , acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) { } + void enable(bool e) { + cc_waiter_.set_enabled(e); + wt_waiter_.set_enabled(e); + rd_waiter_.set_enabled(e); + } + auto acc() { return static_cast(acc_h_.get()); } @@ -295,10 +302,9 @@ bool wait_for(W& waiter, F&& pred, std::size_t tm) { bool loop = true, ret = true; ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] { ret = waiter.wait_if([&loop, &pred] { - return loop = pred(); - }, tm); + return loop = pred(); + }, tm); k = 0; - return true; }); if (!ret ) return false; // timeout or fail if (!loop) break; @@ -341,18 +347,22 @@ constexpr static queue_t* queue_of(ipc::handle_t h) { /* API implementations */ -static ipc::handle_t connect(char const * name, bool start) { - auto h = mem::alloc(name); - auto que = queue_of(h); - if (que == nullptr) { - return nullptr; +static bool connect(handle_t * ph, char const * name, bool start) { + assert(ph != nullptr); + if (*ph == nullptr) { + *ph = mem::alloc(name); } + auto que = queue_of(*ph); + if (que == nullptr) { + return false; + } + info_of(*ph)->enable(true); if (start) { if (que->connect()) { // wouldn't connect twice - info_of(h)->cc_waiter_.broadcast(); + info_of(*ph)->cc_waiter_.broadcast(); } } - return h; + return true; } static void disconnect(ipc::handle_t h) { @@ -360,9 +370,15 @@ static void disconnect(ipc::handle_t h) { if (que == nullptr) { return; } - if (que->disconnect()) { - info_of(h)->cc_waiter_.broadcast(); + bool dis = que->disconnect(); + info_of(h)->enable(false); + if (dis) { + info_of(h)->recv_cache().clear(); } +} + +static void destroy(ipc::handle_t h) { + disconnect(h); mem::free(info_of(h)); } @@ -485,14 +501,16 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { ipc::error("fail: recv, queue_of(h) == nullptr\n"); return {}; } - if (que->connect()) { // wouldn't connect twice - info_of(h)->cc_waiter_.broadcast(); + if (!que->connected()) { + // hasn't connected yet, just return. + return {}; } auto& rc = info_of(h)->recv_cache(); while (1) { // pop a new message typename queue_t::value_t msg; if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { + // pop failed, just return. return {}; } info_of(h)->wt_waiter_.broadcast(); @@ -562,8 +580,8 @@ using policy_t = policy::choose; namespace ipc { template -ipc::handle_t chan_impl::connect(char const * name, unsigned mode) { - return detail_impl>::connect(name, mode & receiver); +bool chan_impl::connect(handle_t * ph, char const * name, unsigned mode) { + return detail_impl>::connect(ph, name, mode & receiver); } template @@ -571,6 +589,11 @@ void chan_impl::disconnect(ipc::handle_t h) { detail_impl>::disconnect(h); } +template +void chan_impl::destroy(handle_t h) { + detail_impl>::destroy(h); +} + template char const * chan_impl::name(ipc::handle_t h) { auto info = detail_impl>::info_of(h); diff --git a/src/libipc/platform/waiter_linux.h b/src/libipc/platform/waiter_linux.h index f118478..b84644b 100755 --- a/src/libipc/platform/waiter_linux.h +++ b/src/libipc/platform/waiter_linux.h @@ -9,10 +9,12 @@ #include #include +#include #include "libipc/def.h" #include "libipc/utility/log.h" +#include "libipc/utility/scope_guard.h" #include "libipc/platform/detail.h" #include "libipc/memory/resource.h" @@ -280,20 +282,22 @@ public: template bool wait_if(handle_t const & h, F&& pred, std::size_t tm = invalid_value) { waiting_.fetch_add(1, std::memory_order_release); + auto finally = ipc::guard([this] { + waiting_->fetch_sub(1, std::memory_order_release); + }); { IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); if (!std::forward(pred)()) return true; ++ counter_; } bool ret = sem_helper::wait(std::get<1>(h), tm); - waiting_.fetch_sub(1, std::memory_order_release); + finally.do_exit(); ret = sem_helper::post(std::get<2>(h)) && ret; return ret; } bool notify(handle_t const & h) { - std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_.load(std::memory_order_relaxed) == 0) { + if (waiting_.load(std::memory_order_acquire) == 0) { return true; } bool ret = true; @@ -307,8 +311,7 @@ public: } bool broadcast(handle_t const & h) { - std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_.load(std::memory_order_relaxed) == 0) { + if (waiting_.load(std::memory_order_acquire) == 0) { return true; } bool ret = true; diff --git a/src/libipc/platform/waiter_win.h b/src/libipc/platform/waiter_win.h index ca1f86e..0782510 100755 --- a/src/libipc/platform/waiter_win.h +++ b/src/libipc/platform/waiter_win.h @@ -3,13 +3,15 @@ #include #include -#include +#include +#include #include "libipc/rw_lock.h" #include "libipc/pool_alloc.h" #include "libipc/shm.h" #include "libipc/utility/log.h" +#include "libipc/utility/scope_guard.h" #include "libipc/platform/to_tchar.h" #include "libipc/platform/get_sa.h" #include "libipc/platform/detail.h" @@ -79,6 +81,11 @@ class condition { std::atomic * waiting_ = nullptr; long * counter_ = nullptr; + enum : unsigned { + destruct_mask = (std::numeric_limits::max)() >> 1, + destruct_flag = ~destruct_mask + }; + public: friend bool operator==(condition const & c1, condition const & c2) { return (c1.waiting_ == c2.waiting_) && (c1.counter_ == c2.counter_); @@ -112,24 +119,35 @@ public: } template - bool wait_if(Mutex& mtx, F&& pred, std::size_t tm = invalid_value) { + bool wait_if(Mutex & mtx, std::atomic const & enabled, F && pred, std::size_t tm = invalid_value) { + if (!enabled.load(std::memory_order_acquire)) { + return false; + } waiting_->fetch_add(1, std::memory_order_release); + auto finally = ipc::guard([this] { + waiting_->fetch_sub(1, std::memory_order_release); + }); { IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); if (!std::forward(pred)()) return true; ++ *counter_; } mtx.unlock(); - bool ret = sema_.wait(tm); - waiting_->fetch_sub(1, std::memory_order_release); + bool ret = false; + do { + if (!enabled.load(std::memory_order_acquire)) { + break; + } + ret = sema_.wait(tm); + } while (waiting_->load(std::memory_order_acquire) & destruct_flag); + finally.do_exit(); ret = handshake_.post() && ret; mtx.lock(); return ret; } bool notify() { - std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_->load(std::memory_order_relaxed) == 0) { + if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) { return true; } bool ret = true; @@ -143,8 +161,7 @@ public: } bool broadcast() { - std::atomic_thread_fence(std::memory_order_acq_rel); - if (waiting_->load(std::memory_order_relaxed) == 0) { + if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) { return true; } bool ret = true; @@ -158,6 +175,26 @@ public: } return ret; } + + bool emit_destruction() { + if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) { + return true; + } + bool ret = true; + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); + waiting_->fetch_or(destruct_flag, std::memory_order_relaxed); + IPC_UNUSED_ auto finally = ipc::guard([this] { + waiting_->fetch_and(destruct_mask, std::memory_order_relaxed); + }); + if (*counter_ > 0) { + ret = sema_.post(*counter_); + do { + -- *counter_; + ret = ret && handshake_.wait(default_timeout); + } while (*counter_ > 0); + } + return ret; + } }; class waiter { @@ -189,7 +226,7 @@ public: } template - bool wait_if(handle_t& h, F&& pred, std::size_t tm = invalid_value) { + bool wait_if(handle_t& h, std::atomic const & enabled, F&& pred, std::size_t tm = invalid_value) { if (h == invalid()) return false; class non_mutex { @@ -198,17 +235,22 @@ public: void unlock() noexcept {} } nm; - return h.wait_if(nm, std::forward(pred), tm); + return h.wait_if(nm, enabled, std::forward(pred), tm); } - void notify(handle_t& h) { - if (h == invalid()) return; - h.notify(); + bool notify(handle_t& h) { + if (h == invalid()) return false; + return h.notify(); } - void broadcast(handle_t& h) { - if (h == invalid()) return; - h.broadcast(); + bool broadcast(handle_t& h) { + if (h == invalid()) return false; + return h.broadcast(); + } + + bool emit_destruction(handle_t& h) { + if (h == invalid()) return false; + return h.emit_destruction(); } }; diff --git a/src/libipc/platform/waiter_wrapper.h b/src/libipc/platform/waiter_wrapper.h index 4638c84..8242d31 100755 --- a/src/libipc/platform/waiter_wrapper.h +++ b/src/libipc/platform/waiter_wrapper.h @@ -23,6 +23,7 @@ using semaphore_impl = ipc::detail::semaphore; class condition_impl : public ipc::detail::condition { ipc::shm::handle wait_h_, cnt_h_; + std::atomic enabled_ { false }; public: static void remove(char const * name) { @@ -35,6 +36,7 @@ public: bool open(ipc::string const & name) { if (wait_h_.acquire((name + "__COND_WAIT__").c_str(), sizeof(std::atomic)) && cnt_h_ .acquire((name + "__COND_CNT__" ).c_str(), sizeof(long))) { + enabled_.store(true, std::memory_order_release); return ipc::detail::condition::open(name, static_cast *>(wait_h_.get()), static_cast(cnt_h_.get())); @@ -43,13 +45,15 @@ public: } void close() { + enabled_.store(false, std::memory_order_release); + ipc::detail::condition::emit_destruction(); ipc::detail::condition::close(); cnt_h_ .release(); wait_h_.release(); } bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) { - return ipc::detail::condition::wait_if(mtx, [] { return true; }, tm); + return ipc::detail::condition::wait_if(mtx, enabled_, [] { return true; }, tm); } }; @@ -194,6 +198,7 @@ public: private: waiter_t* w_ = nullptr; waiter_t::handle_t h_ = waiter_t::invalid(); + std::atomic enabled_ { true }; public: waiter_wrapper() = default; @@ -228,10 +233,17 @@ public: h_ = waiter_t::invalid(); } + void set_enabled(bool e) { + if (enabled_.exchange(e, std::memory_order_acq_rel) == e) { + return; + } + if (!e) w_->emit_destruction(h_); + } + template bool wait_if(F&& pred, std::size_t tm = invalid_value) { if (!valid()) return false; - return w_->wait_if(h_, std::forward(pred), tm); + return w_->wait_if(h_, enabled_, std::forward(pred), tm); } bool notify() { diff --git a/src/libipc/utility/log.h b/src/libipc/utility/log.h index f09b385..bb1dcea 100755 --- a/src/libipc/utility/log.h +++ b/src/libipc/utility/log.h @@ -7,8 +7,8 @@ namespace ipc { namespace detail { template -void print(O out, char const * fmt) { - std::fprintf(out, "%s", fmt); +void print(O out, char const * str) { + std::fprintf(out, "%s", str); } template @@ -27,8 +27,8 @@ void log(char const * fmt, P1&& p1, P&&... params) { ipc::detail::print(stdout, fmt, std::forward(p1), std::forward

(params)...); } -inline void error(char const * fmt) { - ipc::detail::print(stderr, fmt); +inline void error(char const * str) { + ipc::detail::print(stderr, str); } template diff --git a/src/libipc/utility/scope_guard.h b/src/libipc/utility/scope_guard.h new file mode 100644 index 0000000..9f42149 --- /dev/null +++ b/src/libipc/utility/scope_guard.h @@ -0,0 +1,64 @@ +#pragma once + +#include // std::forward, std::move +#include // std::swap +#include // std::decay + +namespace ipc { + +//////////////////////////////////////////////////////////////// +/// Execute guard function when the enclosing scope exits +//////////////////////////////////////////////////////////////// + +template +class scope_guard { + F destructor_; + mutable bool dismiss_; + +public: + template + scope_guard(D && destructor) + : destructor_(std::forward(destructor)) + , dismiss_(false) { + } + + scope_guard(scope_guard&& rhs) + : destructor_(std::move(rhs.destructor_)) + , dismiss_(true) /* dismiss rhs */ { + std::swap(dismiss_, rhs.dismiss_); + } + + ~scope_guard() { + try { do_exit(); } + /** + * In the realm of exceptions, it is fundamental that you can do nothing + * if your "undo/recover" action fails. + */ + catch (...) { /* Do nothing */ } + } + + void swap(scope_guard & rhs) { + std::swap(destructor_, rhs.destructor_); + std::swap(dismiss_ , rhs.dismiss_); + } + + void dismiss() const noexcept { + dismiss_ = true; + } + + void do_exit() { + if (!dismiss_) { + dismiss_ = true; + destructor_(); + } + } +}; + +template +constexpr auto guard(D && destructor) noexcept { + return scope_guard> { + std::forward(destructor) + }; +} + +} // namespace ipc \ No newline at end of file