From de39be3c4f02557e1b866afefb4a72a627b267ed Mon Sep 17 00:00:00 2001 From: mutouyun Date: Wed, 21 Nov 2018 19:03:39 +0800 Subject: [PATCH] fix bugs --- src/circ_queue.h | 62 ++++++++++++++++++++++++++++------------ test/test_circ_queue.cpp | 44 ++++++++++++++-------------- 2 files changed, 67 insertions(+), 39 deletions(-) diff --git a/src/circ_queue.h b/src/circ_queue.h index e0037d0..4167308 100644 --- a/src/circ_queue.h +++ b/src/circ_queue.h @@ -2,18 +2,21 @@ #include #include +#include #include #include #include #include +#include namespace ipc { struct circ_queue_head { using u8_t = std::atomic; + using ms_t = std::atomic; // message head - u8_t cc_; // connection count - u8_t wt_; // write-index + u8_t cc_ { 0 }; // connection count + u8_t cr_ { 0 }; // write cursor }; template @@ -24,14 +27,18 @@ public: head_size = sizeof(circ_queue_head), block_size = Size - head_size, elem_max = std::numeric_limits::max(), - elem_size = Size / (elem_max + 1) + elem_size = (Size / (elem_max + 1)), + msg_size = elem_size - sizeof(ms_t) }; - static_assert(Size > head_size , "Size must > head_size"); - static_assert(elem_size >= head_size, "elem_size must >= head_size"); - static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size"); + static_assert(Size > head_size , "Size must > head_size"); + static_assert(elem_size >= head_size , "elem_size must >= head_size"); + static_assert(elem_size > sizeof(ms_t), "elem_size must > sizeof(ms_t)"); + static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size"); - circ_queue(void) = default; + circ_queue(void) { + ::memset(block_, 0, sizeof(block_)); + } ~circ_queue(void) = delete; circ_queue(const circ_queue&) = delete; @@ -47,30 +54,49 @@ public: return cc_.fetch_sub(1, std::memory_order_acq_rel); } + std::size_t conn_count(void) const { + return cc_.load(std::memory_order_acquire); + } + void* acquire(void) { - return begin() + wt_.load(std::memory_order_relaxed); + auto st = begin() + id(cr_.load(std::memory_order_relaxed)); + do { + // check remain count of consumers + if (st->head_.load(std::memory_order_acquire)) { + std::this_thread::yield(); + } + else { + st->head_.store(conn_count()); + break; + } + } while(1); + return st->msg_; } void commit(void) { - wt_.fetch_add(1, std::memory_order_release); + cr_.fetch_add(1, std::memory_order_release); } std::uint8_t cursor(void) const { - return wt_.load(std::memory_order_acquire); + return cr_.load(std::memory_order_acquire); } - void const * get(std::uint8_t index) const { - return begin() + index; + void* get(std::uint8_t index) { + auto st = begin() + id(index); + st->head_.fetch_sub(1, std::memory_order_release); + return st->msg_; } private: - using elem_t = std::uint8_t[elem_size]; - elem_t const * begin(void) const { - return reinterpret_cast( - reinterpret_cast(this) + elem_size); + struct elem_t { + ms_t head_; + std::uint8_t msg_[msg_size]; + }; + elem_t* begin(void) { + return reinterpret_cast(this); } - elem_t * begin(void) { - return const_cast(static_cast(this)->begin()); + static std::uint8_t id(std::uint8_t i) { + return (i += 1) ? i : 1; } std::uint8_t block_[block_size]; }; diff --git a/test/test_circ_queue.cpp b/test/test_circ_queue.cpp index e96a178..40a97f5 100644 --- a/test/test_circ_queue.cpp +++ b/test/test_circ_queue.cpp @@ -1,15 +1,13 @@ -#include #include #include #include +#include #include "circ_queue.h" #include "test.h" namespace { -using namespace std::string_literals; - class Unit : public TestSuite { Q_OBJECT @@ -34,42 +32,46 @@ void Unit::test_inst(void) { } void Unit::test_producer(void) { + cq__ = new cq_t; std::thread consumers[3]; - std::atomic_int flag(0); for (auto& c : consumers) { - c = std::thread{[&c, &flag] { + c = std::thread{[&c] { auto cur = cq__->cursor(); - std::cout << &c << ": cur = " << (int)cur << std::endl; - flag.fetch_add(1, std::memory_order_release); + std::cout << "start consumer " << &c << ": cur = " << (int)cur << std::endl; + + cq__->connect(); + auto disconn = [](cq_t* cq) { cq->disconnect(); }; + std::unique_ptr guard(cq__, disconn); + + int i = 0; do { while (cur != cq__->cursor()) { - auto data = static_cast(cq__->get(cur)); - std::cout << &c << ": " << data << std::endl; - if (data == "quit"s) { + int d = *static_cast(cq__->get(cur)); +// std::cout << &c << ": cur = " << (int)cur << ", " << d << std::endl; + if (d < 0) { return; } - else QCOMPARE(data, std::to_string(cur).c_str()); + else QCOMPARE(d, i); ++cur; + ++i; } - std::this_thread::yield(); } while(1); }}; } - while (flag.load(std::memory_order_acquire) != std::extent::value) { + while (cq__->conn_count() != std::extent::value) { std::this_thread::yield(); } - - for (int i = 0; i < 10; ++i) { - auto str = static_cast(cq__->acquire()); - strcpy(str, std::to_string(i).c_str()); - std::cout << "put: " << str << std::endl; + std::cout << "start producer..." << std::endl; + for (int i = 0; i < 1000; ++i) { + auto d = static_cast(cq__->acquire()); + *d = i; cq__->commit(); } - auto str = static_cast(cq__->acquire()); - strcpy(str, "quit"); - std::cout << "put: " << str << std::endl; + auto d = static_cast(cq__->acquire()); + *d = -1; + std::cout << "put: quit..." << std::endl; cq__->commit(); for (auto& c : consumers) {