add circ_queue test (TBD)

This commit is contained in:
zhangyi 2018-11-28 11:53:10 +08:00
parent 2905b55fcf
commit c9ce4b466a
2 changed files with 231 additions and 64 deletions

View File

@ -4,6 +4,7 @@
#include <new> #include <new>
#include <exception> #include <exception>
#include <utility> #include <utility>
#include <algorithm>
#include "circ_elem_array.h" #include "circ_elem_array.h"
@ -25,18 +26,55 @@ private:
typename std::result_of<decltype(&array_t::cursor)(array_t)>::type cursor_ = 0; typename std::result_of<decltype(&array_t::cursor)(array_t)>::type cursor_ = 0;
public: public:
std::size_t connect(array_t* arr) { queue(void) = default;
if (arr == nullptr) return error_count;
elems_ = arr; explicit queue(array_t* arr) : queue() {
attch(arr);
}
queue(queue&& rhs) : queue() {
swap(rhs);
}
~queue(void) { detach(); }
void swap(queue& rhs) {
std::swap(elems_ , rhs.elems_ );
std::swap(cursor_, rhs.cursor_);
}
queue& operator=(queue rhs) {
swap(rhs);
return *this;
}
array_t * elems(void) {
return elems_;
}
std::size_t connect(void) {
if (elems_ == nullptr) return error_count;
cursor_ = elems_->cursor(); cursor_ = elems_->cursor();
return elems_->connect(); return elems_->connect();
} }
std::size_t disconnect(void) { std::size_t disconnect(void) {
if (elems_ == nullptr) return error_count; if (elems_ == nullptr) return error_count;
auto ret = elems_->disconnect(); return elems_->disconnect();
}
array_t* attch(array_t* arr) {
if (arr == nullptr) return nullptr;
auto old = elems_;
elems_ = arr;
return old;
}
array_t* detach(void) {
if (elems_ == nullptr) return nullptr;
auto old = elems_;
elems_ = nullptr; elems_ = nullptr;
return ret; return old;
} }
std::size_t conn_count(void) const { std::size_t conn_count(void) const {
@ -59,7 +97,7 @@ public:
std::this_thread::yield(); std::this_thread::yield();
} }
auto item_ptr = static_cast<T*>(elems_->take(cursor_)); auto item_ptr = static_cast<T*>(elems_->take(cursor_));
T item { *item_ptr }; T item = *item_ptr;
elems_->put(item_ptr); elems_->put(item_ptr);
return item; return item;
} }

View File

