This commit is contained in:
mutouyun 2018-11-21 19:03:39 +08:00
parent 19b40e23cb
commit de39be3c4f
2 changed files with 67 additions and 39 deletions

View File

@ -2,18 +2,21 @@
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <atomic>
#include <utility>
#include <limits>
#include <algorithm>
#include <thread>
namespace ipc {
struct circ_queue_head {
using u8_t = std::atomic<std::uint8_t>;
using ms_t = std::atomic<std::size_t>; // message head
u8_t cc_; // connection count
u8_t wt_; // write-index
u8_t cc_ { 0 }; // connection count
u8_t cr_ { 0 }; // write cursor
};
template <std::size_t Size>
@ -24,14 +27,18 @@ public:
head_size = sizeof(circ_queue_head),
block_size = Size - head_size,
elem_max = std::numeric_limits<std::uint8_t>::max(),
elem_size = Size / (elem_max + 1)
elem_size = (Size / (elem_max + 1)),
msg_size = elem_size - sizeof(ms_t)
};
static_assert(Size > head_size , "Size must > head_size");
static_assert(elem_size >= head_size, "elem_size must >= head_size");
static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size");
static_assert(Size > head_size , "Size must > head_size");
static_assert(elem_size >= head_size , "elem_size must >= head_size");
static_assert(elem_size > sizeof(ms_t), "elem_size must > sizeof(ms_t)");
static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size");
circ_queue(void) = default;
circ_queue(void) {
::memset(block_, 0, sizeof(block_));
}
~circ_queue(void) = delete;
circ_queue(const circ_queue&) = delete;
@ -47,30 +54,49 @@ public:
return cc_.fetch_sub(1, std::memory_order_acq_rel);
}
std::size_t conn_count(void) const {
return cc_.load(std::memory_order_acquire);
}
void* acquire(void) {
return begin() + wt_.load(std::memory_order_relaxed);
auto st = begin() + id(cr_.load(std::memory_order_relaxed));
do {
// check remain count of consumers
if (st->head_.load(std::memory_order_acquire)) {
std::this_thread::yield();
}
else {
st->head_.store(conn_count());
break;
}
} while(1);
return st->msg_;
}
void commit(void) {
wt_.fetch_add(1, std::memory_order_release);
cr_.fetch_add(1, std::memory_order_release);
}
std::uint8_t cursor(void) const {
return wt_.load(std::memory_order_acquire);
return cr_.load(std::memory_order_acquire);
}
void const * get(std::uint8_t index) const {
return begin() + index;
void* get(std::uint8_t index) {
auto st = begin() + id(index);
st->head_.fetch_sub(1, std::memory_order_release);
return st->msg_;
}
private:
using elem_t = std::uint8_t[elem_size];
elem_t const * begin(void) const {
return reinterpret_cast<elem_t const *>(
reinterpret_cast<std::uint8_t const *>(this) + elem_size);
struct elem_t {
ms_t head_;
std::uint8_t msg_[msg_size];
};
elem_t* begin(void) {
return reinterpret_cast<elem_t*>(this);
}
elem_t * begin(void) {
return const_cast<elem_t *>(static_cast<circ_queue const *>(this)->begin());
static std::uint8_t id(std::uint8_t i) {
return (i += 1) ? i : 1;
}
std::uint8_t block_[block_size];
};

View File

@ -1,15 +1,13 @@
#include <thread>
#include <iostream>
#include <string>
#include <type_traits>
#include <memory>
#include "circ_queue.h"
#include "test.h"
namespace {
using namespace std::string_literals;
class Unit : public TestSuite {
Q_OBJECT
@ -34,42 +32,46 @@ void Unit::test_inst(void) {
}
void Unit::test_producer(void) {
cq__ = new cq_t;
std::thread consumers[3];
std::atomic_int flag(0);
for (auto& c : consumers) {
c = std::thread{[&c, &flag] {
c = std::thread{[&c] {
auto cur = cq__->cursor();
std::cout << &c << ": cur = " << (int)cur << std::endl;
flag.fetch_add(1, std::memory_order_release);
std::cout << "start consumer " << &c << ": cur = " << (int)cur << std::endl;
cq__->connect();
auto disconn = [](cq_t* cq) { cq->disconnect(); };
std::unique_ptr<cq_t, decltype(disconn)> guard(cq__, disconn);
int i = 0;
do {
while (cur != cq__->cursor()) {
auto data = static_cast<const char*>(cq__->get(cur));
std::cout << &c << ": " << data << std::endl;
if (data == "quit"s) {
int d = *static_cast<const int*>(cq__->get(cur));
// std::cout << &c << ": cur = " << (int)cur << ", " << d << std::endl;
if (d < 0) {
return;
}
else QCOMPARE(data, std::to_string(cur).c_str());
else QCOMPARE(d, i);
++cur;
++i;
}
std::this_thread::yield();
} while(1);
}};
}
while (flag.load(std::memory_order_acquire) != std::extent<decltype(consumers)>::value) {
while (cq__->conn_count() != std::extent<decltype(consumers)>::value) {
std::this_thread::yield();
}
for (int i = 0; i < 10; ++i) {
auto str = static_cast<char*>(cq__->acquire());
strcpy(str, std::to_string(i).c_str());
std::cout << "put: " << str << std::endl;
std::cout << "start producer..." << std::endl;
for (int i = 0; i < 1000; ++i) {
auto d = static_cast<int*>(cq__->acquire());
*d = i;
cq__->commit();
}
auto str = static_cast<char*>(cq__->acquire());
strcpy(str, "quit");
std::cout << "put: " << str << std::endl;
auto d = static_cast<int*>(cq__->acquire());
*d = -1;
std::cout << "put: quit..." << std::endl;
cq__->commit();
for (auto& c : consumers) {