From 731f61a3c1c8730fe81835d1e2603ee14e34c660 Mon Sep 17 00:00:00 2001 From: zhangyi Date: Mon, 25 Mar 2019 20:14:59 +0800 Subject: [PATCH] optimize structure --- src/circ/elem_def.h | 8 -- src/ipc.cpp | 65 +++++++++++++--- src/platform/waiter_wrapper.h | 30 ++++++++ src/queue.h | 135 +++++++--------------------------- test/test.h | 1 + test/test_circ.cpp | 40 +++++----- test/test_ipc.cpp | 25 ++----- 7 files changed, 137 insertions(+), 167 deletions(-) diff --git a/src/circ/elem_def.h b/src/circ/elem_def.h index 7f77714..8787f95 100644 --- a/src/circ/elem_def.h +++ b/src/circ/elem_def.h @@ -7,7 +7,6 @@ #include "rw_lock.h" -#include "platform/waiter_wrapper.h" #include "platform/detail.h" namespace ipc { @@ -25,7 +24,6 @@ constexpr u1_t index_of(u2_t c) noexcept { } class conn_head { - ipc::detail::waiter cc_waiter_, waiter_; std::atomic cc_ { 0 }; // connection counter ipc::spin_lock lc_; @@ -47,12 +45,6 @@ public: conn_head(const conn_head&) = delete; conn_head& operator=(const conn_head&) = delete; - auto & waiter() noexcept { return this->waiter_; } - auto const & waiter() const noexcept { return this->waiter_; } - - auto & conn_waiter() noexcept { return this->cc_waiter_; } - auto const & conn_waiter() const noexcept { return this->cc_waiter_; } - std::size_t connect() noexcept { return cc_.fetch_add(1, std::memory_order_release); } diff --git a/src/ipc.cpp b/src/ipc.cpp index 8c9e9bf..7817ebc 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include "def.h" #include "shm.h" @@ -13,9 +14,12 @@ #include "pool_alloc.h" #include "queue.h" #include "policy.h" +#include "rw_lock.h" #include "memory/resource.h" + #include "platform/detail.h" +#include "platform/waiter_wrapper.h" namespace { @@ -76,17 +80,32 @@ struct detail_impl { using queue_t = ipc::queue, Policy>; +struct conn_info_t { + queue_t que_; + waiter cc_waiter_, wt_waiter_, rd_waiter_; + + conn_info_t(char const * name) + : que_ ((std::string{ "__QU_CONN__" } + name).c_str()) { + cc_waiter_.open((std::string{ "__CC_CONN__" } + name).c_str()); + wt_waiter_.open((std::string{ "__WT_CONN__" } + name).c_str()); + rd_waiter_.open((std::string{ "__RD_CONN__" } + name).c_str()); + } +}; + constexpr static void* head_of(queue_t* que) { return static_cast(que->elems()); } -constexpr static queue_t* queue_of(ipc::handle_t h) { - return static_cast(h); +constexpr static conn_info_t* info_of(ipc::handle_t h) { + return static_cast(h); } -static auto& queues_cache() { - static tls::pointer> qc; - return *qc.create(); +constexpr static queue_t* queue_of(ipc::handle_t h) { + auto info = info_of(h); + if (info == nullptr) { + return nullptr; + } + return &(info->que_); } static auto& recv_cache() { @@ -106,7 +125,7 @@ static auto& recv_cache() { /* API implementations */ static ipc::handle_t connect(char const * name) { - return mem::alloc(name); + return mem::alloc(name); } static void disconnect(ipc::handle_t h) { @@ -114,8 +133,10 @@ static void disconnect(ipc::handle_t h) { if (que == nullptr) { return; } - que->disconnect(); // needn't to detach, cause it will be deleted soon. - mem::free(que); + if (que->disconnect()) { + info_of(h)->cc_waiter_.broadcast(); + } + mem::free(info_of(h)); } static std::size_t recv_count(ipc::handle_t h) { @@ -131,7 +152,14 @@ static bool wait_for_recv(ipc::handle_t h, std::size_t r_count) { if (que == nullptr) { return false; } - return que->wait_for_connect(r_count); + for (unsigned k = 0; que->conn_count() < r_count;) { + ipc::sleep(k, [h, que, r_count] { + return info_of(h)->cc_waiter_.wait_if([que, r_count] { + return que->conn_count() < r_count; + }); + }); + } + return true; } static bool send(ipc::handle_t h, void const * data, std::size_t size) { @@ -154,6 +182,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { static_cast(data) + offset, data_length)) { std::this_thread::yield(); } + info_of(h)->rd_waiter_.broadcast(); } // if remain > 0, this is the last message fragment int remain = static_cast(size) - offset; @@ -162,6 +191,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { static_cast(data) + offset, static_cast(remain))) { std::this_thread::yield(); } + info_of(h)->rd_waiter_.broadcast(); } return true; } @@ -169,11 +199,22 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { static buff_t recv(ipc::handle_t h) { auto que = queue_of(h); if (que == nullptr) return {}; - que->connect(); // wouldn't connect twice + if (que->connect()) { // wouldn't connect twice + info_of(h)->cc_waiter_.broadcast(); + } auto& rc = recv_cache(); while (1) { // pop a new message - auto msg = que->pop(); + typename queue_t::value_t msg; + for (unsigned k = 0; !que->pop(msg);) { + bool succ = false; + ipc::sleep(k, [h, que, &msg, &succ] { + return info_of(h)->rd_waiter_.wait_if([que, &msg, &succ] { + return !(succ = que->pop(msg)); + }); + }); + if (succ) break; + } if (msg.head_.que_ == nullptr) return {}; if (msg.head_.que_ == que) continue; // pop next // msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length @@ -217,7 +258,7 @@ namespace ipc { namespace detail { std::size_t calc_unique_id() { - static shm::handle g_shm { "__GLOBAL_ACC_STORAGE__", sizeof(std::atomic) }; + static shm::handle g_shm { "__IPC_GLOBAL_ACC_STORAGE__", sizeof(std::atomic) }; return static_cast*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed); } diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 8f9b55e..5bb16ec 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -219,4 +219,34 @@ public: }; } // namespace detail + +class waiter : public detail::waiter_wrapper { + + shm::handle shm_; + + using detail::waiter_wrapper::attach; + +public: + ~waiter() { + close(); + } + + bool open(char const * name) { + if (name == nullptr || name[0] == '\0') { + return false; + } + close(); + if (!shm_.acquire((std::string{ "__SHM_WAITER__" } + name).c_str(), sizeof(waiter_t))) { + return false; + } + attach(static_cast(shm_.get())); + return detail::waiter_wrapper::open((std::string{ "__IMP_WAITER__" } + name).c_str()); + } + + void close() { + detail::waiter_wrapper::close(); + shm_.release(); + } +}; + } // namespace ipc diff --git a/src/queue.h b/src/queue.h index 03ce4d0..521afe3 100644 --- a/src/queue.h +++ b/src/queue.h @@ -9,71 +9,48 @@ #include #include #include -#include #include "def.h" #include "shm.h" +#include "log.h" #include "rw_lock.h" -#include "platform/waiter_wrapper.h" #include "platform/detail.h" namespace ipc { namespace detail { -class queue_waiter { +class queue_conn { protected: - ipc::detail::waiter_wrapper waiter_; - ipc::detail::waiter_wrapper cc_waiter_; - bool connected_ = false; shm::handle elems_h_; template Elems* open(char const * name) { + if (name == nullptr || name[0] == '\0') { + ipc::error("fail open waiter: name is empty!\n"); + return nullptr; + } if (!elems_h_.acquire(name, sizeof(Elems))) { return nullptr; } auto elems = static_cast(elems_h_.get()); if (elems == nullptr) { + ipc::error("fail acquire elems: %s\n", name); return nullptr; } elems->init(); return elems; } - template - void open(Elems*(& elems), char const * name) { - assert(name != nullptr && name[0] != '\0'); - if (elems == nullptr) { - elems = open(name); - } - assert(elems != nullptr); - waiter_.attach(&(elems->waiter())); - if (!waiter_.open((std::string{ "__IPC_WAITER__" } + name).c_str())) { - return; - } - cc_waiter_.attach(&(elems->conn_waiter())); - cc_waiter_.open((std::string{ "__IPC_CC_WAITER__" } + name).c_str()); - } - void close() { - waiter_.close(); - waiter_.attach(nullptr); - cc_waiter_.close(); - cc_waiter_.attach(nullptr); - } - - template - void close(Elems* /*elems*/) { - close(); elems_h_.release(); } public: - queue_waiter() = default; - queue_waiter(const queue_waiter&) = delete; - queue_waiter& operator=(const queue_waiter&) = delete; + queue_conn() = default; + queue_conn(const queue_conn&) = delete; + queue_conn& operator=(const queue_conn&) = delete; bool connected() const noexcept { return connected_; @@ -89,9 +66,7 @@ public: } connected_ = true; elems->connect(); - auto ret = std::make_tuple(true, elems->cursor()); - cc_waiter_.broadcast(); - return ret; + return std::make_tuple(true, elems->cursor()); } template @@ -103,27 +78,13 @@ public: } connected_ = false; elems->disconnect(); - cc_waiter_.broadcast(); - return true; - } - - template - bool wait_for_connect(Elems* elems, std::size_t count) { - if (elems == nullptr) return false; - for (unsigned k = 0; elems->conn_count() < count;) { - ipc::sleep(k, [this, elems, count] { - return cc_waiter_.wait_if([elems, count] { - return elems->conn_count() < count; - }); - }); - } return true; } }; template -class queue_base : public queue_waiter { - using base_t = queue_waiter; +class queue_base : public queue_conn { + using base_t = queue_conn; public: using elems_t = Elems; @@ -140,16 +101,11 @@ public: explicit queue_base(char const * name) : queue_base() { - attach(nullptr, name); + elems_ = open(name); } - explicit queue_base(elems_t* els, char const * name = nullptr) - : queue_base() { - attach(els, name); - } - - /* not virtual */ ~queue_base(void) { - base_t::close(elems_); + /* not virtual */ ~queue_base() { + base_t::close(); } constexpr elems_t * elems() const noexcept { @@ -173,10 +129,6 @@ public: return (elems_ == nullptr) ? invalid_value : elems_->conn_count(); } - bool wait_for_connect(std::size_t count) { - return base_t::wait_for_connect(elems_, count); - } - bool valid() const noexcept { return elems_ != nullptr; } @@ -185,57 +137,22 @@ public: return (elems_ == nullptr) ? true : (cursor_ == elems_->cursor()); } - elems_t* attach(elems_t* els, char const * name = nullptr) noexcept { - auto old = elems_; - elems_ = els; - if (name == nullptr || name[0] == '\0') { - base_t::close(old); - } - else base_t::open(elems_, name); - return old; - } - - elems_t* detach() noexcept { - if (elems_ == nullptr) return nullptr; - base_t::close(nullptr); // not release shm - auto old = elems_; - elems_ = nullptr; - return old; - } - template auto push(P&&... params) { if (elems_ == nullptr) return false; - if (elems_->push([&](void* p) { + return elems_->push([&](void* p) { ::new (p) T(std::forward

