From a6ab7bcdac452aa5906a02542ccf0ad2006187b0 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Wed, 21 Nov 2018 23:10:12 +0800 Subject: [PATCH] fix some bugs --- src/circ_queue.h | 54 ++++++++++++++++++++++++++-------------- test/test_circ_queue.cpp | 14 +++++------ 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/src/circ_queue.h b/src/circ_queue.h index 1485503..8ae737c 100644 --- a/src/circ_queue.h +++ b/src/circ_queue.h @@ -15,12 +15,13 @@ struct circ_queue_head { using u8_t = std::atomic; using ms_t = std::atomic; // message head - u8_t cc_ { 0 }; // connection count + ms_t mc_ { 0 }; // message counter + u8_t cc_ { 0 }; // connection counter u8_t cr_ { 0 }; // write cursor }; template -class circ_queue : public circ_queue_head { +class circ_queue : private circ_queue_head { public: enum : std::size_t { total_size = Size, @@ -36,6 +37,27 @@ public: static_assert(elem_size > sizeof(ms_t), "elem_size must > sizeof(ms_t)"); static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size"); +private: + struct elem_t { + ms_t head_; + std::uint8_t msg_[msg_size]; + }; + + elem_t* elem_start(void) { + return reinterpret_cast(this) + 1; + } + + static void next(u8_t& i) { + if (++i == elem_max) i.store(0, std::memory_order_release); + } + + std::uint8_t block_[block_size]; + +public: + static void next(std::uint8_t& i) { + if (++i == elem_max) i = 0; + } + circ_queue(void) { ::memset(block_, 0, sizeof(block_)); } @@ -59,7 +81,7 @@ public: } void* acquire(void) { - auto st = begin() + id(cr_.load(std::memory_order_relaxed)); + auto st = elem_start() + cr_.load(std::memory_order_relaxed); do { // check remain count of consumers if (!st->head_.load(std::memory_order_acquire)) { @@ -71,31 +93,27 @@ public: } void commit(void) { - cr_.fetch_add(1, std::memory_order_release); + next(cr_); + mc_.fetch_add(1, std::memory_order_release); } std::uint8_t cursor(void) const { return cr_.load(std::memory_order_acquire); } + constexpr std::size_t begin(void) const { + return 0; + } + + std::size_t end(void) const { + return mc_.load(std::memory_order_acquire); + } + void* get(std::uint8_t index) { - auto st = begin() + id(index); + auto st = elem_start() + index; st->head_.fetch_sub(1, std::memory_order_release); return st->msg_; } - -private: - struct elem_t { - ms_t head_; - std::uint8_t msg_[msg_size]; - }; - elem_t* begin(void) { - return reinterpret_cast(this); - } - static std::uint8_t id(std::uint8_t i) { - return (i += 1) ? i : 1; - } - std::uint8_t block_[block_size]; }; } // namespace ipc diff --git a/test/test_circ_queue.cpp b/test/test_circ_queue.cpp index 618d453..f4d31ec 100644 --- a/test/test_circ_queue.cpp +++ b/test/test_circ_queue.cpp @@ -28,9 +28,9 @@ void Unit::test_inst(void) { QCOMPARE(sizeof(*cq__), static_cast(cq_t::total_size)); auto a = cq__->get(1); auto b = cq__->get(2); - QCOMPARE(static_cast( - static_cast(b) - - static_cast(a)), cq_t::elem_size); + QCOMPARE(static_cast(static_cast(b) - + static_cast(a)), + static_cast(cq_t::elem_size)); } void Unit::test_producer(void) { @@ -46,14 +46,14 @@ void Unit::test_producer(void) { auto disconn = [](cq_t* cq) { cq->disconnect(); }; std::unique_ptr guard(cq__, disconn); + auto it = cq__->begin(); int i = 0; do { - while (cur != cq__->cursor()) { + while (it != cq__->end()) { int d = *static_cast(cq__->get(cur)); if (d < 0) return; QCOMPARE(d, i); - ++cur; - ++i; + ++i; ++it; cq__->next(cur); } } while(1); }}; @@ -63,7 +63,7 @@ void Unit::test_producer(void) { std::this_thread::yield(); } capo::stopwatch<> sw; - constexpr static int loops = 1000000; + constexpr static int loops = 267/*1000000*/; std::cout << "start producer..." << std::endl; sw.start();