This commit is contained in:
mutouyun 2018-11-22 14:05:39 +08:00
parent a6ab7bcdac
commit daae701cec
2 changed files with 31 additions and 33 deletions

View File

@ -7,17 +7,15 @@
#include <utility> #include <utility>
#include <limits> #include <limits>
#include <algorithm> #include <algorithm>
#include <thread>
namespace ipc { namespace ipc {
struct circ_queue_head { struct circ_queue_head {
using u8_t = std::atomic<std::uint8_t>; using ui_t = std::atomic<std::uint16_t>;
using ms_t = std::atomic<std::size_t>; // message head using el_t = std::atomic<std::size_t>; // element head
ms_t mc_ { 0 }; // message counter ui_t cc_ { 0 }; // connection counter
u8_t cc_ { 0 }; // connection counter ui_t cr_ { 0 }; // write cursor
u8_t cr_ { 0 }; // write cursor
}; };
template <std::size_t Size> template <std::size_t Size>
@ -29,33 +27,36 @@ public:
block_size = Size - head_size, block_size = Size - head_size,
elem_max = std::numeric_limits<std::uint8_t>::max(), elem_max = std::numeric_limits<std::uint8_t>::max(),
elem_size = (Size / (elem_max + 1)), elem_size = (Size / (elem_max + 1)),
msg_size = elem_size - sizeof(ms_t) data_size = elem_size - sizeof(el_t)
}; };
static_assert(Size > head_size , "Size must > head_size"); static_assert(Size > head_size , "Size must > head_size");
static_assert(elem_size >= head_size , "elem_size must >= head_size"); static_assert(elem_size >= head_size , "elem_size must >= head_size");
static_assert(elem_size > sizeof(ms_t), "elem_size must > sizeof(ms_t)"); static_assert(elem_size > sizeof(el_t), "elem_size must > sizeof(el_t)");
static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size"); static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size");
private: private:
struct elem_t { struct elem_t {
ms_t head_; el_t head_;
std::uint8_t msg_[msg_size]; std::uint8_t data_[data_size];
}; };
elem_t* elem_start(void) { elem_t* elem_start(void) {
return reinterpret_cast<elem_t*>(this) + 1; return reinterpret_cast<elem_t*>(this) + 1;
} }
static void next(u8_t& i) { static std::uint8_t id(std::uint16_t i) {
if (++i == elem_max) i.store(0, std::memory_order_release); return i & 0x00ff;
} }
std::uint8_t block_[block_size]; std::uint8_t block_[block_size];
public: public:
static void next(std::uint8_t& i) { static std::uint16_t next(std::uint16_t i) {
if (++i == elem_max) i = 0; if (id(++i) == elem_max) {
return ++i;
}
else return i;
} }
circ_queue(void) { circ_queue(void) {
@ -81,7 +82,7 @@ public:
} }
void* acquire(void) { void* acquire(void) {
auto st = elem_start() + cr_.load(std::memory_order_relaxed); auto st = elem_start() + id(cr_.load(std::memory_order_relaxed));
do { do {
// check remain count of consumers // check remain count of consumers
if (!st->head_.load(std::memory_order_acquire)) { if (!st->head_.load(std::memory_order_acquire)) {
@ -89,30 +90,24 @@ public:
break; break;
} }
} while(1); } while(1);
return st->msg_; return st->data_;
} }
void commit(void) { void commit(void) {
next(cr_); cr_.store(next(cr_.load(std::memory_order_relaxed)), std::memory_order_release);
mc_.fetch_add(1, std::memory_order_release);
} }
std::uint8_t cursor(void) const { std::uint16_t cursor(void) const {
return cr_.load(std::memory_order_acquire); return cr_.load(std::memory_order_acquire);
} }
constexpr std::size_t begin(void) const { void* get(std::uint16_t index) {
return 0; return (elem_start() + id(index))->data_;
} }
std::size_t end(void) const { void commit(std::uint16_t index) {
return mc_.load(std::memory_order_acquire); auto st = elem_start() + id(index);
}
void* get(std::uint8_t index) {
auto st = elem_start() + index;
st->head_.fetch_sub(1, std::memory_order_release); st->head_.fetch_sub(1, std::memory_order_release);
return st->msg_;
} }
}; };

View File

@ -3,6 +3,8 @@
#include <type_traits> #include <type_traits>
#include <memory> #include <memory>
#include <new> #include <new>
#include <vector>
#include <thread>
#include "circ_queue.h" #include "circ_queue.h"
#include "test.h" #include "test.h"
@ -46,14 +48,15 @@ void Unit::test_producer(void) {
auto disconn = [](cq_t* cq) { cq->disconnect(); }; auto disconn = [](cq_t* cq) { cq->disconnect(); };
std::unique_ptr<cq_t, decltype(disconn)> guard(cq__, disconn); std::unique_ptr<cq_t, decltype(disconn)> guard(cq__, disconn);
auto it = cq__->begin();
int i = 0; int i = 0;
do { do {
while (it != cq__->end()) { while (cur != cq__->cursor()) {
int d = *static_cast<const int*>(cq__->get(cur)); int d = *static_cast<const int*>(cq__->get(cur));
cq__->commit(cur);
if (d < 0) return; if (d < 0) return;
QCOMPARE(d, i); cur = cq__->next(cur);
++i; ++it; cq__->next(cur); QCOMPARE(i, d);
++i;
} }
} while(1); } while(1);
}}; }};
@ -63,7 +66,7 @@ void Unit::test_producer(void) {
std::this_thread::yield(); std::this_thread::yield();
} }
capo::stopwatch<> sw; capo::stopwatch<> sw;
constexpr static int loops = 267/*1000000*/; constexpr static int loops = 1000000;
std::cout << "start producer..." << std::endl; std::cout << "start producer..." << std::endl;
sw.start(); sw.start();