(params)...); - })) { - this->waiter_.broadcast(); - return true; - } - return false; + }); } template - T pop() { + bool pop(T& item) { if (elems_ == nullptr) { - return {}; - } - T item; - auto pop_item = [this, &item] { - return elems_->pop(&this->cursor_, [&item](void* p) { - ::new (&item) T(std::move(*static_cast(p))); - }); - }; - for (unsigned k = 0;;) { - if (pop_item()) return item; - bool succ = false; - ipc::sleep(k, [this, &succ, &pop_item] { - return this->waiter_.wait_if([&succ, &pop_item] { - return !(succ = pop_item()); - }); - }); - if (succ) return item; + return false; } + return elems_->pop(&(this->cursor_), [&item](void* p) { + ::new (&item) T(std::move(*static_cast(p))); + }); } }; @@ -246,6 +163,8 @@ class queue : public detail::queue_base>; public: + using value_t = T; + using base_t::base_t; using base_t::push; using base_t::pop; @@ -255,8 +174,8 @@ public: return base_t::template push(std::forward

(params)...); } - T pop() { - return base_t::template pop(); + bool pop(T& item) { + return base_t::pop(item); } }; diff --git a/test/test.h b/test/test.h index 8b3ed16..2a09a5d 100644 --- a/test/test.h +++ b/test/test.h @@ -146,6 +146,7 @@ void benchmark_prod_cons(T* cq) { } // quit tcq.send(cn, { -1, -1 }); + tcq.disconnect(cn); }}; ++pid; } diff --git a/test/test_circ.cpp b/test/test_circ.cpp index e377c8e..7deeea1 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -133,6 +133,9 @@ struct test_cq> { ca_->disconnect(); } + void disconnect(ca_t*) { + } + void wait_start(int M) { while (ca_->conn_count() != static_cast(M)) { std::this_thread::yield(); @@ -173,28 +176,23 @@ struct test_cq> { template struct test_cq> { using cn_t = ipc::queue; - using ca_t = typename cn_t::elems_t; - ca_t* ca_; - - test_cq(void*) : ca_(reinterpret_cast(cq__)) { - ::new (ca_) ca_t; - } + test_cq(void*) {} cn_t* connect() { - cn_t* queue = new cn_t { ca_ }; + cn_t* queue = new cn_t { "test-ipc-queue" }; [&] { QVERIFY(queue->connect()); } (); return queue; } void disconnect(cn_t* queue) { - QVERIFY(queue->disconnect()); - QVERIFY(queue->detach() != nullptr); + queue->disconnect(); delete queue; } void wait_start(int M) { - while (ca_->conn_count() != static_cast(M)) { + cn_t que("test-ipc-queue"); + while (que.conn_count() != static_cast(M)) { std::this_thread::yield(); } } @@ -202,18 +200,21 @@ struct test_cq> { template void recv(cn_t* queue, F&& proc) { while(1) { - auto msg = queue->pop(); + typename cn_t::value_t msg; + while (!queue->pop(msg)) { + std::this_thread::yield(); + } if (msg.pid_ < 0) return; proc(msg); } } cn_t* connect_send() { - return nullptr; + return new cn_t { "test-ipc-queue" }; } - void send(cn_t* /*cn*/, msg_t const & msg) { - while (!cn_t{ ca_ }.push(msg)) { + void send(cn_t* cn, msg_t const & msg) { + while (!cn->push(msg)) { std::this_thread::yield(); } } @@ -385,15 +386,12 @@ void Unit::test_queue() { >>; queue_t queue; - queue.push(msg_t { 1, 2 }); - QCOMPARE(queue.pop(), msg_t{}); + QVERIFY(!queue.push(msg_t { 1, 2 })); + msg_t msg {}; + QVERIFY(!queue.pop(msg)); + QCOMPARE(msg, (msg_t {})); QVERIFY(sizeof(decltype(queue)::elems_t) <= sizeof(*cq__)); - std::memset(cq__, 0, sizeof(decltype(queue)::elems_t)); - auto cq = reinterpret_cast(cq__); - queue.attach(cq); - QVERIFY(queue.detach() != nullptr); - ipc::detail::static_for(std::make_index_sequence<16>{}, [](auto index) { benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((queue_t*)nullptr); }); diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 6d0c544..91e8a62 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -113,17 +113,11 @@ struct test_cq { std::string conn_name_; int m_ = 0; - std::vector s_cns_; - ipc::rw_lock lc_; test_cq(void*) : conn_name_("test-ipc-channel") { } - ~test_cq() { - for (auto p : s_cns_) delete p; - } - cn_t connect() { return cn_t { conn_name_.c_str() }; } @@ -146,27 +140,22 @@ struct test_cq { } while(1); } - cn_t* connect_send() { - auto p = new cn_t { conn_name_.c_str() }; - { - std::unique_lock guard { lc_ }; - s_cns_.push_back(p); - } - return p; + cn_t connect_send() { + return connect(); } - void send(cn_t* cn, const std::array& info) { + void send(cn_t& cn, const std::array& info) { thread_local struct s_dummy { - s_dummy(cn_t* cn, int m) { - cn->wait_for_recv(static_cast(m)); + s_dummy(cn_t& cn, int m) { + cn.wait_for_recv(static_cast(m)); // std::printf("start to send: %d.\n", m); } } _(cn, m_); int n = info[1]; if (n < 0) { - /*QVERIFY*/(cn->send(ipc::buff_t('\0'))); + /*QVERIFY*/(cn.send(ipc::buff_t('\0'))); } - else /*QVERIFY*/(cn->send(datas__[static_cast(n)])); + else /*QVERIFY*/(cn.send(datas__[static_cast(n)])); } };