preparing to refactor

This commit is contained in:
mutouyun 2019-01-04 18:48:21 +08:00
parent 3db21b7bfa
commit 17621c1e83
5 changed files with 450 additions and 40 deletions

View File

@ -18,6 +18,7 @@ INCLUDEPATH += \
HEADERS += \
../include/export.h \
../include/shm.h \
../include/circ_elems_array.h \
../include/circ_elem_array.h \
../include/circ_queue.h \
../include/ipc.h \

View File

@ -18,7 +18,9 @@ struct alignas(std::max_align_t) elem_array_head {
std::atomic<u2_t> cc_ { 0 }; // connection counter, using for broadcast
std::atomic<u2_t> wt_ { 0 }; // write index
static u1_t index_of(u2_t c) noexcept { return static_cast<u1_t>(c); }
constexpr static u1_t index_of(u2_t c) noexcept {
return static_cast<u1_t>(c);
}
std::size_t connect() noexcept {
return cc_.fetch_add(1, std::memory_order_release);
@ -36,8 +38,8 @@ struct alignas(std::max_align_t) elem_array_head {
return wt_.load(std::memory_order_acquire);
}
auto acquire() noexcept {
return index_of(wt_.load(std::memory_order_relaxed));
auto acquire(std::memory_order order = std::memory_order_acquire) noexcept {
return index_of(wt_.load(order));
}
void commit() noexcept {
@ -56,7 +58,7 @@ struct elem_head {
};
template <std::size_t DataSize, std::size_t BaseIntSize = 8>
class elem_array : private elem_array_head<BaseIntSize> {
class elem_array : protected elem_array_head<BaseIntSize> {
public:
using base_t = elem_array_head<BaseIntSize>;
using head_t = elem_head;
@ -72,7 +74,7 @@ public:
block_size = elem_size * elem_max
};
private:
protected:
struct elem_t {
head_t head_;
byte_t data_[data_size] {};
@ -86,6 +88,25 @@ private:
static elem_t* elem(void* ptr) noexcept { return reinterpret_cast<elem_t*>(static_cast<byte_t*>(ptr) - sizeof(head_t)); }
elem_t* elem(u1_t i ) noexcept { return elem_start() + i; }
template <typename Acq, typename... P>
void* acquire(std::memory_order order, Acq&& acq, P&&... params) noexcept {
uint_t<32> conn_cnt = cc_.load(order);
if (conn_cnt == 0) return nullptr;
elem_t* el = elem(std::forward<Acq>(acq)(std::memory_order_relaxed,
std::forward<P>(params)...));
// check all consumers have finished reading
while(1) {
uint_t<32> expected = 0;
if (el->head_.rc_.compare_exchange_weak(
expected, conn_cnt, std::memory_order_relaxed)) {
break;
}
std::this_thread::yield();
conn_cnt = cc_.load(std::memory_order_acquire);
}
return el->data_;
}
public:
elem_array() = default;
@ -99,36 +120,95 @@ public:
using base_t::conn_count;
using base_t::cursor;
void* acquire() noexcept {
uint_t<32> conn_cnt = static_cast<uint_t<32>>(conn_count()); // acquire
if (conn_cnt == 0) return nullptr;
elem_t* el = elem(base_t::acquire());
// check all consumers have finished reading
while(1) {
uint_t<32> expected = 0;
if (el->head_.rc_.compare_exchange_weak(
expected, conn_cnt, std::memory_order_relaxed)) {
break;
}
std::this_thread::yield();
conn_cnt = static_cast<uint_t<32>>(conn_count()); // acquire
}
return el->data_;
void* acquire(std::memory_order order = std::memory_order_acquire) noexcept {
return this->acquire(order, [this](auto o) {
return base_t::acquire(o);
});
}
void commit(void* /*ptr*/) noexcept {
base_t::commit();
}
template <typename F>
bool fetch(F&& f) noexcept {
auto p = this->acquire();
if (p == nullptr) return false;
std::forward<F>(f)(p);
this->commit(p);
return true;
}
void* take(u2_t cursor) noexcept {
std::atomic_thread_fence(std::memory_order_acquire);
return elem(base_t::index_of(cursor))->data_;
}
void put(void* ptr) noexcept {
elem(ptr)->head_.rc_.fetch_sub(1, std::memory_order_release);
auto el = elem(ptr);
uint_t<32> cur_rc;
do {
cur_rc = el->head_.rc_.load(std::memory_order_relaxed);
if (cur_rc == 0) return;
} while (!el->head_.rc_.compare_exchange_weak(
cur_rc, cur_rc - 1, std::memory_order_release));
}
};
/*
template <std::size_t DataSize, std::size_t BaseIntSize = 8>
class multi_write_array : protected elem_array<DataSize, BaseIntSize> {
public:
using base_t = elem_array<DataSize, BaseIntSize>;
using head_t = typename base_t::head_t;
using typename base_t::u1_t;
using typename base_t::u2_t;
using base_t::head_size;
using base_t::data_size;
using base_t::elem_max;
using base_t::elem_size;
using base_t::block_size;
protected:
std::atomic<u2_t> rd_ { 0 }; // ready index
public:
using base_t::connect;
using base_t::disconnect;
using base_t::conn_count;
u2_t cursor() const noexcept {
return rd_.load(std::memory_order_acquire);
}
template <typename F>
bool fetch(F&& f) noexcept {
u2_t cur_rd;
auto p = base_t::acquire(std::memory_order_acquire, [this, &cur_rd](auto o) {
while (1) {
u2_t cur_wt = wt_.load(o), nxt_wt = cur_wt + 1;
if (base_t::index_of(nxt_wt) ==
base_t::index_of(cur_rd = rd_.load(std::memory_order_relaxed))) {
// is full
}
else if (wt_.compare_exchange_weak(cur_wt, nxt_wt, std::memory_order_relaxed)) {
return base_t::index_of(nxt_wt);
}
std::this_thread::yield();
std::atomic_thread_fence(std::memory_order_acquire);
}
});
if (p == nullptr) return false;
std::forward<F>(f)(p);
while (1) {
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
break;
}
std::this_thread::yield();
}
return true;
}
};
*/
} // namespace circ
} // namespace ipc

224
include/circ_elems_array.h Normal file
View File

@ -0,0 +1,224 @@
#pragma once
#include <atomic>
#include <thread>
#include <cstring>
#include "def.h"
namespace ipc {
namespace circ {
namespace detail {
using u1_t = uint_t<8>;
using u2_t = uint_t<16>;
constexpr u1_t index_of(u2_t c) noexcept {
return static_cast<u1_t>(c);
}
struct elem_head {
std::atomic<std::size_t> rc_ { 0 }; // read counter
};
template <std::size_t DataSize>
struct elem_t {
elem_head head_;
byte_t data_[DataSize] {};
};
template <std::size_t S>
elem_t<S>* elem_of(void* ptr) noexcept {
return reinterpret_cast<elem_t<S>*>(static_cast<byte_t*>(ptr) - sizeof(elem_head));
}
} // namespace detail
enum class relat { // multiplicity of the relationship
single,
multi
};
enum class trans { // transmission
unicast,
multicast
};
////////////////////////////////////////////////////////////////
/// producer-consumer policies
////////////////////////////////////////////////////////////////
template <relat Rp, relat Rc, trans Ts>
struct prod_cons;
template <>
struct prod_cons<relat::single, relat::single, trans::unicast> {
std::atomic<detail::u2_t> rd_ { 0 }; // read index
std::atomic<detail::u2_t> wt_ { 0 }; // write index
template <std::size_t DataSize>
constexpr static std::size_t elem_param = DataSize - sizeof(elem_head);
constexpr detail::u2_t cursor() const noexcept {
return 0;
}
template <typename E, typename F, std::size_t S>
bool push(E* /*elems*/, F&& f, detail::elem_t<S>* elem_start) {
auto cur_wt = detail::index_of(wt_.load(std::memory_order_acquire));
if (cur_wt == detail::index_of(rd_.load(std::memory_order_relaxed) - 1)) {
return false;
}
std::forward<F>(f)(elem_start + cur_wt);
wt_.fetch_add(1, std::memory_order_release);
return true;
}
template <typename E, typename F, std::size_t S>
bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t<S>* elem_start) noexcept {
auto cur_rd = detail::index_of(rd_.load(std::memory_order_acquire));
if (cur_rd == detail::index_of(wt_.load(std::memory_order_relaxed))) {
return false;
}
std::forward<F>(f)(elem_start + cur_rd);
rd_.fetch_add(1, std::memory_order_release);
return true;
}
};
template <>
struct prod_cons<relat::single, relat::multi, trans::unicast>
: prod_cons<relat::single, relat::single, trans::unicast> {
template <typename E, typename F, std::size_t S>
bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t<S>* elem_start) noexcept {
byte_t buff[sizeof(detail::elem_t<S>)];
while (1) {
auto cur_rd = rd_.load(std::memory_order_acquire);
if (detail::index_of(cur_rd) ==
detail::index_of(wt_.load(std::memory_order_relaxed))) {
return false;
}
std::memcpy(buff, elem_start + detail::index_of(cur_rd), sizeof(buff));
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
std::forward<F>(f)(buff);
return true;
}
std::this_thread::yield();
}
}
};
template <>
struct prod_cons<relat::single, relat::multi, trans::multicast> {
std::atomic<detail::u2_t> wt_ { 0 }; // write index
template <std::size_t DataSize>
constexpr static std::size_t elem_param = DataSize;
using rc_t = decltype(detail::elem_head::rc_)::value_type;
detail::u2_t cursor() const noexcept {
return wt_.load(std::memory_order_acquire);
}
template <typename E, typename F, std::size_t S>
bool push(E* elems, F&& f, detail::elem_t<S>* elem_start) {
auto conn_cnt = elems->conn_count(); // acquire
if (conn_cnt == 0) return false;
auto el = elem_start + detail::index_of(wt_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
rc_t expected = 0;
if (!el->head_.rc_.compare_exchange_weak(
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
return false;
}
std::forward<F>(f)(el->data_);
wt_.fetch_add(1, std::memory_order_release);
return true;
}
template <typename E, typename F, std::size_t S>
bool pop(E* /*elems*/, detail::u2_t& cur, F&& f, detail::elem_t<S>* elem_start) noexcept {
if (cur == cursor()) return false; // acquire
auto el = elem_start + detail::index_of(cur++);
std::forward<F>(f)(el->data_);
do {
rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire);
if (cur_rc == 0) {
return true;
}
if (el->head_.rc_.compare_exchange_weak(
cur_rc, cur_rc - 1, std::memory_order_release)) {
return true;
}
std::this_thread::yield();
} while (1);
}
};
////////////////////////////////////////////////////////////////
/// element-array implementation
////////////////////////////////////////////////////////////////
struct elems_head {
std::atomic<detail::u2_t> cc_ { 0 }; // connection counter
std::size_t connect() noexcept {
return cc_.fetch_add(1, std::memory_order_release);
}
std::size_t disconnect() noexcept {
return cc_.fetch_sub(1, std::memory_order_release);
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
return cc_.load(order);
}
};
template <std::size_t DataSize, typename Policy>
class elems_array : private Policy {
public:
using policy_t = Policy;
using base_t = Policy;
using head_t = elems_head;
using elem_t = detail::elem_t<policy_t::template elem_param<DataSize>>;
enum : std::size_t {
head_size = sizeof(head_t),
data_size = DataSize,
elem_max = (std::numeric_limits<uint_t<8>>::max)() + 1, // default is 255 + 1
elem_size = sizeof(elem_t),
block_size = elem_size * elem_max
};
private:
head_t head_;
elem_t block_[elem_max];
public:
elems_array() = default;
elems_array(const elems_array&) = delete;
elems_array& operator=(const elems_array&) = delete;
std::size_t connect () noexcept { return head_.connect (); }
std::size_t disconnect() noexcept { return head_.disconnect(); }
std::size_t conn_count() const noexcept { return head_.conn_count(); }
using base_t::cursor;
template <typename F>
bool push(F&& f) noexcept {
return base_t::push(this, std::forward<F>(f), block_);
}
template <typename F>
bool pop(detail::u2_t& cur, F&& f) noexcept {
return base_t::pop(this, cur, std::forward<F>(f), block_);
}
};
} // namespace circ
} // namespace ipc

View File

@ -14,10 +14,10 @@
namespace ipc {
namespace circ {
template <typename T>
template <typename T, template <std::size_t...> class ElemArray = elem_array>
class queue {
public:
using array_t = elem_array<sizeof(T)>;
using array_t = ElemArray<sizeof(T)>;
private:
array_t* elems_ = nullptr;
@ -88,11 +88,9 @@ public:
template <typename... P>
auto push(P&&... params) noexcept {
if (elems_ == nullptr) return false;
auto ptr = elems_->acquire();
if (ptr == nullptr) return false;
::new (ptr) T(std::forward<P>(params)...);
elems_->commit(ptr);
return true;
return elems_->fetch([&](void* p) {
::new (p) T(std::forward<P>(params)...);
});
}
template <typename F>

View File

@ -6,6 +6,7 @@
#include <vector>
#include <unordered_map>
#include "circ_elems_array.h"
#include "circ_elem_array.h"
#include "circ_queue.h"
#include "memory/resource.hpp"
@ -13,14 +14,14 @@
namespace {
using cq_t = ipc::circ::elem_array<12>;
cq_t* cq__;
struct msg_t {
int pid_;
int dat_;
};
using cq_t = ipc::circ::elem_array<sizeof(msg_t)>;
cq_t* cq__;
bool operator==(msg_t const & m1, msg_t const & m2) {
return (m1.pid_ == m2.pid_) && (m1.dat_ == m2.dat_);
}
@ -46,9 +47,8 @@ struct test_verify<cq_t> {
void verify(int N, int Loops) {
std::cout << "verifying..." << std::endl;
for (auto& c_dats : list_) {
auto& cons_vec = c_dats;
for (int n = 0; n < N; ++n) {
auto& vec = cons_vec[n];
auto& vec = c_dats[n];
QCOMPARE(vec.size(), static_cast<std::size_t>(Loops));
int i = 0;
for (int d : vec) {
@ -60,6 +60,87 @@ struct test_verify<cq_t> {
}
};
template <>
struct test_verify<ipc::circ::prod_cons<
ipc::circ::relat::single,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> : test_verify<cq_t> {
using test_verify<cq_t>::test_verify;
void verify(int N, int Loops) {
std::cout << "verifying..." << std::endl;
for (int n = 0; n < N; ++n) {
std::vector<int> datas;
std::uint64_t sum = 0;
for (auto& c_dats : list_) {
for (int d : c_dats[n]) {
datas.push_back(d);
sum += d;
}
}
QCOMPARE(datas.size(), static_cast<std::size_t>(Loops));
QCOMPARE(sum, (Loops * std::uint64_t(Loops - 1)) / 2);
}
}
};
template <std::size_t D, typename P>
struct test_cq<ipc::circ::elems_array<D, P>> {
using ca_t = ipc::circ::elems_array<D, P>;
volatile bool quit_ = false;
ca_t* ca_;
test_cq(ca_t* ca) : ca_(ca) {}
auto connect() {
auto cur = ca_->cursor();
ca_->connect();
return cur;
}
void disconnect(int) {
ca_->disconnect();
}
void wait_start(int M) {
while (ca_->conn_count() != static_cast<std::size_t>(M)) {
std::this_thread::yield();
}
}
template <typename F>
void recv(decltype(std::declval<ca_t>().cursor()) cur, F&& proc) {
while(1) {
msg_t* pmsg;
while (ca_->pop(cur, [&pmsg](void* p) {
pmsg = static_cast<msg_t*>(p);
})) {
if (pmsg->pid_ < 0) {
quit_ = true;
return;
}
proc(*pmsg);
}
if (quit_) return;
std::this_thread::yield();
}
}
ca_t* connect_send() {
return ca_;
}
void send(ca_t* ca, msg_t const & msg) {
while (!ca->push([&msg](void* p) {
(*static_cast<msg_t*>(p)) = msg;
})) {
std::this_thread::yield();
}
}
};
template <std::size_t D>
struct test_cq<ipc::circ::elem_array<D>> {
using ca_t = ipc::circ::elem_array<D>;
@ -89,7 +170,7 @@ struct test_cq<ipc::circ::elem_array<D>> {
template <typename F>
void recv(cn_t cur, F&& proc) {
do {
while(1) {
while (cur != ca_->cursor()) {
msg_t* pmsg = static_cast<msg_t*>(ca_->take(cur)),
msg = *pmsg;
@ -99,7 +180,7 @@ struct test_cq<ipc::circ::elem_array<D>> {
proc(msg);
}
std::this_thread::yield();
} while(1);
}
}
ca_t* connect_send() {
@ -144,11 +225,11 @@ struct test_cq<ipc::circ::queue<T>> {
template <typename F>
void recv(cn_t* queue, F&& proc) {
do {
while(1) {
auto msg = queue->pop();
if (msg.pid_ < 0) return;
proc(msg);
} while(1);
}
}
cn_t connect_send() {
@ -182,7 +263,7 @@ private slots:
#include "test_circ.moc"
constexpr int LoopCount = 1000000;
constexpr int LoopCount = 10000000;
void Unit::initTestCase() {
TestSuite::initTestCase();
@ -199,7 +280,7 @@ void Unit::test_inst() {
std::cout << "cq_t::elem_size = " << cq_t::elem_size << std::endl;
std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl;
QCOMPARE(static_cast<std::size_t>(cq_t::data_size) , static_cast<std::size_t>(12));
QCOMPARE(static_cast<std::size_t>(cq_t::data_size), sizeof(msg_t));
QCOMPARE(sizeof(cq_t), static_cast<std::size_t>(cq_t::block_size + cq_t::head_size));
std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl;
@ -218,10 +299,36 @@ void test_prod_cons() {
void Unit::test_prod_cons_1v1() {
test_prod_cons<1, 1>();
ipc::circ::elems_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::single,
ipc::circ::relat::single,
ipc::circ::trans::unicast>
> el_arr_ss;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ss);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ss);
}
void Unit::test_prod_cons_1v3() {
test_prod_cons<1, 3>();
ipc::circ::elems_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::circ::relat::single,
ipc::circ::relat::multi,
ipc::circ::trans::unicast>
> el_arr_smn;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smn)::policy_t>(&el_arr_smn);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smn);
// ipc::circ::elems_array<
// sizeof(msg_t),
// ipc::circ::prod_cons<ipc::circ::relat::single,
// ipc::circ::relat::multi,
// ipc::circ::trans::multicast>
// > el_arr_smm;
// benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_smm);
}
void Unit::test_prod_cons_performance() {