add test_prod_cons function template; fix some bugs for multi-producers

This commit is contained in:
mutouyun 2018-11-26 11:56:59 +08:00
parent 7e24273a64
commit 0222af2221
2 changed files with 90 additions and 53 deletions

View File

@ -107,8 +107,7 @@ public:
do { do {
uc_t expected = 0; uc_t expected = 0;
if (el->head_.rf_.compare_exchange_weak( if (el->head_.rf_.compare_exchange_weak(
expected, static_cast<uc_t>(conn_count()), expected, static_cast<uc_t>(conn_count()), std::memory_order_acq_rel)) {
std::memory_order_consume, std::memory_order_relaxed)) {
break; break;
} }
} while(1); } while(1);
@ -117,38 +116,41 @@ public:
void commit(void* ptr) { void commit(void* ptr) {
auto el = elem(ptr); // get the commit element auto el = elem(ptr); // get the commit element
ui_t cm = index_of(el); // get the index of this element ui_t wi = index_of(el); // get the index of this element (the write index)
do { do {
bool no_next_check; bool no_next, cas;
uc_t curr; uc_t curr = cr_.load(std::memory_order_relaxed), next;
do { do {
curr = cr_.load(std::memory_order_relaxed); next = curr;
no_next_check = (index_of(curr) != cm); if ((no_next = (index_of(curr) != wi)) /* assign & judge */) {
if (no_next_check) {
/* /*
* commit is not the current commit
* set wf_ for the other producer thread which is commiting * set wf_ for the other producer thread which is commiting
* the element matches cr_ could see it has commited * the element matches cr_ could see it has commited
*/ */
el->head_.wf_.store(1, std::memory_order_relaxed); el->head_.wf_.store(1, std::memory_order_release);
} }
else { else {
/* /*
* no thread changes the cr_ except current thread at here * commit is the current commit
* so we could just fetch_add & break, no need to check cr_ again * so we should increase the cursor & go check the next
*/ */
cr_.fetch_add(1, std::memory_order_relaxed); ++next;
el->head_.wf_.store(0, std::memory_order_release); el->head_.wf_.store(0, std::memory_order_release);
no_next_check = false;
break;
} }
/* /*
* it needs to go back and judge again * it needs to go back and judge again
* when cr_ has been changed by the other producer thread * when cr_ has been changed by the other producer thread
*/ */
} while(curr != cr_.load(std::memory_order_acq_rel)); } while(!(cas = cr_.compare_exchange_weak(curr, next, std::memory_order_acq_rel)) && no_next);
/*
* if compare_exchange failed & !no_next,
* means there is another producer thread just update this commit,
* so in this case we could just return
*/
if (no_next || (!cas/* && !no_next*/)) return;
// check next element has commited or not // check next element has commited or not
if (no_next_check) return; } while(el = elem(++wi), el->head_.wf_.load(std::memory_order_consume));
} while(el = elem(++cm), el->head_.wf_.load(std::memory_order_consume));
} }
uc_t cursor(void) const { uc_t cursor(void) const {

View File

@ -4,6 +4,7 @@
#include <memory> #include <memory>
#include <new> #include <new>
#include <vector> #include <vector>
#include <unordered_map>
#include <thread> #include <thread>
#include "circ_elem_array.h" #include "circ_elem_array.h"
@ -17,7 +18,9 @@ class Unit : public TestSuite {
private slots: private slots:
void test_inst(void); void test_inst(void);
void test_prod_cons_1v1(void);
void test_prod_cons_1vN(void); void test_prod_cons_1vN(void);
void test_prod_cons_Nv1(void);
} unit__; } unit__;
#include "test_circ_elem_array.moc" #include "test_circ_elem_array.moc"
@ -38,50 +41,60 @@ void Unit::test_inst(void) {
cq__ = new cq_t; cq__ = new cq_t;
std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl;
auto a = cq__->take(1); // auto a = cq__->take(1);
auto b = cq__->take(2); // auto b = cq__->take(2);
QCOMPARE(static_cast<std::size_t>(static_cast<ipc::byte_t*>(b) - // QCOMPARE(static_cast<std::size_t>(static_cast<ipc::byte_t*>(b) -
static_cast<ipc::byte_t*>(a)), // static_cast<ipc::byte_t*>(a)),
static_cast<std::size_t>(cq_t::elem_size)); // static_cast<std::size_t>(cq_t::elem_size));
} }
void Unit::test_prod_cons_1vN(void) { template <int N, int M, int Loops = 1000000>
void test_prod_cons(void) {
::new (cq__) cq_t; ::new (cq__) cq_t;
std::thread consumers[3]; std::thread producers[N];
std::thread consumers[M];
std::atomic_int fini { 0 }; std::atomic_int fini { 0 };
capo::stopwatch<> sw; capo::stopwatch<> sw;
constexpr static int loops = 10000000;
for (auto& c : consumers) { struct msg_t {
c = std::thread{[&] { int pid_;
int dat_;
};
for (auto& t : consumers) {
t = std::thread{[&] {
auto cur = cq__->cursor(); auto cur = cq__->cursor();
std::cout << "start consumer " << &c << ": cur = " << (int)cur << std::endl; std::cout << "start consumer " << &t << ": cur = " << (int)cur << std::endl;
cq__->connect(); cq__->connect();
auto disconn = [](cq_t* cq) { cq->disconnect(); }; std::unique_ptr<cq_t, void(*)(cq_t*)> guard(cq__, [](cq_t* cq) { cq->disconnect(); });
std::unique_ptr<cq_t, decltype(disconn)> guard(cq__, disconn);
std::vector<int> list; std::unordered_map<int, std::vector<int>> list;
int i = 0;
do { do {
while (cur != cq__->cursor()) { while (cur != cq__->cursor()) {
auto p = static_cast<int*>(cq__->take(cur)); msg_t* pmsg = static_cast<msg_t*>(cq__->take(cur)),
int d = *p; msg = *pmsg;
cq__->put(p); cq__->put(pmsg);
if (d < 0) goto finished; if (msg.pid_ < 0) goto finished;
++cur; ++cur;
list.push_back(d); list[msg.pid_].push_back(msg.dat_);
} }
} while(1); } while(1);
finished: finished:
if (++fini == std::extent<decltype(consumers)>::value) { if (++fini == std::extent<decltype(consumers)>::value) {
auto ts = sw.elapsed<std::chrono::microseconds>(); auto ts = sw.elapsed<std::chrono::microseconds>();
std::cout << "performance: " << (double(ts) / double(loops)) << " us/d" << std::endl; std::cout << "[" << N << ":" << M << ", " << Loops << "]" << std::endl
<< "performance: " << (double(ts) / double(Loops)) << " us/d" << std::endl;
} }
std::cout << "confirming..." << std::endl; std::cout << "confirming..." << std::endl;
for (int d : list) { for (int n = 0; n < static_cast<int>(std::extent<decltype(producers)>::value); ++n) {
QCOMPARE(i, d); auto& vec = list[n];
++i; QCOMPARE(vec.size(), static_cast<std::size_t>(Loops));
int i = 0;
for (int d : vec) {
QCOMPARE(i, d);
++i;
}
} }
}}; }};
} }
@ -90,20 +103,42 @@ void Unit::test_prod_cons_1vN(void) {
std::this_thread::yield(); std::this_thread::yield();
} }
std::cout << "start producer..." << std::endl; std::cout << "start producers..." << std::endl;
sw.start(); std::atomic_flag started = ATOMIC_FLAG_INIT;
for (int i = 0; i < loops; ++i) { int pid = 0;
auto d = static_cast<int*>(cq__->acquire()); for (auto& t : producers) {
*d = i; t = std::thread{[&, pid] {
cq__->commit(d); if (!started.test_and_set()) {
sw.start();
}
for (int i = 0; i < Loops; ++i) {
msg_t* pmsg = static_cast<msg_t*>(cq__->acquire());
pmsg->pid_ = pid;
pmsg->dat_ = i;
cq__->commit(pmsg);
}
}};
++pid;
} }
auto d = static_cast<int*>(cq__->acquire()); for (auto& t : producers) t.join();
*d = -1; // quit
cq__->commit(d); msg_t* pmsg = static_cast<msg_t*>(cq__->acquire());
pmsg->pid_ = pmsg->dat_ = -1;
cq__->commit(pmsg);
for (auto& c : consumers) { for (auto& t : consumers) t.join();
c.join(); }
}
void Unit::test_prod_cons_1v1(void) {
test_prod_cons<1, 1>();
}
void Unit::test_prod_cons_1vN(void) {
test_prod_cons<1, 3>();
}
void Unit::test_prod_cons_Nv1(void) {
test_prod_cons<2, 1>();
} }
} // internal-linkage } // internal-linkage