From 17621c1e835563a394250b9114282337722ab44f Mon Sep 17 00:00:00 2001 From: mutouyun Date: Fri, 4 Jan 2019 18:48:21 +0800 Subject: [PATCH] preparing to refactor --- build/ipc.pro | 1 + include/circ_elem_array.h | 124 ++++++++++++++++---- include/circ_elems_array.h | 224 +++++++++++++++++++++++++++++++++++++ include/circ_queue.h | 12 +- test/test_circ.cpp | 129 +++++++++++++++++++-- 5 files changed, 450 insertions(+), 40 deletions(-) create mode 100644 include/circ_elems_array.h diff --git a/build/ipc.pro b/build/ipc.pro index 8a0370c..c894a13 100644 --- a/build/ipc.pro +++ b/build/ipc.pro @@ -18,6 +18,7 @@ INCLUDEPATH += \ HEADERS += \ ../include/export.h \ ../include/shm.h \ + ../include/circ_elems_array.h \ ../include/circ_elem_array.h \ ../include/circ_queue.h \ ../include/ipc.h \ diff --git a/include/circ_elem_array.h b/include/circ_elem_array.h index 26af8c4..a5e2f23 100644 --- a/include/circ_elem_array.h +++ b/include/circ_elem_array.h @@ -18,7 +18,9 @@ struct alignas(std::max_align_t) elem_array_head { std::atomic cc_ { 0 }; // connection counter, using for broadcast std::atomic wt_ { 0 }; // write index - static u1_t index_of(u2_t c) noexcept { return static_cast(c); } + constexpr static u1_t index_of(u2_t c) noexcept { + return static_cast(c); + } std::size_t connect() noexcept { return cc_.fetch_add(1, std::memory_order_release); @@ -36,8 +38,8 @@ struct alignas(std::max_align_t) elem_array_head { return wt_.load(std::memory_order_acquire); } - auto acquire() noexcept { - return index_of(wt_.load(std::memory_order_relaxed)); + auto acquire(std::memory_order order = std::memory_order_acquire) noexcept { + return index_of(wt_.load(order)); } void commit() noexcept { @@ -56,7 +58,7 @@ struct elem_head { }; template -class elem_array : private elem_array_head { +class elem_array : protected elem_array_head { public: using base_t = elem_array_head; using head_t = elem_head; @@ -72,7 +74,7 @@ public: block_size = elem_size * elem_max }; -private: +protected: struct elem_t { head_t head_; byte_t data_[data_size] {}; @@ -86,6 +88,25 @@ private: static elem_t* elem(void* ptr) noexcept { return reinterpret_cast(static_cast(ptr) - sizeof(head_t)); } elem_t* elem(u1_t i ) noexcept { return elem_start() + i; } + template + void* acquire(std::memory_order order, Acq&& acq, P&&... params) noexcept { + uint_t<32> conn_cnt = cc_.load(order); + if (conn_cnt == 0) return nullptr; + elem_t* el = elem(std::forward(acq)(std::memory_order_relaxed, + std::forward

(params)...)); + // check all consumers have finished reading + while(1) { + uint_t<32> expected = 0; + if (el->head_.rc_.compare_exchange_weak( + expected, conn_cnt, std::memory_order_relaxed)) { + break; + } + std::this_thread::yield(); + conn_cnt = cc_.load(std::memory_order_acquire); + } + return el->data_; + } + public: elem_array() = default; @@ -99,36 +120,95 @@ public: using base_t::conn_count; using base_t::cursor; - void* acquire() noexcept { - uint_t<32> conn_cnt = static_cast>(conn_count()); // acquire - if (conn_cnt == 0) return nullptr; - elem_t* el = elem(base_t::acquire()); - // check all consumers have finished reading - while(1) { - uint_t<32> expected = 0; - if (el->head_.rc_.compare_exchange_weak( - expected, conn_cnt, std::memory_order_relaxed)) { - break; - } - std::this_thread::yield(); - conn_cnt = static_cast>(conn_count()); // acquire - } - return el->data_; + void* acquire(std::memory_order order = std::memory_order_acquire) noexcept { + return this->acquire(order, [this](auto o) { + return base_t::acquire(o); + }); } void commit(void* /*ptr*/) noexcept { base_t::commit(); } + template + bool fetch(F&& f) noexcept { + auto p = this->acquire(); + if (p == nullptr) return false; + std::forward(f)(p); + this->commit(p); + return true; + } + void* take(u2_t cursor) noexcept { - std::atomic_thread_fence(std::memory_order_acquire); return elem(base_t::index_of(cursor))->data_; } void put(void* ptr) noexcept { - elem(ptr)->head_.rc_.fetch_sub(1, std::memory_order_release); + auto el = elem(ptr); + uint_t<32> cur_rc; + do { + cur_rc = el->head_.rc_.load(std::memory_order_relaxed); + if (cur_rc == 0) return; + } while (!el->head_.rc_.compare_exchange_weak( + cur_rc, cur_rc - 1, std::memory_order_release)); } }; +/* +template +class multi_write_array : protected elem_array { +public: + using base_t = elem_array; + using head_t = typename base_t::head_t; + using typename base_t::u1_t; + using typename base_t::u2_t; + + using base_t::head_size; + using base_t::data_size; + using base_t::elem_max; + using base_t::elem_size; + using base_t::block_size; + +protected: + std::atomic rd_ { 0 }; // ready index + +public: + using base_t::connect; + using base_t::disconnect; + using base_t::conn_count; + + u2_t cursor() const noexcept { + return rd_.load(std::memory_order_acquire); + } + + template + bool fetch(F&& f) noexcept { + u2_t cur_rd; + auto p = base_t::acquire(std::memory_order_acquire, [this, &cur_rd](auto o) { + while (1) { + u2_t cur_wt = wt_.load(o), nxt_wt = cur_wt + 1; + if (base_t::index_of(nxt_wt) == + base_t::index_of(cur_rd = rd_.load(std::memory_order_relaxed))) { + // is full + } + else if (wt_.compare_exchange_weak(cur_wt, nxt_wt, std::memory_order_relaxed)) { + return base_t::index_of(nxt_wt); + } + std::this_thread::yield(); + std::atomic_thread_fence(std::memory_order_acquire); + } + }); + if (p == nullptr) return false; + std::forward(f)(p); + while (1) { + if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { + break; + } + std::this_thread::yield(); + } + return true; + } +}; +*/ } // namespace circ } // namespace ipc diff --git a/include/circ_elems_array.h b/include/circ_elems_array.h new file mode 100644 index 0000000..1a3472d --- /dev/null +++ b/include/circ_elems_array.h @@ -0,0 +1,224 @@ +#pragma once + +#include +#include +#include + +#include "def.h" + +namespace ipc { +namespace circ { + +namespace detail { + +using u1_t = uint_t<8>; +using u2_t = uint_t<16>; + +constexpr u1_t index_of(u2_t c) noexcept { + return static_cast(c); +} + +struct elem_head { + std::atomic rc_ { 0 }; // read counter +}; + +template +struct elem_t { + elem_head head_; + byte_t data_[DataSize] {}; +}; + +template +elem_t* elem_of(void* ptr) noexcept { + return reinterpret_cast*>(static_cast(ptr) - sizeof(elem_head)); +} + +} // namespace detail + +enum class relat { // multiplicity of the relationship + single, + multi +}; + +enum class trans { // transmission + unicast, + multicast +}; + +//////////////////////////////////////////////////////////////// +/// producer-consumer policies +//////////////////////////////////////////////////////////////// + +template +struct prod_cons; + +template <> +struct prod_cons { + std::atomic rd_ { 0 }; // read index + std::atomic wt_ { 0 }; // write index + + template + constexpr static std::size_t elem_param = DataSize - sizeof(elem_head); + + constexpr detail::u2_t cursor() const noexcept { + return 0; + } + + template + bool push(E* /*elems*/, F&& f, detail::elem_t* elem_start) { + auto cur_wt = detail::index_of(wt_.load(std::memory_order_acquire)); + if (cur_wt == detail::index_of(rd_.load(std::memory_order_relaxed) - 1)) { + return false; + } + std::forward(f)(elem_start + cur_wt); + wt_.fetch_add(1, std::memory_order_release); + return true; + } + + template + bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t* elem_start) noexcept { + auto cur_rd = detail::index_of(rd_.load(std::memory_order_acquire)); + if (cur_rd == detail::index_of(wt_.load(std::memory_order_relaxed))) { + return false; + } + std::forward(f)(elem_start + cur_rd); + rd_.fetch_add(1, std::memory_order_release); + return true; + } +}; + +template <> +struct prod_cons + : prod_cons { + + template + bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t* elem_start) noexcept { + byte_t buff[sizeof(detail::elem_t)]; + while (1) { + auto cur_rd = rd_.load(std::memory_order_acquire); + if (detail::index_of(cur_rd) == + detail::index_of(wt_.load(std::memory_order_relaxed))) { + return false; + } + std::memcpy(buff, elem_start + detail::index_of(cur_rd), sizeof(buff)); + if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { + std::forward(f)(buff); + return true; + } + std::this_thread::yield(); + } + } +}; + +template <> +struct prod_cons { + std::atomic wt_ { 0 }; // write index + + template + constexpr static std::size_t elem_param = DataSize; + + using rc_t = decltype(detail::elem_head::rc_)::value_type; + + detail::u2_t cursor() const noexcept { + return wt_.load(std::memory_order_acquire); + } + + template + bool push(E* elems, F&& f, detail::elem_t* elem_start) { + auto conn_cnt = elems->conn_count(); // acquire + if (conn_cnt == 0) return false; + auto el = elem_start + detail::index_of(wt_.load(std::memory_order_relaxed)); + // check all consumers have finished reading this element + rc_t expected = 0; + if (!el->head_.rc_.compare_exchange_weak( + expected, static_cast(conn_cnt), std::memory_order_relaxed)) { + return false; + } + std::forward(f)(el->data_); + wt_.fetch_add(1, std::memory_order_release); + return true; + } + + template + bool pop(E* /*elems*/, detail::u2_t& cur, F&& f, detail::elem_t* elem_start) noexcept { + if (cur == cursor()) return false; // acquire + auto el = elem_start + detail::index_of(cur++); + std::forward(f)(el->data_); + do { + rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire); + if (cur_rc == 0) { + return true; + } + if (el->head_.rc_.compare_exchange_weak( + cur_rc, cur_rc - 1, std::memory_order_release)) { + return true; + } + std::this_thread::yield(); + } while (1); + } +}; + +//////////////////////////////////////////////////////////////// +/// element-array implementation +//////////////////////////////////////////////////////////////// + +struct elems_head { + std::atomic cc_ { 0 }; // connection counter + + std::size_t connect() noexcept { + return cc_.fetch_add(1, std::memory_order_release); + } + + std::size_t disconnect() noexcept { + return cc_.fetch_sub(1, std::memory_order_release); + } + + std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept { + return cc_.load(order); + } +}; + +template +class elems_array : private Policy { +public: + using policy_t = Policy; + using base_t = Policy; + using head_t = elems_head; + using elem_t = detail::elem_t>; + + enum : std::size_t { + head_size = sizeof(head_t), + data_size = DataSize, + elem_max = (std::numeric_limits>::max)() + 1, // default is 255 + 1 + elem_size = sizeof(elem_t), + block_size = elem_size * elem_max + }; + +private: + head_t head_; + elem_t block_[elem_max]; + +public: + elems_array() = default; + elems_array(const elems_array&) = delete; + elems_array& operator=(const elems_array&) = delete; + + std::size_t connect () noexcept { return head_.connect (); } + std::size_t disconnect() noexcept { return head_.disconnect(); } + std::size_t conn_count() const noexcept { return head_.conn_count(); } + + using base_t::cursor; + + template + bool push(F&& f) noexcept { + return base_t::push(this, std::forward(f), block_); + } + + template + bool pop(detail::u2_t& cur, F&& f) noexcept { + return base_t::pop(this, cur, std::forward(f), block_); + } +}; + +} // namespace circ +} // namespace ipc diff --git a/include/circ_queue.h b/include/circ_queue.h index 81e32bc..10e09a7 100644 --- a/include/circ_queue.h +++ b/include/circ_queue.h @@ -14,10 +14,10 @@ namespace ipc { namespace circ { -template +template class ElemArray = elem_array> class queue { public: - using array_t = elem_array; + using array_t = ElemArray; private: array_t* elems_ = nullptr; @@ -88,11 +88,9 @@ public: template auto push(P&&... params) noexcept { if (elems_ == nullptr) return false; - auto ptr = elems_->acquire(); - if (ptr == nullptr) return false; - ::new (ptr) T(std::forward

