mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
optimize
This commit is contained in:
parent
eb1f15583e
commit
46051733bb
@ -18,12 +18,7 @@ class elem_array {
|
|||||||
public:
|
public:
|
||||||
using policy_t = Policy;
|
using policy_t = Policy;
|
||||||
using cursor_t = decltype(std::declval<policy_t>().cursor());
|
using cursor_t = decltype(std::declval<policy_t>().cursor());
|
||||||
|
using elem_t = typename policy_t::template elem_t<DataSize>;
|
||||||
#if __cplusplus >= 201703L
|
|
||||||
using elem_t = ipc::circ::elem_t<policy_t::template elem_param<DataSize>>;
|
|
||||||
#else /*__cplusplus < 201703L*/
|
|
||||||
using elem_t = ipc::circ::elem_t<policy_t::template elem_param<DataSize>::value>;
|
|
||||||
#endif/*__cplusplus < 201703L*/
|
|
||||||
|
|
||||||
enum : std::size_t {
|
enum : std::size_t {
|
||||||
data_size = DataSize,
|
data_size = DataSize,
|
||||||
|
|||||||
@ -13,28 +13,12 @@
|
|||||||
namespace ipc {
|
namespace ipc {
|
||||||
namespace circ {
|
namespace circ {
|
||||||
|
|
||||||
struct elem_head {
|
enum {
|
||||||
std::atomic<std::size_t> rc_ { 0 }; // read-counter
|
cache_line_size = 64
|
||||||
};
|
};
|
||||||
|
|
||||||
template <std::size_t DataSize>
|
|
||||||
struct elem_t {
|
|
||||||
elem_head head_;
|
|
||||||
byte_t data_[DataSize] {};
|
|
||||||
};
|
|
||||||
|
|
||||||
template <>
|
|
||||||
struct elem_t<0> {
|
|
||||||
elem_head head_;
|
|
||||||
};
|
|
||||||
|
|
||||||
template <std::size_t S>
|
|
||||||
elem_t<S>* elem_of(void* ptr) noexcept {
|
|
||||||
return reinterpret_cast<elem_t<S>*>(static_cast<byte_t*>(ptr) - sizeof(elem_head));
|
|
||||||
}
|
|
||||||
|
|
||||||
using u1_t = ipc::uint_t<8>;
|
using u1_t = ipc::uint_t<8>;
|
||||||
using u2_t = ipc::uint_t<16>;
|
using u2_t = ipc::uint_t<32>;
|
||||||
|
|
||||||
constexpr u1_t index_of(u2_t c) noexcept {
|
constexpr u1_t index_of(u2_t c) noexcept {
|
||||||
return static_cast<u1_t>(c);
|
return static_cast<u1_t>(c);
|
||||||
|
|||||||
143
src/prod_cons.h
143
src/prod_cons.h
@ -20,43 +20,37 @@ struct prod_cons_impl;
|
|||||||
|
|
||||||
template <>
|
template <>
|
||||||
struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
||||||
std::atomic<circ::u2_t> rd_; // read index
|
|
||||||
std::atomic<circ::u2_t> wt_; // write index
|
|
||||||
|
|
||||||
#if __cplusplus >= 201703L
|
|
||||||
template <std::size_t DataSize>
|
template <std::size_t DataSize>
|
||||||
constexpr static std::size_t elem_param = DataSize - sizeof(circ::elem_head);
|
struct elem_t {
|
||||||
#else /*__cplusplus < 201703L*/
|
byte_t data_[DataSize] {};
|
||||||
template <std::size_t DataSize>
|
|
||||||
struct elem_param {
|
|
||||||
enum : std::size_t {
|
|
||||||
value = DataSize - sizeof(circ::elem_head)
|
|
||||||
};
|
};
|
||||||
};
|
|
||||||
#endif/*__cplusplus < 201703L*/
|
alignas(circ::cache_line_size) std::atomic<circ::u2_t> rd_; // read index
|
||||||
|
alignas(circ::cache_line_size) std::atomic<circ::u2_t> wt_; // write index
|
||||||
|
|
||||||
constexpr circ::u2_t cursor() const noexcept {
|
constexpr circ::u2_t cursor() const noexcept {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename E, typename F, typename EB>
|
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
||||||
bool push(E* /*elems*/, F&& f, EB* elem_start) {
|
bool push(W* /*wrapper*/, F&& f, E<DataSize>* elems) {
|
||||||
auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
|
auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
|
||||||
if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) {
|
if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) {
|
||||||
return false; // full
|
return false; // full
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(elem_start + cur_wt);
|
std::forward<F>(f)(&(elems[cur_wt].data_));
|
||||||
wt_.fetch_add(1, std::memory_order_release);
|
wt_.fetch_add(1, std::memory_order_release);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename E, typename F, typename EB>
|
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
||||||
bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) {
|
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DataSize>* elems) {
|
||||||
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
|
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
|
||||||
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
|
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
|
||||||
return false; // empty
|
return false; // empty
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(elem_start + cur_rd);
|
std::forward<F>(f)(&(elems[cur_rd].data_));
|
||||||
rd_.fetch_add(1, std::memory_order_release);
|
rd_.fetch_add(1, std::memory_order_release);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -66,16 +60,16 @@ template <>
|
|||||||
struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
|
struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
|
||||||
: prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
: prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
||||||
|
|
||||||
template <typename E, typename F, typename EB>
|
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
||||||
bool pop(E* /*elems*/, circ::u2_t& /*cur*/, F&& f, EB* elem_start) {
|
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DataSize>* elems) {
|
||||||
byte_t buff[sizeof(E)];
|
byte_t buff[DataSize];
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
||||||
if (circ::index_of(cur_rd) ==
|
if (circ::index_of(cur_rd) ==
|
||||||
circ::index_of(wt_.load(std::memory_order_acquire))) {
|
circ::index_of(wt_.load(std::memory_order_acquire))) {
|
||||||
return false; // empty
|
return false; // empty
|
||||||
}
|
}
|
||||||
std::memcpy(buff, elem_start + circ::index_of(cur_rd), sizeof(buff));
|
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
|
||||||
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
||||||
std::forward<F>(f)(buff);
|
std::forward<F>(f)(buff);
|
||||||
return true;
|
return true;
|
||||||
@ -89,12 +83,23 @@ template <>
|
|||||||
struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
||||||
: prod_cons_impl<wr<relat::single, relat::multi, trans::unicast>> {
|
: prod_cons_impl<wr<relat::single, relat::multi, trans::unicast>> {
|
||||||
|
|
||||||
std::atomic<circ::u2_t> ct_; // commit index
|
enum : std::uint64_t {
|
||||||
|
invalid_index = (std::numeric_limits<std::uint64_t>::max)()
|
||||||
|
};
|
||||||
|
|
||||||
template <typename E, typename F, typename EB>
|
template <std::size_t DataSize>
|
||||||
bool push(E* /*elems*/, F&& f, EB* elem_start) {
|
struct elem_t {
|
||||||
|
byte_t data_[DataSize] {};
|
||||||
|
alignas(circ::cache_line_size) std::atomic<std::uint64_t> f_ct_ { invalid_index }; // commit flag
|
||||||
|
};
|
||||||
|
|
||||||
|
alignas(circ::cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
|
||||||
|
alignas(circ::cache_line_size) std::atomic<unsigned > barrier_;
|
||||||
|
|
||||||
|
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
||||||
|
bool push(W* /*wrapper*/, F&& f, E<DataSize>* elems) {
|
||||||
circ::u2_t cur_ct, nxt_ct;
|
circ::u2_t cur_ct, nxt_ct;
|
||||||
while (1) {
|
for (unsigned k = 0;;) {
|
||||||
cur_ct = ct_.load(std::memory_order_relaxed);
|
cur_ct = ct_.load(std::memory_order_relaxed);
|
||||||
if (circ::index_of(nxt_ct = cur_ct + 1) ==
|
if (circ::index_of(nxt_ct = cur_ct + 1) ==
|
||||||
circ::index_of(rd_.load(std::memory_order_acquire))) {
|
circ::index_of(rd_.load(std::memory_order_acquire))) {
|
||||||
@ -103,15 +108,28 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_release)) {
|
if (ct_.compare_exchange_weak(cur_ct, nxt_ct, std::memory_order_release)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(elem_start + circ::index_of(cur_ct));
|
auto* el = elems + circ::index_of(cur_ct);
|
||||||
|
std::forward<F>(f)(&(el->data_));
|
||||||
|
// set flag & try update wt
|
||||||
|
el->f_ct_.store(cur_ct, std::memory_order_release);
|
||||||
while (1) {
|
while (1) {
|
||||||
auto exp_wt = cur_ct;
|
barrier_.exchange(0, std::memory_order_acq_rel);
|
||||||
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
auto cac_ct = el->f_ct_.load(std::memory_order_acquire);
|
||||||
break;
|
if (cur_ct != wt_.load(std::memory_order_acquire)) {
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
if (cac_ct != cur_ct) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!el->f_ct_.compare_exchange_strong(cac_ct, invalid_index, std::memory_order_relaxed)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
wt_.store(nxt_ct, std::memory_order_release);
|
||||||
|
cur_ct = nxt_ct;
|
||||||
|
nxt_ct = cur_ct + 1;
|
||||||
|
el = elems + circ::index_of(cur_ct);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -119,58 +137,53 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
|
|
||||||
template <>
|
template <>
|
||||||
struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
||||||
std::atomic<circ::u2_t> wt_; // write index
|
|
||||||
|
|
||||||
#if __cplusplus >= 201703L
|
using rc_t = std::size_t;
|
||||||
template <std::size_t DataSize>
|
|
||||||
constexpr static std::size_t elem_param = DataSize;
|
|
||||||
#else /*__cplusplus < 201703L*/
|
|
||||||
template <std::size_t DataSize>
|
|
||||||
struct elem_param { enum : std::size_t { value = DataSize }; };
|
|
||||||
#endif/*__cplusplus < 201703L*/
|
|
||||||
|
|
||||||
/*
|
template <std::size_t DataSize>
|
||||||
<Remarks> std::atomic<T> may not have value_type.
|
struct elem_t {
|
||||||
See: https://stackoverflow.com/questions/53648614/what-happened-to-stdatomicxvalue-type
|
byte_t data_[DataSize] {};
|
||||||
*/
|
alignas(circ::cache_line_size) std::atomic<rc_t> rc_ { 0 }; // read-counter
|
||||||
using rc_t = decltype(circ::elem_head::rc_.load());
|
};
|
||||||
|
|
||||||
|
alignas(circ::cache_line_size) std::atomic<circ::u2_t> wt_; // write index
|
||||||
|
|
||||||
circ::u2_t cursor() const noexcept {
|
circ::u2_t cursor() const noexcept {
|
||||||
return wt_.load(std::memory_order_acquire);
|
return wt_.load(std::memory_order_acquire);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename E, typename F, typename EB>
|
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
||||||
bool push(E* elems, F&& f, EB* elem_start) {
|
bool push(W* wrapper, F&& f, E<DataSize>* elems) {
|
||||||
auto conn_cnt = elems->conn_count(std::memory_order_relaxed);
|
auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed);
|
||||||
if (conn_cnt == 0) return false;
|
if (conn_cnt == 0) return false;
|
||||||
auto el = elem_start + circ::index_of(wt_.load(std::memory_order_acquire));
|
auto el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
|
||||||
// check all consumers have finished reading this element
|
// check all consumers have finished reading this element
|
||||||
while (1) {
|
while (1) {
|
||||||
rc_t expected = 0;
|
rc_t expected = 0;
|
||||||
if (el->head_.rc_.compare_exchange_weak(
|
if (el->rc_.compare_exchange_weak(
|
||||||
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
|
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
std::this_thread::yield();
|
||||||
conn_cnt = elems->conn_count(); // acquire
|
conn_cnt = wrapper->conn_count(); // acquire
|
||||||
if (conn_cnt == 0) return false;
|
if (conn_cnt == 0) return false;
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(el->data_);
|
std::forward<F>(f)(&(el->data_));
|
||||||
wt_.fetch_add(1, std::memory_order_release);
|
wt_.fetch_add(1, std::memory_order_release);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename E, typename F, typename EB>
|
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
||||||
bool pop(E* /*elems*/, circ::u2_t& cur, F&& f, EB* elem_start) {
|
bool pop(W* /*wrapper*/, circ::u2_t& cur, F&& f, E<DataSize>* elems) {
|
||||||
if (cur == cursor()) return false; // acquire
|
if (cur == cursor()) return false; // acquire
|
||||||
auto el = elem_start + circ::index_of(cur++);
|
auto el = elems + circ::index_of(cur++);
|
||||||
std::forward<F>(f)(el->data_);
|
std::forward<F>(f)(&(el->data_));
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
rc_t cur_rc = el->head_.rc_.load(std::memory_order_acquire);
|
rc_t cur_rc = el->rc_.load(std::memory_order_acquire);
|
||||||
if (cur_rc == 0) {
|
if (cur_rc == 0) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (el->head_.rc_.compare_exchange_weak(
|
if (el->rc_.compare_exchange_weak(
|
||||||
cur_rc, cur_rc - 1, std::memory_order_release)) {
|
cur_rc, cur_rc - 1, std::memory_order_release)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -183,27 +196,27 @@ template <>
|
|||||||
struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>>
|
struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>>
|
||||||
: prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
: prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
||||||
|
|
||||||
std::atomic<circ::u2_t> ct_; // commit index
|
alignas(circ::cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
|
||||||
|
|
||||||
template <typename E, typename F, typename EB>
|
template <typename W, typename F, template <std::size_t> class E, std::size_t DataSize>
|
||||||
bool push(E* elems, F&& f, EB* elem_start) {
|
bool push(W* wrapper, F&& f, E<DataSize>* elems) {
|
||||||
auto conn_cnt = elems->conn_count(std::memory_order_relaxed);
|
auto conn_cnt = wrapper->conn_count(std::memory_order_relaxed);
|
||||||
if (conn_cnt == 0) return false;
|
if (conn_cnt == 0) return false;
|
||||||
circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_acquire),
|
circ::u2_t cur_ct = ct_.fetch_add(1, std::memory_order_acquire),
|
||||||
nxt_ct = cur_ct + 1;
|
nxt_ct = cur_ct + 1;
|
||||||
auto el = elem_start + circ::index_of(cur_ct);
|
auto el = elems + circ::index_of(cur_ct);
|
||||||
// check all consumers have finished reading this element
|
// check all consumers have finished reading this element
|
||||||
while (1) {
|
while (1) {
|
||||||
rc_t expected = 0;
|
rc_t expected = 0;
|
||||||
if (el->head_.rc_.compare_exchange_weak(
|
if (el->rc_.compare_exchange_weak(
|
||||||
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
|
expected, static_cast<rc_t>(conn_cnt), std::memory_order_release)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
std::this_thread::yield();
|
std::this_thread::yield();
|
||||||
conn_cnt = elems->conn_count(); // acquire
|
conn_cnt = wrapper->conn_count(); // acquire
|
||||||
if (conn_cnt == 0) return false;
|
if (conn_cnt == 0) return false;
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(el->data_);
|
std::forward<F>(f)(&(el->data_));
|
||||||
while (1) {
|
while (1) {
|
||||||
auto exp_wt = cur_ct;
|
auto exp_wt = cur_ct;
|
||||||
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
if (wt_.compare_exchange_weak(exp_wt, nxt_ct, std::memory_order_release)) {
|
||||||
|
|||||||
@ -213,7 +213,9 @@ struct test_cq<ipc::queue<T...>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void send(cn_t* /*cn*/, msg_t const & msg) {
|
void send(cn_t* /*cn*/, msg_t const & msg) {
|
||||||
cn_t{ ca_ }.push(msg);
|
while (!cn_t{ ca_ }.push(msg)) {
|
||||||
|
std::this_thread::yield();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user