@ -65,50 +65,49 @@ struct msg_t {
int dat_; int dat_;
}; };
template <int N, int M, bool Confirmation = true, int Loops = 1000000> struct test_stopwatch {
void test_prod_cons(void) { capo::stopwatch<> sw_;
::new (cq__) cq_t; std::atomic_flag started_ = ATOMIC_FLAG_INIT;
std::thread producers[N];
std::thread consumers[M];
std::atomic_int fini { 0 };
capo::stopwatch<> sw;
std::unordered_map<int, std::vector<int>> list[std::extent<decltype(consumers)>::value]; void start(void) {
auto push_data = Confirmation ? [](std::vector<int>& l, int dat) { if (!started_.test_and_set()) {
l.push_back(dat); sw_.start();
} : [](std::vector<int>&, int) {}; }
int cid = 0;
for (auto& t : consumers) {
t = std::thread{[&, cid] {
auto cur = cq__->cursor();
if (Confirmation) {
std::cout << "start consumer " << &t << ": cur = " << (int)cur << std::endl;
} }
cq__->connect(); void print_elapsed(int N, int M, int Loops) {
std::unique_ptr<cq_t, void(*)(cq_t*)> guard(cq__, [](cq_t* cq) { cq->disconnect(); }); auto ts = sw_.elapsed<std::chrono::microseconds>();
do {
while (cur != cq__->cursor()) {
msg_t* pmsg = static_cast<msg_t*>(cq__->take(cur)),
msg = *pmsg;
cq__->put(pmsg);
if (msg.pid_ < 0) goto finished;
++cur;
push_data(list[cid][msg.pid_], msg.dat_);
}
std::this_thread::yield();
} while(1);
finished:
if (++fini != std::extent<decltype(consumers)>::value) return;
auto ts = sw.elapsed<std::chrono::microseconds>();
std::cout << "[" << N << ":" << M << ", " << Loops << "]" << std::endl std::cout << "[" << N << ":" << M << ", " << Loops << "]" << std::endl
<< "performance: " << (double(ts) / double(Loops * N)) << " us/d" << std::endl; << "performance: " << (double(ts) / double(Loops * N)) << " us/d" << std::endl;
if (!Confirmation) return; }
};
template <bool Confirmation>
struct test_confirm {
std::unordered_map<int, std::vector<int>>* list_;
int lcount_;
test_confirm(int M) {
list_ = new std::remove_reference_t<decltype(*list_)>[lcount_ = M];
}
~test_confirm(void) {
delete [] list_;
}
void prepare(void* pt) {
std::cout << "start consumer: " << pt << std::endl;
}
void push_data(int cid, msg_t const & msg) {
list_[cid][msg.pid_].push_back(msg.dat_);
}
void verify(int N, int Loops) {
std::cout << "confirming..." << std::endl; std::cout << "confirming..." << std::endl;
for (auto& cons_vec : list) { for (int m = 0; m < lcount_; ++m) {
for (int n = 0; n < static_cast<int>(std::extent<decltype(producers)>::value); ++n) { auto& cons_vec = list_[m];
for (int n = 0; n < N; ++n) {
auto& vec = cons_vec[n]; auto& vec = cons_vec[n];
QCOMPARE(vec.size(), static_cast<std::size_t>(Loops)); QCOMPARE(vec.size(), static_cast<std::size_t>(Loops));
int i = 0; int i = 0;
@ -118,40 +117,167 @@ void test_prod_cons(void) {
} }
} }
} }
}
};
template <>
struct test_confirm<false> {
test_confirm (int) {}
void prepare (void*) {}
void push_data(int, msg_t const &) {}
void verify (int, int) {}
};
template <typename T>
struct test_cq;
template <std::size_t D>
struct test_cq<ipc::circ::elem_array<D>> {
using ca_t = ipc::circ::elem_array<D>;
using cn_t = typename std::result_of<decltype(&ca_t::cursor)(ca_t)>::type;
ca_t* ca_;
test_cq(ca_t* ca) : ca_(ca) {
::new (ca) ca_t;
}
cn_t connect(void) {
auto cur = ca_->cursor();
ca_->connect();
return cur;
}
void disconnect(cn_t) {
ca_->disconnect();
}
void wait_start(int M) {
while (ca_->conn_count() != static_cast<std::size_t>(M)) {
std::this_thread::yield();
}
}
template <typename F>
void recv(cn_t cur, F&& proc) {
do {
while (cur != ca_->cursor()) {
msg_t* pmsg = static_cast<msg_t*>(ca_->take(cur)),
msg = *pmsg;
ca_->put(pmsg);
if (msg.pid_ < 0) return;
++cur;
proc(msg);
}
std::this_thread::yield();
} while(1);
}
void send(msg_t const & msg) {
msg_t* pmsg = static_cast<msg_t*>(ca_->acquire());
(*pmsg) = msg;
ca_->commit(pmsg);
}
};
template <typename T>
struct test_cq<ipc::circ::queue<T>> {
using cn_t = ipc::circ::queue<T>;
using ca_t = typename cn_t::array_t;
ca_t* ca_;
test_cq(void*) : ca_(reinterpret_cast<ca_t*>(cq__)) {
::new (ca_) ca_t;
}
cn_t connect(void) {
cn_t queue;
[&] {
queue.attch(ca_);
QVERIFY(queue.connect() != ipc::error_count);
} ();
return queue;
}
void disconnect(cn_t& queue) {
QVERIFY(queue.disconnect() != ipc::error_count);
QVERIFY(queue.detach() != nullptr);
}
void wait_start(int M) {
while (ca_->conn_count() != static_cast<std::size_t>(M)) {
std::this_thread::yield();
}
}
template <typename F>
void recv(cn_t& queue, F&& proc) {
do {
auto msg = queue.pop();
if (msg.pid_ < 0) return;
proc(msg);
} while(1);
}
void send(msg_t const & msg) {
cn_t{ ca_ }.push(msg);
}
};
template <int N, int M, bool C = true, int Loops = 1000000, typename T>
void test_prod_cons(T* cq) {
test_cq<T> tcq { cq };
std::thread producers[N];
std::thread consumers[M];
std::atomic_int fini { 0 };
test_stopwatch sw;
test_confirm<C> cf { M };
int cid = 0;
for (auto& t : consumers) {
t = std::thread{[&, cid] {
cf.prepare(&t);
auto cn = tcq.connect();
using namespace std::placeholders;
tcq.recv(cn, std::bind(&test_confirm<C>::push_data, &cf, cid, _1));
tcq.disconnect(cn);
if (++fini != std::extent<decltype(consumers)>::value) return;
sw.print_elapsed(N, M, Loops);
cf.verify(N, Loops);
}}; }};
++cid; ++cid;
} }
while (cq__->conn_count() != std::extent<decltype(consumers)>::value) { tcq.wait_start(M);
std::this_thread::yield();
}
std::cout << "start producers..." << std::endl; std::cout << "start producers..." << std::endl;
std::atomic_flag started = ATOMIC_FLAG_INIT;
int pid = 0; int pid = 0;
for (auto& t : producers) { for (auto& t : producers) {
t = std::thread{[&, pid] { t = std::thread{[&, pid] {
if (!started.test_and_set()) {
sw.start(); sw.start();
}
for (int i = 0; i < Loops; ++i) { for (int i = 0; i < Loops; ++i) {
msg_t* pmsg = static_cast<msg_t*>(cq__->acquire()); tcq.send({ pid, i });
pmsg->pid_ = pid;
pmsg->dat_ = i;
cq__->commit(pmsg);
} }
}}; }};
++pid; ++pid;
} }
for (auto& t : producers) t.join(); for (auto& t : producers) t.join();
// quit // quit
msg_t* pmsg = static_cast<msg_t*>(cq__->acquire()); tcq.send({ -1, -1 });
pmsg->pid_ = pmsg->dat_ = -1;
cq__->commit(pmsg);
for (auto& t : consumers) t.join(); for (auto& t : consumers) t.join();
} }
template <int N, int M, bool C = true, int Loops = 1000000>
void test_prod_cons(void) {
test_prod_cons<N, M, C, Loops>(cq__);
}
void Unit::test_prod_cons_1v1(void) { void Unit::test_prod_cons_1v1(void) {
test_prod_cons<1, 1>(); test_prod_cons<1, 1>();
} }
@ -186,7 +312,10 @@ void Unit::test_queue(void) {
QVERIFY(sizeof(decltype(queue)::array_t) <= sizeof(*cq__)); QVERIFY(sizeof(decltype(queue)::array_t) <= sizeof(*cq__));
auto cq = ::new (cq__) decltype(queue)::array_t; auto cq = ::new (cq__) decltype(queue)::array_t;
QVERIFY(queue.connect(cq) != ipc::error_count); queue.attch(cq);
QVERIFY(queue.detach() != nullptr);
test_prod_cons<1, 1>((ipc::circ::queue<msg_t>*)nullptr);
} }
} // internal-linkage } // internal-linkage