mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
尝试去除恶心的连接检测(TBD)
This commit is contained in:
parent
912c1bfc64
commit
20168fb869
@ -31,8 +31,6 @@ struct IPC_EXPORT chan_impl {
|
|||||||
|
|
||||||
static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm);
|
static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm);
|
||||||
static buff_t recv(ipc::handle_t h, std::uint64_t tm);
|
static buff_t recv(ipc::handle_t h, std::uint64_t tm);
|
||||||
|
|
||||||
static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm);
|
|
||||||
static buff_t try_recv(ipc::handle_t h);
|
static buff_t try_recv(ipc::handle_t h);
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -128,9 +126,6 @@ public:
|
|||||||
return chan_wrapper(name).wait_for_recv(r_count, tm);
|
return chan_wrapper(name).wait_for_recv(r_count, tm);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* If timeout, this function would call 'force_push' to send the data forcibly.
|
|
||||||
*/
|
|
||||||
bool send(void const * data, std::size_t size, std::uint64_t tm = default_timeout) {
|
bool send(void const * data, std::size_t size, std::uint64_t tm = default_timeout) {
|
||||||
return detail_t::send(h_, data, size, tm);
|
return detail_t::send(h_, data, size, tm);
|
||||||
}
|
}
|
||||||
@ -141,23 +136,9 @@ public:
|
|||||||
return this->send(str.c_str(), str.size() + 1, tm);
|
return this->send(str.c_str(), str.size() + 1, tm);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* If timeout, this function would just return false.
|
|
||||||
*/
|
|
||||||
bool try_send(void const * data, std::size_t size, std::uint64_t tm = default_timeout) {
|
|
||||||
return detail_t::try_send(h_, data, size, tm);
|
|
||||||
}
|
|
||||||
bool try_send(buff_t const & buff, std::uint64_t tm = default_timeout) {
|
|
||||||
return this->try_send(buff.data(), buff.size(), tm);
|
|
||||||
}
|
|
||||||
bool try_send(std::string const & str, std::uint64_t tm = default_timeout) {
|
|
||||||
return this->try_send(str.c_str(), str.size() + 1, tm);
|
|
||||||
}
|
|
||||||
|
|
||||||
buff_t recv(std::uint64_t tm = invalid_value) {
|
buff_t recv(std::uint64_t tm = invalid_value) {
|
||||||
return detail_t::recv(h_, tm);
|
return detail_t::recv(h_, tm);
|
||||||
}
|
}
|
||||||
|
|
||||||
buff_t try_recv() {
|
buff_t try_recv() {
|
||||||
return detail_t::try_recv(h_);
|
return detail_t::try_recv(h_);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -120,20 +120,15 @@ public:
|
|||||||
return head_.cursor();
|
return head_.cursor();
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Q, typename F>
|
template <typename F>
|
||||||
bool push(Q* que, F&& f) {
|
bool push(F&& f) {
|
||||||
return head_.push(que, std::forward<F>(f), block_);
|
return head_.push(std::forward<F>(f), block_);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Q, typename F>
|
template <typename F>
|
||||||
bool force_push(Q* que, F&& f) {
|
bool pop(cursor_t* cur, F&& f) {
|
||||||
return head_.force_push(que, std::forward<F>(f), block_);
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename Q, typename F, typename R>
|
|
||||||
bool pop(Q* que, cursor_t* cur, F&& f, R&& out) {
|
|
||||||
if (cur == nullptr) return false;
|
if (cur == nullptr) return false;
|
||||||
return head_.pop(que, *cur, std::forward<F>(f), std::forward<R>(out), block_);
|
return head_.pop(*cur, std::forward<F>(f), block_);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -50,40 +50,8 @@ public:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
template <typename P, bool = relat_trait<P>::is_broadcast>
|
|
||||||
class conn_head;
|
|
||||||
|
|
||||||
template <typename P>
|
template <typename P>
|
||||||
class conn_head<P, true> : public conn_head_base {
|
class conn_head : public conn_head_base {
|
||||||
public:
|
|
||||||
cc_t connect() noexcept {
|
|
||||||
for (unsigned k = 0;; ipc::yield(k)) {
|
|
||||||
cc_t curr = this->cc_.load(std::memory_order_acquire);
|
|
||||||
cc_t next = curr | (curr + 1); // find the first 0, and set it to 1.
|
|
||||||
if (next == 0) {
|
|
||||||
// connection-slot is full.
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
if (this->cc_.compare_exchange_weak(curr, next, std::memory_order_release)) {
|
|
||||||
return next ^ curr; // return connected id
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cc_t disconnect(cc_t cc_id) noexcept {
|
|
||||||
return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
|
|
||||||
cc_t cur = this->cc_.load(order);
|
|
||||||
cc_t cnt; // accumulates the total bits set in cc
|
|
||||||
for (cnt = 0; cur; ++cnt) cur &= cur - 1;
|
|
||||||
return cnt;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
template <typename P>
|
|
||||||
class conn_head<P, false> : public conn_head_base {
|
|
||||||
public:
|
public:
|
||||||
cc_t connect() noexcept {
|
cc_t connect() noexcept {
|
||||||
return this->cc_.fetch_add(1, std::memory_order_relaxed) + 1;
|
return this->cc_.fetch_add(1, std::memory_order_relaxed) + 1;
|
||||||
|
|||||||
@ -484,27 +484,6 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
|
|||||||
}
|
}
|
||||||
|
|
||||||
static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) {
|
static bool send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) {
|
||||||
return send([tm](auto info, auto que, auto msg_id) {
|
|
||||||
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
|
||||||
if (!wait_for(info->wt_waiter_, [&] {
|
|
||||||
return !que->push(
|
|
||||||
[](void*) { return true; },
|
|
||||||
info->cc_id_, msg_id, remain, data, size);
|
|
||||||
}, tm)) {
|
|
||||||
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
|
|
||||||
if (!que->force_push(
|
|
||||||
clear_message<typename queue_t::value_t>,
|
|
||||||
info->cc_id_, msg_id, remain, data, size)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
info->rd_waiter_.broadcast();
|
|
||||||
return true;
|
|
||||||
};
|
|
||||||
}, h, data, size);
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) {
|
|
||||||
return send([tm](auto info, auto que, auto msg_id) {
|
return send([tm](auto info, auto que, auto msg_id) {
|
||||||
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) {
|
||||||
if (!wait_for(info->wt_waiter_, [&] {
|
if (!wait_for(info->wt_waiter_, [&] {
|
||||||
@ -676,11 +655,6 @@ buff_t chan_impl<Flag>::recv(ipc::handle_t h, std::uint64_t tm) {
|
|||||||
return detail_impl<policy_t<Flag>>::recv(h, tm);
|
return detail_impl<policy_t<Flag>>::recv(h, tm);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename Flag>
|
|
||||||
bool chan_impl<Flag>::try_send(ipc::handle_t h, void const * data, std::size_t size, std::uint64_t tm) {
|
|
||||||
return detail_impl<policy_t<Flag>>::try_send(h, data, size, tm);
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename Flag>
|
template <typename Flag>
|
||||||
buff_t chan_impl<Flag>::try_recv(ipc::handle_t h) {
|
buff_t chan_impl<Flag>::try_recv(ipc::handle_t h) {
|
||||||
return detail_impl<policy_t<Flag>>::try_recv(h);
|
return detail_impl<policy_t<Flag>>::try_recv(h);
|
||||||
|
|||||||
@ -37,8 +37,8 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
template <typename F, typename E>
|
||||||
bool push(W* /*wrapper*/, F&& f, E* elems) {
|
bool push(F&& f, E* elems) {
|
||||||
auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
|
auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
|
||||||
if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) {
|
if (cur_wt == circ::index_of(rd_.load(std::memory_order_acquire) - 1)) {
|
||||||
return false; // full
|
return false; // full
|
||||||
@ -48,24 +48,13 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
template <typename F, typename E>
|
||||||
* In single-single-unicast, 'force_push' means 'no reader' or 'the only one reader is dead'.
|
bool pop(circ::u2_t& /*cur*/, F&& f, E* elems) {
|
||||||
* So we could just disconnect all connections of receiver, and return false.
|
|
||||||
*/
|
|
||||||
template <typename W, typename F, typename E>
|
|
||||||
bool force_push(W* wrapper, F&&, E*) {
|
|
||||||
wrapper->elems()->disconnect_receiver(~static_cast<circ::cc_t>(0u));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename W, typename F, typename R, typename E>
|
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) {
|
|
||||||
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
|
auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed));
|
||||||
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
|
if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) {
|
||||||
return false; // empty
|
return false; // empty
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(&(elems[cur_rd].data_));
|
std::forward<F>(f)(&(elems[cur_rd].data_));
|
||||||
std::forward<R>(out)(true);
|
|
||||||
rd_.fetch_add(1, std::memory_order_release);
|
rd_.fetch_add(1, std::memory_order_release);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -75,26 +64,17 @@ template <>
|
|||||||
struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
|
struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
|
||||||
: prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
: prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
template <typename F,
|
||||||
bool force_push(W* wrapper, F&&, E*) {
|
|
||||||
wrapper->elems()->disconnect_receiver(1);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename W, typename F, typename R,
|
|
||||||
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
|
bool pop(circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
|
||||||
byte_t buff[DS];
|
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
||||||
if (circ::index_of(cur_rd) ==
|
if (circ::index_of(cur_rd) ==
|
||||||
circ::index_of(wt_.load(std::memory_order_acquire))) {
|
circ::index_of(wt_.load(std::memory_order_acquire))) {
|
||||||
return false; // empty
|
return false; // empty
|
||||||
}
|
}
|
||||||
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
|
|
||||||
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
||||||
std::forward<F>(f)(buff);
|
std::forward<F>(f)(&(elems[circ::index_of(cur_rd)].data_));
|
||||||
std::forward<R>(out)(true);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
@ -116,8 +96,8 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
|
|
||||||
alignas(cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
|
alignas(cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
template <typename F, typename E>
|
||||||
bool push(W* /*wrapper*/, F&& f, E* elems) {
|
bool push(F&& f, E* elems) {
|
||||||
circ::u2_t cur_ct, nxt_ct;
|
circ::u2_t cur_ct, nxt_ct;
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
cur_ct = ct_.load(std::memory_order_relaxed);
|
cur_ct = ct_.load(std::memory_order_relaxed);
|
||||||
@ -153,16 +133,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
template <typename F,
|
||||||
bool force_push(W* wrapper, F&&, E*) {
|
|
||||||
wrapper->elems()->disconnect_receiver(1);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename W, typename F, typename R,
|
|
||||||
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
|
||||||
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E<DS, AS>* elems) {
|
bool pop(circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
|
||||||
byte_t buff[DS];
|
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
auto cur_rd = rd_.load(std::memory_order_relaxed);
|
||||||
auto cur_wt = wt_.load(std::memory_order_acquire);
|
auto cur_wt = wt_.load(std::memory_order_acquire);
|
||||||
@ -179,250 +152,159 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
|
|||||||
}
|
}
|
||||||
k = 0;
|
k = 0;
|
||||||
}
|
}
|
||||||
else {
|
else if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
||||||
std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff));
|
std::forward<F>(f)(&(elems[id_rd].data_));
|
||||||
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) {
|
|
||||||
std::forward<F>(f)(buff);
|
|
||||||
std::forward<R>(out)(true);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
|
||||||
|
|
||||||
using rc_t = std::uint64_t;
|
using flag_t = std::uint64_t;
|
||||||
|
|
||||||
enum : rc_t {
|
|
||||||
ep_mask = 0x00000000ffffffffull,
|
|
||||||
ep_incr = 0x0000000100000000ull
|
|
||||||
};
|
|
||||||
|
|
||||||
template <std::size_t DataSize, std::size_t AlignSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
struct elem_t {
|
struct elem_t {
|
||||||
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
||||||
std::atomic<rc_t> rc_ { 0 }; // read-counter
|
std::atomic<flag_t> f_rc_ { 0 }; // read-flag
|
||||||
};
|
};
|
||||||
|
|
||||||
alignas(cache_line_size) std::atomic<circ::u2_t> wt_; // write index
|
alignas(cache_line_size) std::atomic<circ::u2_t> wt_; // write index
|
||||||
alignas(cache_line_size) rc_t epoch_ { 0 }; // only one writer
|
|
||||||
|
|
||||||
circ::u2_t cursor() const noexcept {
|
circ::u2_t cursor() const noexcept {
|
||||||
return wt_.load(std::memory_order_acquire);
|
return wt_.load(std::memory_order_acquire);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
template <typename F, typename E>
|
||||||
bool push(W* wrapper, F&& f, E* elems) {
|
bool push(F&& f, E* elems) {
|
||||||
E* el;
|
E* el = elems + circ::index_of(wt_.load(std::memory_order_relaxed));
|
||||||
for (unsigned k = 0;;) {
|
auto cur_rc = el->f_rc_.exchange(~0ull, std::memory_order_acq_rel);
|
||||||
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
|
// check for consumers to read this element
|
||||||
if (cc == 0) return false; // no reader
|
if (cur_rc != 0) {
|
||||||
el = elems + circ::index_of(wt_.load(std::memory_order_relaxed));
|
return false; // full
|
||||||
// check all consumers have finished reading this element
|
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
|
||||||
circ::cc_t rem_cc = cur_rc & ep_mask;
|
|
||||||
if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch_)) {
|
|
||||||
return false; // has not finished yet
|
|
||||||
}
|
|
||||||
// consider rem_cc to be 0 here
|
|
||||||
if (el->rc_.compare_exchange_weak(
|
|
||||||
cur_rc, epoch_ | static_cast<rc_t>(cc), std::memory_order_release)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ipc::yield(k);
|
|
||||||
}
|
}
|
||||||
std::forward<F>(f)(&(el->data_));
|
std::forward<F>(f)(&(el->data_));
|
||||||
wt_.fetch_add(1, std::memory_order_release);
|
wt_.fetch_add(1, std::memory_order_release);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
template <typename F, typename E>
|
||||||
bool force_push(W* wrapper, F&& f, E* elems) {
|
bool pop(circ::u2_t& cur, F&& f, E* elems) {
|
||||||
E* el;
|
if (cur == cursor()) return false; // empty
|
||||||
epoch_ += ep_incr;
|
E* el = elems + circ::index_of(cur++);
|
||||||
for (unsigned k = 0;;) {
|
|
||||||
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
|
|
||||||
if (cc == 0) return false; // no reader
|
|
||||||
el = elems + circ::index_of(wt_.load(std::memory_order_relaxed));
|
|
||||||
// check all consumers have finished reading this element
|
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
|
||||||
circ::cc_t rem_cc = cur_rc & ep_mask;
|
|
||||||
if (cc & rem_cc) {
|
|
||||||
ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc);
|
|
||||||
cc = wrapper->elems()->disconnect_receiver(rem_cc); // disconnect all invalid readers
|
|
||||||
if (cc == 0) return false; // no reader
|
|
||||||
}
|
|
||||||
// just compare & exchange
|
|
||||||
if (el->rc_.compare_exchange_weak(
|
|
||||||
cur_rc, epoch_ | static_cast<rc_t>(cc), std::memory_order_release)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
ipc::yield(k);
|
|
||||||
}
|
|
||||||
std::forward<F>(f)(&(el->data_));
|
std::forward<F>(f)(&(el->data_));
|
||||||
wt_.fetch_add(1, std::memory_order_release);
|
el->f_rc_.store(0, std::memory_order_release);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename R, typename E>
|
|
||||||
bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) {
|
|
||||||
if (cur == cursor()) return false; // acquire
|
|
||||||
auto* el = elems + circ::index_of(cur++);
|
|
||||||
std::forward<F>(f)(&(el->data_));
|
|
||||||
for (unsigned k = 0;;) {
|
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
|
||||||
if ((cur_rc & ep_mask) == 0) {
|
|
||||||
std::forward<R>(out)(true);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
auto nxt_rc = cur_rc & ~static_cast<rc_t>(wrapper->connected_id());
|
|
||||||
if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
|
|
||||||
std::forward<R>(out)((nxt_rc & ep_mask) == 0);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
ipc::yield(k);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
|
struct prod_cons_impl<wr<relat::multi, relat::multi, trans::broadcast>> {
|
||||||
|
|
||||||
using rc_t = std::uint64_t;
|
|
||||||
using flag_t = std::uint64_t;
|
using flag_t = std::uint64_t;
|
||||||
|
|
||||||
enum : rc_t {
|
enum : flag_t {
|
||||||
rc_mask = 0x00000000ffffffffull,
|
pushing = 1ull,
|
||||||
ep_mask = 0x00ffffffffffffffull,
|
pushed = ~0ull,
|
||||||
ep_incr = 0x0100000000000000ull,
|
popped = 0ull,
|
||||||
ic_mask = 0xff000000ffffffffull,
|
|
||||||
ic_incr = 0x0000000100000000ull
|
|
||||||
};
|
};
|
||||||
|
|
||||||
template <std::size_t DataSize, std::size_t AlignSize>
|
template <std::size_t DataSize, std::size_t AlignSize>
|
||||||
struct elem_t {
|
struct elem_t {
|
||||||
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
std::aligned_storage_t<DataSize, AlignSize> data_ {};
|
||||||
std::atomic<rc_t > rc_ { 0 }; // read-counter
|
std::atomic<flag_t> f_rc_ { 0 }; // read-flag
|
||||||
std::atomic<flag_t> f_ct_ { 0 }; // commit flag
|
std::atomic<flag_t> f_ct_ { 0 }; // commit-flag
|
||||||
};
|
};
|
||||||
|
|
||||||
alignas(cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
|
alignas(cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
|
||||||
alignas(cache_line_size) std::atomic<rc_t> epoch_ { 0 };
|
alignas(cache_line_size) std::atomic<circ::u2_t> wt_; // write index
|
||||||
|
|
||||||
circ::u2_t cursor() const noexcept {
|
circ::u2_t cursor() const noexcept {
|
||||||
return ct_.load(std::memory_order_acquire);
|
return wt_.load(std::memory_order_acquire);
|
||||||
}
|
}
|
||||||
|
|
||||||
constexpr static rc_t inc_rc(rc_t rc) noexcept {
|
template <typename F, typename E>
|
||||||
return (rc & ic_mask) | ((rc + ic_incr) & ~ic_mask);
|
bool push(F&& f, E* elems) {
|
||||||
}
|
|
||||||
|
|
||||||
constexpr static rc_t inc_mask(rc_t rc) noexcept {
|
|
||||||
return inc_rc(rc) & ~rc_mask;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
|
||||||
bool push(W* wrapper, F&& f, E* elems) {
|
|
||||||
E* el;
|
E* el;
|
||||||
circ::u2_t cur_ct;
|
circ::u2_t cur_ct, nxt_ct;
|
||||||
rc_t epoch = epoch_.load(std::memory_order_acquire);
|
|
||||||
for (unsigned k = 0;;) {
|
for (unsigned k = 0;;) {
|
||||||
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
|
auto cac_ct = ct_.load(std::memory_order_relaxed);
|
||||||
if (cc == 0) return false; // no reader
|
cur_ct = cac_ct;
|
||||||
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
|
nxt_ct = cur_ct + 1;
|
||||||
// check all consumers have finished reading this element
|
el = elems + circ::index_of(cac_ct);
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_relaxed);
|
for (unsigned k = 0;;) {
|
||||||
circ::cc_t rem_cc = cur_rc & rc_mask;
|
auto cur_rc = el->f_rc_.load(std::memory_order_acquire);
|
||||||
if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch)) {
|
switch (cur_rc) {
|
||||||
return false; // has not finished yet
|
// helper
|
||||||
|
case pushing:
|
||||||
|
ct_.compare_exchange_strong(cac_ct, nxt_ct, std::memory_order_release);
|
||||||
|
goto try_next;
|
||||||
|
// full
|
||||||
|
case pushed:
|
||||||
|
return false;
|
||||||
|
// writable
|
||||||
|
default:
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
else if (!rem_cc) {
|
if (el->f_rc_.compare_exchange_weak(cur_rc, pushing, std::memory_order_release)) {
|
||||||
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
|
||||||
if ((cur_fl != cur_ct) && cur_fl) {
|
|
||||||
return false; // full
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// consider rem_cc to be 0 here
|
|
||||||
if (el->rc_.compare_exchange_weak(
|
|
||||||
cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed) &&
|
|
||||||
epoch_.compare_exchange_weak(epoch, epoch, std::memory_order_acq_rel)) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
// only one thread/process would touch here at one time
|
ct_.compare_exchange_strong(cac_ct, nxt_ct, std::memory_order_relaxed);
|
||||||
ct_.store(cur_ct + 1, std::memory_order_release);
|
el->f_rc_.store(pushed, std::memory_order_relaxed);
|
||||||
std::forward<F>(f)(&(el->data_));
|
std::atomic_thread_fence(std::memory_order_release);
|
||||||
// set flag & try update wt
|
|
||||||
el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename W, typename F, typename E>
|
|
||||||
bool force_push(W* wrapper, F&& f, E* elems) {
|
|
||||||
E* el;
|
|
||||||
circ::u2_t cur_ct;
|
|
||||||
rc_t epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr;
|
|
||||||
for (unsigned k = 0;;) {
|
|
||||||
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
|
|
||||||
if (cc == 0) return false; // no reader
|
|
||||||
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
|
|
||||||
// check all consumers have finished reading this element
|
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
|
||||||
circ::cc_t rem_cc = cur_rc & rc_mask;
|
|
||||||
if (cc & rem_cc) {
|
|
||||||
ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc);
|
|
||||||
cc = wrapper->elems()->disconnect_receiver(rem_cc); // disconnect all invalid readers
|
|
||||||
if (cc == 0) return false; // no reader
|
|
||||||
}
|
|
||||||
// just compare & exchange
|
|
||||||
if (el->rc_.compare_exchange_weak(
|
|
||||||
cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_relaxed)) {
|
|
||||||
if (epoch == epoch_.load(std::memory_order_acquire)) {
|
|
||||||
break;
|
break;
|
||||||
}
|
try_next:
|
||||||
else if (push(wrapper, std::forward<F>(f), elems)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr;
|
|
||||||
}
|
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
}
|
}
|
||||||
// only one thread/process would touch here at one time
|
|
||||||
ct_.store(cur_ct + 1, std::memory_order_release);
|
|
||||||
std::forward<F>(f)(&(el->data_));
|
std::forward<F>(f)(&(el->data_));
|
||||||
// set flag & try update wt
|
// set flag & try update wt
|
||||||
el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);
|
el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);
|
||||||
|
while (1) {
|
||||||
|
auto cac_ct = el->f_ct_.load(std::memory_order_acquire);
|
||||||
|
if (cur_ct != wt_.load(std::memory_order_relaxed)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if ((~cac_ct) != cur_ct) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!el->f_ct_.compare_exchange_strong(cac_ct, 0, std::memory_order_relaxed)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
wt_.store(nxt_ct, std::memory_order_release);
|
||||||
|
cur_ct = nxt_ct;
|
||||||
|
nxt_ct = cur_ct + 1;
|
||||||
|
el = elems + circ::index_of(cur_ct);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename W, typename F, typename R, typename E, std::size_t N>
|
template <typename F, typename E, std::size_t N>
|
||||||
bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) {
|
bool pop(circ::u2_t& cur, F&& f, E(& elems)[N]) {
|
||||||
auto* el = elems + circ::index_of(cur);
|
for (unsigned k = 0;;) {
|
||||||
auto cur_fl = el->f_ct_.load(std::memory_order_acquire);
|
auto cur_wt = wt_.load(std::memory_order_acquire);
|
||||||
if (cur_fl != ~static_cast<flag_t>(cur)) {
|
auto id_rd = circ::index_of(cur);
|
||||||
|
auto id_wt = circ::index_of(cur_wt);
|
||||||
|
if (id_rd == id_wt) {
|
||||||
|
auto* el = elems + id_wt;
|
||||||
|
auto cac_ct = el->f_ct_.load(std::memory_order_acquire);
|
||||||
|
if ((~cac_ct) != cur_wt) {
|
||||||
return false; // empty
|
return false; // empty
|
||||||
}
|
}
|
||||||
|
if (el->f_ct_.compare_exchange_weak(cac_ct, 0, std::memory_order_relaxed)) {
|
||||||
|
wt_.store(cur_wt + 1, std::memory_order_release);
|
||||||
|
}
|
||||||
|
k = 0;
|
||||||
|
}
|
||||||
|
else {
|
||||||
++cur;
|
++cur;
|
||||||
|
auto* el = elems + id_rd;
|
||||||
std::forward<F>(f)(&(el->data_));
|
std::forward<F>(f)(&(el->data_));
|
||||||
for (unsigned k = 0;;) {
|
el->f_rc_.store(popped, std::memory_order_release);
|
||||||
auto cur_rc = el->rc_.load(std::memory_order_acquire);
|
|
||||||
if ((cur_rc & rc_mask) == 0) {
|
|
||||||
std::forward<R>(out)(true);
|
|
||||||
el->f_ct_.store(cur + N - 1, std::memory_order_release);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
auto nxt_rc = inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id());
|
|
||||||
bool last_one = false;
|
|
||||||
if ((last_one = (nxt_rc & rc_mask) == 0)) {
|
|
||||||
el->f_ct_.store(cur + N - 1, std::memory_order_release);
|
|
||||||
}
|
|
||||||
if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) {
|
|
||||||
std::forward<R>(out)(last_one);
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
ipc::yield(k);
|
ipc::yield(k);
|
||||||
|
|||||||
@ -158,27 +158,19 @@ public:
|
|||||||
template <typename T, typename F, typename... P>
|
template <typename T, typename F, typename... P>
|
||||||
bool push(F&& prep, P&&... params) {
|
bool push(F&& prep, P&&... params) {
|
||||||
if (elems_ == nullptr) return false;
|
if (elems_ == nullptr) return false;
|
||||||
return elems_->push(this, [&](void* p) {
|
return elems_->push([&](void* p) {
|
||||||
if (prep(p)) ::new (p) T(std::forward<P>(params)...);
|
if (prep(p)) ::new (p) T(std::forward<P>(params)...);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename T, typename F, typename... P>
|
template <typename T>
|
||||||
bool force_push(F&& prep, P&&... params) {
|
bool pop(T& item) {
|
||||||
if (elems_ == nullptr) return false;
|
|
||||||
return elems_->force_push(this, [&](void* p) {
|
|
||||||
if (prep(p)) ::new (p) T(std::forward<P>(params)...);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
template <typename T, typename F>
|
|
||||||
bool pop(T& item, F&& out) {
|
|
||||||
if (elems_ == nullptr) {
|
if (elems_ == nullptr) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return elems_->pop(this, &(this->cursor_), [&item](void* p) {
|
return elems_->pop(&(this->cursor_), [&item](void* p) {
|
||||||
::new (&item) T(std::move(*static_cast<T*>(p)));
|
::new (&item) T(std::move(*static_cast<T*>(p)));
|
||||||
}, std::forward<F>(out));
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -198,18 +190,8 @@ public:
|
|||||||
return base_t::template push<T>(std::forward<P>(params)...);
|
return base_t::template push<T>(std::forward<P>(params)...);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <typename... P>
|
|
||||||
bool force_push(P&&... params) {
|
|
||||||
return base_t::template force_push<T>(std::forward<P>(params)...);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool pop(T& item) {
|
bool pop(T& item) {
|
||||||
return base_t::pop(item, [](bool) {});
|
return base_t::pop(item);
|
||||||
}
|
|
||||||
|
|
||||||
template <typename F>
|
|
||||||
bool pop(T& item, F&& out) {
|
|
||||||
return base_t::pop(item, std::forward<F>(out));
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -73,11 +73,9 @@ void test_basic(char const * name) {
|
|||||||
|
|
||||||
que_t que1 { name };
|
que_t que1 { name };
|
||||||
EXPECT_FALSE(que1.send(test1));
|
EXPECT_FALSE(que1.send(test1));
|
||||||
EXPECT_FALSE(que1.try_send(test2));
|
|
||||||
|
|
||||||
que_t que2 { que1.name(), ipc::receiver };
|
que_t que2 { que1.name(), ipc::receiver };
|
||||||
ASSERT_TRUE(que1.send(test1));
|
ASSERT_TRUE(que1.send(test1));
|
||||||
ASSERT_TRUE(que1.try_send(test2));
|
|
||||||
|
|
||||||
EXPECT_EQ(que2.recv(), test1);
|
EXPECT_EQ(que2.recv(), test1);
|
||||||
EXPECT_EQ(que2.recv(), test2);
|
EXPECT_EQ(que2.recv(), test2);
|
||||||
|
|||||||
@ -5,7 +5,6 @@
|
|||||||
#include <new>
|
#include <new>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <climits> // CHAR_BIT
|
|
||||||
|
|
||||||
#include "libipc/prod_cons.h"
|
#include "libipc/prod_cons.h"
|
||||||
#include "libipc/policy.h"
|
#include "libipc/policy.h"
|
||||||
@ -143,6 +142,8 @@ TEST(Queue, el_connection) {
|
|||||||
for (std::size_t i = 0; i < 10000; ++i) {
|
for (std::size_t i = 0; i < 10000; ++i) {
|
||||||
ASSERT_TRUE(el.connect_sender());
|
ASSERT_TRUE(el.connect_sender());
|
||||||
}
|
}
|
||||||
|
el.disconnect_sender();
|
||||||
|
EXPECT_TRUE(el.connect_sender());
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> el;
|
elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> el;
|
||||||
@ -156,12 +157,13 @@ TEST(Queue, el_connection) {
|
|||||||
}
|
}
|
||||||
{
|
{
|
||||||
elems_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast> el;
|
elems_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast> el;
|
||||||
for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) {
|
auto cc = el.connect_receiver();
|
||||||
|
EXPECT_EQ(cc, 1);
|
||||||
|
for (std::size_t i = 0; i < 10000; ++i) {
|
||||||
ASSERT_NE(el.connect_receiver(), 0);
|
ASSERT_NE(el.connect_receiver(), 0);
|
||||||
}
|
}
|
||||||
for (std::size_t i = 0; i < 10000; ++i) {
|
EXPECT_EQ(el.disconnect_receiver(cc), 10000);
|
||||||
ASSERT_EQ(el.connect_receiver(), 0);
|
EXPECT_EQ(el.connect_receiver(), 10000 + cc);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -227,25 +229,18 @@ TEST(Queue, connection) {
|
|||||||
for (std::size_t i = 0; i < 10000; ++i) {
|
for (std::size_t i = 0; i < 10000; ++i) {
|
||||||
ASSERT_TRUE(que.connect());
|
ASSERT_TRUE(que.connect());
|
||||||
}
|
}
|
||||||
for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) {
|
|
||||||
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
|
|
||||||
ASSERT_TRUE(que.connect());
|
|
||||||
}
|
|
||||||
for (std::size_t i = 0; i < 10000; ++i) {
|
for (std::size_t i = 0; i < 10000; ++i) {
|
||||||
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
|
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
|
||||||
ASSERT_FALSE(que.connect());
|
ASSERT_TRUE(que.connect());
|
||||||
}
|
}
|
||||||
ASSERT_TRUE(que.disconnect());
|
ASSERT_TRUE(que.disconnect());
|
||||||
for (std::size_t i = 0; i < 10000; ++i) {
|
for (std::size_t i = 0; i < 10000; ++i) {
|
||||||
ASSERT_FALSE(que.disconnect());
|
ASSERT_FALSE(que.disconnect());
|
||||||
}
|
}
|
||||||
{
|
|
||||||
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
|
|
||||||
ASSERT_TRUE(que.connect());
|
|
||||||
}
|
|
||||||
for (std::size_t i = 0; i < 10000; ++i) {
|
for (std::size_t i = 0; i < 10000; ++i) {
|
||||||
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
|
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
|
||||||
ASSERT_FALSE(que.connect());
|
ASSERT_TRUE(que.connect());
|
||||||
|
ASSERT_TRUE(que.disconnect());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user