From af6ac84110e96ebeb5112796f290a12065ca7256 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Fri, 1 Jan 2021 12:39:32 +0800 Subject: [PATCH 1/6] add comments --- include/libipc/ipc.h | 10 ++++++++-- src/ipc.cpp | 8 ++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/include/libipc/ipc.h b/include/libipc/ipc.h index ed50d8e..b2569a9 100755 --- a/include/libipc/ipc.h +++ b/include/libipc/ipc.h @@ -117,6 +117,9 @@ public: return chan_wrapper(name).wait_for_recv(r_count, tm); } + /** + * If timeout, this function would call 'force_push' to send the data forcibly. + */ bool send(void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::send(h_, data, size, tm); } @@ -127,6 +130,9 @@ public: return this->send(str.c_str(), str.size() + 1, tm); } + /** + * If timeout, this function would just return false. + */ bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::try_send(h_, data, size, tm); } @@ -149,7 +155,7 @@ public: template using chan = chan_wrapper>; -/* +/** * class route * * You could use one producer/server/sender for sending messages to a route, @@ -162,7 +168,7 @@ using chan = chan_wrapper>; using route = chan; -/* +/** * class channel * * You could use multi producers/writers for sending messages to a channel, diff --git a/src/ipc.cpp b/src/ipc.cpp index f2ddd37..9bfa291 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -357,14 +357,14 @@ static void disconnect(ipc::handle_t h) { } } -static bool reconnect(ipc::handle_t * ph, bool start) { +static bool reconnect(ipc::handle_t * ph, bool start_to_recv) { assert(ph != nullptr); assert(*ph != nullptr); auto que = queue_of(*ph); if (que == nullptr) { return false; } - if (start) { + if (start_to_recv) { if (que->connect()) { // wouldn't connect twice info_of(*ph)->cc_waiter_.broadcast(); } @@ -376,12 +376,12 @@ static bool reconnect(ipc::handle_t * ph, bool start) { return true; } -static bool connect(ipc::handle_t * ph, char const * name, bool start) { +static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) { assert(ph != nullptr); if (*ph == nullptr) { *ph = ipc::mem::alloc(name); } - return reconnect(ph, start); + return reconnect(ph, start_to_recv); } static void destroy(ipc::handle_t h) { From c4617a229030a49032ae5b7bc57639d35414218f Mon Sep 17 00:00:00 2001 From: mutouyun Date: Fri, 1 Jan 2021 13:28:25 +0800 Subject: [PATCH 2/6] 'elem-array::connect' should always return 0 when the connection-slot is full. --- src/ipc.cpp | 6 ++++++ src/libipc/circ/elem_def.h | 8 ++++++-- test/test_queue.cpp | 14 ++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index 9bfa291..46c2859 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -422,6 +422,12 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s ipc::error("fail: send, queue_of(h)->elems() == nullptr\n"); return false; } + /** + * If the que hasn't connected as a receiver, the 'connected-id' must be 0, + * and 'connections' equals to 0 only if there are no receivers. + * Or if the que has connected as a receiver, + * 'connections' equals to 'connected-id' of this que only if there are no other receivers. + */ if (que->elems()->connections(std::memory_order_relaxed) == que->connected_id()) { // there is no receiver on this connection return false; diff --git a/src/libipc/circ/elem_def.h b/src/libipc/circ/elem_def.h index 4b9feb8..b608581 100755 --- a/src/libipc/circ/elem_def.h +++ b/src/libipc/circ/elem_def.h @@ -23,9 +23,9 @@ constexpr u1_t index_of(u2_t c) noexcept { } class conn_head { - std::atomic cc_; // connections + std::atomic cc_{0}; // connections ipc::spin_lock lc_; - std::atomic constructed_; + std::atomic constructed_{false}; public: void init() { @@ -47,6 +47,10 @@ public: for (unsigned k = 0;;) { cc_t curr = cc_.load(std::memory_order_acquire); cc_t next = curr | (curr + 1); // find the first 0, and set it to 1. + if (next == 0) { + // connection-slot is full. + return 0; + } if (cc_.compare_exchange_weak(curr, next, std::memory_order_release)) { return next ^ curr; // return connected id } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index c10d3c9..68c190b 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -127,6 +127,20 @@ TEST(Queue, check_size) { std::cout << "sizeof(elems_t) = " << sizeof(el_t) << std::endl; } +TEST(Queue, connection) { + { + using el_t = elems_t; + el_t el; + el.init(); + for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { + EXPECT_NE(el.connect(), 0); + } + for (std::size_t i = 0; i < 10000; ++i) { + EXPECT_EQ(el.connect(), 0); + } + } +} + TEST(Queue, prod_cons_1v1_unicast) { test_sr(elems_t {}, 1, 1, "ssu"); test_sr(elems_t {}, 1, 1, "smu"); From 6163618433d1ec9e7d2ec6d92599061159562efe Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 3 Jan 2021 12:52:03 +0800 Subject: [PATCH 3/6] =?UTF-8?q?=E9=92=88=E5=AF=B9=E4=B8=8D=E5=90=8C?= =?UTF-8?q?=E7=B1=BB=E5=9E=8B=E7=9A=84=E7=AD=96=E7=95=A5=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E4=B8=8D=E5=90=8C=E7=9A=84sender/receiver=E4=B8=AA?= =?UTF-8?q?=E6=95=B0=E6=A3=80=E6=9F=A5=E3=80=82=20=20-=20is=5Fmulti=5Fprod?= =?UTF-8?q?ucer=EF=BC=9Asender=E6=97=A0=E9=99=90=E5=88=B6=EF=BC=9B?= =?UTF-8?q?=E5=90=A6=E5=88=99=E4=BB=85=E5=85=81=E8=AE=B8=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=20=20-=20is=5Fmulti=5Fconsumer=EF=BC=9Areceiver=E4=B8=AA?= =?UTF-8?q?=E6=95=B0=E4=B8=8A=E9=99=90=E4=BE=9D=E8=B5=96is=5Fbroadcast?= =?UTF-8?q?=E6=8C=87=E5=AE=9A=EF=BC=9B=E5=90=A6=E5=88=99=E4=BB=85=E5=85=81?= =?UTF-8?q?=E8=AE=B8=E4=B8=80=E4=B8=AA=20=20-=20is=5Fbroadcast=EF=BC=9Arec?= =?UTF-8?q?eiver=E4=B8=AA=E6=95=B0=E4=B8=8A=E9=99=90=E4=B8=BA32=EF=BC=88ui?= =?UTF-8?q?nt=5Ft<32>=E4=BD=8D=E6=95=B0=EF=BC=89=EF=BC=9B=E5=90=A6?= =?UTF-8?q?=E5=88=99=E6=97=A0=E9=99=90=E5=88=B6=EF=BC=88uint=5Ft<32>?= =?UTF-8?q?=E5=A4=A7=E5=B0=8F=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 行为变更: 1. 在连接时根据模式检查sender/receiver是否超出上限,超出则返回false 2. 在send时确认是否允许发送(对receiver模式来说,send之前不会尝试确认sender个数) 3. 修正若干bug --- demo/msg_que/main.cpp | 7 +- include/libipc/def.h | 13 +++ include/libipc/ipc.h | 37 ++++++--- src/ipc.cpp | 63 ++++++++------- src/libipc/circ/elem_array.h | 85 +++++++++++++++++++- src/libipc/circ/elem_def.h | 61 ++++++++++---- src/libipc/prod_cons.h | 103 ++++++++++++++---------- src/libipc/queue.h | 55 ++++++++----- test/test_queue.cpp | 151 ++++++++++++++++++++++++++++++----- 9 files changed, 431 insertions(+), 144 deletions(-) diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index 2c19e76..654b4ff 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -111,10 +111,13 @@ void do_recv() { int main(int argc, char ** argv) { if (argc < 2) return 0; - ::signal(SIGINT, [](int) { + auto exit = [](int) { is_quit__.store(true, std::memory_order_release); que__.disconnect(); - }); + }; + ::signal(SIGINT , exit); + ::signal(SIGBREAK, exit); + ::signal(SIGTERM , exit); if (std::string{ argv[1] } == mode_s__) { do_send(); diff --git a/include/libipc/def.h b/include/libipc/def.h index 9732acf..d9cfdc0 100755 --- a/include/libipc/def.h +++ b/include/libipc/def.h @@ -48,4 +48,17 @@ enum class trans { // transmission template struct wr {}; +template +struct relat_trait; + +template +struct relat_trait> { + constexpr static bool is_multi_producer = (Rp == relat::multi); + constexpr static bool is_multi_consumer = (Rc == relat::multi); + constexpr static bool is_broadcast = (Ts == trans::broadcast); +}; + +template