From d762634380825da7fa7daf2f79f1e5fcdcd4a35a Mon Sep 17 00:00:00 2001 From: zhangyi Date: Sun, 29 Mar 2020 14:28:53 +0800 Subject: [PATCH] add impl of force_push for single-single-unicast; make test_circ works --- src/prod_cons.h | 22 +++++++++-- test/test_circ.cpp | 92 +++++++++++++++++++++++----------------------- 2 files changed, 64 insertions(+), 50 deletions(-) diff --git a/src/prod_cons.h b/src/prod_cons.h index 0470bee..27e45d2 100644 --- a/src/prod_cons.h +++ b/src/prod_cons.h @@ -45,8 +45,22 @@ struct prod_cons_impl> { } template - bool force_push(W* wrapper, F&& f, E* elems) { - return push(wrapper, std::forward(f), elems); + bool force_push(W* /*wrapper*/, F&& f, E* elems) { + auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed)); + for (unsigned k = 0;;) { + auto cur_rd = rd_.load(std::memory_order_acquire); + if (cur_wt != circ::index_of(cur_rd - 1)) { + break; + } + // full + if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_acq_rel)) { + break; + } + ipc::yield(k); + } + std::forward(f)(&(elems[cur_wt].data_)); + wt_.fetch_add(1, std::memory_order_release); + return true; } template @@ -107,7 +121,7 @@ struct prod_cons_impl> circ::index_of(rd_.load(std::memory_order_acquire))) { return false; // full } - if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_release)) { + if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_acq_rel)) { break; } ipc::yield(k); @@ -118,7 +132,7 @@ struct prod_cons_impl> el->f_ct_.store(~static_cast(cur_ct), std::memory_order_release); while (1) { auto cac_ct = el->f_ct_.load(std::memory_order_acquire); - if (cur_ct != wt_.load(std::memory_order_acquire)) { + if (cur_ct != wt_.load(std::memory_order_relaxed)) { return true; } if ((~cac_ct) != cur_ct) { diff --git a/test/test_circ.cpp b/test/test_circ.cpp index cff69a6..d076eac 100755 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -32,7 +32,6 @@ using cq_t = ea_t< sizeof(msg_t), pc_t >; -cq_t* cq__; bool operator==(msg_t const & m1, msg_t const & m2) { return (m1.pid_ == m2.pid_) && (m1.dat_ == m2.dat_); @@ -117,28 +116,28 @@ template struct test_cq> { using ca_t = ea_t; using cn_t = decltype(std::declval().connect()); + using cr_t = decltype(std::declval().cursor()); typename quit_mode

::type quit_ = false; ca_t* ca_; - cn_t cc_id_; + std::unordered_map conns_; test_cq(ca_t* ca) : ca_(ca) {} cn_t connect() { - return cc_id_ = ca_->connect(); + auto it = conns_.emplace(ca_->connect(), ca_->cursor()); + return it.first->first; + } + + ca_t* connect_send() { + return ca_; } void disconnect(cn_t cc_id) { ca_->disconnect(cc_id); } - void disconnect(ca_t* ca) { - ca->disconnect(cc_id_); - } - - cn_t connected_id() const noexcept { - return cc_id_; - } + void disconnect(ca_t*) {} ca_t * elems() noexcept { return ca_; } ca_t const * elems() const noexcept { return ca_; } @@ -150,10 +149,18 @@ struct test_cq> { } template - void recv(cn_t cur, F&& proc) { + void recv(cn_t cn, F&& proc) { + + struct que { + cn_t cn_; + // for ca_->pop + cn_t connected_id() const noexcept { return cn_; } + } q { cn }; + + cr_t& cur = conns_[cn]; while (1) { msg_t msg; - while (ca_->pop(this, &cur, [&msg](void* p) { + while (ca_->pop(&q, &cur, [&msg](void* p) { msg = *static_cast(p); })) { if (msg.pid_ < 0) { @@ -167,10 +174,6 @@ struct test_cq> { } } - ca_t* connect_send() { - return ca_; - } - void send(ca_t* ca, msg_t const & msg) { while (!ca->push(this, [&msg](void* p) { (*static_cast(p)) = msg; @@ -238,15 +241,13 @@ class Unit : public TestSuite { private slots: void initTestCase(); - void cleanupTestCase(); - void test_inst(); void test_prod_cons_1v1(); void test_prod_cons_1v3(); void test_prod_cons_performance(); void test_queue(); -}; -// } unit__; +//}; +} unit__; #include "test_circ.moc" @@ -255,11 +256,6 @@ constexpr int LoopCount = 1000000; void Unit::initTestCase() { TestSuite::initTestCase(); - cq__ = new cq_t; -} - -void Unit::cleanupTestCase() { - delete cq__; } void Unit::test_inst() { @@ -270,22 +266,10 @@ void Unit::test_inst() { QCOMPARE(static_cast(cq_t::data_size), sizeof(msg_t)); - std::cout << "sizeof(ea_t) = " << sizeof(*cq__) << std::endl; -} - -template -void test_prod_cons() { - benchmark_prod_cons>(cq__); + std::cout << "sizeof(ea_t) = " << sizeof(cq_t) << std::endl; } void Unit::test_prod_cons_1v1() { -// ea_t< -// sizeof(msg_t), -// pc_t -// > el_arr_mmb; -// benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb); -// benchmark_prod_cons<2, 1, LoopCount, void>(&el_arr_mmb); - ea_t< sizeof(msg_t), pc_t @@ -307,8 +291,12 @@ void Unit::test_prod_cons_1v1() { benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu); benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmu); - test_prod_cons<1, 1>(); - test_prod_cons<1, 1, false>(); + ea_t< + sizeof(msg_t), + pc_t + > el_arr_smb; + benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_smb); + benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smb); ea_t< sizeof(msg_t), @@ -316,6 +304,7 @@ void Unit::test_prod_cons_1v1() { > el_arr_mmb; benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_mmb); benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb); + benchmark_prod_cons<2, 1, LoopCount, void>(&el_arr_mmb); } void Unit::test_prod_cons_1v3() { @@ -333,8 +322,12 @@ void Unit::test_prod_cons_1v3() { benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu); benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmu); - test_prod_cons<1, 3>(); - test_prod_cons<1, 3, false>(); + ea_t< + sizeof(msg_t), + pc_t + > el_arr_smb; + benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_smb); + benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smb); ea_t< sizeof(msg_t), @@ -353,10 +346,17 @@ void Unit::test_prod_cons_performance() { benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smu); }); - ipc::detail::static_for<8>([](auto index) { - test_prod_cons<1, decltype(index)::value + 1, false>(); + ea_t< + sizeof(msg_t), + pc_t + > el_arr_smb; + ipc::detail::static_for<8>([&el_arr_smb](auto index) { + benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smb); + }); + // test & verify + ipc::detail::static_for<8>([&el_arr_smb](auto index) { + benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, cq_t>(&el_arr_smb); }); - test_prod_cons<1, 8>(); // test & verify ea_t< sizeof(msg_t), @@ -398,7 +398,7 @@ void Unit::test_queue() { msg_t msg {}; QVERIFY(!queue.pop(msg)); QCOMPARE(msg, (msg_t {})); - QVERIFY(sizeof(decltype(queue)::elems_t) <= sizeof(*cq__)); + QVERIFY(sizeof(decltype(queue)::elems_t) <= sizeof(cq_t)); ipc::detail::static_for<16>([](auto index) { benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((queue_t*)nullptr);