From eefea828f0d3428c67334502589fb2d6e614eddb Mon Sep 17 00:00:00 2001 From: mutouyun Date: Tue, 27 Nov 2018 10:47:23 +0800 Subject: [PATCH] not support N:M (prod-cons) --- src/circ_elem_array.h | 88 ++++++++--------------------------- test/test_circ_elem_array.cpp | 20 +++----- 2 files changed, 26 insertions(+), 82 deletions(-) diff --git a/src/circ_elem_array.h b/src/circ_elem_array.h index 79ec151..549a237 100644 --- a/src/circ_elem_array.h +++ b/src/circ_elem_array.h @@ -18,12 +18,10 @@ namespace circ { struct alignas(std::max_align_t) elem_array_head { using ui_t = std::uint8_t; using uc_t = std::uint16_t; - using ai_t = std::atomic; using ac_t = std::atomic; ac_t cc_ { 0 }; // connection counter, using for broadcast - ac_t cr_ { 0 }; // cursor - ai_t wt_ { 0 }; // write index + ac_t wt_ { 0 }; // write index }; enum : std::size_t { @@ -36,9 +34,7 @@ enum : std::size_t { template class elem_array : private elem_array_head { struct head_t { - ac_t rf_; // read flag - std::atomic_bool wf_; // write flag - std::atomic_flag acq_; // acquire flag + std::atomic rc_ { 0 }; // read counter }; public: @@ -56,6 +52,7 @@ private: struct elem_t { head_t head_; byte_t data_[data_size]; + elem_t(void) { ::memset(data_, 0, sizeof(data_)); } }; elem_t block_[elem_max]; @@ -80,9 +77,7 @@ private: } public: - elem_array(void) { - ::memset(block_, 0, sizeof(block_)); - } + elem_array(void) = default; ~elem_array(void) = delete; elem_array(const elem_array&) = delete; @@ -103,73 +98,28 @@ public: } void* acquire(void) { - elem_t* el; - while (1) { - // searching an available element - el = elem(wt_.fetch_add(1, std::memory_order_acquire)); - if (el->head_.acq_.test_and_set(std::memory_order_release)) { - std::this_thread::yield(); - continue; + elem_t* el = elem(wt_.load(std::memory_order_acquire)); + // check read finished by all consumers + while(1) { + std::size_t expected = 0; + if (el->head_.rc_.compare_exchange_weak( + expected, + static_cast(cc_.load(std::memory_order_relaxed)), + std::memory_order_release)) { + break; } - // check read finished by all consumers - while(1) { - uc_t expected = 0; - std::atomic_thread_fence(std::memory_order_acquire); - if (el->head_.rf_.compare_exchange_weak( - expected, cc_.load(std::memory_order_relaxed), std::memory_order_release)) { - break; - } - std::this_thread::yield(); - } - el->head_.acq_.clear(std::memory_order_release); - break; + std::this_thread::yield(); + std::atomic_thread_fence(std::memory_order_acquire); } return el->data_; } - void commit(void* ptr) { - auto el = elem(ptr); // get the commit element - ui_t wi = index_of(el); // get the index of this element (the write index) - do { - bool no_next, cas; - uc_t curr = cr_.load(std::memory_order_consume), next; - do { - next = curr; - if ((no_next = (index_of(curr) != wi)) /* assign & judge */) { - /* - * commit is not the current commit - * set wf_ for the other producer thread which is commiting - * the element matches cr_ could see it has commited - */ - el->head_.wf_.store(true, std::memory_order_release); - } - else { - /* - * commit is the current commit - * so we just increase the cursor & go check the next - */ - ++next; - el->head_.wf_.store(false, std::memory_order_release); - } - /* - * it needs to go back and judge again - * when cr_ has been changed by the other producer thread - */ - } while(!(cas = cr_.compare_exchange_weak(curr, next, std::memory_order_acq_rel)) && no_next); - /* - * if compare_exchange failed & !no_next, - * means there is another producer thread updated this commit, - * so in this case we could just return - */ - if (no_next || (!cas/* && !no_next*/)) return; - /* - * check next element has commited or not - */ - } while(el = elem(++wi), el->head_.wf_.exchange(false, std::memory_order_acq_rel)); + void commit(void* /*ptr*/) { + wt_.fetch_add(1, std::memory_order_release); } uc_t cursor(void) const { - return cr_.load(std::memory_order_consume); + return wt_.load(std::memory_order_consume); } void* take(uc_t cursor) { @@ -177,7 +127,7 @@ public: } void put(void* ptr) { - elem(ptr)->head_.rf_.fetch_sub(1, std::memory_order_release); + elem(ptr)->head_.rc_.fetch_sub(1, std::memory_order_release); } }; diff --git a/test/test_circ_elem_array.cpp b/test/test_circ_elem_array.cpp index 87bfad6..8bc8e47 100644 --- a/test/test_circ_elem_array.cpp +++ b/test/test_circ_elem_array.cpp @@ -19,7 +19,6 @@ private slots: void test_inst(void); void test_prod_cons_1v1(void); void test_prod_cons_1vN(void); - void test_prod_cons_Nv1(void); } unit__; #include "test_circ_elem_array.moc" @@ -34,17 +33,16 @@ void Unit::test_inst(void) { std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl; QCOMPARE(static_cast(cq_t::data_size) , static_cast(12)); - QCOMPARE(static_cast(cq_t::block_size), static_cast(4096)); QCOMPARE(sizeof(cq_t), static_cast(cq_t::block_size + cq_t::head_size)); cq__ = new cq_t; std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; -// auto a = cq__->take(1); -// auto b = cq__->take(2); -// QCOMPARE(static_cast(static_cast(b) - -// static_cast(a)), -// static_cast(cq_t::elem_size)); + auto a = cq__->take(1); + auto b = cq__->take(2); + QCOMPARE(static_cast(static_cast(b) - + static_cast(a)), + static_cast(cq_t::elem_size)); } template @@ -129,15 +127,11 @@ void test_prod_cons(void) { } void Unit::test_prod_cons_1v1(void) { -// test_prod_cons<1, 1>(); + test_prod_cons<1, 1>(); } void Unit::test_prod_cons_1vN(void) { -// test_prod_cons<1, 3>(); -} - -void Unit::test_prod_cons_Nv1(void) { - test_prod_cons<2, 1>(); + test_prod_cons<1, 3>(); } } // internal-linkage