diff --git a/CMakeLists.txt b/CMakeLists.txt index b4845b0..6d9b2a5 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,7 +13,7 @@ if(NOT MSVC) set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") endif() -if (MSVC AND LIBIPC_USE_STATIC_CRT) +if (MSVC) set(CompilerFlags CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS_DEBUG @@ -22,9 +22,17 @@ if (MSVC AND LIBIPC_USE_STATIC_CRT) CMAKE_C_FLAGS_DEBUG CMAKE_C_FLAGS_RELEASE ) - foreach(CompilerFlag ${CompilerFlags}) - string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") - endforeach() + if (LIBIPC_USE_STATIC_CRT) + foreach(CompilerFlag ${CompilerFlags}) + string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") + string(REPLACE "/MDd" "/MTd" ${CompilerFlag} "${${CompilerFlag}}") + endforeach() + else() + foreach(CompilerFlag ${CompilerFlags}) + string(REPLACE "/MT" "/MD" ${CompilerFlag} "${${CompilerFlag}}") + string(REPLACE "/MTd" "/MDd" ${CompilerFlag} "${${CompilerFlag}}") + endforeach() + endif() endif() set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) @@ -50,6 +58,7 @@ endif() if (LIBIPC_BUILD_DEMOS) add_subdirectory(demo/chat) add_subdirectory(demo/msg_que) + add_subdirectory(demo/send_recv) endif() install( diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index 5635101..27c5ce6 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -127,10 +127,10 @@ int main(int argc, char ** argv) { ::signal(SIGHUP , exit); #endif - if (std::string{ argv[1] } == mode_s__) { + std::string mode {argv[1]}; + if (mode == mode_s__) { do_send(); - } - else if (std::string{ argv[1] } == mode_r__) { + } else if (mode == mode_r__) { do_recv(); } return 0; diff --git a/demo/send_recv/CMakeLists.txt b/demo/send_recv/CMakeLists.txt new file mode 100644 index 0000000..9d3f0fb --- /dev/null +++ b/demo/send_recv/CMakeLists.txt @@ -0,0 +1,11 @@ +project(send_recv) + +include_directories( + ${LIBIPC_PROJECT_DIR}/3rdparty) + +file(GLOB SRC_FILES ./*.cpp) +file(GLOB HEAD_FILES ./*.h) + +add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES}) + +target_link_libraries(${PROJECT_NAME} ipc) diff --git a/demo/send_recv/main.cpp b/demo/send_recv/main.cpp new file mode 100644 index 0000000..3acc52a --- /dev/null +++ b/demo/send_recv/main.cpp @@ -0,0 +1,46 @@ + +#include +#include +#include +#include + +#include "libipc/ipc.h" + +namespace { + +void do_send(int size, int interval) { + ipc::channel ipc {"ipc", ipc::sender}; + std::string buffer(size, 'A'); + while (true) { + std::cout << "send size: " << buffer.size() + 1 << "\n"; + ipc.send(buffer, 0/*tm*/); + std::this_thread::sleep_for(std::chrono::milliseconds(interval)); + } +} + +void do_recv(int interval) { + ipc::channel ipc {"ipc", ipc::receiver}; + while (true) { + ipc::buff_t recv; + for (int k = 1; recv.empty(); ++k) { + std::cout << "recv waiting... " << k << "\n"; + recv = ipc.recv(interval); + } + std::cout << "recv size: " << recv.size() << "\n"; + } +} + +} // namespace + +int main(int argc, char ** argv) { + if (argc < 3) return -1; + std::string mode {argv[1]}; + if (mode == "send") { + if (argc < 4) return -1; + do_send(std::stoi(argv[2]) /*size*/, + std::stoi(argv[3]) /*interval*/); + } else if (mode == "recv") { + do_recv(std::stoi(argv[2]) /*interval*/); + } + return 0; +} diff --git a/src/libipc/platform/waiter_linux.h b/src/libipc/platform/waiter_linux.h index 006ba6a..fffe8d1 100755 --- a/src/libipc/platform/waiter_linux.h +++ b/src/libipc/platform/waiter_linux.h @@ -223,8 +223,6 @@ public: static bool wait(handle_t h, std::uint64_t tm = invalid_value) { if (h == invalid()) return false; switch (tm) { - case 0: - return true; case invalid_value: IPC_SEMAPHORE_FUNC_(sem_wait, h); default: { diff --git a/src/libipc/waiter_helper.h b/src/libipc/waiter_helper.h index 7bc6a08..39031f1 100644 --- a/src/libipc/waiter_helper.h +++ b/src/libipc/waiter_helper.h @@ -13,7 +13,7 @@ namespace detail { struct waiter_helper { struct wait_counter { - std::atomic waiting_ { 0 }; + std::atomic waiting_ { 0 }; long counter_ = 0; }; @@ -33,14 +33,18 @@ struct waiter_helper { auto & counter = ctrl.counter(); counter.waiting_.fetch_add(1, std::memory_order_release); flags.is_waiting_.store(true, std::memory_order_relaxed); - auto finally = ipc::guard([&counter, &flags] { - counter.waiting_.fetch_sub(1, std::memory_order_release); + auto finally = ipc::guard([&ctrl, &counter, &flags] { + for (auto curr_wait = counter.waiting_.load(std::memory_order_relaxed); curr_wait > 0;) { + if (counter.waiting_.compare_exchange_weak(curr_wait, curr_wait - 1, std::memory_order_acq_rel)) { + break; + } + } flags.is_waiting_.store(false, std::memory_order_relaxed); }); { IPC_UNUSED_ auto guard = ctrl.get_lock(); if (!std::forward(pred)()) return true; - counter.counter_ += 1; + counter.counter_ = counter.waiting_.load(std::memory_order_relaxed); } mtx.unlock(); @@ -69,6 +73,11 @@ struct waiter_helper { return ret; } + template + static void clear_handshake(Ctrl & ctrl) { + while (ctrl.handshake_wait(0)) ; + } + template static bool notify(Ctrl & ctrl) { auto & counter = ctrl.counter(); @@ -77,6 +86,7 @@ struct waiter_helper { } bool ret = true; IPC_UNUSED_ auto guard = ctrl.get_lock(); + clear_handshake(ctrl); if (counter.counter_ > 0) { ret = ctrl.sema_post(1); counter.counter_ -= 1; @@ -93,12 +103,15 @@ struct waiter_helper { } bool ret = true; IPC_UNUSED_ auto guard = ctrl.get_lock(); + clear_handshake(ctrl); if (counter.counter_ > 0) { ret = ctrl.sema_post(counter.counter_); + auto tm = default_timeout / counter.counter_; do { counter.counter_ -= 1; - ret = ret && ctrl.handshake_wait(default_timeout); + ret = ret && ctrl.handshake_wait(tm); } while (counter.counter_ > 0); + counter.waiting_.store(0, std::memory_order_release); } return ret; } @@ -116,6 +129,7 @@ struct waiter_helper { } bool ret = true; IPC_UNUSED_ auto guard = ctrl.get_lock(); + clear_handshake(ctrl); if (counter.counter_ > 0) { ret = ctrl.sema_post(counter.counter_); counter.counter_ -= 1; diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 16c076c..79f3b26 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -18,8 +18,9 @@ using namespace ipc; namespace { -constexpr int LoopCount = 10000; -constexpr int MultiMax = 8; +constexpr int LoopCount = 10000; +constexpr int MultiMax = 8; +constexpr int TestBuffMax = 65536; struct msg_head { int id_; @@ -28,7 +29,7 @@ struct msg_head { class rand_buf : public buffer { public: rand_buf() { - int size = capo::random<>{sizeof(msg_head), 65536}(); + int size = capo::random<>{sizeof(msg_head), TestBuffMax}(); *this = buffer(new char[size], size, [](void * p, std::size_t) { delete [] static_cast(p); }); @@ -98,6 +99,22 @@ public: } } const data_set__; +#define IPC_ASSERT_TRUE(condition) \ + do { \ + bool check = !!(condition); \ + GTEST_TEST_BOOLEAN_(check, #condition, false, true, \ + GTEST_NONFATAL_FAILURE_); \ + if (!check) return; \ + } while (0) + +#define IPC_ASSERT_FALSE(condition) \ + do { \ + bool check = !!(condition); \ + GTEST_TEST_BOOLEAN_(!check, #condition, true, false, \ + GTEST_NONFATAL_FAILURE_); \ + if (check) return; \ + } while (0) + template > void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().start(static_cast(s_cnt)); @@ -109,10 +126,10 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { for (int k = 0; k < s_cnt; ++k) { ipc_ut::sender() << [name, &sw, r_cnt, k] { Que que { name, ipc::sender }; - EXPECT_TRUE(que.wait_for_recv(r_cnt)); + IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); for (int i = 0; i < (int)data_set__.get().size(); ++i) { - EXPECT_TRUE(que.send(data_set__.get()[i])); + IPC_ASSERT_TRUE(que.send(data_set__.get()[i])); } }; } @@ -122,17 +139,17 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { Que que { name, ipc::receiver }; for (;;) { rand_buf got { que.recv() }; - ASSERT_FALSE(got.empty()); + IPC_ASSERT_FALSE(got.empty()); int i = got.get_id(); if (i == -1) { return; } - ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); + IPC_ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); auto const &data_set = data_set__.get()[i]; if (data_set != got) { printf("data_set__.get()[%d] != got, size = %zd/%zd\n", i, data_set.size(), got.size()); - EXPECT_TRUE(false); + IPC_ASSERT_TRUE(false); } } }; @@ -140,7 +157,7 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().wait_for_done(); Que que { name }; - EXPECT_TRUE(que.wait_for_recv(r_cnt)); + IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt)); for (int k = 0; k < r_cnt; ++k) { que.send(rand_buf{msg_head{-1}}); } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index a59b3a7..e85d39f 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -103,7 +103,7 @@ void test_sr(elems_t && elems, int s_cnt, int r_cnt, char const * me queue_t que { &elems }; ASSERT_TRUE(que.connect()); while (pop(que).pid_ >= 0) ; - EXPECT_TRUE(que.disconnect()); + ASSERT_TRUE(que.disconnect()); }; } @@ -133,7 +133,7 @@ TEST(Queue, el_connection) { elems_t el; EXPECT_TRUE(el.connect_sender()); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_FALSE(el.connect_sender()); + ASSERT_FALSE(el.connect_sender()); } el.disconnect_sender(); EXPECT_TRUE(el.connect_sender()); @@ -141,7 +141,7 @@ TEST(Queue, el_connection) { { elems_t el; for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(el.connect_sender()); + ASSERT_TRUE(el.connect_sender()); } } { @@ -149,7 +149,7 @@ TEST(Queue, el_connection) { auto cc = el.connect_receiver(); EXPECT_NE(cc, 0); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_EQ(el.connect_receiver(), 0); + ASSERT_EQ(el.connect_receiver(), 0); } EXPECT_EQ(el.disconnect_receiver(cc), 0); EXPECT_EQ(el.connect_receiver(), cc); @@ -157,10 +157,10 @@ TEST(Queue, el_connection) { { elems_t el; for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { - EXPECT_NE(el.connect_receiver(), 0); + ASSERT_NE(el.connect_receiver(), 0); } for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_EQ(el.connect_receiver(), 0); + ASSERT_EQ(el.connect_receiver(), 0); } } } @@ -171,11 +171,11 @@ TEST(Queue, connection) { queue_t que{&el}; // sending for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.ready_sending()); + ASSERT_FALSE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { que.shut_sending(); @@ -186,15 +186,15 @@ TEST(Queue, connection) { } // receiving for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } EXPECT_TRUE(que.disconnect()); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_FALSE(que.disconnect()); + ASSERT_FALSE(que.disconnect()); } { queue_t que{&el}; @@ -202,7 +202,7 @@ TEST(Queue, connection) { } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } } { @@ -210,42 +210,42 @@ TEST(Queue, connection) { queue_t que{&el}; // sending for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } for (std::size_t i = 0; i < 10000; ++i) { que.shut_sending(); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_TRUE(que.ready_sending()); + ASSERT_TRUE(que.ready_sending()); } // receiving for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { queue_t que{&el}; - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } - EXPECT_TRUE(que.disconnect()); + ASSERT_TRUE(que.disconnect()); for (std::size_t i = 0; i < 10000; ++i) { - EXPECT_FALSE(que.disconnect()); + ASSERT_FALSE(que.disconnect()); } { queue_t que{&el}; - EXPECT_TRUE(que.connect()); + ASSERT_TRUE(que.connect()); } for (std::size_t i = 0; i < 10000; ++i) { queue_t que{&el}; - EXPECT_FALSE(que.connect()); + ASSERT_FALSE(que.connect()); } } }