(params)...); - elems_->commit(ptr); - return true; + return elems_->fetch([&](void* p) { + ::new (p) T(std::forward

(params)...); + }); } template diff --git a/test/test_circ.cpp b/test/test_circ.cpp index 6611848..2654a2a 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -6,6 +6,7 @@ #include #include +#include "circ_elems_array.h" #include "circ_elem_array.h" #include "circ_queue.h" #include "memory/resource.hpp" @@ -13,14 +14,14 @@ namespace { -using cq_t = ipc::circ::elem_array<12>; -cq_t* cq__; - struct msg_t { int pid_; int dat_; }; +using cq_t = ipc::circ::elem_array; +cq_t* cq__; + bool operator==(msg_t const & m1, msg_t const & m2) { return (m1.pid_ == m2.pid_) && (m1.dat_ == m2.dat_); } @@ -46,9 +47,8 @@ struct test_verify { void verify(int N, int Loops) { std::cout << "verifying..." << std::endl; for (auto& c_dats : list_) { - auto& cons_vec = c_dats; for (int n = 0; n < N; ++n) { - auto& vec = cons_vec[n]; + auto& vec = c_dats[n]; QCOMPARE(vec.size(), static_cast(Loops)); int i = 0; for (int d : vec) { @@ -60,6 +60,87 @@ struct test_verify { } }; +template <> +struct test_verify + > : test_verify { + using test_verify::test_verify; + + void verify(int N, int Loops) { + std::cout << "verifying..." << std::endl; + for (int n = 0; n < N; ++n) { + std::vector datas; + std::uint64_t sum = 0; + for (auto& c_dats : list_) { + for (int d : c_dats[n]) { + datas.push_back(d); + sum += d; + } + } + QCOMPARE(datas.size(), static_cast(Loops)); + QCOMPARE(sum, (Loops * std::uint64_t(Loops - 1)) / 2); + } + } +}; + +template +struct test_cq> { + using ca_t = ipc::circ::elems_array; + + volatile bool quit_ = false; + ca_t* ca_; + + test_cq(ca_t* ca) : ca_(ca) {} + + auto connect() { + auto cur = ca_->cursor(); + ca_->connect(); + return cur; + } + + void disconnect(int) { + ca_->disconnect(); + } + + void wait_start(int M) { + while (ca_->conn_count() != static_cast(M)) { + std::this_thread::yield(); + } + } + + template + void recv(decltype(std::declval().cursor()) cur, F&& proc) { + while(1) { + msg_t* pmsg; + while (ca_->pop(cur, [&pmsg](void* p) { + pmsg = static_cast(p); + })) { + if (pmsg->pid_ < 0) { + quit_ = true; + return; + } + proc(*pmsg); + } + if (quit_) return; + std::this_thread::yield(); + } + } + + ca_t* connect_send() { + return ca_; + } + + void send(ca_t* ca, msg_t const & msg) { + while (!ca->push([&msg](void* p) { + (*static_cast(p)) = msg; + })) { + std::this_thread::yield(); + } + } +}; + template struct test_cq> { using ca_t = ipc::circ::elem_array; @@ -89,7 +170,7 @@ struct test_cq> { template void recv(cn_t cur, F&& proc) { - do { + while(1) { while (cur != ca_->cursor()) { msg_t* pmsg = static_cast(ca_->take(cur)), msg = *pmsg; @@ -99,7 +180,7 @@ struct test_cq> { proc(msg); } std::this_thread::yield(); - } while(1); + } } ca_t* connect_send() { @@ -144,11 +225,11 @@ struct test_cq> { template void recv(cn_t* queue, F&& proc) { - do { + while(1) { auto msg = queue->pop(); if (msg.pid_ < 0) return; proc(msg); - } while(1); + } } cn_t connect_send() { @@ -182,7 +263,7 @@ private slots: #include "test_circ.moc" -constexpr int LoopCount = 1000000; +constexpr int LoopCount = 10000000; void Unit::initTestCase() { TestSuite::initTestCase(); @@ -199,7 +280,7 @@ void Unit::test_inst() { std::cout << "cq_t::elem_size = " << cq_t::elem_size << std::endl; std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl; - QCOMPARE(static_cast(cq_t::data_size) , static_cast(12)); + QCOMPARE(static_cast(cq_t::data_size), sizeof(msg_t)); QCOMPARE(sizeof(cq_t), static_cast(cq_t::block_size + cq_t::head_size)); std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; @@ -218,10 +299,36 @@ void test_prod_cons() { void Unit::test_prod_cons_1v1() { test_prod_cons<1, 1>(); + + ipc::circ::elems_array< + sizeof(msg_t), + ipc::circ::prod_cons + > el_arr_ss; + benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ss); + benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ss); } void Unit::test_prod_cons_1v3() { test_prod_cons<1, 3>(); + + ipc::circ::elems_array< + sizeof(msg_t), + ipc::circ::prod_cons + > el_arr_smn; + benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smn)::policy_t>(&el_arr_smn); + benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smn); + +// ipc::circ::elems_array< +// sizeof(msg_t), +// ipc::circ::prod_cons +// > el_arr_smm; +// benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_smm); } void Unit::test_prod_cons_performance() {