Merge branch 'issue-61' into develop

This commit is contained in:
mutouyun 2021-09-19 16:29:31 +08:00
commit 4ca300b3e5
8 changed files with 140 additions and 45 deletions

View File

@ -13,7 +13,7 @@ if(NOT MSVC)
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2")
endif() endif()
if (MSVC AND LIBIPC_USE_STATIC_CRT) if (MSVC)
set(CompilerFlags set(CompilerFlags
CMAKE_CXX_FLAGS CMAKE_CXX_FLAGS
CMAKE_CXX_FLAGS_DEBUG CMAKE_CXX_FLAGS_DEBUG
@ -22,9 +22,17 @@ if (MSVC AND LIBIPC_USE_STATIC_CRT)
CMAKE_C_FLAGS_DEBUG CMAKE_C_FLAGS_DEBUG
CMAKE_C_FLAGS_RELEASE CMAKE_C_FLAGS_RELEASE
) )
if (LIBIPC_USE_STATIC_CRT)
foreach(CompilerFlag ${CompilerFlags}) foreach(CompilerFlag ${CompilerFlags})
string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}") string(REPLACE "/MD" "/MT" ${CompilerFlag} "${${CompilerFlag}}")
string(REPLACE "/MDd" "/MTd" ${CompilerFlag} "${${CompilerFlag}}")
endforeach() endforeach()
else()
foreach(CompilerFlag ${CompilerFlags})
string(REPLACE "/MT" "/MD" ${CompilerFlag} "${${CompilerFlag}}")
string(REPLACE "/MTd" "/MDd" ${CompilerFlag} "${${CompilerFlag}}")
endforeach()
endif()
endif() endif()
set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin) set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/bin)
@ -50,6 +58,7 @@ endif()
if (LIBIPC_BUILD_DEMOS) if (LIBIPC_BUILD_DEMOS)
add_subdirectory(demo/chat) add_subdirectory(demo/chat)
add_subdirectory(demo/msg_que) add_subdirectory(demo/msg_que)
add_subdirectory(demo/send_recv)
endif() endif()
install( install(

View File

@ -127,10 +127,10 @@ int main(int argc, char ** argv) {
::signal(SIGHUP , exit); ::signal(SIGHUP , exit);
#endif #endif
if (std::string{ argv[1] } == mode_s__) { std::string mode {argv[1]};
if (mode == mode_s__) {
do_send(); do_send();
} } else if (mode == mode_r__) {
else if (std::string{ argv[1] } == mode_r__) {
do_recv(); do_recv();
} }
return 0; return 0;

View File

@ -0,0 +1,11 @@
project(send_recv)
include_directories(
${LIBIPC_PROJECT_DIR}/3rdparty)
file(GLOB SRC_FILES ./*.cpp)
file(GLOB HEAD_FILES ./*.h)
add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES})
target_link_libraries(${PROJECT_NAME} ipc)

46
demo/send_recv/main.cpp Normal file
View File

@ -0,0 +1,46 @@
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include "libipc/ipc.h"
namespace {
void do_send(int size, int interval) {
ipc::channel ipc {"ipc", ipc::sender};
std::string buffer(size, 'A');
while (true) {
std::cout << "send size: " << buffer.size() + 1 << "\n";
ipc.send(buffer, 0/*tm*/);
std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
}
void do_recv(int interval) {
ipc::channel ipc {"ipc", ipc::receiver};
while (true) {
ipc::buff_t recv;
for (int k = 1; recv.empty(); ++k) {
std::cout << "recv waiting... " << k << "\n";
recv = ipc.recv(interval);
}
std::cout << "recv size: " << recv.size() << "\n";
}
}
} // namespace
int main(int argc, char ** argv) {
if (argc < 3) return -1;
std::string mode {argv[1]};
if (mode == "send") {
if (argc < 4) return -1;
do_send(std::stoi(argv[2]) /*size*/,
std::stoi(argv[3]) /*interval*/);
} else if (mode == "recv") {
do_recv(std::stoi(argv[2]) /*interval*/);
}
return 0;
}

View File

@ -223,8 +223,6 @@ public:
static bool wait(handle_t h, std::uint64_t tm = invalid_value) { static bool wait(handle_t h, std::uint64_t tm = invalid_value) {
if (h == invalid()) return false; if (h == invalid()) return false;
switch (tm) { switch (tm) {
case 0:
return true;
case invalid_value: case invalid_value:
IPC_SEMAPHORE_FUNC_(sem_wait, h); IPC_SEMAPHORE_FUNC_(sem_wait, h);
default: { default: {

View File

@ -13,7 +13,7 @@ namespace detail {
struct waiter_helper { struct waiter_helper {
struct wait_counter { struct wait_counter {
std::atomic<unsigned> waiting_ { 0 }; std::atomic<long> waiting_ { 0 };
long counter_ = 0; long counter_ = 0;
}; };
@ -33,14 +33,18 @@ struct waiter_helper {
auto & counter = ctrl.counter(); auto & counter = ctrl.counter();
counter.waiting_.fetch_add(1, std::memory_order_release); counter.waiting_.fetch_add(1, std::memory_order_release);
flags.is_waiting_.store(true, std::memory_order_relaxed); flags.is_waiting_.store(true, std::memory_order_relaxed);
auto finally = ipc::guard([&counter, &flags] { auto finally = ipc::guard([&ctrl, &counter, &flags] {
counter.waiting_.fetch_sub(1, std::memory_order_release); for (auto curr_wait = counter.waiting_.load(std::memory_order_relaxed); curr_wait > 0;) {
if (counter.waiting_.compare_exchange_weak(curr_wait, curr_wait - 1, std::memory_order_acq_rel)) {
break;
}
}
flags.is_waiting_.store(false, std::memory_order_relaxed); flags.is_waiting_.store(false, std::memory_order_relaxed);
}); });
{ {
IPC_UNUSED_ auto guard = ctrl.get_lock(); IPC_UNUSED_ auto guard = ctrl.get_lock();
if (!std::forward<F>(pred)()) return true; if (!std::forward<F>(pred)()) return true;
counter.counter_ += 1; counter.counter_ = counter.waiting_.load(std::memory_order_relaxed);
} }
mtx.unlock(); mtx.unlock();
@ -69,6 +73,11 @@ struct waiter_helper {
return ret; return ret;
} }
template <typename Ctrl>
static void clear_handshake(Ctrl & ctrl) {
while (ctrl.handshake_wait(0)) ;
}
template <typename Ctrl> template <typename Ctrl>
static bool notify(Ctrl & ctrl) { static bool notify(Ctrl & ctrl) {
auto & counter = ctrl.counter(); auto & counter = ctrl.counter();
@ -77,6 +86,7 @@ struct waiter_helper {
} }
bool ret = true; bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock(); IPC_UNUSED_ auto guard = ctrl.get_lock();
clear_handshake(ctrl);
if (counter.counter_ > 0) { if (counter.counter_ > 0) {
ret = ctrl.sema_post(1); ret = ctrl.sema_post(1);
counter.counter_ -= 1; counter.counter_ -= 1;
@ -93,12 +103,15 @@ struct waiter_helper {
} }
bool ret = true; bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock(); IPC_UNUSED_ auto guard = ctrl.get_lock();
clear_handshake(ctrl);
if (counter.counter_ > 0) { if (counter.counter_ > 0) {
ret = ctrl.sema_post(counter.counter_); ret = ctrl.sema_post(counter.counter_);
auto tm = default_timeout / counter.counter_;
do { do {
counter.counter_ -= 1; counter.counter_ -= 1;
ret = ret && ctrl.handshake_wait(default_timeout); ret = ret && ctrl.handshake_wait(tm);
} while (counter.counter_ > 0); } while (counter.counter_ > 0);
counter.waiting_.store(0, std::memory_order_release);
} }
return ret; return ret;
} }
@ -116,6 +129,7 @@ struct waiter_helper {
} }
bool ret = true; bool ret = true;
IPC_UNUSED_ auto guard = ctrl.get_lock(); IPC_UNUSED_ auto guard = ctrl.get_lock();
clear_handshake(ctrl);
if (counter.counter_ > 0) { if (counter.counter_ > 0) {
ret = ctrl.sema_post(counter.counter_); ret = ctrl.sema_post(counter.counter_);
counter.counter_ -= 1; counter.counter_ -= 1;

View File

@ -20,6 +20,7 @@ namespace {
constexpr int LoopCount = 10000; constexpr int LoopCount = 10000;
constexpr int MultiMax = 8; constexpr int MultiMax = 8;
constexpr int TestBuffMax = 65536;
struct msg_head { struct msg_head {
int id_; int id_;
@ -28,7 +29,7 @@ struct msg_head {
class rand_buf : public buffer { class rand_buf : public buffer {
public: public:
rand_buf() { rand_buf() {
int size = capo::random<>{sizeof(msg_head), 65536}(); int size = capo::random<>{sizeof(msg_head), TestBuffMax}();
*this = buffer(new char[size], size, [](void * p, std::size_t) { *this = buffer(new char[size], size, [](void * p, std::size_t) {
delete [] static_cast<char *>(p); delete [] static_cast<char *>(p);
}); });
@ -98,6 +99,22 @@ public:
} }
} const data_set__; } const data_set__;
#define IPC_ASSERT_TRUE(condition) \
do { \
bool check = !!(condition); \
GTEST_TEST_BOOLEAN_(check, #condition, false, true, \
GTEST_NONFATAL_FAILURE_); \
if (!check) return; \
} while (0)
#define IPC_ASSERT_FALSE(condition) \
do { \
bool check = !!(condition); \
GTEST_TEST_BOOLEAN_(!check, #condition, true, false, \
GTEST_NONFATAL_FAILURE_); \
if (check) return; \
} while (0)
template <relat Rp, relat Rc, trans Ts, typename Que = chan<Rp, Rc, Ts>> template <relat Rp, relat Rc, trans Ts, typename Que = chan<Rp, Rc, Ts>>
void test_sr(char const * name, int s_cnt, int r_cnt) { void test_sr(char const * name, int s_cnt, int r_cnt) {
ipc_ut::sender().start(static_cast<std::size_t>(s_cnt)); ipc_ut::sender().start(static_cast<std::size_t>(s_cnt));
@ -109,10 +126,10 @@ void test_sr(char const * name, int s_cnt, int r_cnt) {
for (int k = 0; k < s_cnt; ++k) { for (int k = 0; k < s_cnt; ++k) {
ipc_ut::sender() << [name, &sw, r_cnt, k] { ipc_ut::sender() << [name, &sw, r_cnt, k] {
Que que { name, ipc::sender }; Que que { name, ipc::sender };
EXPECT_TRUE(que.wait_for_recv(r_cnt)); IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt));
sw.start(); sw.start();
for (int i = 0; i < (int)data_set__.get().size(); ++i) { for (int i = 0; i < (int)data_set__.get().size(); ++i) {
EXPECT_TRUE(que.send(data_set__.get()[i])); IPC_ASSERT_TRUE(que.send(data_set__.get()[i]));
} }
}; };
} }
@ -122,17 +139,17 @@ void test_sr(char const * name, int s_cnt, int r_cnt) {
Que que { name, ipc::receiver }; Que que { name, ipc::receiver };
for (;;) { for (;;) {
rand_buf got { que.recv() }; rand_buf got { que.recv() };
ASSERT_FALSE(got.empty()); IPC_ASSERT_FALSE(got.empty());
int i = got.get_id(); int i = got.get_id();
if (i == -1) { if (i == -1) {
return; return;
} }
ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); IPC_ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size()));
auto const &data_set = data_set__.get()[i]; auto const &data_set = data_set__.get()[i];
if (data_set != got) { if (data_set != got) {
printf("data_set__.get()[%d] != got, size = %zd/%zd\n", printf("data_set__.get()[%d] != got, size = %zd/%zd\n",
i, data_set.size(), got.size()); i, data_set.size(), got.size());
EXPECT_TRUE(false); IPC_ASSERT_TRUE(false);
} }
} }
}; };
@ -140,7 +157,7 @@ void test_sr(char const * name, int s_cnt, int r_cnt) {
ipc_ut::sender().wait_for_done(); ipc_ut::sender().wait_for_done();
Que que { name }; Que que { name };
EXPECT_TRUE(que.wait_for_recv(r_cnt)); IPC_ASSERT_TRUE(que.wait_for_recv(r_cnt));
for (int k = 0; k < r_cnt; ++k) { for (int k = 0; k < r_cnt; ++k) {
que.send(rand_buf{msg_head{-1}}); que.send(rand_buf{msg_head{-1}});
} }

View File

@ -103,7 +103,7 @@ void test_sr(elems_t<Rp, Rc, Ts> && elems, int s_cnt, int r_cnt, char const * me
queue_t<Rp, Rc, Ts> que { &elems }; queue_t<Rp, Rc, Ts> que { &elems };
ASSERT_TRUE(que.connect()); ASSERT_TRUE(que.connect());
while (pop(que).pid_ >= 0) ; while (pop(que).pid_ >= 0) ;
EXPECT_TRUE(que.disconnect()); ASSERT_TRUE(que.disconnect());
}; };
} }
@ -133,7 +133,7 @@ TEST(Queue, el_connection) {
elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> el; elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> el;
EXPECT_TRUE(el.connect_sender()); EXPECT_TRUE(el.connect_sender());
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_FALSE(el.connect_sender()); ASSERT_FALSE(el.connect_sender());
} }
el.disconnect_sender(); el.disconnect_sender();
EXPECT_TRUE(el.connect_sender()); EXPECT_TRUE(el.connect_sender());
@ -141,7 +141,7 @@ TEST(Queue, el_connection) {
{ {
elems_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast> el; elems_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast> el;
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(el.connect_sender()); ASSERT_TRUE(el.connect_sender());
} }
} }
{ {
@ -149,7 +149,7 @@ TEST(Queue, el_connection) {
auto cc = el.connect_receiver(); auto cc = el.connect_receiver();
EXPECT_NE(cc, 0); EXPECT_NE(cc, 0);
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_EQ(el.connect_receiver(), 0); ASSERT_EQ(el.connect_receiver(), 0);
} }
EXPECT_EQ(el.disconnect_receiver(cc), 0); EXPECT_EQ(el.disconnect_receiver(cc), 0);
EXPECT_EQ(el.connect_receiver(), cc); EXPECT_EQ(el.connect_receiver(), cc);
@ -157,10 +157,10 @@ TEST(Queue, el_connection) {
{ {
elems_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast> el; elems_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast> el;
for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) {
EXPECT_NE(el.connect_receiver(), 0); ASSERT_NE(el.connect_receiver(), 0);
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_EQ(el.connect_receiver(), 0); ASSERT_EQ(el.connect_receiver(), 0);
} }
} }
} }
@ -171,11 +171,11 @@ TEST(Queue, connection) {
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el}; queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
// sending // sending
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(que.ready_sending()); ASSERT_TRUE(que.ready_sending());
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el}; queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
EXPECT_FALSE(que.ready_sending()); ASSERT_FALSE(que.ready_sending());
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
que.shut_sending(); que.shut_sending();
@ -186,15 +186,15 @@ TEST(Queue, connection) {
} }
// receiving // receiving
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(que.connect()); ASSERT_TRUE(que.connect());
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el}; queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
EXPECT_FALSE(que.connect()); ASSERT_FALSE(que.connect());
} }
EXPECT_TRUE(que.disconnect()); EXPECT_TRUE(que.disconnect());
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_FALSE(que.disconnect()); ASSERT_FALSE(que.disconnect());
} }
{ {
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el}; queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
@ -202,7 +202,7 @@ TEST(Queue, connection) {
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el}; queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
EXPECT_FALSE(que.connect()); ASSERT_FALSE(que.connect());
} }
} }
{ {
@ -210,42 +210,42 @@ TEST(Queue, connection) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el}; queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
// sending // sending
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(que.ready_sending()); ASSERT_TRUE(que.ready_sending());
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el}; queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_TRUE(que.ready_sending()); ASSERT_TRUE(que.ready_sending());
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
que.shut_sending(); que.shut_sending();
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el}; queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_TRUE(que.ready_sending()); ASSERT_TRUE(que.ready_sending());
} }
// receiving // receiving
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(que.connect()); ASSERT_TRUE(que.connect());
} }
for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el}; queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_TRUE(que.connect()); ASSERT_TRUE(que.connect());
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el}; queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_FALSE(que.connect()); ASSERT_FALSE(que.connect());
} }
EXPECT_TRUE(que.disconnect()); ASSERT_TRUE(que.disconnect());
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_FALSE(que.disconnect()); ASSERT_FALSE(que.disconnect());
} }
{ {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el}; queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_TRUE(que.connect()); ASSERT_TRUE(que.connect());
} }
for (std::size_t i = 0; i < 10000; ++i) { for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el}; queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_FALSE(que.connect()); ASSERT_FALSE(que.connect());
} }
} }
} }