ipc::circ::prod_cons => ipc::prod_cons_circ; ipc::circ::queue => ipc::queue

This commit is contained in:
mutouyun 2019-01-15 14:42:52 +08:00
parent 612a59ae31
commit b87e96b43c
6 changed files with 112 additions and 103 deletions

View File

@ -8,8 +8,8 @@
#include "rw_lock.h"
namespace ipc {
namespace circ {
namespace circ {
namespace detail {
using u1_t = uint_t<8>;
@ -40,30 +40,28 @@ elem_t<S>* elem_of(void* ptr) noexcept {
}
} // namespace detail
} // namespace circ
////////////////////////////////////////////////////////////////
/// producer-consumer policies
////////////////////////////////////////////////////////////////
template <relat Rp, relat Rc, trans Ts>
struct prod_cons;
template <>
struct prod_cons<relat::single, relat::single, trans::unicast> {
std::atomic<detail::u2_t> rd_ { 0 }; // read index
std::atomic<detail::u2_t> wt_ { 0 }; // write index
struct prod_cons<organ::cyclic, relat::single, relat::single, trans::unicast> {
std::atomic<circ::detail::u2_t> rd_ { 0 }; // read index
std::atomic<circ::detail::u2_t> wt_ { 0 }; // write index
template <std::size_t DataSize>
constexpr static std::size_t elem_param = DataSize - sizeof(detail::elem_head);
constexpr static std::size_t elem_param = DataSize - sizeof(circ::detail::elem_head);
constexpr detail::u2_t cursor() const noexcept {
constexpr circ::detail::u2_t cursor() const noexcept {
return 0;
}
template <typename E, typename F, std::size_t S>
bool push(E* /*elems*/, F&& f, detail::elem_t<S>* elem_start) {
auto cur_wt = detail::index_of(wt_.load(std::memory_order_acquire));
if (cur_wt == detail::index_of(rd_.load(std::memory_order_relaxed) - 1)) {
bool push(E* /*elems*/, F&& f, circ::detail::elem_t<S>* elem_start) {
auto cur_wt = circ::detail::index_of(wt_.load(std::memory_order_acquire));
if (cur_wt == circ::detail::index_of(rd_.load(std::memory_order_relaxed) - 1)) {
return false; // full
}
std::forward<F>(f)(elem_start + cur_wt);
@ -72,9 +70,9 @@ struct prod_cons<relat::single, relat::single, trans::unicast> {
}
template <typename E, typename F, std::size_t S>
bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t<S>* elem_start) noexcept {
auto cur_rd = detail::index_of(rd_.load(std::memory_order_acquire));
if (cur_rd == detail::index_of(wt_.load(std::memory_order_relaxed))) {
bool pop(E* /*elems*/, circ::detail::u2_t& /*cur*/, F&& f, circ::detail::elem_t<S>* elem_start) noexcept {
auto cur_rd = circ::detail::index_of(rd_.load(std::memory_order_acquire));
if (cur_rd == circ::detail::index_of(wt_.load(std::memory_order_relaxed))) {
return false; // empty
}
std::forward<F>(f)(elem_start + cur_rd);
@ -84,19 +82,19 @@ struct prod_cons<relat::single, relat::single, trans::unicast> {
};
template <>
struct prod_cons<relat::single, relat::multi, trans::unicast>
: prod_cons<relat::single, relat::single, trans::unicast> {
struct prod_cons<organ::cyclic, relat::single, relat::multi , trans::unicast>
: prod_cons<organ::cyclic, relat::single, relat::single, trans::unicast> {
template <typename E, typename F, std::size_t S>
bool pop(E* /*elems*/, detail::u2_t& /*cur*/, F&& f, detail::elem_t<S>* elem_start) noexcept {
byte_t buff[sizeof(detail::elem_t<S>)];
bool pop(E* /*elems*/, circ::detail::u2_t& /*cur*/, F&& f, circ::detail::elem_t<S>* elem_start) noexcept {
byte_t buff[sizeof(circ::detail::elem_t<S>)];
for (unsigned k = 0;;) {
auto cur_rd = rd_.load(std::memory_order_acquire);
if (detail::index_of(cur_rd) ==
detail::index_of(wt_.load(std::memory_order_relaxed))) {
if (circ::detail::index_of(cur_rd) ==
circ::detail::index_of(wt_.load(std::memory_order_relaxed))) {
return false; // empty
}
std::memcpy(buff, elem_start + detail::index_of(cur_rd), sizeof(buff));
std::memcpy(buff, elem_start + circ::detail::index_of(cur_rd), sizeof(buff));
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
std::forward<F>(f)(buff);
return true;
@ -107,18 +105,18 @@ struct prod_cons<relat::single, relat::multi, trans::unicast>
};
template <>
struct prod_cons<relat::multi, relat::multi, trans::unicast>
: prod_cons<relat::single, relat::multi, trans::unicast> {
struct prod_cons<organ::cyclic, relat::multi , relat::multi, trans::unicast>
: prod_cons<organ::cyclic, relat::single, relat::multi, trans::unicast> {
std::atomic<detail::u2_t> ct_ { 0 }; // commit index
std::atomic<circ::detail::u2_t> ct_ { 0 }; // commit index
template <typename E, typename F, std::size_t S>
bool push(E* /*elems*/, F&& f, detail::elem_t<S>* elem_start) {
detail::u2_t cur_ct, nxt_ct;
bool push(E* /*elems*/, F&& f, circ::detail::elem_t<S>* elem_start) {
circ::detail::u2_t cur_ct, nxt_ct;
while(1) {
cur_ct = ct_.load(std::memory_order_acquire);
if (detail::index_of(nxt_ct = cur_ct + 1) ==
detail::index_of(rd_.load(std::memory_order_relaxed))) {
if (circ::detail::index_of(nxt_ct = cur_ct + 1) ==
circ::detail::index_of(rd_.load(std::memory_order_relaxed))) {
return false; // full
}
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) {
@ -126,7 +124,7 @@ struct prod_cons<relat::multi, relat::multi, trans::unicast>
}
std::this_thread::yield();
}
std::forward<F>(f)(elem_start + detail::index_of(cur_ct));
std::forward<F>(f)(elem_start + circ::detail::index_of(cur_ct));
while(1) {
auto exp_wt = cur_ct;
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
@ -139,8 +137,8 @@ struct prod_cons<relat::multi, relat::multi, trans::unicast>
};
template <>
struct prod_cons<relat::single, relat::multi, trans::broadcast> {
std::atomic<detail::u2_t> wt_ { 0 }; // write index
struct prod_cons<organ::cyclic, relat::single, relat::multi, trans::broadcast> {
std::atomic<circ::detail::u2_t> wt_ { 0 }; // write index
template <std::size_t DataSize>
constexpr static std::size_t elem_param = DataSize;
@ -149,17 +147,17 @@ struct prod_cons<relat::single, relat::multi, trans::broadcast> {
<Remarks> std::atomic<T> may not have value_type.
See: https://stackoverflow.com/questions/53648614/what-happened-to-stdatomicxvalue-type
*/
using rc_t = decltype(detail::elem_head::rc_.load());
using rc_t = decltype(circ::detail::elem_head::rc_.load());
detail::u2_t cursor() const noexcept {
circ::detail::u2_t cursor() const noexcept {
return wt_.load(std::memory_order_acquire);
}
template <typename E, typename F, std::size_t S>
bool push(E* elems, F&& f, detail::elem_t<S>* elem_start) {
bool push(E* elems, F&& f, circ::detail::elem_t<S>* elem_start) {
auto conn_cnt = elems->conn_count(); // acquire
if (conn_cnt == 0) return false;
auto el = elem_start + detail::index_of(wt_.load(std::memory_order_relaxed));
auto el = elem_start + circ::detail::index_of(wt_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
while(1) {
rc_t expected = 0;
@ -177,9 +175,9 @@ struct prod_cons<relat::single, relat::multi, trans::broadcast> {
}
template <typename E, typename F, std::size_t S>
bool pop(E* /*elems*/, detail::u2_t& cur, F&& f, detail::elem_t<S>* elem_start) noexcept {
bool pop(E* /*elems*/, circ::detail::u2_t& cur, F&& f, circ::detail::elem_t<S>* elem_start) noexcept {
if (cur == cursor()) return false; // acquire
auto el = elem_start + detail::index_of(cur++);
auto el = elem_start + circ::detail::index_of(cur++);
std::forward<F>(f)(el->data_);
for (unsigned k = 0;;) {
rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire);
@ -196,18 +194,18 @@ struct prod_cons<relat::single, relat::multi, trans::broadcast> {
};
template <>
struct prod_cons<relat::multi, relat::multi, trans::broadcast>
: prod_cons<relat::single, relat::multi, trans::broadcast> {
struct prod_cons<organ::cyclic, relat::multi , relat::multi, trans::broadcast>
: prod_cons<organ::cyclic, relat::single, relat::multi, trans::broadcast> {
std::atomic<detail::u2_t> ct_ { 0 }; // commit index
std::atomic<circ::detail::u2_t> ct_ { 0 }; // commit index
template <typename E, typename F, std::size_t S>
bool push(E* elems, F&& f, detail::elem_t<S>* elem_start) {
bool push(E* elems, F&& f, circ::detail::elem_t<S>* elem_start) {
auto conn_cnt = elems->conn_count(); // acquire
if (conn_cnt == 0) return false;
detail::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_relaxed),
nxt_ct = cur_ct + 1;
auto el = elem_start + detail::index_of(cur_ct);
circ::detail::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_relaxed),
nxt_ct = cur_ct + 1;
auto el = elem_start + circ::detail::index_of(cur_ct);
// check all consumers have finished reading this element
while(1) {
rc_t expected = 0;
@ -231,6 +229,11 @@ struct prod_cons<relat::multi, relat::multi, trans::broadcast>
}
};
template <relat Rp, relat Rc, trans Ts>
using prod_cons_circ = prod_cons<organ::cyclic, Rp, Rc, Ts>;
namespace circ {
////////////////////////////////////////////////////////////////
/// element-array implementation
////////////////////////////////////////////////////////////////

View File

@ -31,6 +31,11 @@ enum : std::size_t {
data_length = 16
};
enum class organ { // data structure organization
linked,
cyclic
};
enum class relat { // multiplicity of the relationship
single,
multi
@ -41,6 +46,11 @@ enum class trans { // transmission
broadcast
};
// producer-consumer policy declaration
template <organ Oz, relat Rp, relat Rc, trans Ts>
struct prod_cons;
// concept helpers
template <bool Cond, typename R>

View File

@ -7,7 +7,7 @@
#include "def.h"
#include "buffer.h"
#include "shm.h"
#include "circ_queue.h"
#include "queue.h"
namespace ipc {
@ -138,8 +138,7 @@ public:
* (one producer/server/sender to multi consumers/clients/receivers)
*/
using route = channel_ipml<channel_detail<
circ::queue,
circ::prod_cons<relat::single, relat::multi, trans::broadcast>
ipc::queue, ipc::prod_cons_circ<relat::single, relat::multi, trans::broadcast>
>>;
/*
@ -151,8 +150,7 @@ using route = channel_ipml<channel_detail<
*/
using channel = channel_ipml<channel_detail<
circ::queue,
circ::prod_cons<relat::multi, relat::multi, trans::broadcast>
ipc::queue, ipc::prod_cons_circ<relat::multi, relat::multi, trans::broadcast>
>>;
} // namespace ipc

View File

@ -14,13 +14,12 @@
#include "circ_elem_array.h"
namespace ipc {
namespace circ {
template <typename T,
typename Policy = circ::prod_cons<relat::single, relat::multi, trans::broadcast>>
typename Policy = ipc::prod_cons_circ<relat::single, relat::multi, trans::broadcast>>
class queue {
public:
using array_t = elem_array<sizeof(T), Policy>;
using array_t = circ::elem_array<sizeof(T), Policy>;
using policy_t = typename array_t::policy_t;
private:
@ -113,5 +112,4 @@ public:
}
};
} // namespace circ
} // namespace ipc

View File

@ -27,7 +27,7 @@ template <template <typename...> class Queue, typename Policy>
struct detail;
template <typename Policy>
struct detail<circ::queue, Policy> {
struct detail<ipc::queue, Policy> {
#pragma pack(1)
struct msg_t {
@ -38,7 +38,7 @@ struct msg_t {
};
#pragma pack()
using queue_t = circ::queue<msg_t, Policy>;
using queue_t = ipc::queue<msg_t, Policy>;
struct shm_info_t {
typename queue_t::array_t elems_; // the circ_elem_array in shm
@ -215,7 +215,7 @@ static buff_t recv(handle_t h) {
}
}
}; // detail<circ::queue>
}; // detail<ipc::queue>
} // internal-linkage
@ -261,10 +261,10 @@ buff_t channel_detail<Queue, Policy>::recv(handle_t h) {
return detail<Queue, Policy>::recv(h);
}
template struct channel_detail<circ::queue, circ::prod_cons<relat::single, relat::single, trans::unicast >>;
template struct channel_detail<circ::queue, circ::prod_cons<relat::single, relat::multi , trans::unicast >>;
template struct channel_detail<circ::queue, circ::prod_cons<relat::multi , relat::multi , trans::unicast >>;
template struct channel_detail<circ::queue, circ::prod_cons<relat::single, relat::multi , trans::broadcast>>;
template struct channel_detail<circ::queue, circ::prod_cons<relat::multi , relat::multi , trans::broadcast>>;
template struct channel_detail<ipc::queue, ipc::prod_cons_circ<relat::single, relat::single, trans::unicast >>;
template struct channel_detail<ipc::queue, ipc::prod_cons_circ<relat::single, relat::multi , trans::unicast >>;
template struct channel_detail<ipc::queue, ipc::prod_cons_circ<relat::multi , relat::multi , trans::unicast >>;
template struct channel_detail<ipc::queue, ipc::prod_cons_circ<relat::single, relat::multi , trans::broadcast>>;
template struct channel_detail<ipc::queue, ipc::prod_cons_circ<relat::multi , relat::multi , trans::broadcast>>;
} // namespace ipc

View File

@ -7,7 +7,7 @@
#include <unordered_map>
#include "circ_elem_array.h"
#include "circ_queue.h"
#include "queue.h"
#include "memory/resource.hpp"
#include "test.h"
@ -19,9 +19,9 @@ struct msg_t {
};
using cq_t = ipc::circ::elem_array<sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::single,
ipc::relat::multi,
ipc::trans::broadcast>>;
ipc::prod_cons_circ<ipc::relat::single,
ipc::relat::multi,
ipc::trans::broadcast>>;
cq_t* cq__;
bool operator==(msg_t const & m1, msg_t const & m2) {
@ -67,7 +67,7 @@ struct test_verify<ipc::circ::elem_array<D, P>> {
};
template <ipc::relat Rp>
struct test_verify<ipc::circ::prod_cons<Rp,
struct test_verify<ipc::prod_cons_circ<Rp,
ipc::relat::multi,
ipc::trans::unicast>
> : test_verify<cq_t> {
@ -94,12 +94,12 @@ template <typename P>
struct quit_mode;
template <ipc::relat Rp, ipc::relat Rc>
struct quit_mode<ipc::circ::prod_cons<Rp, Rc, ipc::trans::unicast>> {
struct quit_mode<ipc::prod_cons_circ<Rp, Rc, ipc::trans::unicast>> {
using type = volatile bool;
};
template <ipc::relat Rp, ipc::relat Rc>
struct quit_mode<ipc::circ::prod_cons<Rp, Rc, ipc::trans::broadcast>> {
struct quit_mode<ipc::prod_cons_circ<Rp, Rc, ipc::trans::broadcast>> {
struct type {
constexpr type(bool) {}
constexpr operator bool() const { return false; }
@ -164,8 +164,8 @@ struct test_cq<ipc::circ::elem_array<D, P>> {
};
template <typename... T>
struct test_cq<ipc::circ::queue<T...>> {
using cn_t = ipc::circ::queue<T...>;
struct test_cq<ipc::queue<T...>> {
using cn_t = ipc::queue<T...>;
using ca_t = typename cn_t::array_t;
ca_t* ca_;
@ -263,27 +263,27 @@ void test_prod_cons() {
void Unit::test_prod_cons_1v1() {
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::single,
ipc::relat::single,
ipc::trans::unicast>
ipc::prod_cons_circ<ipc::relat::single,
ipc::relat::single,
ipc::trans::unicast>
> el_arr_ssu;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ssu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ssu);
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::single,
ipc::relat::multi,
ipc::trans::unicast>
ipc::prod_cons_circ<ipc::relat::single,
ipc::relat::multi,
ipc::trans::unicast>
> el_arr_smu;
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smu);
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::unicast>
ipc::prod_cons_circ<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::unicast>
> el_arr_mmu;
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmu);
@ -293,9 +293,9 @@ void Unit::test_prod_cons_1v1() {
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::broadcast>
ipc::prod_cons_circ<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::broadcast>
> el_arr_mmb;
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_mmb);
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb);
@ -304,18 +304,18 @@ void Unit::test_prod_cons_1v1() {
void Unit::test_prod_cons_1v3() {
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::single,
ipc::relat::multi,
ipc::trans::unicast>
ipc::prod_cons_circ<ipc::relat::single,
ipc::relat::multi,
ipc::trans::unicast>
> el_arr_smu;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smu);
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::unicast>
ipc::prod_cons_circ<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::unicast>
> el_arr_mmu;
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmu);
@ -325,9 +325,9 @@ void Unit::test_prod_cons_1v3() {
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::broadcast>
ipc::prod_cons_circ<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::broadcast>
> el_arr_mmb;
benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_mmb);
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmb);
@ -336,9 +336,9 @@ void Unit::test_prod_cons_1v3() {
void Unit::test_prod_cons_performance() {
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::single,
ipc::relat::multi,
ipc::trans::unicast>
ipc::prod_cons_circ<ipc::relat::single,
ipc::relat::multi,
ipc::trans::unicast>
> el_arr_smu;
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_smu](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smu);
@ -351,9 +351,9 @@ void Unit::test_prod_cons_performance() {
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::unicast>
ipc::prod_cons_circ<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::unicast>
> el_arr_mmu;
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmu](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmu);
@ -367,9 +367,9 @@ void Unit::test_prod_cons_performance() {
ipc::circ::elem_array<
sizeof(msg_t),
ipc::circ::prod_cons<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::broadcast>
ipc::prod_cons_circ<ipc::relat::multi,
ipc::relat::multi,
ipc::trans::broadcast>
> el_arr_mmb;
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [&el_arr_mmb](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmb);
@ -383,7 +383,7 @@ void Unit::test_prod_cons_performance() {
}
void Unit::test_queue() {
ipc::circ::queue<msg_t> queue;
ipc::queue<msg_t> queue;
queue.push(msg_t { 1, 2 });
QCOMPARE(queue.pop(), msg_t{});
QVERIFY(sizeof(decltype(queue)::array_t) <= sizeof(*cq__));
@ -393,7 +393,7 @@ void Unit::test_queue() {
QVERIFY(queue.detach() != nullptr);
ipc::mem::detail::static_for(std::make_index_sequence<10>{}, [](auto index) {
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((ipc::circ::queue<msg_t>*)nullptr);
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((ipc::queue<msg_t>*)nullptr);
});
}