mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
refactor
This commit is contained in:
parent
799e5db963
commit
12119f0f65
@ -17,9 +17,6 @@ HEADERS += \
|
|||||||
../include/export.h \
|
../include/export.h \
|
||||||
../include/def.h \
|
../include/def.h \
|
||||||
../include/shm.h \
|
../include/shm.h \
|
||||||
../include/elem_def.h \
|
|
||||||
../include/elem_circ.h \
|
|
||||||
../include/elem_link.h \
|
|
||||||
../include/waiter.h \
|
../include/waiter.h \
|
||||||
../include/queue.h \
|
../include/queue.h \
|
||||||
../include/ipc.h \
|
../include/ipc.h \
|
||||||
@ -28,11 +25,15 @@ HEADERS += \
|
|||||||
../include/pool_alloc.h \
|
../include/pool_alloc.h \
|
||||||
../include/buffer.h \
|
../include/buffer.h \
|
||||||
../src/memory/detail.h \
|
../src/memory/detail.h \
|
||||||
../src/memory/alloc.hpp \
|
../src/memory/alloc.h \
|
||||||
../src/memory/wrapper.hpp \
|
../src/memory/wrapper.h \
|
||||||
../src/memory/resource.hpp \
|
../src/memory/resource.h \
|
||||||
../src/platform/detail.h \
|
../src/platform/detail.h \
|
||||||
../src/platform/waiter.h
|
../src/platform/waiter.h \
|
||||||
|
../src/circ/elem_def.h \
|
||||||
|
../src/circ/elem_array.h \
|
||||||
|
../src/prod_cons.h \
|
||||||
|
../src/policy.h
|
||||||
|
|
||||||
SOURCES += \
|
SOURCES += \
|
||||||
../src/shm.cpp \
|
../src/shm.cpp \
|
||||||
|
|||||||
@ -49,11 +49,6 @@ enum : std::size_t {
|
|||||||
data_length = 16
|
data_length = 16
|
||||||
};
|
};
|
||||||
|
|
||||||
enum class orgnz { // data structure organization
|
|
||||||
linked,
|
|
||||||
cyclic
|
|
||||||
};
|
|
||||||
|
|
||||||
enum class relat { // multiplicity of the relationship
|
enum class relat { // multiplicity of the relationship
|
||||||
single,
|
single,
|
||||||
multi
|
multi
|
||||||
@ -64,10 +59,10 @@ enum class trans { // transmission
|
|||||||
broadcast
|
broadcast
|
||||||
};
|
};
|
||||||
|
|
||||||
// producer-consumer policy declaration
|
// producer-consumer policy flag
|
||||||
|
|
||||||
template <orgnz Oz, relat Rp, relat Rc, trans Ts>
|
template <relat Rp, relat Rc, trans Ts>
|
||||||
struct prod_cons;
|
struct prod_cons {};
|
||||||
|
|
||||||
// concept helpers
|
// concept helpers
|
||||||
|
|
||||||
|
|||||||
@ -1,323 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
#include <thread>
|
|
||||||
#include <cstring>
|
|
||||||
#include <utility>
|
|
||||||
#include <type_traits>
|
|
||||||
|
|
||||||
#include "def.h"
|
|
||||||
#include "rw_lock.h"
|
|
||||||
#include "elem_def.h"
|
|
||||||
|
|
||||||
#include "platform/waiter.h"
|
|
||||||
|
|
||||||
namespace ipc {
|
|
||||||
|
|
||||||
namespace circ {
|
|
||||||
namespace detail {
|
|
||||||
|
|
||||||
using u1_t = uint_t<8>;
|
|
||||||
using u2_t = uint_t<16>;
|
|
||||||
|
|
||||||
constexpr u1_t index_of(u2_t c) noexcept {
|
|
||||||
return static_cast<u1_t>(c);
|
|
||||||
}
|
|
||||||
|
|
||||||
struct elem_head {
|
|
||||||
std::atomic<std::size_t> rc_ { 0 }; // read counter
|
|
||||||
};
|
|
||||||
|
|
||||||
template <std::size_t DataSize>
|
|
||||||
struct elem_t {
|
|
||||||
elem_head head_;
|
|
||||||
byte_t data_[DataSize] {};
|
|
||||||
};
|
|
||||||
|
|
||||||
template <>
|
|
||||||
struct elem_t<0> {
|
|
||||||
elem_head head_;
|
|
||||||
};
|
|
||||||
|
|
||||||
template <std::size_t S>
|
|
||||||
elem_t<S>* elem_of(void* ptr) noexcept {
|
|
||||||
return reinterpret_cast<elem_t<S>*>(static_cast<byte_t*>(ptr) - sizeof(elem_head));
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace detail
|
|
||||||
} // namespace circ
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////
|
|
||||||
/// producer-consumer policies
|
|
||||||
////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
template <>
|
|
||||||
struct prod_cons<orgnz::cyclic, relat::single, relat::single, trans::unicast> {
|
|
||||||
std::atomic<circ::detail::u2_t> rd_ { 0 }; // read index
|
|
||||||
std::atomic<circ::detail::u2_t> wt_ { 0 }; // write index
|
|
||||||
|
|
||||||
#if __cplusplus >= 201703L
|
|
||||||
template <std::size_t DataSize>
|
|
||||||
constexpr static std::size_t elem_param = DataSize - sizeof(circ::detail::elem_head);
|
|
||||||
#else /*__cplusplus < 201703L*/
|
|
||||||
template <std::size_t DataSize>
|
|
||||||
struct elem_param {
|
|
||||||
enum : std::size_t {
|
|
||||||
value = DataSize - sizeof(circ::detail::elem_head)
|
|
||||||
};
|
|
||||||
};
|
|
||||||
#endif/*__cplusplus < 201703L*/
|
|
||||||
|
|
||||||
constexpr circ::detail::u2_t cursor() const noexcept {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename E, typename F, std::size_t S>
|
|
||||||
bool push(E* /*elems*/, F&& f, circ::detail::elem_t<S>* elem_start) {
|
|
||||||
auto cur_wt = circ::detail::index_of(wt_.load(std::memory_order_acquire));
|
|
||||||
if (cur_wt == circ::detail::index_of(rd_.load(std::memory_order_relaxed) - 1)) {
|
|
||||||
return false; // full
|
|
||||||
}
|
|
||||||
std::forward<F>(f)(elem_start + cur_wt);
|
|
||||||
wt_.fetch_add(1, std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename E, typename F, std::size_t S>
|
|
||||||
bool pop(E* /*elems*/, circ::detail::u2_t& /*cur*/, F&& f, circ::detail::elem_t<S>* elem_start) noexcept {
|
|
||||||
auto cur_rd = circ::detail::index_of(rd_.load(std::memory_order_acquire));
|
|
||||||
if (cur_rd == circ::detail::index_of(wt_.load(std::memory_order_relaxed))) {
|
|
||||||
return false; // empty
|
|
||||||
}
|
|
||||||
std::forward<F>(f)(elem_start + cur_rd);
|
|
||||||
rd_.fetch_add(1, std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
template <>
|
|
||||||
struct prod_cons<orgnz::cyclic, relat::single, relat::multi , trans::unicast>
|
|
||||||
: prod_cons<orgnz::cyclic, relat::single, relat::single, trans::unicast> {
|
|
||||||
|
|
||||||
template <typename E, typename F, std::size_t S>
|
|
||||||
bool pop(E* /*elems*/, circ::detail::u2_t& /*cur*/, F&& f, circ::detail::elem_t<S>* elem_start) noexcept {
|
|
||||||
byte_t buff[sizeof(circ::detail::elem_t<S>)];
|
|
||||||
for (unsigned k = 0;;) {
|
|
||||||
auto cur_rd = rd_.load(std::memory_order_acquire);
|
|
||||||
if (circ::detail::index_of(cur_rd) ==
|
|
||||||
circ::detail::index_of(wt_.load(std::memory_order_relaxed))) {
|
|
||||||
return false; // empty
|
|
||||||
}
|
|
||||||
std::memcpy(buff, elem_start + circ::detail::index_of(cur_rd), sizeof(buff));
|
|
||||||
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
|
||||||
std::forward<F>(f)(buff);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
ipc::yield(k);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
template <>
|
|
||||||
struct prod_cons<orgnz::cyclic, relat::multi , relat::multi, trans::unicast>
|
|
||||||
: prod_cons<orgnz::cyclic, relat::single, relat::multi, trans::unicast> {
|
|
||||||
|
|
||||||
std::atomic<circ::detail::u2_t> ct_ { 0 }; // commit index
|
|
||||||
|
|
||||||
template <typename E, typename F, std::size_t S>
|
|
||||||
bool push(E* /*elems*/, F&& f, circ::detail::elem_t<S>* elem_start) {
|
|
||||||
circ::detail::u2_t cur_ct, nxt_ct;
|
|
||||||
while(1) {
|
|
||||||
cur_ct = ct_.load(std::memory_order_acquire);
|
|
||||||
if (circ::detail::index_of(nxt_ct = cur_ct + 1) ==
|
|
||||||
circ::detail::index_of(rd_.load(std::memory_order_relaxed))) {
|
|
||||||
return false; // full
|
|
||||||
}
|
|
||||||
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
std::this_thread::yield();
|
|
||||||
}
|
|
||||||
std::forward<F>(f)(elem_start + circ::detail::index_of(cur_ct));
|
|
||||||
while(1) {
|
|
||||||
auto exp_wt = cur_ct;
|
|
||||||
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
std::this_thread::yield();
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
template <>
|
|
||||||
struct prod_cons<orgnz::cyclic, relat::single, relat::multi, trans::broadcast> {
|
|
||||||
std::atomic<circ::detail::u2_t> wt_ { 0 }; // write index
|
|
||||||
|
|
||||||
#if __cplusplus >= 201703L
|
|
||||||
template <std::size_t DataSize>
|
|
||||||
constexpr static std::size_t elem_param = DataSize;
|
|
||||||
#else /*__cplusplus < 201703L*/
|
|
||||||
template <std::size_t DataSize>
|
|
||||||
struct elem_param { enum : std::size_t { value = DataSize }; };
|
|
||||||
#endif/*__cplusplus < 201703L*/
|
|
||||||
|
|
||||||
/*
|
|
||||||
<Remarks> std::atomic<T> may not have value_type.
|
|
||||||
See: https://stackoverflow.com/questions/53648614/what-happened-to-stdatomicxvalue-type
|
|
||||||
*/
|
|
||||||
using rc_t = decltype(circ::detail::elem_head::rc_.load());
|
|
||||||
|
|
||||||
circ::detail::u2_t cursor() const noexcept {
|
|
||||||
return wt_.load(std::memory_order_acquire);
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename E, typename F, std::size_t S>
|
|
||||||
bool push(E* elems, F&& f, circ::detail::elem_t<S>* elem_start) {
|
|
||||||
auto conn_cnt = elems->conn_count(); // acquire
|
|
||||||
if (conn_cnt == 0) return false;
|
|
||||||
auto el = elem_start + circ::detail::index_of(wt_.load(std::memory_order_relaxed));
|
|
||||||
// check all consumers have finished reading this element
|
|
||||||
while(1) {
|
|
||||||
rc_t expected = 0;
|
|
||||||
if (el->head_.rc_.compare_exchange_weak(
|
|
||||||
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
std::this_thread::yield();
|
|
||||||
conn_cnt = elems->conn_count(); // acquire
|
|
||||||
if (conn_cnt == 0) return false;
|
|
||||||
}
|
|
||||||
std::forward<F>(f)(el->data_);
|
|
||||||
wt_.fetch_add(1, std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename E, typename F, std::size_t S>
|
|
||||||
bool pop(E* /*elems*/, circ::detail::u2_t& cur, F&& f, circ::detail::elem_t<S>* elem_start) noexcept {
|
|
||||||
if (cur == cursor()) return false; // acquire
|
|
||||||
auto el = elem_start + circ::detail::index_of(cur++);
|
|
||||||
std::forward<F>(f)(el->data_);
|
|
||||||
for (unsigned k = 0;;) {
|
|
||||||
rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire);
|
|
||||||
if (cur_rc == 0) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (el->head_.rc_.compare_exchange_weak(
|
|
||||||
cur_rc, cur_rc - 1, std::memory_order_release)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
ipc::yield(k);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
template <>
|
|
||||||
struct prod_cons<orgnz::cyclic, relat::multi , relat::multi, trans::broadcast>
|
|
||||||
: prod_cons<orgnz::cyclic, relat::single, relat::multi, trans::broadcast> {
|
|
||||||
|
|
||||||
std::atomic<circ::detail::u2_t> ct_ { 0 }; // commit index
|
|
||||||
|
|
||||||
template <typename E, typename F, std::size_t S>
|
|
||||||
bool push(E* elems, F&& f, circ::detail::elem_t<S>* elem_start) {
|
|
||||||
auto conn_cnt = elems->conn_count(); // acquire
|
|
||||||
if (conn_cnt == 0) return false;
|
|
||||||
circ::detail::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_relaxed),
|
|
||||||
nxt_ct = cur_ct + 1;
|
|
||||||
auto el = elem_start + circ::detail::index_of(cur_ct);
|
|
||||||
// check all consumers have finished reading this element
|
|
||||||
while(1) {
|
|
||||||
rc_t expected = 0;
|
|
||||||
if (el->head_.rc_.compare_exchange_weak(
|
|
||||||
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
std::this_thread::yield();
|
|
||||||
conn_cnt = elems->conn_count(); // acquire
|
|
||||||
if (conn_cnt == 0) return false;
|
|
||||||
}
|
|
||||||
std::forward<F>(f)(el->data_);
|
|
||||||
while(1) {
|
|
||||||
auto exp_wt = cur_ct;
|
|
||||||
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
std::this_thread::yield();
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
template <relat Rp, relat Rc, trans Ts>
|
|
||||||
using prod_cons_circ = prod_cons<orgnz::cyclic, Rp, Rc, Ts>;
|
|
||||||
|
|
||||||
namespace circ {
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////
|
|
||||||
/// element-array implementation
|
|
||||||
////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
template <std::size_t DataSize, typename Policy>
|
|
||||||
class elem_array : private Policy {
|
|
||||||
public:
|
|
||||||
using policy_t = Policy;
|
|
||||||
using base_t = Policy;
|
|
||||||
using head_t = ipc::conn_head<detail::u2_t>;
|
|
||||||
#if __cplusplus >= 201703L
|
|
||||||
using elem_t = detail::elem_t<policy_t::template elem_param<DataSize>>;
|
|
||||||
#else /*__cplusplus < 201703L*/
|
|
||||||
using elem_t = detail::elem_t<policy_t::template elem_param<DataSize>::value>;
|
|
||||||
#endif/*__cplusplus < 201703L*/
|
|
||||||
|
|
||||||
enum : std::size_t {
|
|
||||||
head_size = sizeof(policy_t) + sizeof(head_t),
|
|
||||||
data_size = DataSize,
|
|
||||||
elem_max = (std::numeric_limits<uint_t<8>>::max)() + 1, // default is 255 + 1
|
|
||||||
elem_size = sizeof(elem_t),
|
|
||||||
block_size = elem_size * elem_max
|
|
||||||
};
|
|
||||||
|
|
||||||
private:
|
|
||||||
head_t head_;
|
|
||||||
ipc::detail::waiter waiter_;
|
|
||||||
elem_t block_[elem_max];
|
|
||||||
|
|
||||||
public:
|
|
||||||
elem_array() = default;
|
|
||||||
elem_array(const elem_array&) = delete;
|
|
||||||
elem_array& operator=(const elem_array&) = delete;
|
|
||||||
|
|
||||||
auto & waiter() { return this->waiter_; }
|
|
||||||
auto const & waiter() const { return this->waiter_; }
|
|
||||||
|
|
||||||
auto & conn_waiter() { return head_.conn_waiter(); }
|
|
||||||
auto const & conn_waiter() const { return head_.conn_waiter(); }
|
|
||||||
|
|
||||||
std::size_t connect () noexcept { return head_.connect (); }
|
|
||||||
std::size_t disconnect() noexcept { return head_.disconnect(); }
|
|
||||||
std::size_t conn_count() const noexcept { return head_.conn_count(); }
|
|
||||||
|
|
||||||
using base_t::cursor;
|
|
||||||
|
|
||||||
template <typename F, typename... P>
|
|
||||||
bool push(F&& f) noexcept {
|
|
||||||
return base_t::push(this, std::forward<F>(f), block_);
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename F>
|
|
||||||
bool pop(detail::u2_t* cur, F&& f) noexcept {
|
|
||||||
if (cur == nullptr) return false;
|
|
||||||
return base_t::pop(this, *cur, std::forward<F>(f), block_);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
template <relat Rp, relat Rc, trans Ts>
|
|
||||||
struct prod_cons {
|
|
||||||
using is_fixed = std::true_type;
|
|
||||||
|
|
||||||
template <std::size_t DataSize>
|
|
||||||
using elems_t = elem_array<DataSize, prod_cons_circ<Rp, Rc, Ts>>;
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace circ
|
|
||||||
} // namespace ipc
|
|
||||||
@ -1,32 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
#include <cstddef>
|
|
||||||
#include <cstdint>
|
|
||||||
|
|
||||||
#include "platform/waiter.h"
|
|
||||||
|
|
||||||
namespace ipc {
|
|
||||||
|
|
||||||
template <typename U2>
|
|
||||||
struct conn_head {
|
|
||||||
ipc::detail::waiter cc_waiter_;
|
|
||||||
std::atomic<U2> cc_ { 0 }; // connection counter
|
|
||||||
|
|
||||||
auto & conn_waiter() { return this->cc_waiter_; }
|
|
||||||
auto const & conn_waiter() const { return this->cc_waiter_; }
|
|
||||||
|
|
||||||
std::size_t connect() noexcept {
|
|
||||||
return cc_.fetch_add(1, std::memory_order_release);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::size_t disconnect() noexcept {
|
|
||||||
return cc_.fetch_sub(1, std::memory_order_release);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
|
|
||||||
return cc_.load(order);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace ipc
|
|
||||||
@ -1,18 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <atomic>
|
|
||||||
#include <cstddef>
|
|
||||||
#include <cstdint>
|
|
||||||
|
|
||||||
#include "def.h"
|
|
||||||
#include "rw_lock.h"
|
|
||||||
#include "elem_def.h"
|
|
||||||
|
|
||||||
namespace ipc {
|
|
||||||
|
|
||||||
namespace link {
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
} // namespace link
|
|
||||||
} // namespace ipc
|
|
||||||
@ -7,14 +7,13 @@
|
|||||||
#include "def.h"
|
#include "def.h"
|
||||||
#include "buffer.h"
|
#include "buffer.h"
|
||||||
#include "shm.h"
|
#include "shm.h"
|
||||||
#include "elem_circ.h"
|
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
|
|
||||||
using handle_t = void*;
|
using handle_t = void*;
|
||||||
using buff_t = buffer;
|
using buff_t = buffer;
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Flag>
|
||||||
struct IPC_EXPORT channel_detail {
|
struct IPC_EXPORT channel_detail {
|
||||||
static handle_t connect (char const * name);
|
static handle_t connect (char const * name);
|
||||||
static void disconnect(handle_t h);
|
static void disconnect(handle_t h);
|
||||||
@ -138,7 +137,7 @@ public:
|
|||||||
* (one producer/server/sender to multi consumers/clients/receivers)
|
* (one producer/server/sender to multi consumers/clients/receivers)
|
||||||
*/
|
*/
|
||||||
using route = channel_impl<channel_detail<
|
using route = channel_impl<channel_detail<
|
||||||
ipc::circ::prod_cons<relat::single, relat::multi, trans::broadcast>
|
ipc::prod_cons<relat::single, relat::multi, trans::broadcast>
|
||||||
>>;
|
>>;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -150,7 +149,7 @@ using route = channel_impl<channel_detail<
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
using channel = channel_impl<channel_detail<
|
using channel = channel_impl<channel_detail<
|
||||||
ipc::circ::prod_cons<relat::multi, relat::multi, trans::broadcast>
|
ipc::prod_cons<relat::multi, relat::multi, trans::broadcast>
|
||||||
>>;
|
>>;
|
||||||
|
|
||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
|
|||||||
61
src/circ/elem_array.h
Normal file
61
src/circ/elem_array.h
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <thread>
|
||||||
|
#include <cstring>
|
||||||
|
#include <utility>
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
|
#include "def.h"
|
||||||
|
#include "rw_lock.h"
|
||||||
|
|
||||||
|
#include "circ/elem_def.h"
|
||||||
|
#include "platform/detail.h"
|
||||||
|
|
||||||
|
namespace ipc {
|
||||||
|
namespace circ {
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////
|
||||||
|
/// element-array implementation
|
||||||
|
////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
template <std::size_t DataSize, typename Policy>
|
||||||
|
class elem_array : public ipc::circ::conn_head {
|
||||||
|
public:
|
||||||
|
using base_t = ipc::circ::conn_head;
|
||||||
|
using policy_t = Policy;
|
||||||
|
#if __cplusplus >= 201703L
|
||||||
|
using elem_t = ipc::circ::elem_t<policy_t::template elem_param<DataSize>>;
|
||||||
|
#else /*__cplusplus < 201703L*/
|
||||||
|
using elem_t = ipc::circ::elem_t<policy_t::template elem_param<DataSize>::value>;
|
||||||
|
#endif/*__cplusplus < 201703L*/
|
||||||
|
|
||||||
|
enum : std::size_t {
|
||||||
|
head_size = sizeof(base_t) + sizeof(policy_t),
|
||||||
|
data_size = DataSize,
|
||||||
|
elem_max = (std::numeric_limits<uint_t<8>>::max)() + 1, // default is 255 + 1
|
||||||
|
elem_size = sizeof(elem_t),
|
||||||
|
block_size = elem_size * elem_max
|
||||||
|
};
|
||||||
|
|
||||||
|
private:
|
||||||
|
policy_t head_;
|
||||||
|
elem_t block_[elem_max];
|
||||||
|
|
||||||
|
public:
|
||||||
|
auto cursor() const noexcept { return head_.cursor(); }
|
||||||
|
|
||||||
|
template <typename F>
|
||||||
|
bool push(F&& f) {
|
||||||
|
return head_.push(this, std::forward<F>(f), block_);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename F>
|
||||||
|
bool pop(decltype(std::declval<policy_t>().cursor())* cur, F&& f) {
|
||||||
|
if (cur == nullptr) return false;
|
||||||
|
return head_.pop(this, *cur, std::forward<F>(f), block_);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace circ
|
||||||
|
} // namespace ipc
|
||||||
68
src/circ/elem_def.h
Normal file
68
src/circ/elem_def.h
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
|
#include "platform/waiter.h"
|
||||||
|
|
||||||
|
namespace ipc {
|
||||||
|
namespace circ {
|
||||||
|
|
||||||
|
struct elem_head {
|
||||||
|
std::atomic<std::size_t> rc_ { 0 }; // read-counter
|
||||||
|
};
|
||||||
|
|
||||||
|
template <std::size_t DataSize>
|
||||||
|
struct elem_t {
|
||||||
|
elem_head head_;
|
||||||
|
byte_t data_[DataSize] {};
|
||||||
|
};
|
||||||
|
|
||||||
|
template <>
|
||||||
|
struct elem_t<0> {
|
||||||
|
elem_head head_;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <std::size_t S>
|
||||||
|
elem_t<S>* elem_of(void* ptr) noexcept {
|
||||||
|
return reinterpret_cast<elem_t<S>*>(static_cast<byte_t*>(ptr) - sizeof(elem_head));
|
||||||
|
}
|
||||||
|
|
||||||
|
using u1_t = ipc::uint_t<8>;
|
||||||
|
using u2_t = ipc::uint_t<16>;
|
||||||
|
|
||||||
|
constexpr u1_t index_of(u2_t c) noexcept {
|
||||||
|
return static_cast<u1_t>(c);
|
||||||
|
}
|
||||||
|
|
||||||
|
class conn_head {
|
||||||
|
ipc::detail::waiter cc_waiter_, waiter_;
|
||||||
|
std::atomic<std::size_t> cc_ { 0 }; // connection counter
|
||||||
|
|
||||||
|
public:
|
||||||
|
conn_head() = default;
|
||||||
|
conn_head(const conn_head&) = delete;
|
||||||
|
conn_head& operator=(const conn_head&) = delete;
|
||||||
|
|
||||||
|
auto & waiter() noexcept { return this->waiter_; }
|
||||||
|
auto const & waiter() const noexcept { return this->waiter_; }
|
||||||
|
|
||||||
|
auto & conn_waiter() noexcept { return this->cc_waiter_; }
|
||||||
|
auto const & conn_waiter() const noexcept { return this->cc_waiter_; }
|
||||||
|
|
||||||
|
std::size_t connect() noexcept {
|
||||||
|
return cc_.fetch_add(1, std::memory_order_release);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t disconnect() noexcept {
|
||||||
|
return cc_.fetch_sub(1, std::memory_order_release);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
|
||||||
|
return cc_.load(order);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace circ
|
||||||
|
} // namespace ipc
|
||||||
64
src/ipc.cpp
64
src/ipc.cpp
@ -11,7 +11,8 @@
|
|||||||
#include "tls_pointer.h"
|
#include "tls_pointer.h"
|
||||||
#include "queue.h"
|
#include "queue.h"
|
||||||
|
|
||||||
#include "memory/resource.hpp"
|
#include "policy.h"
|
||||||
|
#include "memory/resource.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
@ -217,54 +218,57 @@ static buff_t recv(ipc::handle_t h) {
|
|||||||
|
|
||||||
}; // detail_impl<Policy>
|
}; // detail_impl<Policy>
|
||||||
|
|
||||||
|
template <typename Flag>
|
||||||
|
using policy_t = policy::choose<circ::elem_array, Flag>;
|
||||||
|
|
||||||
} // internal-linkage
|
} // internal-linkage
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Flag>
|
||||||
ipc::handle_t channel_detail<Policy>::connect(char const * name) {
|
ipc::handle_t channel_detail<Flag>::connect(char const * name) {
|
||||||
return detail_impl<Policy>::connect(name);
|
return detail_impl<policy_t<Flag>>::connect(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Flag>
|
||||||
void channel_detail<Policy>::disconnect(ipc::handle_t h) {
|
void channel_detail<Flag>::disconnect(ipc::handle_t h) {
|
||||||
detail_impl<Policy>::disconnect(h);
|
detail_impl<policy_t<Flag>>::disconnect(h);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Flag>
|
||||||
std::size_t channel_detail<Policy>::recv_count(ipc::handle_t h) {
|
std::size_t channel_detail<Flag>::recv_count(ipc::handle_t h) {
|
||||||
return detail_impl<Policy>::recv_count(h);
|
return detail_impl<policy_t<Flag>>::recv_count(h);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Flag>
|
||||||
bool channel_detail<Policy>::wait_for_recv(ipc::handle_t h, std::size_t r_count) {
|
bool channel_detail<Flag>::wait_for_recv(ipc::handle_t h, std::size_t r_count) {
|
||||||
return detail_impl<Policy>::wait_for_recv(h, r_count);
|
return detail_impl<policy_t<Flag>>::wait_for_recv(h, r_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Flag>
|
||||||
void channel_detail<Policy>::clear_recv(ipc::handle_t h) {
|
void channel_detail<Flag>::clear_recv(ipc::handle_t h) {
|
||||||
detail_impl<Policy>::clear_recv(h);
|
detail_impl<policy_t<Flag>>::clear_recv(h);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Flag>
|
||||||
void channel_detail<Policy>::clear_recv(char const * name) {
|
void channel_detail<Flag>::clear_recv(char const * name) {
|
||||||
detail_impl<Policy>::clear_recv(name);
|
detail_impl<policy_t<Flag>>::clear_recv(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Flag>
|
||||||
bool channel_detail<Policy>::send(ipc::handle_t h, void const * data, std::size_t size) {
|
bool channel_detail<Flag>::send(ipc::handle_t h, void const * data, std::size_t size) {
|
||||||
return detail_impl<Policy>::send(h, data, size);
|
return detail_impl<policy_t<Flag>>::send(h, data, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Policy>
|
template <typename Flag>
|
||||||
buff_t channel_detail<Policy>::recv(ipc::handle_t h) {
|
buff_t channel_detail<Flag>::recv(ipc::handle_t h) {
|
||||||
return detail_impl<Policy>::recv(h);
|
return detail_impl<policy_t<Flag>>::recv(h);
|
||||||
}
|
}
|
||||||
|
|
||||||
template struct channel_detail<ipc::circ::prod_cons<relat::single, relat::single, trans::unicast >>;
|
template struct channel_detail<ipc::prod_cons<relat::single, relat::single, trans::unicast >>;
|
||||||
template struct channel_detail<ipc::circ::prod_cons<relat::single, relat::multi , trans::unicast >>;
|
template struct channel_detail<ipc::prod_cons<relat::single, relat::multi , trans::unicast >>;
|
||||||
template struct channel_detail<ipc::circ::prod_cons<relat::multi , relat::multi , trans::unicast >>;
|
template struct channel_detail<ipc::prod_cons<relat::multi , relat::multi , trans::unicast >>;
|
||||||
template struct channel_detail<ipc::circ::prod_cons<relat::single, relat::multi , trans::broadcast>>;
|
template struct channel_detail<ipc::prod_cons<relat::single, relat::multi , trans::broadcast>>;
|
||||||
template struct channel_detail<ipc::circ::prod_cons<relat::multi , relat::multi , trans::broadcast>>;
|
template struct channel_detail<ipc::prod_cons<relat::multi , relat::multi , trans::broadcast>>;
|
||||||
|
|
||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
|
|||||||
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
#include <cstddef>
|
#include <cstddef>
|
||||||
|
|
||||||
#include "memory/alloc.hpp"
|
#include "memory/alloc.h"
|
||||||
#include "platform/detail.h"
|
#include "platform/detail.h"
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
|
|||||||
@ -9,8 +9,8 @@
|
|||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
|
|
||||||
#include "memory/alloc.hpp"
|
#include "memory/alloc.h"
|
||||||
#include "memory/wrapper.hpp"
|
#include "memory/wrapper.h"
|
||||||
#include "memory/detail.h"
|
#include "memory/detail.h"
|
||||||
#include "platform/detail.h"
|
#include "platform/detail.h"
|
||||||
|
|
||||||
@ -15,7 +15,7 @@
|
|||||||
#include "rw_lock.h"
|
#include "rw_lock.h"
|
||||||
#include "tls_pointer.h"
|
#include "tls_pointer.h"
|
||||||
|
|
||||||
#include "memory/alloc.hpp"
|
#include "memory/alloc.h"
|
||||||
#include "memory/detail.h"
|
#include "memory/detail.h"
|
||||||
#include "platform/detail.h"
|
#include "platform/detail.h"
|
||||||
|
|
||||||
@ -4,6 +4,8 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <type_traits>
|
#include <type_traits>
|
||||||
|
|
||||||
|
#include "def.h"
|
||||||
|
|
||||||
#if __cplusplus >= 201703L
|
#if __cplusplus >= 201703L
|
||||||
|
|
||||||
namespace std {
|
namespace std {
|
||||||
|
|||||||
@ -12,7 +12,7 @@
|
|||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
#include "memory/resource.hpp"
|
#include "memory/resource.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
|
|||||||
@ -8,7 +8,7 @@
|
|||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
|
|
||||||
#include "memory/resource.hpp"
|
#include "memory/resource.h"
|
||||||
#include "platform/to_tchar.h"
|
#include "platform/to_tchar.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|||||||
25
src/policy.h
Normal file
25
src/policy.h
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <type_traits>
|
||||||
|
|
||||||
|
#include "def.h"
|
||||||
|
#include "prod_cons.h"
|
||||||
|
|
||||||
|
#include "circ/elem_array.h"
|
||||||
|
|
||||||
|
namespace ipc {
|
||||||
|
namespace policy {
|
||||||
|
|
||||||
|
template <template <std::size_t, typename> class Elems, typename Flag>
|
||||||
|
struct choose;
|
||||||
|
|
||||||
|
template <typename Flag>
|
||||||
|
struct choose<circ::elem_array, Flag> {
|
||||||
|
using is_fixed = std::true_type;
|
||||||
|
|
||||||
|
template <std::size_t DataSize>
|
||||||
|
using elems_t = circ::elem_array<DataSize, ipc::prod_cons_impl<Flag>>;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace policy
|
||||||
|
} // namespace ipc
|
||||||
@ -1,6 +1,6 @@
|
|||||||
#include "pool_alloc.h"
|
#include "pool_alloc.h"
|
||||||
|
|
||||||
#include "memory/resource.hpp"
|
#include "memory/resource.h"
|
||||||
|
|
||||||
namespace ipc {
|
namespace ipc {
|
||||||
namespace mem {
|
namespace mem {
|
||||||
|
|||||||
218
src/prod_cons.h
Normal file
218
src/prod_cons.h
Normal file
@ -0,0 +1,218 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <thread>
|
||||||
|
#include <utility>
|
||||||
|
#include <cstring>
|
||||||
|
|
||||||
|
#include "def.h"
|
||||||
|
|
||||||
|
#include "circ/elem_def.h"
|
||||||
|
|
||||||
|
namespace ipc {
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////
|
||||||
|
/// producer-consumer implementation
|
||||||
|
////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
template <typename Flag>
|
||||||
|
struct prod_cons_impl;
|
||||||
|
|
||||||
|
template <>
|
||||||
|
struct prod_cons_impl<prod_cons<relat::single, relat::single, trans::unicast>> {
|
||||||
|
std::atomic<circ::u2_t> rd_ { 0 }; // read index
|
||||||
|
std::atomic<circ::u2_t> wt_ { 0 }; // write index
|
||||||
|
|
||||||
|
#if __cplusplus >= 201703L
|
||||||
|
template <std::size_t DataSize>
|
||||||
|
constexpr static std::size_t elem_param = DataSize - sizeof(circ::elem_head);
|
||||||
|
#else /*__cplusplus < 201703L*/
|
||||||
|
template <std::size_t DataSize>
|
||||||
|
struct elem_param {
|
||||||
|
enum : std::size_t {
|
||||||
|
value = DataSize - sizeof(circ::elem_head)
|
||||||
|
};
|
||||||
|
};
|
||||||
|
#endif/*__cplusplus < 201703L*/
|
||||||
|
|
||||||
|
constexpr circ::u2_t cursor() const noexcept {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename E, typename F, typename EB>
|
||||||
|
bool push(E* /*elems*/, F&& f, EB* elem_start) {
|
||||||
|
auto cur_wt = circ::index_of(wt_.load(std::memory_order_acquire));
|
||||||
|
if (cur_wt == circ::index_of(rd_.load(std::memory_order_relaxed) - 1)) {
|
||||||
|
return false; // full
|
||||||
|
}
|
||||||
|
std::forward<F>(f)(elem_start + cur_wt);
|
||||||
|
wt_.fetch_add(1, std::memory_order_release);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename E, typename F, typename EB>
|
||||||
|
bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) {
|
||||||
|
auto cur_rd = circ::index_of(rd_.load(std::memory_order_acquire));
|
||||||
|
if (cur_rd == circ::index_of(wt_.load(std::memory_order_relaxed))) {
|
||||||
|
return false; // empty
|
||||||
|
}
|
||||||
|
std::forward<F>(f)(elem_start + cur_rd);
|
||||||
|
rd_.fetch_add(1, std::memory_order_release);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <>
|
||||||
|
struct prod_cons_impl<prod_cons<relat::single, relat::multi , trans::unicast>>
|
||||||
|
: prod_cons_impl<prod_cons<relat::single, relat::single, trans::unicast>> {
|
||||||
|
|
||||||
|
template <typename E, typename F, typename EB>
|
||||||
|
bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) {
|
||||||
|
byte_t buff[sizeof(E)];
|
||||||
|
for (unsigned k = 0;;) {
|
||||||
|
auto cur_rd = rd_.load(std::memory_order_acquire);
|
||||||
|
if (circ::index_of(cur_rd) ==
|
||||||
|
circ::index_of(wt_.load(std::memory_order_relaxed))) {
|
||||||
|
return false; // empty
|
||||||
|
}
|
||||||
|
std::memcpy(buff, elem_start + circ::index_of(cur_rd), sizeof(buff));
|
||||||
|
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
||||||
|
std::forward<F>(f)(buff);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
ipc::yield(k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <>
|
||||||
|
struct prod_cons_impl<prod_cons<relat::multi , relat::multi, trans::unicast>>
|
||||||
|
: prod_cons_impl<prod_cons<relat::single, relat::multi, trans::unicast>> {
|
||||||
|
|
||||||
|
std::atomic<circ::u2_t> ct_ { 0 }; // commit index
|
||||||
|
|
||||||
|
template <typename E, typename F, typename EB>
|
||||||
|
bool push(E* /*elems*/, F&& f, EB* elem_start) {
|
||||||
|
circ::u2_t cur_ct, nxt_ct;
|
||||||
|
while(1) {
|
||||||
|
cur_ct = ct_.load(std::memory_order_acquire);
|
||||||
|
if (circ::index_of(nxt_ct = cur_ct + 1) ==
|
||||||
|
circ::index_of(rd_.load(std::memory_order_relaxed))) {
|
||||||
|
return false; // full
|
||||||
|
}
|
||||||
|
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_relaxed)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
std::forward<F>(f)(elem_start + circ::index_of(cur_ct));
|
||||||
|
while(1) {
|
||||||
|
auto exp_wt = cur_ct;
|
||||||
|
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <>
|
||||||
|
struct prod_cons_impl<prod_cons<relat::single, relat::multi, trans::broadcast>> {
|
||||||
|
std::atomic<circ::u2_t> wt_ { 0 }; // write index
|
||||||
|
|
||||||
|
#if __cplusplus >= 201703L
|
||||||
|
template <std::size_t DataSize>
|
||||||
|
constexpr static std::size_t elem_param = DataSize;
|
||||||
|
#else /*__cplusplus < 201703L*/
|
||||||
|
template <std::size_t DataSize>
|
||||||
|
struct elem_param { enum : std::size_t { value = DataSize }; };
|
||||||
|
#endif/*__cplusplus < 201703L*/
|
||||||
|
|
||||||
|
/*
|
||||||
|
<Remarks> std::atomic<T> may not have value_type.
|
||||||
|
See: https://stackoverflow.com/questions/53648614/what-happened-to-stdatomicxvalue-type
|
||||||
|
*/
|
||||||
|
using rc_t = decltype(circ::elem_head::rc_.load());
|
||||||
|
|
||||||
|
circ::u2_t cursor() const noexcept {
|
||||||
|
return wt_.load(std::memory_order_acquire);
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename E, typename F, typename EB>
|
||||||
|
bool push(E* elems, F&& f, EB* elem_start) {
|
||||||
|
auto conn_cnt = elems->conn_count(); // acquire
|
||||||
|
if (conn_cnt == 0) return false;
|
||||||
|
auto el = elem_start + circ::index_of(wt_.load(std::memory_order_relaxed));
|
||||||
|
// check all consumers have finished reading this element
|
||||||
|
while(1) {
|
||||||
|
rc_t expected = 0;
|
||||||
|
if (el->head_.rc_.compare_exchange_weak(
|
||||||
|
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::this_thread::yield();
|
||||||
|
conn_cnt = elems->conn_count(); // acquire
|
||||||
|
if (conn_cnt == 0) return false;
|
||||||
|
}
|
||||||
|
std::forward<F>(f)(el->data_);
|
||||||
|
wt_.fetch_add(1, std::memory_order_release);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename E, typename F, typename EB>
|
||||||
|
bool pop(E* /*elems*/, circ::u2_t& cur, F&& f, EB* elem_start) {
|
||||||
|
if (cur == cursor()) return false; // acquire
|
||||||
|
auto el = elem_start + circ::index_of(cur++);
|
||||||
|
std::forward<F>(f)(el->data_);
|
||||||
|
for (unsigned k = 0;;) {
|
||||||
|
rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire);
|
||||||
|
if (cur_rc == 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (el->head_.rc_.compare_exchange_weak(
|
||||||
|
cur_rc, cur_rc - 1, std::memory_order_release)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
ipc::yield(k);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
template <>
|
||||||
|
struct prod_cons_impl<prod_cons<relat::multi , relat::multi, trans::broadcast>>
|
||||||
|
: prod_cons_impl<prod_cons<relat::single, relat::multi, trans::broadcast>> {
|
||||||
|
|
||||||
|
std::atomic<circ::u2_t> ct_ { 0 }; // commit index
|
||||||
|
|
||||||
|
template <typename E, typename F, typename EB>
|
||||||
|
bool push(E* elems, F&& f, EB* elem_start) {
|
||||||
|
auto conn_cnt = elems->conn_count(); // acquire
|
||||||
|
if (conn_cnt == 0) return false;
|
||||||
|
circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_relaxed),
|
||||||
|
nxt_ct = cur_ct + 1;
|
||||||
|
auto el = elem_start + circ::index_of(cur_ct);
|
||||||
|
// check all consumers have finished reading this element
|
||||||
|
while(1) {
|
||||||
|
rc_t expected = 0;
|
||||||
|
if (el->head_.rc_.compare_exchange_weak(
|
||||||
|
expected, static_cast<rc_t>(conn_cnt), std::memory_order_relaxed)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::this_thread::yield();
|
||||||
|
conn_cnt = elems->conn_count(); // acquire
|
||||||
|
if (conn_cnt == 0) return false;
|
||||||
|
}
|
||||||
|
std::forward<F>(f)(el->data_);
|
||||||
|
while(1) {
|
||||||
|
auto exp_wt = cur_ct;
|
||||||
|
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace ipc
|
||||||
@ -12,7 +12,6 @@
|
|||||||
|
|
||||||
#include "def.h"
|
#include "def.h"
|
||||||
#include "rw_lock.h"
|
#include "rw_lock.h"
|
||||||
#include "elem_circ.h"
|
|
||||||
|
|
||||||
#include "platform/waiter.h"
|
#include "platform/waiter.h"
|
||||||
|
|
||||||
@ -146,6 +145,34 @@ public:
|
|||||||
elems_ = nullptr;
|
elems_ = nullptr;
|
||||||
return old;
|
return old;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T, typename F, typename... P>
|
||||||
|
auto push(F&& f, P&&... params) {
|
||||||
|
if (elems_ == nullptr) return false;
|
||||||
|
if (std::forward<F>(f)([&](void* p) {
|
||||||
|
::new (p) T(std::forward<P>(params)...);
|
||||||
|
})) {
|
||||||
|
this->waiter_.broadcast();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
T pop() {
|
||||||
|
if (elems_ == nullptr) {
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
T item;
|
||||||
|
for (unsigned k = 0;;) {
|
||||||
|
if (elems_->pop(&this->cursor_, [&item](void* p) {
|
||||||
|
::new (&item) T(std::move(*static_cast<T*>(p)));
|
||||||
|
})) {
|
||||||
|
return item;
|
||||||
|
}
|
||||||
|
ipc::sleep(k, [this] { return this->waiter_.wait(); });
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename Elems, typename IsFixed>
|
template <typename Elems, typename IsFixed>
|
||||||
@ -159,51 +186,46 @@ public:
|
|||||||
|
|
||||||
template <typename T, typename... P>
|
template <typename T, typename... P>
|
||||||
auto push(P&&... params) {
|
auto push(P&&... params) {
|
||||||
if (this->elems_ == nullptr) return false;
|
return base_t::template push<T>([this](auto&& f) {
|
||||||
if (this->elems_->push([&](void* p) {
|
return this->elems_->push(std::forward<decltype(f)>(f));
|
||||||
::new (p) T(std::forward<P>(params)...);
|
}, std::forward<P>(params)...);
|
||||||
})) {
|
|
||||||
this->waiter_.broadcast();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
template <typename T>
|
template <typename Elems>
|
||||||
T pop() {
|
class queue<Elems, std::false_type> : public queue_base<Elems> {
|
||||||
if (this->elems_ == nullptr) {
|
using base_t = queue_base<Elems>;
|
||||||
return {};
|
|
||||||
}
|
public:
|
||||||
T item;
|
using is_fixed = std::false_type;
|
||||||
for (unsigned k = 0;;) {
|
|
||||||
if (this->elems_->pop(&this->cursor_, [&item](void* p) {
|
using base_t::base_t;
|
||||||
::new (&item) T(std::move(*static_cast<T*>(p)));
|
|
||||||
})) {
|
template <typename T, typename... P>
|
||||||
return item;
|
auto push(P&&... params) {
|
||||||
}
|
return base_t::template push<T>([this](auto&& f) {
|
||||||
ipc::sleep(k, [this] { return this->waiter_.wait(); });
|
return this->elems_->template push<sizeof(T), alignof(T)>(std::forward<decltype(f)>(f));
|
||||||
}
|
}, std::forward<P>(params)...);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace detail
|
} // namespace detail
|
||||||
|
|
||||||
template <typename T,
|
template <typename T, typename Policy>
|
||||||
typename Policy = ipc::circ::prod_cons<relat::single, relat::multi, trans::broadcast>>
|
|
||||||
class queue : public detail::queue<typename Policy::template elems_t<sizeof(T)>, typename Policy::is_fixed> {
|
class queue : public detail::queue<typename Policy::template elems_t<sizeof(T)>, typename Policy::is_fixed> {
|
||||||
using base_t = detail::queue<typename Policy::template elems_t<sizeof(T)>, typename Policy::is_fixed>;
|
using base_t = detail::queue<typename Policy::template elems_t<sizeof(T)>, typename Policy::is_fixed>;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
using base_t::base_t;
|
using base_t::base_t;
|
||||||
|
using base_t::push;
|
||||||
|
using base_t::pop;
|
||||||
|
|
||||||
template <typename... P>
|
template <typename... P>
|
||||||
auto push(P&&... params) {
|
auto push(P&&... params) {
|
||||||
return base_t::template push<T>(std::forward<P>(params)...);
|
return base_t::template push<T>(std::forward<P>(params)...);
|
||||||
}
|
}
|
||||||
|
|
||||||
T pop() {
|
T pop() { return base_t::template pop<T>(); }
|
||||||
return base_t::template pop<T>();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace ipc
|
} // namespace ipc
|
||||||
@ -6,9 +6,11 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
|
|
||||||
#include "elem_circ.h"
|
|
||||||
#include "queue.h"
|
#include "queue.h"
|
||||||
#include "memory/resource.hpp"
|
#include "prod_cons.h"
|
||||||
|
#include "policy.h"
|
||||||
|
#include "circ/elem_array.h"
|
||||||
|
#include "memory/resource.h"
|
||||||
#include "test.h"
|
#include "test.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
@ -18,10 +20,13 @@ struct msg_t {
|
|||||||
int dat_;
|
int dat_;
|
||||||
};
|
};
|
||||||
|
|
||||||
using cq_t = ipc::circ::elem_array<sizeof(msg_t),
|
template <ipc::relat Rp, ipc::relat Rc, ipc::trans Ts>
|
||||||
ipc::prod_cons_circ<ipc::relat::single,
|
using pc_t = ipc::prod_cons_impl<ipc::prod_cons<Rp, Rc, Ts>>;
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::broadcast>>;
|
using cq_t = ipc::circ::elem_array<
|
||||||
|
sizeof(msg_t),
|
||||||
|
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
|
||||||
|
>;
|
||||||
cq_t* cq__;
|
cq_t* cq__;
|
||||||
|
|
||||||
bool operator==(msg_t const & m1, msg_t const & m2) {
|
bool operator==(msg_t const & m1, msg_t const & m2) {
|
||||||
@ -67,10 +72,7 @@ struct test_verify<ipc::circ::elem_array<D, P>> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
template <ipc::relat Rp>
|
template <ipc::relat Rp>
|
||||||
struct test_verify<ipc::prod_cons_circ<Rp,
|
struct test_verify<pc_t<Rp, ipc::relat::multi, ipc::trans::unicast>> : test_verify<cq_t> {
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::unicast>
|
|
||||||
> : test_verify<cq_t> {
|
|
||||||
using test_verify<cq_t>::test_verify;
|
using test_verify<cq_t>::test_verify;
|
||||||
|
|
||||||
void verify(int N, int Loops) {
|
void verify(int N, int Loops) {
|
||||||
@ -94,12 +96,12 @@ template <typename P>
|
|||||||
struct quit_mode;
|
struct quit_mode;
|
||||||
|
|
||||||
template <ipc::relat Rp, ipc::relat Rc>
|
template <ipc::relat Rp, ipc::relat Rc>
|
||||||
struct quit_mode<ipc::prod_cons_circ<Rp, Rc, ipc::trans::unicast>> {
|
struct quit_mode<pc_t<Rp, Rc, ipc::trans::unicast>> {
|
||||||
using type = volatile bool;
|
using type = volatile bool;
|
||||||
};
|
};
|
||||||
|
|
||||||
template <ipc::relat Rp, ipc::relat Rc>
|
template <ipc::relat Rp, ipc::relat Rc>
|
||||||
struct quit_mode<ipc::prod_cons_circ<Rp, Rc, ipc::trans::broadcast>> {
|
struct quit_mode<pc_t<Rp, Rc, ipc::trans::broadcast>> {
|
||||||
struct type {
|
struct type {
|
||||||
constexpr type(bool) {}
|
constexpr type(bool) {}
|
||||||
constexpr operator bool() const { return false; }
|
constexpr operator bool() const { return false; }
|
||||||
@ -263,27 +265,21 @@ void test_prod_cons() {
|
|||||||
void Unit::test_prod_cons_1v1() {
|
void Unit::test_prod_cons_1v1() {
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::single,
|
pc_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>
|
||||||
ipc::relat::single,
|
|
||||||
ipc::trans::unicast>
|
|
||||||
> el_arr_ssu;
|
> el_arr_ssu;
|
||||||
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ssu);
|
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_ssu);
|
||||||
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ssu);
|
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_ssu);
|
||||||
|
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::single,
|
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::unicast>
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::unicast>
|
|
||||||
> el_arr_smu;
|
> el_arr_smu;
|
||||||
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
|
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
|
||||||
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smu);
|
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_smu);
|
||||||
|
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::multi,
|
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast>
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::unicast>
|
|
||||||
> el_arr_mmu;
|
> el_arr_mmu;
|
||||||
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
|
benchmark_prod_cons<1, 1, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
|
||||||
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmu);
|
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmu);
|
||||||
@ -293,9 +289,7 @@ void Unit::test_prod_cons_1v1() {
|
|||||||
|
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::multi,
|
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast>
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::broadcast>
|
|
||||||
> el_arr_mmb;
|
> el_arr_mmb;
|
||||||
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_mmb);
|
benchmark_prod_cons<1, 1, LoopCount, cq_t>(&el_arr_mmb);
|
||||||
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb);
|
benchmark_prod_cons<1, 1, LoopCount, void>(&el_arr_mmb);
|
||||||
@ -304,18 +298,14 @@ void Unit::test_prod_cons_1v1() {
|
|||||||
void Unit::test_prod_cons_1v3() {
|
void Unit::test_prod_cons_1v3() {
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::single,
|
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::unicast>
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::unicast>
|
|
||||||
> el_arr_smu;
|
> el_arr_smu;
|
||||||
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
|
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_smu)::policy_t>(&el_arr_smu);
|
||||||
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smu);
|
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_smu);
|
||||||
|
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::multi,
|
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast>
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::unicast>
|
|
||||||
> el_arr_mmu;
|
> el_arr_mmu;
|
||||||
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
|
benchmark_prod_cons<1, 3, LoopCount, decltype(el_arr_mmu)::policy_t>(&el_arr_mmu);
|
||||||
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmu);
|
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmu);
|
||||||
@ -325,9 +315,7 @@ void Unit::test_prod_cons_1v3() {
|
|||||||
|
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::multi,
|
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast>
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::broadcast>
|
|
||||||
> el_arr_mmb;
|
> el_arr_mmb;
|
||||||
benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_mmb);
|
benchmark_prod_cons<1, 3, LoopCount, cq_t>(&el_arr_mmb);
|
||||||
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmb);
|
benchmark_prod_cons<1, 3, LoopCount, void>(&el_arr_mmb);
|
||||||
@ -336,9 +324,7 @@ void Unit::test_prod_cons_1v3() {
|
|||||||
void Unit::test_prod_cons_performance() {
|
void Unit::test_prod_cons_performance() {
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::single,
|
pc_t<ipc::relat::single, ipc::relat::multi, ipc::trans::unicast>
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::unicast>
|
|
||||||
> el_arr_smu;
|
> el_arr_smu;
|
||||||
ipc::detail::static_for(std::make_index_sequence<8>{}, [&el_arr_smu](auto index) {
|
ipc::detail::static_for(std::make_index_sequence<8>{}, [&el_arr_smu](auto index) {
|
||||||
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smu);
|
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_smu);
|
||||||
@ -351,9 +337,7 @@ void Unit::test_prod_cons_performance() {
|
|||||||
|
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::multi,
|
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast>
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::unicast>
|
|
||||||
> el_arr_mmu;
|
> el_arr_mmu;
|
||||||
ipc::detail::static_for(std::make_index_sequence<8>{}, [&el_arr_mmu](auto index) {
|
ipc::detail::static_for(std::make_index_sequence<8>{}, [&el_arr_mmu](auto index) {
|
||||||
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmu);
|
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmu);
|
||||||
@ -367,9 +351,7 @@ void Unit::test_prod_cons_performance() {
|
|||||||
|
|
||||||
ipc::circ::elem_array<
|
ipc::circ::elem_array<
|
||||||
sizeof(msg_t),
|
sizeof(msg_t),
|
||||||
ipc::prod_cons_circ<ipc::relat::multi,
|
pc_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast>
|
||||||
ipc::relat::multi,
|
|
||||||
ipc::trans::broadcast>
|
|
||||||
> el_arr_mmb;
|
> el_arr_mmb;
|
||||||
ipc::detail::static_for(std::make_index_sequence<8>{}, [&el_arr_mmb](auto index) {
|
ipc::detail::static_for(std::make_index_sequence<8>{}, [&el_arr_mmb](auto index) {
|
||||||
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmb);
|
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount, void>(&el_arr_mmb);
|
||||||
@ -383,7 +365,12 @@ void Unit::test_prod_cons_performance() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void Unit::test_queue() {
|
void Unit::test_queue() {
|
||||||
ipc::queue<msg_t> queue;
|
using queue_t = ipc::queue<msg_t, ipc::policy::choose<
|
||||||
|
ipc::circ::elem_array,
|
||||||
|
ipc::prod_cons<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>
|
||||||
|
>>;
|
||||||
|
queue_t queue;
|
||||||
|
|
||||||
queue.push(msg_t { 1, 2 });
|
queue.push(msg_t { 1, 2 });
|
||||||
QCOMPARE(queue.pop(), msg_t{});
|
QCOMPARE(queue.pop(), msg_t{});
|
||||||
QVERIFY(sizeof(decltype(queue)::elems_t) <= sizeof(*cq__));
|
QVERIFY(sizeof(decltype(queue)::elems_t) <= sizeof(*cq__));
|
||||||
@ -393,7 +380,7 @@ void Unit::test_queue() {
|
|||||||
QVERIFY(queue.detach() != nullptr);
|
QVERIFY(queue.detach() != nullptr);
|
||||||
|
|
||||||
ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) {
|
ipc::detail::static_for(std::make_index_sequence<8>{}, [](auto index) {
|
||||||
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((ipc::queue<msg_t>*)nullptr);
|
benchmark_prod_cons<1, decltype(index)::value + 1, LoopCount>((queue_t*)nullptr);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -19,7 +19,7 @@
|
|||||||
|
|
||||||
#include "ipc.h"
|
#include "ipc.h"
|
||||||
#include "rw_lock.h"
|
#include "rw_lock.h"
|
||||||
#include "memory/resource.hpp"
|
#include "memory/resource.h"
|
||||||
|
|
||||||
#include "test.h"
|
#include "test.h"
|
||||||
|
|
||||||
|
|||||||
@ -5,7 +5,7 @@
|
|||||||
|
|
||||||
#include "random.hpp"
|
#include "random.hpp"
|
||||||
|
|
||||||
#include "memory/resource.hpp"
|
#include "memory/resource.h"
|
||||||
#include "pool_alloc.h"
|
#include "pool_alloc.h"
|
||||||
|
|
||||||
#include "test.h"
|
#include "test.h"
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user