From b3c577988b2d7405607eed301ec5a88c2c70e4c6 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 16 Jun 2019 17:10:03 +0800 Subject: [PATCH] use storage-flag, simplify codes, use one acc handle for all connections & messages. --- src/ipc.cpp | 69 +++++++++++++++++++++++++---------------------------- 1 file changed, 33 insertions(+), 36 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index b4789e5..a0c6828 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -35,9 +35,10 @@ struct msg_t; template struct msg_t<0, AlignSize> { - void* que_; + msg_id_t conn_; msg_id_t id_; int remain_; + bool storage_; }; template @@ -46,11 +47,17 @@ struct msg_t { std::aligned_storage_t data_ {}; msg_t() = default; - msg_t(void* q, msg_id_t i, int r, void const * d, std::size_t s) - : head_ { q, i, r } { + msg_t(msg_id_t c, msg_id_t i, int r, void const * d, std::size_t s) + : head_ { c, i, r, false } { if ((d != nullptr) && (s > 0)) { std::memcpy(&data_, d, s); } + else { + head_.storage_ = true; + if (d != nullptr) { + std::memcpy(&data_, d, sizeof(msg_id_t)); + } + } } }; @@ -80,9 +87,13 @@ struct cache_t { struct conn_info_head { using acc_t = std::atomic; - std::string name_; + static auto acc() { + static shm::handle acc_h("__AC_CONN__", sizeof(acc_t)); + return static_cast(acc_h.get()); + } + + msg_id_t cc_id_; // connection-info id waiter cc_waiter_, wt_waiter_, rd_waiter_; - shm::handle acc_h_; struct simple_push { @@ -101,34 +112,23 @@ struct conn_info_head { ++ wt_; return true; } - - template - bool force_push(W* /*wrapper*/, F&& /*f*/, E* /*elems*/) { - return false; - } - - template - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& /*f*/, E* /*elems*/) { - return false; - } }; circ::elem_array msg_datas_; conn_info_head(char const * name) - : name_(name) - , cc_waiter_(("__CC_CONN__" + name_).c_str()) - , wt_waiter_(("__WT_CONN__" + name_).c_str()) - , rd_waiter_(("__RD_CONN__" + name_).c_str()) - , acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) { + : cc_id_ ((acc() == nullptr) ? 0 : acc()->fetch_add(1, std::memory_order_relaxed)) + , cc_waiter_((std::string { "__CC_CONN__" } + name).c_str()) + , wt_waiter_((std::string { "__WT_CONN__" } + name).c_str()) + , rd_waiter_((std::string { "__RD_CONN__" } + name).c_str()) { } - auto acc() { - return static_cast(acc_h_.get()); + static shm::handle apply_storage(msg_id_t msg_id, std::size_t size) { + return { ("__ST_CONN__" + std::to_string(msg_id)).c_str(), size, shm::create }; } - shm::handle apply_storage(msg_id_t msg_id, std::size_t size, unsigned mode) { - return { ("__IPC_DAT_STORAGE__" + name_ + "__" + std::to_string(msg_id)).c_str(), size, mode }; + static shm::handle apply_storage(msg_id_t msg_id) { + return { ("__ST_CONN__" + std::to_string(msg_id)).c_str(), 0, shm::open }; } void store(shm::handle && dat) { @@ -270,7 +270,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > small_msg_limit) { - auto dat = info_of(h)->apply_storage(msg_id, size, shm::create); + auto dat = info_of(h)->apply_storage(msg_id, size); void * buf = dat.get(); if (buf != nullptr) { std::memcpy(buf, data, size); @@ -305,10 +305,10 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { return send([](auto info, auto que, auto msg_id) { return [info, que, msg_id](int remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(que, msg_id, remain, data, size); + return !que->push(info->cc_id_, msg_id, remain, data, size); }, que->dis_flag() ? 0 : static_cast(default_timeut))) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push(que, msg_id, remain, data, size)) { + if (!que->force_push(info->cc_id_, msg_id, remain, data, size)) { return false; } } @@ -322,7 +322,7 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size) { return send([](auto info, auto que, auto msg_id) { return [info, que, msg_id](int remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(que, msg_id, remain, data, size); + return !que->push(info->cc_id_, msg_id, remain, data, size); }, 0)) { return false; } @@ -345,16 +345,13 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { while (1) { // pop a new message typename queue_t::value_t msg; - if (!wait_for(info_of(h)->rd_waiter_, - [que, &msg] { return !que->pop(msg); }, tm)) { + if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { return {}; } info_of(h)->wt_waiter_.broadcast(); - if (msg.head_.que_ == nullptr) { - ipc::error("fail: recv, msg.head_.que_ == nullptr\n"); - return {}; + if ((info_of(h)->acc() != nullptr) && (msg.head_.conn_ == info_of(h)->cc_id_)) { + continue; // ignore message to self } - if (msg.head_.que_ == que) continue; // pop next // msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length auto remain = static_cast(static_cast(data_length) + msg.head_.remain_); // find cache with msg.head_.id_ @@ -363,8 +360,8 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { if (remain <= data_length) { return make_cache(msg.data_, remain); } - if (remain > small_msg_limit) { - auto dat = info_of(h)->apply_storage(msg.head_.id_, 0, shm::open); + if (msg.head_.storage_) { + auto dat = info_of(h)->apply_storage(msg.head_.id_); void * buf = dat.get(); if (buf != nullptr && remain <= dat.size()) { auto id = dat.detach();