#include #include #include #include #include #include #include #include #include "circ_elem_array.h" #include "test.h" #include "stopwatch.hpp" namespace { class Unit : public TestSuite { Q_OBJECT private slots: void test_inst(void); void test_prod_cons_1v1(void); void test_prod_cons_1v3(void); void test_prod_cons_performance(void); } unit__; #include "test_circ_elem_array.moc" using cq_t = ipc::circ::elem_array<12>; cq_t* cq__; void Unit::test_inst(void) { std::cout << "cq_t::head_size = " << cq_t::head_size << std::endl; std::cout << "cq_t::data_size = " << cq_t::data_size << std::endl; std::cout << "cq_t::elem_size = " << cq_t::elem_size << std::endl; std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl; QCOMPARE(static_cast(cq_t::data_size) , static_cast(12)); QCOMPARE(sizeof(cq_t), static_cast(cq_t::block_size + cq_t::head_size)); cq__ = new cq_t; std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; auto a = cq__->take(1); auto b = cq__->take(2); QCOMPARE(static_cast(static_cast(b) - static_cast(a)), static_cast(cq_t::elem_size)); } template void test_prod_cons(void) { ::new (cq__) cq_t; std::thread producers[N]; std::thread consumers[M]; std::atomic_int fini { 0 }; capo::stopwatch<> sw; struct msg_t { int pid_; int dat_; }; std::unordered_map> list[std::extent::value]; auto push_data = Confirmation ? [](std::vector& l, int dat) { l.push_back(dat); } : [](std::vector&, int) {}; int cid = 0; for (auto& t : consumers) { t = std::thread{[&, cid] { auto cur = cq__->cursor(); if (Confirmation) { std::cout << "start consumer " << &t << ": cur = " << (int)cur << std::endl; } cq__->connect(); std::unique_ptr guard(cq__, [](cq_t* cq) { cq->disconnect(); }); do { while (cur != cq__->cursor()) { msg_t* pmsg = static_cast(cq__->take(cur)), msg = *pmsg; cq__->put(pmsg); if (msg.pid_ < 0) goto finished; ++cur; push_data(list[cid][msg.pid_], msg.dat_); } std::this_thread::yield(); } while(1); finished: if (++fini != std::extent::value) return; auto ts = sw.elapsed(); std::cout << "[" << N << ":" << M << ", " << Loops << "]" << std::endl << "performance: " << (double(ts) / double(Loops * N)) << " us/d" << std::endl; if (!Confirmation) return; std::cout << "confirming..." << std::endl; for (auto& cons_vec : list) { for (int n = 0; n < static_cast(std::extent::value); ++n) { auto& vec = cons_vec[n]; QCOMPARE(vec.size(), static_cast(Loops)); int i = 0; for (int d : vec) { QCOMPARE(i, d); ++i; } } } }}; ++cid; } while (cq__->conn_count() != std::extent::value) { std::this_thread::yield(); } std::cout << "start producers..." << std::endl; std::atomic_flag started = ATOMIC_FLAG_INIT; int pid = 0; for (auto& t : producers) { t = std::thread{[&, pid] { if (!started.test_and_set()) { sw.start(); } for (int i = 0; i < Loops; ++i) { msg_t* pmsg = static_cast(cq__->acquire()); pmsg->pid_ = pid; pmsg->dat_ = i; cq__->commit(pmsg); } }}; ++pid; } for (auto& t : producers) t.join(); // quit msg_t* pmsg = static_cast(cq__->acquire()); pmsg->pid_ = pmsg->dat_ = -1; cq__->commit(pmsg); for (auto& t : consumers) t.join(); } void Unit::test_prod_cons_1v1(void) { test_prod_cons<1, 1>(); } void Unit::test_prod_cons_1v3(void) { test_prod_cons<1, 3>(); } template struct test_performance { static void start(void) { test_prod_cons<1, B, false>(); test_performance::start(); } }; template struct test_performance { static void start(void) { test_prod_cons<1, E, false>(); } }; void Unit::test_prod_cons_performance(void) { test_performance<1, 10>::start(); } } // internal-linkage