From cca70b018cb0548e3c5d51db020da8b043a25254 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Mon, 17 Dec 2018 19:07:18 +0800 Subject: [PATCH] improve test cases (has bugs) --- include/circ_elem_array.h | 18 ++--- include/circ_queue.h | 16 ++-- include/ipc.h | 15 ++-- include/shm.h | 16 ++-- src/ipc.cpp | 36 +++++++-- src/shm.cpp | 18 ++--- test/random.hpp | 52 +++++++++++++ test/test.h | 63 +++++++++++++++ test/test_circ.cpp | 157 +++++++++++++------------------------- test/test_ipc.cpp | 109 ++++++++++++++++++++++++++ 10 files changed, 346 insertions(+), 154 deletions(-) create mode 100644 test/random.hpp diff --git a/include/circ_elem_array.h b/include/circ_elem_array.h index 9606172..28e16ca 100644 --- a/include/circ_elem_array.h +++ b/include/circ_elem_array.h @@ -21,30 +21,30 @@ struct alignas(std::max_align_t) elem_array_head { static u1_t index_of(u2_t c) { return static_cast(c); } - std::size_t connect(void) { + std::size_t connect() { return cc_.fetch_add(1, std::memory_order_release); } - std::size_t disconnect(void) { + std::size_t disconnect() { return cc_.fetch_sub(1, std::memory_order_release); } - std::size_t conn_count(void) const { + std::size_t conn_count() const { return cc_.load(std::memory_order_acquire); } - u2_t cursor(void) const { + u2_t cursor() const { return wt_.load(std::memory_order_acquire); } - auto acquire(void) { + auto acquire() { while (lc_.exchange(1, std::memory_order_acquire)) { std::this_thread::yield(); } return index_of(wt_.load(std::memory_order_relaxed)); } - void commit(void) { + void commit() { wt_.fetch_add(1, std::memory_order_relaxed); lc_.store(0, std::memory_order_release); } @@ -84,7 +84,7 @@ private: }; elem_t block_[elem_max]; - elem_t* elem_start(void) { + elem_t* elem_start() { return block_; } @@ -92,7 +92,7 @@ private: elem_t* elem(u1_t i ) { return elem_start() + i; } public: - elem_array(void) = default; + elem_array() = default; elem_array(const elem_array&) = delete; elem_array& operator=(const elem_array&) = delete; @@ -104,7 +104,7 @@ public: using base_t::conn_count; using base_t::cursor; - void* acquire(void) { + void* acquire() { elem_t* el = elem(base_t::acquire()); // check all consumers have finished reading while(1) { diff --git a/include/circ_queue.h b/include/circ_queue.h index c64b9a7..6884c91 100644 --- a/include/circ_queue.h +++ b/include/circ_queue.h @@ -23,7 +23,7 @@ private: bool connected_ = false; public: - queue(void) = default; + queue() = default; explicit queue(array_t* arr) : queue() { attach(arr); @@ -44,29 +44,29 @@ public: return *this; } - array_t * elems(void) { + array_t * elems() { return elems_; } - std::size_t connect(void) { + std::size_t connect() { if (elems_ == nullptr) return error_count; if (connected_) return error_count; connected_ = true; return elems_->connect(); } - std::size_t disconnect(void) { + std::size_t disconnect() { if (elems_ == nullptr) return error_count; if (!connected_) return error_count; connected_ = false; return elems_->disconnect(); } - std::size_t conn_count(void) const { + std::size_t conn_count() const { return (elems_ == nullptr) ? error_count : elems_->conn_count(); } - bool connected(void) const { + bool connected() const { return connected_; } @@ -78,7 +78,7 @@ public: return old; } - array_t* detach(void) { + array_t* detach() { if (elems_ == nullptr) return nullptr; auto old = elems_; elems_ = nullptr; @@ -113,7 +113,7 @@ public: return true; } - T pop(void) { + T pop() { if (elems_ == nullptr) throw std::invalid_argument { "This queue hasn't attached any elem_array." }; diff --git a/include/ipc.h b/include/ipc.h index 99a8a65..9829e19 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -20,27 +20,30 @@ buff_t make_buff(byte_t const (& data)[N]) { return make_buff(data, N); } IPC_EXPORT handle_t connect (char const * name); IPC_EXPORT void disconnect(handle_t h); +IPC_EXPORT std::size_t conn_count(handle_t h); + IPC_EXPORT bool send(handle_t h, void const * data, std::size_t size); IPC_EXPORT buff_t recv(handle_t h); class IPC_EXPORT channel { public: - channel(void); + channel(); channel(char const * name); channel(channel&& rhs); - ~channel(void); + ~channel(); void swap(channel& rhs); channel& operator=(channel rhs); - bool valid(void) const; - char const * name (void) const; + bool valid() const; + char const * name () const; - channel clone(void) const; + channel clone() const; bool connect(char const * name); - void disconnect(void); + void disconnect(); + std::size_t conn_count() const; bool send(void const * data, std::size_t size); bool send(buff_t const & buff); diff --git a/include/shm.h b/include/shm.h index 97e31fc..6e5762a 100644 --- a/include/shm.h +++ b/include/shm.h @@ -16,24 +16,24 @@ IPC_EXPORT void close (void* mem); class IPC_EXPORT handle { public: - handle(void); + handle(); handle(char const * name, std::size_t size); handle(handle&& rhs); - ~handle(void); + ~handle(); void swap(handle& rhs); handle& operator=(handle rhs); - bool valid(void) const; - std::size_t size (void) const; - char const * name (void) const; + bool valid() const; + std::size_t size () const; + char const * name () const; bool acquire(char const * name, std::size_t size); - void release(void); + void release(); - void* get (void); - void close(void); + void* get (); + void close(); private: class handle_; diff --git a/src/ipc.cpp b/src/ipc.cpp index ce2f89f..932a879 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -90,15 +90,24 @@ handle_t connect(char const * name) { if (mem == nullptr) { return nullptr; } + queue_t* queue; { std::unique_lock guard { h2q_lc__ }; - h2q__[h].attach(&(static_cast(mem)->elems_)); + queue = &(h2q__[h]); + if (queue == nullptr) { + return nullptr; + } + queue->attach(&(static_cast(mem)->elems_)); + queue->connect(); } h_guard.release(); return h; } void disconnect(handle_t h) { + if (h == nullptr) { + return; + } void* mem = nullptr; { std::unique_lock guard { h2q_lc__ }; @@ -112,6 +121,14 @@ void disconnect(handle_t h) { shm::release(h, sizeof(queue_t)); } +std::size_t conn_count(handle_t h) { + auto queue = queue_of(h); + if (queue == nullptr) { + return error_count; + } + return queue->conn_count(); +} + bool send(handle_t h, void const * data, std::size_t size) { if (data == nullptr) { return false; @@ -196,7 +213,7 @@ public: std::string n_; }; -channel::channel(void) +channel::channel() : p_(p_->make()) { } @@ -210,7 +227,7 @@ channel::channel(channel&& rhs) swap(rhs); } -channel::~channel(void) { +channel::~channel() { p_->clear(); } @@ -223,15 +240,15 @@ channel& channel::operator=(channel rhs) { return *this; } -bool channel::valid(void) const { +bool channel::valid() const { return (impl(p_)->h_ != nullptr); } -char const * channel::name(void) const { +char const * channel::name() const { return impl(p_)->n_.c_str(); } -channel channel::clone(void) const { +channel channel::clone() const { return { name() }; } @@ -242,11 +259,14 @@ bool channel::connect(char const * name) { return valid(); } -void channel::disconnect(void) { - if (!valid()) return; +void channel::disconnect() { ipc::disconnect(impl(p_)->h_); } +std::size_t channel::conn_count() const { + return ipc::conn_count(impl(p_)->h_); +} + bool channel::send(void const *data, std::size_t size) { return ipc::send(impl(p_)->h_, data, size); } diff --git a/src/shm.cpp b/src/shm.cpp index 695c17b..2f1b3e2 100644 --- a/src/shm.cpp +++ b/src/shm.cpp @@ -20,13 +20,13 @@ public: handle_() = default; handle_(handle* t) : t_{t} {} - ~handle_(void) { + ~handle_() { t_->close(); t_->release(); } }; -handle::handle(void) +handle::handle() : p_(p_->make(this)) { } @@ -40,7 +40,7 @@ handle::handle(handle&& rhs) swap(rhs); } -handle::~handle(void) { +handle::~handle() { p_->clear(); } @@ -53,15 +53,15 @@ handle& handle::operator=(handle rhs) { return *this; } -bool handle::valid(void) const { +bool handle::valid() const { return impl(p_)->h_ != nullptr; } -std::size_t handle::size(void) const { +std::size_t handle::size() const { return impl(p_)->s_; } -char const * handle::name(void) const { +char const * handle::name() const { return impl(p_)->n_.c_str(); } @@ -73,7 +73,7 @@ bool handle::acquire(char const * name, std::size_t size) { return valid(); } -void handle::release(void) { +void handle::release() { if (!valid()) return; shm::release(impl(p_)->h_, impl(p_)->s_); impl(p_)->h_ = nullptr; @@ -81,7 +81,7 @@ void handle::release(void) { impl(p_)->n_.clear(); } -void* handle::get(void) { +void* handle::get() { if (!valid()) return nullptr; if (impl(p_)->m_ == nullptr) { return impl(p_)->m_ = shm::open(impl(p_)->h_); @@ -89,7 +89,7 @@ void* handle::get(void) { else return impl(p_)->m_; } -void handle::close(void) { +void handle::close() { if (!valid()) return; shm::close(impl(p_)->m_); impl(p_)->m_ = nullptr; diff --git a/test/random.hpp b/test/random.hpp new file mode 100644 index 0000000..78f0e60 --- /dev/null +++ b/test/random.hpp @@ -0,0 +1,52 @@ +/* + The Capo Library + Code covered by the MIT License + + Author: mutouyun (http://orzz.org) +*/ + +#pragma once + +#include // std::default_random_engine, std::uniform_int_distribution +#include // std::forward + +namespace capo { + +//////////////////////////////////////////////////////////////// +/// Simple way of generating random numbers +//////////////////////////////////////////////////////////////// + +template > +class random : public Distribution +{ + using base_t = Distribution; + +public: + using engine_type = Engine; + using distribution_type = Distribution; + using result_type = typename distribution_type::result_type; + using param_type = typename distribution_type::param_type; + +private: + engine_type engine_; + +public: + template + random(T&&... args) + : base_t (std::forward(args)...) + , engine_(std::random_device{}()) + {} + + result_type operator()(void) + { + return base_t::operator()(engine_); + } + + result_type operator()(const param_type& parm) + { + return base_t::operator()(engine_, parm); + } +}; + +} // namespace capo diff --git a/test/test.h b/test/test.h index b7e929c..174b756 100644 --- a/test/test.h +++ b/test/test.h @@ -4,6 +4,7 @@ #include #include +#include #include "stopwatch.hpp" @@ -38,3 +39,65 @@ struct test_stopwatch { << (double(ts) / double(Loops * N)) << " us/d" << std::endl; } }; + +template +struct test_verify; + +template <> +struct test_verify { + test_verify (int) {} + void prepare (void*) {} + void push_data(int, ...) {} + void verify (int, int) {} +}; + +template +struct test_cq; + +template +void benchmark_prod_cons(T* cq) { + test_cq tcq { cq }; + + std::thread producers[N]; + std::thread consumers[M]; + std::atomic_int fini { 0 }; + + test_stopwatch sw; + test_verify vf { M }; + + int cid = 0; + for (auto& t : consumers) { + t = std::thread{[&, cid] { + vf.prepare(&t); + auto cn = tcq.connect(); + + using namespace std::placeholders; + tcq.recv(cn, std::bind(&test_verify::push_data, &vf, cid, _1)); + + tcq.disconnect(cn); + if (++fini != std::extent::value) return; + sw.print_elapsed(N, M, Loops); + vf.verify(N, Loops); + }}; + ++cid; + } + + tcq.wait_start(M); + + std::cout << "start producers..." << std::endl; + int pid = 0; + for (auto& t : producers) { + t = std::thread{[&, pid] { + sw.start(); + for (int i = 0; i < Loops; ++i) { + tcq.send({ pid, i }); + } + }}; + ++pid; + } + for (auto& t : producers) t.join(); + // quit + tcq.send({ -1, -1 }); + + for (auto& t : consumers) t.join(); +} diff --git a/test/test_circ.cpp b/test/test_circ.cpp index af1be9b..9363e2c 100644 --- a/test/test_circ.cpp +++ b/test/test_circ.cpp @@ -13,65 +13,16 @@ namespace { -class Unit : public TestSuite { - Q_OBJECT - - const char* name() const { - return "test_circ"; - } - -private slots: - void initTestCase(); - void cleanupTestCase(); - - void test_inst(); - void test_prod_cons_1v1(); - void test_prod_cons_1v3(); - void test_prod_cons_3v1(); - void test_prod_cons_performance(); - - void test_queue(); -} unit__; - -#include "test_circ.moc" - using cq_t = ipc::circ::elem_array<12>; cq_t* cq__; -constexpr int LoopCount = 1000000; - -void Unit::initTestCase() { - TestSuite::initTestCase(); - cq__ = new cq_t; -} - -void Unit::cleanupTestCase() { - delete cq__; -} - -void Unit::test_inst() { - std::cout << "cq_t::head_size = " << cq_t::head_size << std::endl; - std::cout << "cq_t::data_size = " << cq_t::data_size << std::endl; - std::cout << "cq_t::elem_size = " << cq_t::elem_size << std::endl; - std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl; - - QCOMPARE(static_cast(cq_t::data_size) , static_cast(12)); - QCOMPARE(sizeof(cq_t), static_cast(cq_t::block_size + cq_t::head_size)); - - std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; - - auto a = cq__->take(1); - auto b = cq__->take(2); - QCOMPARE(static_cast(static_cast(b) - - static_cast(a)), - static_cast(cq_t::elem_size)); -} - struct msg_t { int pid_; int dat_; }; +} // internal-linkage + template struct test_verify { std::unordered_map>* list_; @@ -112,17 +63,6 @@ struct test_verify { } }; -template <> -struct test_verify { - test_verify (int) {} - void prepare (void*) {} - void push_data(int, msg_t const &) {} - void verify (int, int) {} -}; - -template -struct test_cq; - template struct test_cq> { using ca_t = ipc::circ::elem_array; @@ -217,57 +157,62 @@ struct test_cq> { } }; -template -void test_prod_cons(T* cq) { - test_cq tcq { cq }; +namespace { - std::thread producers[N]; - std::thread consumers[M]; - std::atomic_int fini { 0 }; +class Unit : public TestSuite { + Q_OBJECT - test_stopwatch sw; - test_verify vf { M }; - - int cid = 0; - for (auto& t : consumers) { - t = std::thread{[&, cid] { - vf.prepare(&t); - auto cn = tcq.connect(); - - using namespace std::placeholders; - tcq.recv(cn, std::bind(&test_verify::push_data, &vf, cid, _1)); - - tcq.disconnect(cn); - if (++fini != std::extent::value) return; - sw.print_elapsed(N, M, Loops); - vf.verify(N, Loops); - }}; - ++cid; + const char* name() const { + return "test_circ"; } - tcq.wait_start(M); +private slots: + void initTestCase(); + void cleanupTestCase(); - std::cout << "start producers..." << std::endl; - int pid = 0; - for (auto& t : producers) { - t = std::thread{[&, pid] { - sw.start(); - for (int i = 0; i < Loops; ++i) { - tcq.send({ pid, i }); - } - }}; - ++pid; - } - for (auto& t : producers) t.join(); - // quit - tcq.send({ -1, -1 }); + void test_inst(); + void test_prod_cons_1v1(); + void test_prod_cons_1v3(); + void test_prod_cons_3v1(); + void test_prod_cons_performance(); - for (auto& t : consumers) t.join(); + void test_queue(); +} unit__; + +#include "test_circ.moc" + +constexpr int LoopCount = 1000000; + +void Unit::initTestCase() { + TestSuite::initTestCase(); + cq__ = new cq_t; +} + +void Unit::cleanupTestCase() { + delete cq__; +} + +void Unit::test_inst() { + std::cout << "cq_t::head_size = " << cq_t::head_size << std::endl; + std::cout << "cq_t::data_size = " << cq_t::data_size << std::endl; + std::cout << "cq_t::elem_size = " << cq_t::elem_size << std::endl; + std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl; + + QCOMPARE(static_cast(cq_t::data_size) , static_cast(12)); + QCOMPARE(sizeof(cq_t), static_cast(cq_t::block_size + cq_t::head_size)); + + std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; + + auto a = cq__->take(1); + auto b = cq__->take(2); + QCOMPARE(static_cast(static_cast(b) - + static_cast(a)), + static_cast(cq_t::elem_size)); } template void test_prod_cons() { - test_prod_cons(cq__); + benchmark_prod_cons(cq__); } void Unit::test_prod_cons_1v1() { @@ -354,9 +299,9 @@ void Unit::test_queue() { queue.attach(cq); QVERIFY(queue.detach() != nullptr); - test_prod_cons<1, 3>((ipc::circ::queue*)nullptr); - test_prod_cons<3, 1>((ipc::circ::queue*)nullptr); - test_prod_cons<3, 3>((ipc::circ::queue*)nullptr); + benchmark_prod_cons<1, 3, LoopCount>((ipc::circ::queue*)nullptr); + benchmark_prod_cons<3, 1, LoopCount>((ipc::circ::queue*)nullptr); + benchmark_prod_cons<3, 3, LoopCount>((ipc::circ::queue*)nullptr); } } // internal-linkage diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 5be252e..92d88a4 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -9,6 +9,9 @@ #include #include #include +#include +#include +#include #if defined(__GNUC__) # include // abi::__cxa_demangle @@ -18,10 +21,88 @@ #include "rw_lock.h" #include "stopwatch.hpp" #include "spin_lock.hpp" +#include "random.hpp" #include "test.h" namespace { +std::vector datas__; + +constexpr int DataMin = 2; +constexpr int DataMax = 512; +constexpr int LoopCount = 100/*0*//*000*/; + +} // internal-linkage + +template <> +struct test_cq { + using cn_t = ipc::channel; + + std::string conn_name_; + std::size_t conn_count_ = 0; + + test_cq(void*) + : conn_name_("test-ipc-channel") + {} + + cn_t connect() { + cn_t cn { conn_name_.c_str() }; + conn_count_ = cn.conn_count(); + return cn; + } + + void disconnect(cn_t&) {} + + void wait_start(int M) { + while (conn_count_ != static_cast(M)) { + std::this_thread::yield(); + } + } + + template + void recv(cn_t& cn, F&& proc) { + do { + auto msg = cn.recv(); + if (msg.size() < 2) return; + proc(msg); + } while(1); + } + + void send(const std::array& info) { + thread_local auto cn = connect(); + int n = info[1]; + if (n < 0) { + cn.send(ipc::buff_t { '\0' }); + } + else cn.send(datas__[n]); + } +}; + +//template <> +//struct test_verify { +// std::unordered_map> list_; +// int lcount_; + +// test_verify(int M) : lcount_{ M } {} + +// void prepare(void* pt) { +// std::cout << "start consumer: " << pt << std::endl; +// } + +// void push_data(int cid, ipc::buff_t const & msg) { +// list_[cid].emplace_back(std::move(msg)); +// } + +// void verify(int /*N*/, int /*Loops*/) { +// std::cout << "verifying..." << std::endl; +// for (int m = 0; m < lcount_; ++m) { +// QCOMPARE(datas__, list_[m]); +// } +// } +//}; + +namespace { + class Unit : public TestSuite { Q_OBJECT @@ -30,13 +111,37 @@ class Unit : public TestSuite { } private slots: + void initTestCase(); + void cleanupTestCase(); + void test_rw_lock(); void test_send_recv(); void test_channel(); + void test_channel_performance(); } unit__; #include "test_ipc.moc" +void Unit::initTestCase() { + TestSuite::initTestCase(); + + capo::random<> rdm { DataMin, DataMax }; + capo::random<> bit { 0, (std::numeric_limits::max)() }; + + for (int i = 0; i < LoopCount; ++i) { + auto n = rdm(); + ipc::buff_t buff(static_cast(n)); + for (std::size_t k = 0; k < buff.size(); ++k) { + buff[k] = static_cast(bit()); + } + datas__.emplace_back(std::move(buff)); + } +} + +void Unit::cleanupTestCase() { + datas__.clear(); +} + template constexpr T acc(T b, T e) { return (e + b) * (e - b + 1) / 2; @@ -235,4 +340,8 @@ void Unit::test_channel() { t2.join(); } +void Unit::test_channel_performance() { + benchmark_prod_cons<1, 1, LoopCount, false>((ipc::channel*)nullptr); +} + } // internal-linkage