From baf645eea1456a0b17dd9bcc03c4c898336e230d Mon Sep 17 00:00:00 2001 From: mutouyun Date: Thu, 16 Sep 2021 23:49:01 +0800 Subject: [PATCH 1/6] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=20recv=20timeout=20?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3cpu=E5=8D=A0=E7=94=A8=E8=BF=87=E9=AB=98?= =?UTF-8?q?=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/libipc/platform/waiter_linux.h | 2 -- src/libipc/waiter_helper.h | 15 ++++++++++++--- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/libipc/platform/waiter_linux.h b/src/libipc/platform/waiter_linux.h index eb9a3b1..0efc20e 100755 --- a/src/libipc/platform/waiter_linux.h +++ b/src/libipc/platform/waiter_linux.h @@ -224,8 +224,6 @@ public: static bool wait(handle_t h, std::size_t tm = invalid_value) { if (h == invalid()) return false; switch (tm) { - case 0: - return true; case invalid_value: IPC_SEMAPHORE_FUNC_(sem_wait, h); default: { diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h index a32035b..f866cf1 100644 --- a/src/libipc/waiter_helper.h +++ b/src/libipc/waiter_helper.h @@ -13,7 +13,7 @@ namespace detail { struct waiter_helper { struct wait_counter { - std::atomic waiting_ { 0 }; + std::atomic waiting_ { 0 }; long counter_ = 0; }; @@ -40,7 +40,7 @@ struct waiter_helper { { IPC_UNUSED_ auto guard = ctrl.get_lock(); if (!std::forward(pred)()) return true; - counter.counter_ += 1; + counter.counter_ = counter.waiting_.load(std::memory_order_relaxed); } mtx.unlock(); @@ -69,6 +69,11 @@ struct waiter_helper { return ret; } + template + static void clear_handshake(Ctrl & ctrl) { + while (ctrl.handshake_wait(0)) ; + } + template static bool notify(Ctrl & ctrl) { auto & counter = ctrl.counter(); @@ -77,6 +82,7 @@ struct waiter_helper { } bool ret = true; IPC_UNUSED_ auto guard = ctrl.get_lock(); + clear_handshake(ctrl); if (counter.counter_ > 0) { ret = ctrl.sema_post(1); counter.counter_ -= 1; @@ -93,11 +99,13 @@ struct waiter_helper { } bool ret = true; IPC_UNUSED_ auto guard = ctrl.get_lock(); + clear_handshake(ctrl); if (counter.counter_ > 0) { ret = ctrl.sema_post(counter.counter_); + auto tm = default_timeout / counter.counter_; do { counter.counter_ -= 1; - ret = ret && ctrl.handshake_wait(default_timeout); + ret = ret && ctrl.handshake_wait(tm); } while (counter.counter_ > 0); } return ret; @@ -116,6 +124,7 @@ struct waiter_helper { } bool ret = true; IPC_UNUSED_ auto guard = ctrl.get_lock(); + clear_handshake(ctrl); if (counter.counter_ > 0) { ret = ctrl.sema_post(counter.counter_); counter.counter_ -= 1; From 91385d727a6ed82631ed60d3988a768d3e91cd8a Mon Sep 17 00:00:00 2001 From: mutouyun Date: Fri, 17 Sep 2021 22:01:34 +0800 Subject: [PATCH 2/6] =?UTF-8?q?=E4=BF=AE=E6=AD=A3recv=E4=B8=AD=E6=96=AD?= =?UTF-8?q?=E5=90=8Ecounter=E6=97=A0=E6=B3=95=E4=B8=8B=E9=99=8D=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=9B=E6=B7=BB=E5=8A=A0=E6=96=B0=E7=9A=84?= =?UTF-8?q?=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 17 +++++++++++++---- demo/msg_que/main.cpp | 6 +++--- src/libipc/waiter_helper.h | 7 ++++++- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b4845b0..6d9b2a5 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,7 @@ if(NOT MSVC) set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") endif() -if (MSVC AND LIBIPC_USE_STATIC_CRT) +if (MSVC) set(CompilerFlags CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_DEBUG @@ -22,9 +22,17 @@ if (MSVC AND LIBIPC_USE_STATIC_CRT) CMAKE_C_FLAGS_DEBUG CMAKE_C_FLAGS_RELEASE ) - foreach(CompilerFlag ${CompilerFlags}) - string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") - endforeach() + if (LIBIPC_USE_STATIC_CRT) + foreach(CompilerFlag ${CompilerFlags}) + string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") + string(REPLACE "/MDd" "/MTd" ${CompilerFlag} "${${CompilerFlag}}") + endforeach() + else() + foreach(CompilerFlag ${CompilerFlags}) + string(REPLACE "/MT" "/MD" ${CompilerFlag} "${${CompilerFlag}}") + string(REPLACE "/MTd" "/MDd" ${CompilerFlag} "${${CompilerFlag}}") + endforeach() + endif() endif() set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) @@ -50,6 +58,7 @@ endif() if (LIBIPC_BUILD_DEMOS) add_subdirectory(demo/chat) add_subdirectory(demo/msg_que) + add_subdirectory(demo/send_recv) endif() install( diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index 5635101..27c5ce6 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -127,10 +127,10 @@ int main(int argc, char ** argv) { ::signal(SIGHUP , exit); #endif - if (std::string{ argv[1] } == mode_s__) { + std::string mode {argv[1]}; + if (mode == mode_s__) { do_send(); - } - else if (std::string{ argv[1] } == mode_r__) { + } else if (mode == mode_r__) { do_recv(); } return 0; diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h index f866cf1..a84f59c 100644 --- a/src/libipc/waiter_helper.h +++ b/src/libipc/waiter_helper.h @@ -34,7 +34,11 @@ struct waiter_helper { counter.waiting_.fetch_add(1, std::memory_order_release); flags.is_waiting_.store(true, std::memory_order_relaxed); auto finally = ipc::guard([&counter, &flags] { - counter.waiting_.fetch_sub(1, std::memory_order_release); + for (auto curr_wait = counter.waiting_.load(std::memory_order_acquire); curr_wait > 0;) { + if (counter.waiting_.compare_exchange_weak(curr_wait, curr_wait - 1, std::memory_order_release)) { + break; + } + } flags.is_waiting_.store(false, std::memory_order_relaxed); }); { @@ -107,6 +111,7 @@ struct waiter_helper { counter.counter_ -= 1; ret = ret && ctrl.handshake_wait(tm); } while (counter.counter_ > 0); + counter.waiting_.store(0, std::memory_order_release); } return ret; } From 843770442c343b76ad5145c132e19ad6b7574d2d Mon Sep 17 00:00:00 2001 From: mutouyun Date: Fri, 17 Sep 2021 22:25:53 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E9=81=BF=E5=85=8Dwait=5Fif=E7=9A=84counter?= =?UTF-8?q?=E5=9B=A0=E4=B8=BAABA=E9=97=AE=E9=A2=98=E5=AF=BC=E8=87=B4?= =?UTF-8?q?=E8=AE=A1=E6=95=B0=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/libipc/waiter_helper.h | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h index a84f59c..a366461 100644 --- a/src/libipc/waiter_helper.h +++ b/src/libipc/waiter_helper.h @@ -33,9 +33,10 @@ struct waiter_helper { auto & counter = ctrl.counter(); counter.waiting_.fetch_add(1, std::memory_order_release); flags.is_waiting_.store(true, std::memory_order_relaxed); - auto finally = ipc::guard([&counter, &flags] { - for (auto curr_wait = counter.waiting_.load(std::memory_order_acquire); curr_wait > 0;) { - if (counter.waiting_.compare_exchange_weak(curr_wait, curr_wait - 1, std::memory_order_release)) { + auto finally = ipc::guard([&ctrl, &counter, &flags] { + ctrl.get_lock(); // barrier + for (auto curr_wait = counter.waiting_.load(std::memory_order_relaxed); curr_wait > 0;) { + if (counter.waiting_.compare_exchange_weak(curr_wait, curr_wait - 1, std::memory_order_acq_rel)) { break; } } From 94ad05ce3540a2f93e7f6bca19affce2bae5250a Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 18 Sep 2021 00:11:11 +0800 Subject: [PATCH 4/6] =?UTF-8?q?=E8=B0=83=E6=95=B4ut?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test/test_ipc.cpp | 8 ++++---- test/test_queue.cpp | 44 ++++++++++++++++++++++---------------------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 16c076c..428cbd1 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -109,10 +109,10 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { for (int k = 0; k < s_cnt; ++k) { ipc_ut::sender() << [name, &sw, r_cnt, k] { Que que { name, ipc::sender }; - EXPECT_TRUE(que.wait_for_recv(r_cnt)); + ASSERT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); for (int i = 0; i < (int)data_set__.get().size(); ++i) { - EXPECT_TRUE(que.send(data_set__.get()[i])); + ASSERT_TRUE(que.send(data_set__.get()[i])); } }; } @@ -132,7 +132,7 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { if (data_set != got) { printf("data_set__.get()[%d] != got, size = %zd/%zd\n", i, data_set.size(), got.size()); - EXPECT_TRUE(false); + ASSERT_TRUE(false); } } }; @@ -140,7 +140,7 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().wait_for_done(); Que que { name }; - EXPECT_TRUE(que.wait_for_recv(r_cnt)); + ASSERT_TRUE(que.wait_for_recv(r_cnt)); for (int k = 0; k < r_cnt; ++k) { que.send(rand_buf{msg_head{-1}}); } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index a59b3a7..e85d39f 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -103,7 +103,7 @@ void test_sr(elems_t && elems, int s_cnt, int r_cnt, char const * me queue_t que { &elems }; ASSERT_TRUE(que.connect()); while (pop(que).pid_ >= 0) ; - EXPECT_TRUE(que.disconnect()); + ASSERT_TRUE(que.disconnect()); }; } @@ -133,7 +133,7 @@ TEST(Queue, el_connection) { elems_t el; EXPECT_TRUE(el.connect_sender()); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_FALSE(el.connect_sender()); + ASSERT_FALSE(el.connect_sender()); } el.disconnect_sender(); EXPECT_TRUE(el.connect_sender()); @@ -141,7 +141,7 @@ TEST(Queue, el_connection) { { elems_t el; for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(el.connect_sender()); + ASSERT_TRUE(el.connect_sender()); } } { @@ -149,7 +149,7 @@ TEST(Queue, el_connection) { auto cc = el.connect_receiver(); EXPECT_NE(cc, 0); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_EQ(el.connect_receiver(), 0); + ASSERT_EQ(el.connect_receiver(), 0); } EXPECT_EQ(el.disconnect_receiver(cc), 0); EXPECT_EQ(el.connect_receiver(), cc); @@ -157,10 +157,10 @@ TEST(Queue, el_connection) { { elems_t el; for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { - EXPECT_NE(el.connect_receiver(), 0); + ASSERT_NE(el.connect_receiver(), 0); } for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_EQ(el.connect_receiver(), 0); + ASSERT_EQ(el.connect_receiver(), 0); } } } @@ -171,11 +171,11 @@ TEST(Queue, connection) { queue_t que{&el}; // sending for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.ready_sending()); + ASSERT_FALSE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { que.shut_sending(); @@ -186,15 +186,15 @@ TEST(Queue, connection) { } // receiving for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } EXPECT_TRUE(que.disconnect()); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_FALSE(que.disconnect()); + ASSERT_FALSE(que.disconnect()); } { queue_t que{&el}; @@ -202,7 +202,7 @@ TEST(Queue, connection) { } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } } { @@ -210,42 +210,42 @@ TEST(Queue, connection) { queue_t que{&el}; // sending for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { que.shut_sending(); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } // receiving for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { queue_t que{&el}; - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } - EXPECT_TRUE(que.disconnect()); + ASSERT_TRUE(que.disconnect()); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_FALSE(que.disconnect()); + ASSERT_FALSE(que.disconnect()); } { queue_t que{&el}; - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } } } From 68590dd2f3e7f1a0a1278daee7719eb6614bb771 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 19 Sep 2021 16:14:48 +0800 Subject: [PATCH 5/6] commit new demo --- demo/send_recv/CMakeLists.txt | 11 +++++++++ demo/send_recv/main.cpp | 46 +++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 demo/send_recv/CMakeLists.txt create mode 100644 demo/send_recv/main.cpp diff --git a/demo/send_recv/CMakeLists.txt b/demo/send_recv/CMakeLists.txt new file mode 100644 index 0000000..9d3f0fb --- /dev/null +++ b/demo/send_recv/CMakeLists.txt @@ -0,0 +1,11 @@ +project(send_recv) + +include_directories( + ${LIBIPC_PROJECT_DIR}/3rdparty) + +file(GLOB SRC_FILES ./*.cpp) +file(GLOB HEAD_FILES ./*.h) + +add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES}) + +target_link_libraries(${PROJECT_NAME} ipc) diff --git a/demo/send_recv/main.cpp b/demo/send_recv/main.cpp new file mode 100644 index 0000000..3acc52a --- /dev/null +++ b/demo/send_recv/main.cpp @@ -0,0 +1,46 @@ + +#include +#include +#include +#include + +#include "libipc/ipc.h" + +namespace { + +void do_send(int size, int interval) { + ipc::channel ipc {"ipc", ipc::sender}; + std::string buffer(size, 'A'); + while (true) { + std::cout << "send size: " << buffer.size() + 1 << "\n"; + ipc.send(buffer, 0/*tm*/); + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } +} + +void do_recv(int interval) { + ipc::channel ipc {"ipc", ipc::receiver}; + while (true) { + ipc::buff_t recv; + for (int k = 1; recv.empty(); ++k) { + std::cout << "recv waiting... " << k << "\n"; + recv = ipc.recv(interval); + } + std::cout << "recv size: " << recv.size() << "\n"; + } +} + +} // namespace + +int main(int argc, char ** argv) { + if (argc < 3) return -1; + std::string mode {argv[1]}; + if (mode == "send") { + if (argc < 4) return -1; + do_send(std::stoi(argv[2]) /*size*/, + std::stoi(argv[3]) /*interval*/); + } else if (mode == "recv") { + do_recv(std::stoi(argv[2]) /*interval*/); + } + return 0; +} From be6f16f87f1d9ec1a7a1855b9d525cf447b7a1bf Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 19 Sep 2021 16:29:06 +0800 Subject: [PATCH 6/6] revert some changes --- src/libipc/waiter_helper.h | 1 - test/test_ipc.cpp | 35 ++++++++++++++++++++++++++--------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h index a366461..4493306 100644 --- a/src/libipc/waiter_helper.h +++ b/src/libipc/waiter_helper.h @@ -34,7 +34,6 @@ struct waiter_helper { counter.waiting_.fetch_add(1, std::memory_order_release); flags.is_waiting_.store(true, std::memory_order_relaxed); auto finally = ipc::guard([&ctrl, &counter, &flags] { - ctrl.get_lock(); // barrier for (auto curr_wait = counter.waiting_.load(std::memory_order_relaxed); curr_wait > 0;) { if (counter.waiting_.compare_exchange_weak(curr_wait, curr_wait - 1, std::memory_order_acq_rel)) { break; diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 428cbd1..79f3b26 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -18,8 +18,9 @@ using namespace ipc; namespace { -constexpr int LoopCount = 10000; -constexpr int MultiMax = 8; +constexpr int LoopCount = 10000; +constexpr int MultiMax = 8; +constexpr int TestBuffMax = 65536; struct msg_head { int id_; @@ -28,7 +29,7 @@ struct msg_head { class rand_buf : public buffer { public: rand_buf() { - int size = capo::random<>{sizeof(msg_head), 65536}(); + int size = capo::random<>{sizeof(msg_head), TestBuffMax}(); *this = buffer(new char[size], size, [](void * p, std::size_t) { delete [] static_cast(p); }); @@ -98,6 +99,22 @@ public: } } const data_set__; +#define IPC_ASSERT_TRUE(condition) \ + do { \ + bool check = !!(condition); \ + GTEST_TEST_BOOLEAN_(check, #condition, false, true, \ + GTEST_NONFATAL_FAILURE_); \ + if (!check) return; \ + } while (0) + +#define IPC_ASSERT_FALSE(condition) \ + do { \ + bool check = !!(condition); \ + GTEST_TEST_BOOLEAN_(!check, #condition, true, false, \ + GTEST_NONFATAL_FAILURE_); \ + if (check) return; \ + } while (0) + template > void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().start(static_cast(s_cnt)); @@ -109,10 +126,10 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { for (int k = 0; k < s_cnt; ++k) { ipc_ut::sender() << [name, &sw, r_cnt, k] { Que que { name, ipc::sender }; - ASSERT_TRUE(que.wait_for_recv(r_cnt)); + IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); for (int i = 0; i < (int)data_set__.get().size(); ++i) { - ASSERT_TRUE(que.send(data_set__.get()[i])); + IPC_ASSERT_TRUE(que.send(data_set__.get()[i])); } }; } @@ -122,17 +139,17 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { Que que { name, ipc::receiver }; for (;;) { rand_buf got { que.recv() }; - ASSERT_FALSE(got.empty()); + IPC_ASSERT_FALSE(got.empty()); int i = got.get_id(); if (i == -1) { return; } - ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); + IPC_ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); auto const &data_set = data_set__.get()[i]; if (data_set != got) { printf("data_set__.get()[%d] != got, size = %zd/%zd\n", i, data_set.size(), got.size()); - ASSERT_TRUE(false); + IPC_ASSERT_TRUE(false); } } }; @@ -140,7 +157,7 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().wait_for_done(); Que que { name }; - ASSERT_TRUE(que.wait_for_recv(r_cnt)); + IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt)); for (int k = 0; k < r_cnt; ++k) { que.send(rand_buf{msg_head{-1}}); }