From 7a536b6e9c653b206a58b49ce0e7a2a3f024a67f Mon Sep 17 00:00:00 2001 From: mutouyun Date: Mon, 20 Sep 2021 22:18:27 +0800 Subject: [PATCH] impl quit_waiting --- src/libipc/ipc.cpp | 6 +++--- src/libipc/waiter.h | 12 ++++++++++-- test/test_waiter.cpp | 28 ++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/libipc/ipc.cpp b/src/libipc/ipc.cpp index 7bda379..d23c556 100755 --- a/src/libipc/ipc.cpp +++ b/src/libipc/ipc.cpp @@ -282,9 +282,9 @@ struct conn_info_head { } void quit_waiting() { - // cc_waiter_.quit_waiting(); - // wt_waiter_.quit_waiting(); - // rd_waiter_.quit_waiting(); + cc_waiter_.quit_waiting(); + wt_waiter_.quit_waiting(); + rd_waiter_.quit_waiting(); } auto acc() { diff --git a/src/libipc/waiter.h b/src/libipc/waiter.h index 48f8ce9..7983702 100644 --- a/src/libipc/waiter.h +++ b/src/libipc/waiter.h @@ -3,6 +3,7 @@ #include #include #include +#include #include "libipc/def.h" #include "libipc/mutex.h" @@ -15,6 +16,7 @@ namespace detail { class waiter { ipc::sync::condition cond_; ipc::sync::mutex lock_; + std::atomic quit_ {false}; public: waiter() = default; @@ -31,6 +33,7 @@ public: } bool open(char const *name) noexcept { + quit_.store(false, std::memory_order_relaxed); if (!cond_.open((std::string{"_waiter_cond_"} + name).c_str())) { return false; } @@ -49,7 +52,10 @@ public: template bool wait_if(F &&pred, std::uint64_t tm = ipc::invalid_value) noexcept { IPC_UNUSED_ std::lock_guard guard {lock_}; - while (std::forward(pred)()) { + while ([this, &pred] { + return !quit_.load(std::memory_order_relaxed) + && std::forward(pred)(); + }()) { if (!cond_.wait(lock_, tm)) return false; } return true; @@ -65,7 +71,9 @@ public: return cond_.broadcast(); } - void quit_waiting() { + bool quit_waiting() { + quit_.store(true, std::memory_order_release); + return broadcast(); } }; diff --git a/test/test_waiter.cpp b/test/test_waiter.cpp index 99ec7ee..727cca0 100755 --- a/test/test_waiter.cpp +++ b/test/test_waiter.cpp @@ -35,6 +35,34 @@ TEST(Waiter, broadcast) { } TEST(Waiter, quit_waiting) { + ipc::detail::waiter waiter; + EXPECT_TRUE(waiter.open("test-ipc-waiter")); + + std::thread t1 { + [&waiter] { + EXPECT_TRUE(waiter.wait_if([] { return true; })); + } + }; + + bool quit = false; + std::thread t2 { + [&quit] { + ipc::detail::waiter waiter {"test-ipc-waiter"}; + EXPECT_TRUE(waiter.wait_if([&quit] { return !quit; })); + } + }; + + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_TRUE(waiter.quit_waiting()); + t1.join(); + ASSERT_TRUE(t2.joinable()); + + EXPECT_TRUE(waiter.open("test-ipc-waiter")); + std::cout << "nofify quit...\n"; + quit = true; + EXPECT_TRUE(waiter.notify()); + t2.join(); + std::cout << "quit... \n"; } } // internal-linkage