fix some bugs

This commit is contained in:
mutouyun 2018-11-21 23:10:12 +08:00
parent 7718a63b24
commit a6ab7bcdac
2 changed files with 43 additions and 25 deletions

View File

@ -15,12 +15,13 @@ struct circ_queue_head {
using u8_t = std::atomic<std::uint8_t>; using u8_t = std::atomic<std::uint8_t>;
using ms_t = std::atomic<std::size_t>; // message head using ms_t = std::atomic<std::size_t>; // message head
u8_t cc_ { 0 }; // connection count ms_t mc_ { 0 }; // message counter
u8_t cc_ { 0 }; // connection counter
u8_t cr_ { 0 }; // write cursor u8_t cr_ { 0 }; // write cursor
}; };
template <std::size_t Size> template <std::size_t Size>
class circ_queue : public circ_queue_head { class circ_queue : private circ_queue_head {
public: public:
enum : std::size_t { enum : std::size_t {
total_size = Size, total_size = Size,
@ -36,6 +37,27 @@ public:
static_assert(elem_size > sizeof(ms_t), "elem_size must > sizeof(ms_t)"); static_assert(elem_size > sizeof(ms_t), "elem_size must > sizeof(ms_t)");
static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size"); static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size");
private:
struct elem_t {
ms_t head_;
std::uint8_t msg_[msg_size];
};
elem_t* elem_start(void) {
return reinterpret_cast<elem_t*>(this) + 1;
}
static void next(u8_t& i) {
if (++i == elem_max) i.store(0, std::memory_order_release);
}
std::uint8_t block_[block_size];
public:
static void next(std::uint8_t& i) {
if (++i == elem_max) i = 0;
}
circ_queue(void) { circ_queue(void) {
::memset(block_, 0, sizeof(block_)); ::memset(block_, 0, sizeof(block_));
} }
@ -59,7 +81,7 @@ public:
} }
void* acquire(void) { void* acquire(void) {
auto st = begin() + id(cr_.load(std::memory_order_relaxed)); auto st = elem_start() + cr_.load(std::memory_order_relaxed);
do { do {
// check remain count of consumers // check remain count of consumers
if (!st->head_.load(std::memory_order_acquire)) { if (!st->head_.load(std::memory_order_acquire)) {
@ -71,31 +93,27 @@ public:
} }
void commit(void) { void commit(void) {
cr_.fetch_add(1, std::memory_order_release); next(cr_);
mc_.fetch_add(1, std::memory_order_release);
} }
std::uint8_t cursor(void) const { std::uint8_t cursor(void) const {
return cr_.load(std::memory_order_acquire); return cr_.load(std::memory_order_acquire);
} }
constexpr std::size_t begin(void) const {
return 0;
}
std::size_t end(void) const {
return mc_.load(std::memory_order_acquire);
}
void* get(std::uint8_t index) { void* get(std::uint8_t index) {
auto st = begin() + id(index); auto st = elem_start() + index;
st->head_.fetch_sub(1, std::memory_order_release); st->head_.fetch_sub(1, std::memory_order_release);
return st->msg_; return st->msg_;
} }
private:
struct elem_t {
ms_t head_;
std::uint8_t msg_[msg_size];
};
elem_t* begin(void) {
return reinterpret_cast<elem_t*>(this);
}
static std::uint8_t id(std::uint8_t i) {
return (i += 1) ? i : 1;
}
std::uint8_t block_[block_size];
}; };
} // namespace ipc } // namespace ipc

View File

@ -28,9 +28,9 @@ void Unit::test_inst(void) {
QCOMPARE(sizeof(*cq__), static_cast<std::size_t>(cq_t::total_size)); QCOMPARE(sizeof(*cq__), static_cast<std::size_t>(cq_t::total_size));
auto a = cq__->get(1); auto a = cq__->get(1);
auto b = cq__->get(2); auto b = cq__->get(2);
QCOMPARE(static_cast<std::size_t>( QCOMPARE(static_cast<std::size_t>(static_cast<std::uint8_t const *>(b) -
static_cast<std::uint8_t const *>(b) - static_cast<std::uint8_t const *>(a)),
static_cast<std::uint8_t const *>(a)), cq_t::elem_size); static_cast<std::size_t>(cq_t::elem_size));
} }
void Unit::test_producer(void) { void Unit::test_producer(void) {
@ -46,14 +46,14 @@ void Unit::test_producer(void) {
auto disconn = [](cq_t* cq) { cq->disconnect(); }; auto disconn = [](cq_t* cq) { cq->disconnect(); };
std::unique_ptr<cq_t, decltype(disconn)> guard(cq__, disconn); std::unique_ptr<cq_t, decltype(disconn)> guard(cq__, disconn);
auto it = cq__->begin();
int i = 0; int i = 0;
do { do {
while (cur != cq__->cursor()) { while (it != cq__->end()) {
int d = *static_cast<const int*>(cq__->get(cur)); int d = *static_cast<const int*>(cq__->get(cur));
if (d < 0) return; if (d < 0) return;
QCOMPARE(d, i); QCOMPARE(d, i);
++cur; ++i; ++it; cq__->next(cur);
++i;
} }
} while(1); } while(1);
}}; }};
@ -63,7 +63,7 @@ void Unit::test_producer(void) {
std::this_thread::yield(); std::this_thread::yield();
} }
capo::stopwatch<> sw; capo::stopwatch<> sw;
constexpr static int loops = 1000000; constexpr static int loops = 267/*1000000*/;
std::cout << "start producer..." << std::endl; std::cout << "start producer..." << std::endl;
sw.start(); sw.start();