From 32f88c83537c1b69322a953cee75019c38b5f698 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 20 Sep 2020 15:30:19 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E5=BE=AE=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- demo/msg_que/main.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index 79b5433..1f40058 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -21,7 +21,7 @@ constexpr std::size_t const min_sz = 128; constexpr std::size_t const max_sz = 1024 * 16; std::atomic is_quit__{ false }; -std::atomic size_per_1s__{ 0 }; +std::atomic size_counter__{ 0 }; using msg_que_t = ipc::chan; @@ -49,9 +49,8 @@ void do_counting() { if (i % 10) continue; i = 0; std::cout - << speed_of(size_per_1s__.load(std::memory_order_acquire)) + << speed_of(size_counter__.exchange(0, std::memory_order_relaxed)) << std::endl; - size_per_1s__.store(0, std::memory_order_release); } } @@ -76,7 +75,7 @@ void do_send() { break; } } - size_per_1s__.fetch_add(sz, std::memory_order_release); + size_counter__.fetch_add(sz, std::memory_order_relaxed); std::this_thread::yield(); } counting.join(); @@ -97,7 +96,7 @@ void do_recv() { while (!is_quit__.load(std::memory_order_acquire)) { auto msg = que__.recv(); if (msg.empty()) break; - size_per_1s__.fetch_add(msg.size(), std::memory_order_release); + size_counter__.fetch_add(msg.size(), std::memory_order_relaxed); } counting.join(); } From 10cc2d336028fec5244f6af883dac1b3fb38fc11 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 26 Sep 2020 19:17:25 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=EF=BC=9Adisconnect?= =?UTF-8?q?=E4=B8=8D=E4=BC=9A=E8=87=AA=E5=8A=A8=E9=80=80=E5=87=BAwait=5Ffo?= =?UTF-8?q?r=5Frecv?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- demo/msg_que/main.cpp | 8 ++++---- src/libipc/platform/waiter_linux.h | 18 ++++++++++++----- src/libipc/waiter_helper.h | 32 +++++++++++++++--------------- 3 files changed, 33 insertions(+), 25 deletions(-) diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index 1f40058..95d94aa 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -30,13 +30,13 @@ ipc::byte_t buff__[max_sz]; capo::random<> rand__{ min_sz, max_sz }; inline std::string str_of_size(std::size_t sz) noexcept { - if (sz <= 1024) { - return std::to_string(sz) + " bytes"; + if (sz > 1024 * 1024) { + return std::to_string(sz / (1024 * 1024)) + " MB"; } - if (sz <= 1024 * 1024) { + if (sz > 1024) { return std::to_string(sz / 1024) + " KB"; } - return std::to_string(sz / (1024 * 1024)) + " MB"; + return std::to_string(sz) + " bytes"; } inline std::string speed_of(std::size_t sz) noexcept { diff --git a/src/libipc/platform/waiter_linux.h b/src/libipc/platform/waiter_linux.h index 25bed4f..eb9a3b1 100755 --- a/src/libipc/platform/waiter_linux.h +++ b/src/libipc/platform/waiter_linux.h @@ -26,7 +26,7 @@ inline static bool calc_wait_time(timespec& ts, std::size_t tm /*ms*/) { timeval now; int eno = ::gettimeofday(&now, NULL); if (eno != 0) { - ipc::error("fail gettimeofday[%d]\n", eno); + ipc::error("fail gettimeofday [%d]\n", eno); return false; } ts.tv_nsec = (now.tv_usec + (tm % 1000) * 1000) * 1000; @@ -40,7 +40,7 @@ inline static bool calc_wait_time(timespec& ts, std::size_t tm /*ms*/) { #define IPC_PTHREAD_FUNC_(CALL, ...) \ int eno; \ if ((eno = ::CALL(__VA_ARGS__)) != 0) { \ - ipc::error("fail " #CALL "[%d]\n", eno); \ + ipc::error("fail " #CALL " [%d]\n", eno); \ return false; \ } \ return true @@ -146,7 +146,11 @@ public: IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native()); default: { timespec ts; - calc_wait_time(ts, tm); + if (!calc_wait_time(ts, tm)) { + ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + tm, ts.tv_sec, ts.tv_nsec); + return false; + } int eno; if ((eno = ::pthread_cond_timedwait(&cond_, &mtx.native(), &ts)) != 0) { if (eno != ETIMEDOUT) { @@ -226,10 +230,14 @@ public: IPC_SEMAPHORE_FUNC_(sem_wait, h); default: { timespec ts; - calc_wait_time(ts, tm); + if (!calc_wait_time(ts, tm)) { + ipc::error("fail calc_wait_time: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + tm, ts.tv_sec, ts.tv_nsec); + return false; + } if (::sem_timedwait(h, &ts) != 0) { if (errno != ETIMEDOUT) { - ipc::error("fail sem_timedwait[%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", + ipc::error("fail sem_timedwait [%d]: tm = %zd, tv_sec = %ld, tv_nsec = %ld\n", errno, tm, ts.tv_sec, ts.tv_nsec); } return false; diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h index 46b03a3..ed267b6 100644 --- a/src/libipc/waiter_helper.h +++ b/src/libipc/waiter_helper.h @@ -10,14 +10,8 @@ namespace ipc { namespace detail { -class waiter_helper { +struct waiter_helper { - enum : unsigned { - destruct_mask = (std::numeric_limits::max)() >> 1, - destruct_flag = ~destruct_mask - }; - -public: struct wait_counter { std::atomic waiting_ { 0 }; long counter_ = 0; @@ -26,6 +20,7 @@ public: struct wait_flags { std::atomic is_waiting_ { false }; std::atomic is_closed_ { true }; + std::atomic need_dest_ { false }; }; template @@ -54,11 +49,19 @@ public: bool is_waiting = flags.is_waiting_.load(std::memory_order_relaxed); bool is_closed = flags.is_closed_ .load(std::memory_order_acquire); if (!is_waiting || is_closed) { + flags.need_dest_.store(false, std::memory_order_release); ret = false; break; } - ret = ctrl.sema_wait(tm); - } while (counter.waiting_.load(std::memory_order_acquire) & destruct_flag); + else if (flags.need_dest_.exchange(false, std::memory_order_release)) { + ret = false; + ctrl.sema_wait(tm); + break; + } + else { + ret = ctrl.sema_wait(tm); + } + } while (flags.need_dest_.load(std::memory_order_acquire)); finally.do_exit(); ret = ctrl.handshake_post(1) && ret; @@ -69,7 +72,7 @@ public: template static bool notify(Ctrl & ctrl) { auto & counter = ctrl.counter(); - if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + if ((counter.waiting_.load(std::memory_order_acquire)) == 0) { return true; } bool ret = true; @@ -85,7 +88,7 @@ public: template static bool broadcast(Ctrl & ctrl) { auto & counter = ctrl.counter(); - if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + if ((counter.waiting_.load(std::memory_order_acquire)) == 0) { return true; } bool ret = true; @@ -107,16 +110,13 @@ public: return true; } auto & counter = ctrl.counter(); - if ((counter.waiting_.load(std::memory_order_acquire) & destruct_mask) == 0) { + if ((counter.waiting_.load(std::memory_order_acquire)) == 0) { return true; } bool ret = true; IPC_UNUSED_ auto guard = ctrl.get_lock(); - counter.waiting_.fetch_or(destruct_flag, std::memory_order_relaxed); - IPC_UNUSED_ auto finally = ipc::guard([&counter] { - counter.waiting_.fetch_and(destruct_mask, std::memory_order_relaxed); - }); if (counter.counter_ > 0) { + flags.need_dest_.store(true, std::memory_order_relaxed); ret = ctrl.sema_post(counter.counter_); counter.counter_ -= 1; ret = ret && ctrl.handshake_wait(default_timeout);