From 0222af22219b2a23b9ce88e643005192ad56ea83 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Mon, 26 Nov 2018 11:56:59 +0800 Subject: [PATCH] add test_prod_cons function template; fix some bugs for multi-producers --- src/circ_elem_array.h | 36 ++++++------ test/test_circ_elem_array.cpp | 107 ++++++++++++++++++++++------------ 2 files changed, 90 insertions(+), 53 deletions(-) diff --git a/src/circ_elem_array.h b/src/circ_elem_array.h index 4969a1e..50d3282 100644 --- a/src/circ_elem_array.h +++ b/src/circ_elem_array.h @@ -107,8 +107,7 @@ public: do { uc_t expected = 0; if (el->head_.rf_.compare_exchange_weak( - expected, static_cast(conn_count()), - std::memory_order_consume, std::memory_order_relaxed)) { + expected, static_cast(conn_count()), std::memory_order_acq_rel)) { break; } } while(1); @@ -117,38 +116,41 @@ public: void commit(void* ptr) { auto el = elem(ptr); // get the commit element - ui_t cm = index_of(el); // get the index of this element + ui_t wi = index_of(el); // get the index of this element (the write index) do { - bool no_next_check; - uc_t curr; + bool no_next, cas; + uc_t curr = cr_.load(std::memory_order_relaxed), next; do { - curr = cr_.load(std::memory_order_relaxed); - no_next_check = (index_of(curr) != cm); - if (no_next_check) { + next = curr; + if ((no_next = (index_of(curr) != wi)) /* assign & judge */) { /* + * commit is not the current commit * set wf_ for the other producer thread which is commiting * the element matches cr_ could see it has commited */ - el->head_.wf_.store(1, std::memory_order_relaxed); + el->head_.wf_.store(1, std::memory_order_release); } else { /* - * no thread changes the cr_ except current thread at here - * so we could just fetch_add & break, no need to check cr_ again + * commit is the current commit + * so we should increase the cursor & go check the next */ - cr_.fetch_add(1, std::memory_order_relaxed); + ++next; el->head_.wf_.store(0, std::memory_order_release); - no_next_check = false; - break; } /* * it needs to go back and judge again * when cr_ has been changed by the other producer thread */ - } while(curr != cr_.load(std::memory_order_acq_rel)); + } while(!(cas = cr_.compare_exchange_weak(curr, next, std::memory_order_acq_rel)) && no_next); + /* + * if compare_exchange failed & !no_next, + * means there is another producer thread just update this commit, + * so in this case we could just return + */ + if (no_next || (!cas/* && !no_next*/)) return; // check next element has commited or not - if (no_next_check) return; - } while(el = elem(++cm), el->head_.wf_.load(std::memory_order_consume)); + } while(el = elem(++wi), el->head_.wf_.load(std::memory_order_consume)); } uc_t cursor(void) const { diff --git a/test/test_circ_elem_array.cpp b/test/test_circ_elem_array.cpp index d1c01fb..10c8f4b 100644 --- a/test/test_circ_elem_array.cpp +++ b/test/test_circ_elem_array.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include "circ_elem_array.h" @@ -17,7 +18,9 @@ class Unit : public TestSuite { private slots: void test_inst(void); + void test_prod_cons_1v1(void); void test_prod_cons_1vN(void); + void test_prod_cons_Nv1(void); } unit__; #include "test_circ_elem_array.moc" @@ -38,50 +41,60 @@ void Unit::test_inst(void) { cq__ = new cq_t; std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; - auto a = cq__->take(1); - auto b = cq__->take(2); - QCOMPARE(static_cast(static_cast(b) - - static_cast(a)), - static_cast(cq_t::elem_size)); +// auto a = cq__->take(1); +// auto b = cq__->take(2); +// QCOMPARE(static_cast(static_cast(b) - +// static_cast(a)), +// static_cast(cq_t::elem_size)); } -void Unit::test_prod_cons_1vN(void) { +template +void test_prod_cons(void) { ::new (cq__) cq_t; - std::thread consumers[3]; + std::thread producers[N]; + std::thread consumers[M]; std::atomic_int fini { 0 }; capo::stopwatch<> sw; - constexpr static int loops = 10000000; - for (auto& c : consumers) { - c = std::thread{[&] { + struct msg_t { + int pid_; + int dat_; + }; + + for (auto& t : consumers) { + t = std::thread{[&] { auto cur = cq__->cursor(); - std::cout << "start consumer " << &c << ": cur = " << (int)cur << std::endl; + std::cout << "start consumer " << &t << ": cur = " << (int)cur << std::endl; cq__->connect(); - auto disconn = [](cq_t* cq) { cq->disconnect(); }; - std::unique_ptr guard(cq__, disconn); + std::unique_ptr guard(cq__, [](cq_t* cq) { cq->disconnect(); }); - std::vector list; - int i = 0; + std::unordered_map> list; do { while (cur != cq__->cursor()) { - auto p = static_cast(cq__->take(cur)); - int d = *p; - cq__->put(p); - if (d < 0) goto finished; + msg_t* pmsg = static_cast(cq__->take(cur)), + msg = *pmsg; + cq__->put(pmsg); + if (msg.pid_ < 0) goto finished; ++cur; - list.push_back(d); + list[msg.pid_].push_back(msg.dat_); } } while(1); finished: if (++fini == std::extent::value) { auto ts = sw.elapsed(); - std::cout << "performance: " << (double(ts) / double(loops)) << " us/d" << std::endl; + std::cout << "[" << N << ":" << M << ", " << Loops << "]" << std::endl + << "performance: " << (double(ts) / double(Loops)) << " us/d" << std::endl; } std::cout << "confirming..." << std::endl; - for (int d : list) { - QCOMPARE(i, d); - ++i; + for (int n = 0; n < static_cast(std::extent::value); ++n) { + auto& vec = list[n]; + QCOMPARE(vec.size(), static_cast(Loops)); + int i = 0; + for (int d : vec) { + QCOMPARE(i, d); + ++i; + } } }}; } @@ -90,20 +103,42 @@ void Unit::test_prod_cons_1vN(void) { std::this_thread::yield(); } - std::cout << "start producer..." << std::endl; - sw.start(); - for (int i = 0; i < loops; ++i) { - auto d = static_cast(cq__->acquire()); - *d = i; - cq__->commit(d); + std::cout << "start producers..." << std::endl; + std::atomic_flag started = ATOMIC_FLAG_INIT; + int pid = 0; + for (auto& t : producers) { + t = std::thread{[&, pid] { + if (!started.test_and_set()) { + sw.start(); + } + for (int i = 0; i < Loops; ++i) { + msg_t* pmsg = static_cast(cq__->acquire()); + pmsg->pid_ = pid; + pmsg->dat_ = i; + cq__->commit(pmsg); + } + }}; + ++pid; } - auto d = static_cast(cq__->acquire()); - *d = -1; - cq__->commit(d); + for (auto& t : producers) t.join(); + // quit + msg_t* pmsg = static_cast(cq__->acquire()); + pmsg->pid_ = pmsg->dat_ = -1; + cq__->commit(pmsg); - for (auto& c : consumers) { - c.join(); - } + for (auto& t : consumers) t.join(); +} + +void Unit::test_prod_cons_1v1(void) { + test_prod_cons<1, 1>(); +} + +void Unit::test_prod_cons_1vN(void) { + test_prod_cons<1, 3>(); +} + +void Unit::test_prod_cons_Nv1(void) { + test_prod_cons<2, 1>(); } } // internal-linkage