From c9ce4b466a337fa1482fc5fbf705243bec3fe6b4 Mon Sep 17 00:00:00 2001 From: zhangyi Date: Wed, 28 Nov 2018 11:53:10 +0800 Subject: [PATCH] add circ_queue test (TBD) --- include/circ_queue.h | 52 +++++++-- test/test_circ.cpp | 243 +++++++++++++++++++++++++++++++++---------- 2 files changed, 231 insertions(+), 64 deletions(-) diff --git a/include/circ_queue.h b/include/circ_queue.h index 45e6a94..0230253 100644 --- a/include/circ_queue.h +++ b/include/circ_queue.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "circ_elem_array.h" @@ -21,22 +22,59 @@ public: using array_t = elem_array; private: - array_t * elems_ = nullptr; + array_t* elems_ = nullptr; typename std::result_of::type cursor_ = 0; public: - std::size_t connect(array_t* arr) { - if (arr == nullptr) return error_count; - elems_ = arr; + queue(void) = default; + + explicit queue(array_t* arr) : queue() { + attch(arr); + } + + queue(queue&& rhs) : queue() { + swap(rhs); + } + + ~queue(void) { detach(); } + + void swap(queue& rhs) { + std::swap(elems_ , rhs.elems_ ); + std::swap(cursor_, rhs.cursor_); + } + + queue& operator=(queue rhs) { + swap(rhs); + return *this; + } + + array_t * elems(void) { + return elems_; + } + + std::size_t connect(void) { + if (elems_ == nullptr) return error_count; cursor_ = elems_->cursor(); return elems_->connect(); } std::size_t disconnect(void) { if (elems_ == nullptr) return error_count; - auto ret = elems_->disconnect(); + return elems_->disconnect(); + } + + array_t* attch(array_t* arr) { + if (arr == nullptr) return nullptr; + auto old = elems_; + elems_ = arr; + return old; + } + + array_t* detach(void) { + if (elems_ == nullptr) return nullptr; + auto old = elems_; elems_ = nullptr; - return ret; + return old; } std::size_t conn_count(void) const { @@ -59,7 +97,7 @@ public: std::this_thread::yield(); } auto item_ptr = static_cast(elems_->take(cursor_)); - T item { *item_ptr }; + T item = *item_ptr; elems_->put(item_ptr); return item; } diff --git a/test/test_circ.cpp b/test/test_circ.cpp index c99121b..a535335 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -65,93 +65,219 @@ struct msg_t { int dat_; }; -template -void test_prod_cons(void) { - ::new (cq__) cq_t; +struct test_stopwatch { + capo::stopwatch<> sw_; + std::atomic_flag started_ = ATOMIC_FLAG_INIT; + + void start(void) { + if (!started_.test_and_set()) { + sw_.start(); + } + } + + void print_elapsed(int N, int M, int Loops) { + auto ts = sw_.elapsed(); + std::cout << "[" << N << ":" << M << ", " << Loops << "]" << std::endl + << "performance: " << (double(ts) / double(Loops * N)) << " us/d" << std::endl; + } +}; + +template +struct test_confirm { + std::unordered_map>* list_; + int lcount_; + + test_confirm(int M) { + list_ = new std::remove_reference_t[lcount_ = M]; + } + + ~test_confirm(void) { + delete [] list_; + } + + void prepare(void* pt) { + std::cout << "start consumer: " << pt << std::endl; + } + + void push_data(int cid, msg_t const & msg) { + list_[cid][msg.pid_].push_back(msg.dat_); + } + + void verify(int N, int Loops) { + std::cout << "confirming..." << std::endl; + for (int m = 0; m < lcount_; ++m) { + auto& cons_vec = list_[m]; + for (int n = 0; n < N; ++n) { + auto& vec = cons_vec[n]; + QCOMPARE(vec.size(), static_cast(Loops)); + int i = 0; + for (int d : vec) { + QCOMPARE(i, d); + ++i; + } + } + } + } +}; + +template <> +struct test_confirm { + test_confirm (int) {} + void prepare (void*) {} + void push_data(int, msg_t const &) {} + void verify (int, int) {} +}; + +template +struct test_cq; + +template +struct test_cq> { + using ca_t = ipc::circ::elem_array; + using cn_t = typename std::result_of::type; + + ca_t* ca_; + + test_cq(ca_t* ca) : ca_(ca) { + ::new (ca) ca_t; + } + + cn_t connect(void) { + auto cur = ca_->cursor(); + ca_->connect(); + return cur; + } + + void disconnect(cn_t) { + ca_->disconnect(); + } + + void wait_start(int M) { + while (ca_->conn_count() != static_cast(M)) { + std::this_thread::yield(); + } + } + + template + void recv(cn_t cur, F&& proc) { + do { + while (cur != ca_->cursor()) { + msg_t* pmsg = static_cast(ca_->take(cur)), + msg = *pmsg; + ca_->put(pmsg); + if (msg.pid_ < 0) return; + ++cur; + proc(msg); + } + std::this_thread::yield(); + } while(1); + } + + void send(msg_t const & msg) { + msg_t* pmsg = static_cast(ca_->acquire()); + (*pmsg) = msg; + ca_->commit(pmsg); + } +}; + +template +struct test_cq> { + using cn_t = ipc::circ::queue; + using ca_t = typename cn_t::array_t; + + ca_t* ca_; + + test_cq(void*) : ca_(reinterpret_cast(cq__)) { + ::new (ca_) ca_t; + } + + cn_t connect(void) { + cn_t queue; + [&] { + queue.attch(ca_); + QVERIFY(queue.connect() != ipc::error_count); + } (); + return queue; + } + + void disconnect(cn_t& queue) { + QVERIFY(queue.disconnect() != ipc::error_count); + QVERIFY(queue.detach() != nullptr); + } + + void wait_start(int M) { + while (ca_->conn_count() != static_cast(M)) { + std::this_thread::yield(); + } + } + + template + void recv(cn_t& queue, F&& proc) { + do { + auto msg = queue.pop(); + if (msg.pid_ < 0) return; + proc(msg); + } while(1); + } + + void send(msg_t const & msg) { + cn_t{ ca_ }.push(msg); + } +}; + +template +void test_prod_cons(T* cq) { + test_cq tcq { cq }; + std::thread producers[N]; std::thread consumers[M]; std::atomic_int fini { 0 }; - capo::stopwatch<> sw; - std::unordered_map> list[std::extent::value]; - auto push_data = Confirmation ? [](std::vector& l, int dat) { - l.push_back(dat); - } : [](std::vector&, int) {}; + test_stopwatch sw; + test_confirm cf { M }; int cid = 0; for (auto& t : consumers) { t = std::thread{[&, cid] { - auto cur = cq__->cursor(); - if (Confirmation) { - std::cout << "start consumer " << &t << ": cur = " << (int)cur << std::endl; - } + cf.prepare(&t); + auto cn = tcq.connect(); - cq__->connect(); - std::unique_ptr guard(cq__, [](cq_t* cq) { cq->disconnect(); }); + using namespace std::placeholders; + tcq.recv(cn, std::bind(&test_confirm::push_data, &cf, cid, _1)); - do { - while (cur != cq__->cursor()) { - msg_t* pmsg = static_cast(cq__->take(cur)), - msg = *pmsg; - cq__->put(pmsg); - if (msg.pid_ < 0) goto finished; - ++cur; - push_data(list[cid][msg.pid_], msg.dat_); - } - std::this_thread::yield(); - } while(1); - finished: + tcq.disconnect(cn); if (++fini != std::extent::value) return; - auto ts = sw.elapsed(); - std::cout << "[" << N << ":" << M << ", " << Loops << "]" << std::endl - << "performance: " << (double(ts) / double(Loops * N)) << " us/d" << std::endl; - if (!Confirmation) return; - std::cout << "confirming..." << std::endl; - for (auto& cons_vec : list) { - for (int n = 0; n < static_cast(std::extent::value); ++n) { - auto& vec = cons_vec[n]; - QCOMPARE(vec.size(), static_cast(Loops)); - int i = 0; - for (int d : vec) { - QCOMPARE(i, d); - ++i; - } - } - } + sw.print_elapsed(N, M, Loops); + cf.verify(N, Loops); }}; ++cid; } - while (cq__->conn_count() != std::extent::value) { - std::this_thread::yield(); - } + tcq.wait_start(M); std::cout << "start producers..." << std::endl; - std::atomic_flag started = ATOMIC_FLAG_INIT; int pid = 0; for (auto& t : producers) { t = std::thread{[&, pid] { - if (!started.test_and_set()) { - sw.start(); - } + sw.start(); for (int i = 0; i < Loops; ++i) { - msg_t* pmsg = static_cast(cq__->acquire()); - pmsg->pid_ = pid; - pmsg->dat_ = i; - cq__->commit(pmsg); + tcq.send({ pid, i }); } }}; ++pid; } for (auto& t : producers) t.join(); // quit - msg_t* pmsg = static_cast(cq__->acquire()); - pmsg->pid_ = pmsg->dat_ = -1; - cq__->commit(pmsg); + tcq.send({ -1, -1 }); for (auto& t : consumers) t.join(); } +template +void test_prod_cons(void) { + test_prod_cons(cq__); +} + void Unit::test_prod_cons_1v1(void) { test_prod_cons<1, 1>(); } @@ -186,7 +312,10 @@ void Unit::test_queue(void) { QVERIFY(sizeof(decltype(queue)::array_t) <= sizeof(*cq__)); auto cq = ::new (cq__) decltype(queue)::array_t; - QVERIFY(queue.connect(cq) != ipc::error_count); + queue.attch(cq); + QVERIFY(queue.detach() != nullptr); + + test_prod_cons<1, 1>((ipc::circ::queue*)nullptr); } } // internal-linkage