diff --git a/src/circ_elem_array.h b/src/circ_elem_array.h index 38f6e9c..4969a1e 100644 --- a/src/circ_elem_array.h +++ b/src/circ_elem_array.h @@ -103,7 +103,7 @@ public: void* acquire(void) { auto el = elem(wt_.fetch_add(1, std::memory_order_consume)); - // check read flag + // check read finished by all consumers do { uc_t expected = 0; if (el->head_.rf_.compare_exchange_weak( @@ -116,26 +116,39 @@ public: } void commit(void* ptr) { - auto el = elem(ptr); - ui_t wt = index_of(el); + auto el = elem(ptr); // get the commit element + ui_t cm = index_of(el); // get the index of this element do { - bool no_next; + bool no_next_check; uc_t curr; do { curr = cr_.load(std::memory_order_relaxed); - no_next = (index_of(curr) != wt); - if (no_next) { + no_next_check = (index_of(curr) != cm); + if (no_next_check) { + /* + * set wf_ for the other producer thread which is commiting + * the element matches cr_ could see it has commited + */ el->head_.wf_.store(1, std::memory_order_relaxed); } else { + /* + * no thread changes the cr_ except current thread at here + * so we could just fetch_add & break, no need to check cr_ again + */ cr_.fetch_add(1, std::memory_order_relaxed); el->head_.wf_.store(0, std::memory_order_release); - no_next = false; + no_next_check = false; break; } + /* + * it needs to go back and judge again + * when cr_ has been changed by the other producer thread + */ } while(curr != cr_.load(std::memory_order_acq_rel)); - if (no_next) return; - } while(el = elem(++wt), el->head_.wf_.load(std::memory_order_consume)); + // check next element has commited or not + if (no_next_check) return; + } while(el = elem(++cm), el->head_.wf_.load(std::memory_order_consume)); } uc_t cursor(void) const { diff --git a/test/test_circ_elem_array.cpp b/test/test_circ_elem_array.cpp index 80e64d2..d563766 100644 --- a/test/test_circ_elem_array.cpp +++ b/test/test_circ_elem_array.cpp @@ -31,9 +31,9 @@ void Unit::test_inst(void) { std::cout << "cq_t::elem_size = " << cq_t::elem_size << std::endl; std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl; - QCOMPARE(cq_t::data_size , 12); - QCOMPARE(cq_t::block_size, 4096); - QCOMPARE(sizeof(cq_t), cq_t::block_size + cq_t::head_size); + QCOMPARE(static_cast(cq_t::data_size) , static_cast(12)); + QCOMPARE(static_cast(cq_t::block_size), static_cast(4096)); + QCOMPARE(sizeof(cq_t), static_cast(cq_t::block_size + cq_t::head_size)); cq__ = new cq_t; std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; @@ -50,7 +50,7 @@ void Unit::test_prod_cons_1vN(void) { std::thread consumers[1]; std::atomic_int fini { 0 }; capo::stopwatch<> sw; - constexpr static int loops = 1000000; + constexpr static int loops = 10000000; for (auto& c : consumers) { c = std::thread{[&] { @@ -78,6 +78,7 @@ void Unit::test_prod_cons_1vN(void) { auto ts = sw.elapsed(); std::cout << "performance: " << (double(ts) / double(loops)) << " us/d" << std::endl; } + std::cout << "confirming..." << std::endl; for (int d : list) { QCOMPARE(i, d); ++i;