diff --git a/src/circ_queue.h b/src/circ_queue.h index 8ae737c..e510760 100644 --- a/src/circ_queue.h +++ b/src/circ_queue.h @@ -7,17 +7,15 @@ #include #include #include -#include namespace ipc { struct circ_queue_head { - using u8_t = std::atomic; - using ms_t = std::atomic; // message head + using ui_t = std::atomic; + using el_t = std::atomic; // element head - ms_t mc_ { 0 }; // message counter - u8_t cc_ { 0 }; // connection counter - u8_t cr_ { 0 }; // write cursor + ui_t cc_ { 0 }; // connection counter + ui_t cr_ { 0 }; // write cursor }; template @@ -29,33 +27,36 @@ public: block_size = Size - head_size, elem_max = std::numeric_limits::max(), elem_size = (Size / (elem_max + 1)), - msg_size = elem_size - sizeof(ms_t) + data_size = elem_size - sizeof(el_t) }; static_assert(Size > head_size , "Size must > head_size"); static_assert(elem_size >= head_size , "elem_size must >= head_size"); - static_assert(elem_size > sizeof(ms_t), "elem_size must > sizeof(ms_t)"); + static_assert(elem_size > sizeof(el_t), "elem_size must > sizeof(el_t)"); static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size"); private: struct elem_t { - ms_t head_; - std::uint8_t msg_[msg_size]; + el_t head_; + std::uint8_t data_[data_size]; }; elem_t* elem_start(void) { return reinterpret_cast(this) + 1; } - static void next(u8_t& i) { - if (++i == elem_max) i.store(0, std::memory_order_release); + static std::uint8_t id(std::uint16_t i) { + return i & 0x00ff; } std::uint8_t block_[block_size]; public: - static void next(std::uint8_t& i) { - if (++i == elem_max) i = 0; + static std::uint16_t next(std::uint16_t i) { + if (id(++i) == elem_max) { + return ++i; + } + else return i; } circ_queue(void) { @@ -81,7 +82,7 @@ public: } void* acquire(void) { - auto st = elem_start() + cr_.load(std::memory_order_relaxed); + auto st = elem_start() + id(cr_.load(std::memory_order_relaxed)); do { // check remain count of consumers if (!st->head_.load(std::memory_order_acquire)) { @@ -89,30 +90,24 @@ public: break; } } while(1); - return st->msg_; + return st->data_; } void commit(void) { - next(cr_); - mc_.fetch_add(1, std::memory_order_release); + cr_.store(next(cr_.load(std::memory_order_relaxed)), std::memory_order_release); } - std::uint8_t cursor(void) const { + std::uint16_t cursor(void) const { return cr_.load(std::memory_order_acquire); } - constexpr std::size_t begin(void) const { - return 0; + void* get(std::uint16_t index) { + return (elem_start() + id(index))->data_; } - std::size_t end(void) const { - return mc_.load(std::memory_order_acquire); - } - - void* get(std::uint8_t index) { - auto st = elem_start() + index; + void commit(std::uint16_t index) { + auto st = elem_start() + id(index); st->head_.fetch_sub(1, std::memory_order_release); - return st->msg_; } }; diff --git a/test/test_circ_queue.cpp b/test/test_circ_queue.cpp index f4d31ec..dbdba49 100644 --- a/test/test_circ_queue.cpp +++ b/test/test_circ_queue.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include "circ_queue.h" #include "test.h" @@ -46,14 +48,15 @@ void Unit::test_producer(void) { auto disconn = [](cq_t* cq) { cq->disconnect(); }; std::unique_ptr guard(cq__, disconn); - auto it = cq__->begin(); int i = 0; do { - while (it != cq__->end()) { + while (cur != cq__->cursor()) { int d = *static_cast(cq__->get(cur)); + cq__->commit(cur); if (d < 0) return; - QCOMPARE(d, i); - ++i; ++it; cq__->next(cur); + cur = cq__->next(cur); + QCOMPARE(i, d); + ++i; } } while(1); }}; @@ -63,7 +66,7 @@ void Unit::test_producer(void) { std::this_thread::yield(); } capo::stopwatch<> sw; - constexpr static int loops = 267/*1000000*/; + constexpr static int loops = 1000000; std::cout << "start producer..." << std::endl; sw.start();