针对不同类型的策略,增加不同的sender/receiver个数检查。

- is_multi_producer:sender无限制;否则仅允许一个
 - is_multi_consumer:receiver个数上限依赖is_broadcast指定;否则仅允许一个
 - is_broadcast:receiver个数上限为32(uint_t<32>位数);否则无限制(uint_t<32>大小)

行为变更:
1. 在连接时根据模式检查sender/receiver是否超出上限,超出则返回false
2. 在send时确认是否允许发送(对receiver模式来说,send之前不会尝试确认sender个数)
3. 修正若干bug
This commit is contained in:
mutouyun 2021-01-03 12:52:03 +08:00
parent c4617a2290
commit 6163618433
9 changed files with 431 additions and 144 deletions

View File

@ -111,10 +111,13 @@ void do_recv() {
int main(int argc, char ** argv) {
if (argc < 2) return 0;
::signal(SIGINT, [](int) {
auto exit = [](int) {
is_quit__.store(true, std::memory_order_release);
que__.disconnect();
});
};
::signal(SIGINT , exit);
::signal(SIGBREAK, exit);
::signal(SIGTERM , exit);
if (std::string{ argv[1] } == mode_s__) {
do_send();

View File

@ -48,4 +48,17 @@ enum class trans { // transmission
template <relat Rp, relat Rc, trans Ts>
struct wr {};
template <typename WR>
struct relat_trait;
template <relat Rp, relat Rc, trans Ts>
struct relat_trait<wr<Rp, Rc, Ts>> {
constexpr static bool is_multi_producer = (Rp == relat::multi);
constexpr static bool is_multi_consumer = (Rc == relat::multi);
constexpr static bool is_broadcast = (Ts == trans::broadcast);
};
template <template <typename> class Policy, typename Flag>
struct relat_trait<Policy<Flag>> : relat_trait<Flag> {};
} // namespace ipc

View File

@ -41,17 +41,19 @@ class chan_wrapper {
private:
using detail_t = chan_impl<Flag>;
ipc::handle_t h_ = nullptr;
unsigned mode_ = ipc::sender;
ipc::handle_t h_ = nullptr;
unsigned mode_ = ipc::sender;
bool connected_ = false;
public:
chan_wrapper() = default;
chan_wrapper() noexcept = default;
explicit chan_wrapper(char const * name, unsigned mode = ipc::sender) {
this->connect(name, mode);
explicit chan_wrapper(char const * name, unsigned mode = ipc::sender)
: connected_{this->connect(name, mode)} {
}
chan_wrapper(chan_wrapper&& rhs) noexcept {
chan_wrapper(chan_wrapper&& rhs) noexcept
: chan_wrapper{} {
swap(rhs);
}
@ -60,15 +62,17 @@ public:
}
void swap(chan_wrapper& rhs) noexcept {
std::swap(h_, rhs.h_);
std::swap(h_ , rhs.h_);
std::swap(mode_ , rhs.mode_);
std::swap(connected_, rhs.connected_);
}
chan_wrapper& operator=(chan_wrapper rhs) {
chan_wrapper& operator=(chan_wrapper rhs) noexcept {
swap(rhs);
return *this;
}
char const * name() const {
char const * name() const noexcept {
return detail_t::name(h_);
}
@ -88,21 +92,28 @@ public:
return chan_wrapper { name(), mode_ };
}
/**
* Building handle, then try connecting with name & mode flags.
*/
bool connect(char const * name, unsigned mode = ipc::sender | ipc::receiver) {
if (name == nullptr || name[0] == '\0') return false;
this->disconnect();
return detail_t::connect(&h_, name, mode_ = mode);
detail_t::disconnect(h_); // clear old connection
return connected_ = detail_t::connect(&h_, name, mode_ = mode);
}
/**
* Try connecting with new mode flags.
*/
bool reconnect(unsigned mode) {
if (!valid()) return false;
if (mode_ == mode) return true;
return detail_t::reconnect(&h_, mode_ = mode);
if (connected_ && (mode_ == mode)) return true;
return connected_ = detail_t::reconnect(&h_, mode_ = mode);
}
void disconnect() {
if (!valid()) return;
detail_t::disconnect(h_);
connected_ = false;
}
std::size_t recv_count() const {

View File

@ -270,12 +270,12 @@ struct conn_info_head {
ipc::tls::pointer<ipc::unordered_map<msg_id_t, cache_t>> recv_cache_;
conn_info_head(char const * name)
: name_ (name)
, cc_id_ ((cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed))
, cc_waiter_(("__CC_CONN__" + name_).c_str())
, wt_waiter_(("__WT_CONN__" + name_).c_str())
, rd_waiter_(("__RD_CONN__" + name_).c_str())
, acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) {
: name_ {name}
, cc_id_ {(cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed)}
, cc_waiter_{("__CC_CONN__" + name_).c_str()}
, wt_waiter_{("__WT_CONN__" + name_).c_str()}
, rd_waiter_{("__RD_CONN__" + name_).c_str()}
, acc_h_ {("__AC_CONN__" + name_).c_str(), sizeof(acc_t)} {
}
void quit_waiting() {
@ -321,10 +321,18 @@ struct queue_generator {
queue_t que_;
conn_info_t(char const * name)
: conn_info_head(name)
, que_(("__QU_CONN__" +
ipc::to_string(DataSize) + "__" +
ipc::to_string(AlignSize) + "__" + name).c_str()) {
: conn_info_head{name}
, que_{("__QU_CONN__" +
ipc::to_string(DataSize) + "__" +
ipc::to_string(AlignSize) + "__" + name).c_str()} {
}
void disconnect_receiver() {
bool dis = que_.disconnect();
this->quit_waiting();
if (dis) {
this->recv_cache().clear();
}
}
};
};
@ -335,11 +343,11 @@ struct detail_impl {
using queue_t = typename queue_generator<Policy>::queue_t;
using conn_info_t = typename queue_generator<Policy>::conn_info_t;
constexpr static conn_info_t* info_of(ipc::handle_t h) {
constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept {
return static_cast<conn_info_t*>(h);
}
constexpr static queue_t* queue_of(ipc::handle_t h) {
constexpr static queue_t* queue_of(ipc::handle_t h) noexcept {
return (info_of(h) == nullptr) ? nullptr : &(info_of(h)->que_);
}
@ -350,11 +358,9 @@ static void disconnect(ipc::handle_t h) {
if (que == nullptr) {
return;
}
bool dis = que->disconnect();
info_of(h)->quit_waiting();
if (dis) {
info_of(h)->recv_cache().clear();
}
que->shut_sending();
assert(info_of(h) != nullptr);
info_of(h)->disconnect_receiver();
}
static bool reconnect(ipc::handle_t * ph, bool start_to_recv) {
@ -365,15 +371,18 @@ static bool reconnect(ipc::handle_t * ph, bool start_to_recv) {
return false;
}
if (start_to_recv) {
que->shut_sending();
if (que->connect()) { // wouldn't connect twice
info_of(*ph)->cc_waiter_.broadcast();
return true;
}
return false;
}
// start == false
else if (que->connected()) {
disconnect(*ph);
// start_to_recv == false
if (que->connected()) {
info_of(*ph)->disconnect_receiver();
}
return true;
return que->ready_sending();
}
static bool connect(ipc::handle_t * ph, char const * name, bool start_to_recv) {
@ -389,7 +398,7 @@ static void destroy(ipc::handle_t h) {
ipc::mem::free(info_of(h));
}
static std::size_t recv_count(ipc::handle_t h) {
static std::size_t recv_count(ipc::handle_t h) noexcept {
auto que = queue_of(h);
if (que == nullptr) {
return ipc::invalid_value;
@ -422,14 +431,8 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
ipc::error("fail: send, queue_of(h)->elems() == nullptr\n");
return false;
}
/**
* If the que hasn't connected as a receiver, the 'connected-id' must be 0,
* and 'connections' equals to 0 only if there are no receivers.
* Or if the que has connected as a receiver,
* 'connections' equals to 'connected-id' of this que only if there are no other receivers.
*/
if (que->elems()->connections(std::memory_order_relaxed) == que->connected_id()) {
// there is no receiver on this connection
if (!que->ready_sending()) {
ipc::error("fail: send, que->ready_sending() == false\n");
return false;
}
// calc a new message id

View File

@ -1,10 +1,12 @@
#pragma once
#include <atomic> // std::atomic<?>
#include <limits>
#include <utility>
#include <type_traits>
#include "libipc/def.h"
#include "libipc/rw_lock.h"
#include "libipc/circ/elem_def.h"
#include "libipc/platform/detail.h"
@ -15,9 +17,9 @@ namespace circ {
template <typename Policy,
std::size_t DataSize,
std::size_t AlignSize = (ipc::detail::min)(DataSize, alignof(std::max_align_t))>
class elem_array : public ipc::circ::conn_head {
class elem_array : public ipc::circ::conn_head<Policy> {
public:
using base_t = ipc::circ::conn_head;
using base_t = ipc::circ::conn_head<Policy>;
using policy_t = Policy;
using cursor_t = decltype(std::declval<policy_t>().cursor());
using elem_t = typename policy_t::template elem_t<DataSize, AlignSize>;
@ -34,7 +36,86 @@ private:
policy_t head_;
elem_t block_[elem_max] {};
/**
* @remarks 'warning C4348: redefinition of default parameter' with MSVC.
* @see
* - https://stackoverflow.com/questions/12656239/redefinition-of-default-template-parameter
* - https://developercommunity.visualstudio.com/content/problem/425978/incorrect-c4348-warning-in-nested-template-declara.html
*/
template <typename P, bool/* = relat_trait<P>::is_multi_producer*/>
struct sender_checker;
template <typename P>
struct sender_checker<P, true> {
constexpr static bool connect() noexcept {
// always return true
return true;
}
constexpr static void disconnect() noexcept {}
};
template <typename P>
struct sender_checker<P, false> {
bool connect() noexcept {
return !flag_.test_and_set(std::memory_order_acq_rel);
}
void disconnect() noexcept {
flag_.clear();
}
private:
// in shm, it should be 0 whether it's initialized or not.
std::atomic_flag flag_ = ATOMIC_FLAG_INIT;
};
template <typename P, bool/* = relat_trait<P>::is_multi_consumer*/>
struct receiver_checker;
template <typename P>
struct receiver_checker<P, true> {
constexpr static cc_t connect(base_t &conn) noexcept {
return conn.connect();
}
constexpr static cc_t disconnect(base_t &conn, cc_t cc_id) noexcept {
return conn.disconnect(cc_id);
}
};
template <typename P>
struct receiver_checker<P, false> : protected sender_checker<P, false> {
cc_t connect(base_t &conn) noexcept {
return sender_checker<P, false>::connect() ? conn.connect() : 0;
}
cc_t disconnect(base_t &conn, cc_t cc_id) noexcept {
sender_checker<P, false>::disconnect();
return conn.disconnect(cc_id);
}
};
sender_checker <policy_t, relat_trait<policy_t>::is_multi_producer> s_ckr_;
receiver_checker<policy_t, relat_trait<policy_t>::is_multi_consumer> r_ckr_;
// make these be private
using base_t::connect;
using base_t::disconnect;
public:
bool connect_sender() noexcept {
return s_ckr_.connect();
}
void disconnect_sender() noexcept {
return s_ckr_.disconnect();
}
cc_t connect_receiver() noexcept {
return r_ckr_.connect(*this);
}
cc_t disconnect_receiver(cc_t cc_id) noexcept {
return r_ckr_.disconnect(*this, cc_id);
}
cursor_t cursor() const noexcept {
return head_.cursor();
}

View File

@ -5,6 +5,7 @@
#include <cstdint>
#include <new>
#include "libipc/def.h"
#include "libipc/rw_lock.h"
#include "libipc/platform/detail.h"
@ -22,7 +23,8 @@ constexpr u1_t index_of(u2_t c) noexcept {
return static_cast<u1_t>(c);
}
class conn_head {
class conn_head_base {
protected:
std::atomic<cc_t> cc_{0}; // connections
ipc::spin_lock lc_;
std::atomic<bool> constructed_{false};
@ -33,46 +35,75 @@ public:
if (!constructed_.load(std::memory_order_acquire)) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lc_);
if (!constructed_.load(std::memory_order_relaxed)) {
::new (this) conn_head;
::new (this) conn_head_base;
constructed_.store(true, std::memory_order_release);
}
}
}
conn_head() = default;
conn_head(const conn_head&) = delete;
conn_head& operator=(const conn_head&) = delete;
conn_head_base() = default;
conn_head_base(conn_head_base const &) = delete;
conn_head_base &operator=(conn_head_base const &) = delete;
cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept {
return this->cc_.load(order);
}
};
template <typename P, bool = relat_trait<P>::is_broadcast>
class conn_head;
template <typename P>
class conn_head<P, true> : public conn_head_base {
public:
cc_t connect() noexcept {
for (unsigned k = 0;;) {
cc_t curr = cc_.load(std::memory_order_acquire);
for (unsigned k = 0;; ipc::yield(k)) {
cc_t curr = this->cc_.load(std::memory_order_acquire);
cc_t next = curr | (curr + 1); // find the first 0, and set it to 1.
if (next == 0) {
// connection-slot is full.
return 0;
}
if (cc_.compare_exchange_weak(curr, next, std::memory_order_release)) {
if (this->cc_.compare_exchange_weak(curr, next, std::memory_order_release)) {
return next ^ curr; // return connected id
}
ipc::yield(k);
}
}
cc_t disconnect(cc_t cc_id) noexcept {
return cc_.fetch_and(~cc_id) & ~cc_id;
}
cc_t connections(std::memory_order order = std::memory_order_acquire) const noexcept {
return cc_.load(order);
return this->cc_.fetch_and(~cc_id, std::memory_order_acq_rel) & ~cc_id;
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
cc_t cur = cc_.load(order);
cc_t cur = this->cc_.load(order);
cc_t cnt; // accumulates the total bits set in cc
for (cnt = 0; cur; ++cnt) cur &= cur - 1;
return cnt;
}
};
template <typename P>
class conn_head<P, false> : public conn_head_base {
public:
cc_t connect() noexcept {
return this->cc_.fetch_add(1, std::memory_order_relaxed) + 1;
}
cc_t disconnect(cc_t cc_id) noexcept {
if (cc_id == ~static_cast<circ::cc_t>(0u)) {
// clear all connections
this->cc_.store(0, std::memory_order_relaxed);
return 0u;
}
else {
return this->cc_.fetch_sub(1, std::memory_order_relaxed) - 1;
}
}
std::size_t conn_count(std::memory_order order = std::memory_order_acquire) const noexcept {
return this->connections(order);
}
};
} // namespace circ
} // namespace ipc

View File

@ -4,6 +4,7 @@
#include <utility>
#include <cstring>
#include <type_traits>
#include <cstdint>
#include "libipc/def.h"
@ -47,23 +48,14 @@ struct prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
return true;
}
/**
* In single-single-unicast, 'force_push' means 'no reader' or 'the only one reader is dead'.
* So we could just disconnect all connections of receiver, and return false.
*/
template <typename W, typename F, typename E>
bool force_push(W* /*wrapper*/, F&& f, E* elems) {
auto cur_wt = circ::index_of(wt_.load(std::memory_order_relaxed));
for (unsigned k = 0;;) {
auto cur_rd = rd_.load(std::memory_order_acquire);
if (cur_wt != circ::index_of(cur_rd - 1)) {
break;
}
// full
if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_acq_rel)) {
break;
}
ipc::yield(k);
}
std::forward<F>(f)(&(elems[cur_wt].data_));
wt_.fetch_add(1, std::memory_order_release);
return true;
bool force_push(W* wrapper, F&&, E*) {
wrapper->elems()->disconnect_receiver(~static_cast<circ::cc_t>(0u));
return false;
}
template <typename W, typename F, typename E>
@ -82,6 +74,12 @@ template <>
struct prod_cons_impl<wr<relat::single, relat::multi , trans::unicast>>
: prod_cons_impl<wr<relat::single, relat::single, trans::unicast>> {
template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&&, E*) {
wrapper->elems()->disconnect_receiver(1);
return false;
}
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E<DS, AS>* elems) {
byte_t buff[DS];
@ -153,8 +151,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
}
template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&& f, E* elems) {
return push(wrapper, std::forward<F>(f), elems); /* TBD */
bool force_push(W* wrapper, F&&, E*) {
wrapper->elems()->disconnect_receiver(1);
return false;
}
template <typename W, typename F, template <std::size_t, std::size_t> class E, std::size_t DS, std::size_t AS>
@ -191,7 +190,12 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::unicast>>
template <>
struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
using rc_t = std::uint32_t;
using rc_t = std::uint64_t;
enum : rc_t {
ep_mask = 0x00000000ffffffffull,
ep_incr = 0x0000000100000000ull
};
template <std::size_t DataSize, std::size_t AlignSize>
struct elem_t {
@ -199,7 +203,8 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
std::atomic<rc_t> rc_ { 0 }; // read-counter
};
alignas(cache_line_size) std::atomic<circ::u2_t> wt_; // write index
alignas(cache_line_size) std::atomic<circ::u2_t> wt_; // write index
alignas(cache_line_size) rc_t epoch_ { 0 }; // only one writer
circ::u2_t cursor() const noexcept {
return wt_.load(std::memory_order_acquire);
@ -211,15 +216,16 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
for (unsigned k = 0;;) {
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
el = elems + circ::index_of(wt_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cc & cur_rc) {
circ::cc_t rem_cc = cur_rc & ep_mask;
if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch_)) {
return false; // has not finished yet
}
// cur_rc should be 0 here
// consider rem_cc to be 0 here
if (el->rc_.compare_exchange_weak(
cur_rc, static_cast<rc_t>(cc), std::memory_order_release)) {
cur_rc, epoch_ | static_cast<rc_t>(cc), std::memory_order_release)) {
break;
}
ipc::yield(k);
@ -232,20 +238,22 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
template <typename W, typename F, typename E>
bool force_push(W* wrapper, F&& f, E* elems) {
E* el;
epoch_ += ep_incr;
for (unsigned k = 0;;) {
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
el = elems + circ::index_of(wt_.load(std::memory_order_acquire));
el = elems + circ::index_of(wt_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
if (cc & cur_rc) {
ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, cur_rc);
cc = wrapper->elems()->disconnect(cur_rc); // disconnect all remained readers
circ::cc_t rem_cc = cur_rc & ep_mask;
if (cc & rem_cc) {
ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc);
cc = wrapper->elems()->disconnect_receiver(rem_cc); // disconnect all invalid readers
if (cc == 0) return false; // no reader
}
// just compare & exchange
if (el->rc_.compare_exchange_weak(
cur_rc, static_cast<rc_t>(cc), std::memory_order_release)) {
cur_rc, epoch_ | static_cast<rc_t>(cc), std::memory_order_release)) {
break;
}
ipc::yield(k);
@ -261,8 +269,9 @@ struct prod_cons_impl<wr<relat::single, relat::multi, trans::broadcast>> {
auto* el = elems + circ::index_of(cur++);
std::forward<F>(f)(&(el->data_));
for (unsigned k = 0;;) {
rc_t cur_rc = el->rc_.load(std::memory_order_acquire);
if (cur_rc == 0) {
auto cur_rc = el->rc_.load(std::memory_order_acquire);
circ::cc_t rem_cc = cur_rc & ep_mask;
if (rem_cc == 0) {
return true;
}
if (el->rc_.compare_exchange_weak(cur_rc,
@ -283,7 +292,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
enum : rc_t {
rc_mask = 0x00000000ffffffffull,
rc_incr = 0x0000000100000000ull
ep_mask = 0x0000ffffffffffffull,
ep_incr = 0x0001000000000000ull,
ic_mask = 0xffff0000ffffffffull
};
template <std::size_t DataSize, std::size_t AlignSize>
@ -293,12 +304,22 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
std::atomic<flag_t> f_ct_ { 0 }; // commit flag
};
alignas(cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
alignas(cache_line_size) std::atomic<circ::u2_t> ct_; // commit index
alignas(cache_line_size) std::atomic<rc_t> epoch_ { 0 };
circ::u2_t cursor() const noexcept {
return ct_.load(std::memory_order_acquire);
}
constexpr static rc_t inc_rc(rc_t rc) noexcept {
auto ic = static_cast<std::uint16_t>((rc & ~ic_mask) >> 32);
return (rc & ic_mask) | (static_cast<rc_t>(ic + 1) << 32);
}
constexpr static rc_t inc_mask(rc_t rc) noexcept {
return inc_rc(rc) & ~rc_mask;
}
template <typename W, typename F, typename E>
bool push(W* wrapper, F&& f, E* elems) {
E* el;
@ -308,9 +329,10 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
if (cc == 0) return false; // no reader
el = elems + circ::index_of(cur_ct = ct_.load(std::memory_order_relaxed));
// check all consumers have finished reading this element
auto cur_rc = el->rc_.load(std::memory_order_acquire);
auto cur_rc = el->rc_.load(std::memory_order_relaxed);
circ::cc_t rem_cc = cur_rc & rc_mask;
if (cc & rem_cc) {
rc_t epoch = epoch_.load(std::memory_order_acquire);
if ((cc & rem_cc) && ((cur_rc & ~ep_mask) == epoch)) {
return false; // has not finished yet
}
else if (!rem_cc) {
@ -319,9 +341,9 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
return false; // full
}
}
// (cur_rc & rc_mask) should be 0 here
// consider rem_cc to be 0 here
if (el->rc_.compare_exchange_weak(
cur_rc, ((cur_rc + rc_incr) & ~rc_mask) | static_cast<rc_t>(cc), std::memory_order_release)) {
cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_release)) {
break;
}
ipc::yield(k);
@ -338,6 +360,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
bool force_push(W* wrapper, F&& f, E* elems) {
E* el;
circ::u2_t cur_ct;
rc_t epoch = epoch_.fetch_add(ep_incr, std::memory_order_release) + ep_incr;
for (unsigned k = 0;;) {
circ::cc_t cc = wrapper->elems()->connections(std::memory_order_relaxed);
if (cc == 0) return false; // no reader
@ -347,12 +370,12 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
circ::cc_t rem_cc = cur_rc & rc_mask;
if (cc & rem_cc) {
ipc::log("force_push: k = %u, cc = %u, rem_cc = %u\n", k, cc, rem_cc);
cc = wrapper->elems()->disconnect(rem_cc); // disconnect all remained readers
cc = wrapper->elems()->disconnect_receiver(rem_cc); // disconnect all invalid readers
if (cc == 0) return false; // no reader
}
// just compare & exchange
if (el->rc_.compare_exchange_weak(
cur_rc, ((cur_rc + rc_incr) & ~rc_mask) | static_cast<rc_t>(cc), std::memory_order_release)) {
cur_rc, inc_mask(epoch | (cur_rc & ep_mask)) | static_cast<rc_t>(cc), std::memory_order_release)) {
break;
}
ipc::yield(k);
@ -385,7 +408,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
el->f_ct_.store(cur + N - 1, std::memory_order_release);
}
if (el->rc_.compare_exchange_weak(cur_rc,
(cur_rc + rc_incr) & ~static_cast<rc_t>(wrapper->connected_id()),
inc_rc(cur_rc) & ~static_cast<rc_t>(wrapper->connected_id()),
std::memory_order_release)) {
return true;
}

View File

@ -2,7 +2,7 @@
#include <type_traits>
#include <new>
#include <utility>
#include <utility> // [[since C++14]]: std::exchange
#include <algorithm>
#include <atomic>
#include <tuple>
@ -63,21 +63,22 @@ public:
}
template <typename Elems>
auto connect(Elems* elems)
-> std::tuple<bool, decltype(std::declval<Elems>().cursor())> {
auto connect(Elems* elems) noexcept
/*needs 'optional' here*/
-> std::tuple<bool, bool, decltype(std::declval<Elems>().cursor())> {
if (elems == nullptr) return {};
// if it's already connected, just return false
if (connected()) return {};
connected_ = elems->connect();
return std::make_tuple(connected(), elems->cursor());
// if it's already connected, just return
if (connected()) return {connected(), false, 0};
connected_ = elems->connect_receiver();
return {connected(), true, elems->cursor()};
}
template <typename Elems>
bool disconnect(Elems* elems) {
bool disconnect(Elems* elems) noexcept {
if (elems == nullptr) return false;
// if it's already disconnected, just return false
if (!connected()) return false;
elems->disconnect(connected_);
elems->disconnect_receiver(std::exchange(connected_, 0));
return true;
}
};
@ -93,6 +94,7 @@ public:
protected:
elems_t * elems_ = nullptr;
decltype(std::declval<elems_t>().cursor()) cursor_ = 0;
bool sender_flag_ = false;
public:
using base_t::base_t;
@ -100,12 +102,12 @@ public:
queue_base() = default;
explicit queue_base(char const * name)
: queue_base() {
: queue_base{} {
elems_ = open<elems_t>(name);
}
explicit queue_base(elems_t * elems)
: queue_base() {
explicit queue_base(elems_t * elems) noexcept
: queue_base{} {
assert(elems != nullptr);
elems_ = elems;
}
@ -117,16 +119,27 @@ public:
elems_t * elems() noexcept { return elems_; }
elems_t const * elems() const noexcept { return elems_; }
bool connect() {
auto tp = base_t::connect(elems_);
if (std::get<0>(tp)) {
cursor_ = std::get<1>(tp);
return true;
}
return false;
bool ready_sending() noexcept {
if (elems_ == nullptr) return false;
return sender_flag_ || (sender_flag_ = elems_->connect_sender());
}
bool disconnect() {
void shut_sending() noexcept {
if (elems_ == nullptr) return;
if (!sender_flag_) return;
elems_->disconnect_sender();
}
bool connect() noexcept {
auto tp = base_t::connect(elems_);
if (std::get<0>(tp) && std::get<1>(tp)) {
cursor_ = std::get<2>(tp);
return true;
}
return std::get<0>(tp);
}
bool disconnect() noexcept {
return base_t::disconnect(elems_);
}
@ -172,7 +185,7 @@ public:
} // namespace detail
template <typename T, typename Policy>
class queue : public detail::queue_base<typename Policy::template elems_t<sizeof(T), alignof(T)>> {
class queue final : public detail::queue_base<typename Policy::template elems_t<sizeof(T), alignof(T)>> {
using base_t = detail::queue_base<typename Policy::template elems_t<sizeof(T), alignof(T)>>;
public:

View File

@ -5,6 +5,7 @@
#include <new>
#include <vector>
#include <unordered_map>
#include <climits> // CHAR_BIT
#include "libipc/prod_cons.h"
#include "libipc/policy.h"
@ -127,69 +128,177 @@ TEST(Queue, check_size) {
std::cout << "sizeof(elems_t<s, m, b>) = " << sizeof(el_t) << std::endl;
}
TEST(Queue, connection) {
TEST(Queue, el_connection) {
{
using el_t = elems_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast>;
el_t el;
el.init();
elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> el;
EXPECT_TRUE(el.connect_sender());
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_FALSE(el.connect_sender());
}
el.disconnect_sender();
EXPECT_TRUE(el.connect_sender());
}
{
elems_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::unicast> el;
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(el.connect_sender());
}
}
{
elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> el;
auto cc = el.connect_receiver();
EXPECT_NE(cc, 0);
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_EQ(el.connect_receiver(), 0);
}
EXPECT_EQ(el.disconnect_receiver(cc), 0);
EXPECT_EQ(el.connect_receiver(), cc);
}
{
elems_t<ipc::relat::single, ipc::relat::multi, ipc::trans::broadcast> el;
for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) {
EXPECT_NE(el.connect(), 0);
EXPECT_NE(el.connect_receiver(), 0);
}
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_EQ(el.connect(), 0);
EXPECT_EQ(el.connect_receiver(), 0);
}
}
}
TEST(Queue, connection) {
{
elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> el;
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
// sending
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(que.ready_sending());
}
for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
EXPECT_FALSE(que.ready_sending());
}
for (std::size_t i = 0; i < 10000; ++i) {
que.shut_sending();
}
{
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
EXPECT_TRUE(que.ready_sending());
}
// receiving
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(que.connect());
}
for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
EXPECT_FALSE(que.connect());
}
EXPECT_TRUE(que.disconnect());
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_FALSE(que.disconnect());
}
{
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
EXPECT_TRUE(que.connect());
}
for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> que{&el};
EXPECT_FALSE(que.connect());
}
}
{
elems_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> el;
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
// sending
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(que.ready_sending());
}
for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_TRUE(que.ready_sending());
}
for (std::size_t i = 0; i < 10000; ++i) {
que.shut_sending();
}
for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_TRUE(que.ready_sending());
}
// receiving
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_TRUE(que.connect());
}
for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_TRUE(que.connect());
}
for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_FALSE(que.connect());
}
EXPECT_TRUE(que.disconnect());
for (std::size_t i = 0; i < 10000; ++i) {
EXPECT_FALSE(que.disconnect());
}
{
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_TRUE(que.connect());
}
for (std::size_t i = 0; i < 10000; ++i) {
queue_t<ipc::relat::multi, ipc::relat::multi, ipc::trans::broadcast> que{&el};
EXPECT_FALSE(que.connect());
}
}
}
TEST(Queue, prod_cons_1v1_unicast) {
test_sr(elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast> {}, 1, 1, "ssu");
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::unicast> {}, 1, 1, "smu");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, 1, 1, "mmu");
test_sr(elems_t<ipc::relat::single, ipc::relat::single, ipc::trans::unicast>{}, 1, 1, "ssu");
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::unicast>{}, 1, 1, "smu");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast>{}, 1, 1, "mmu");
}
TEST(Queue, prod_cons_1v1_broadcast) {
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::broadcast> {}, 1, 1, "smb");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, 1, 1, "mmb");
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::broadcast>{}, 1, 1, "smb");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast>{}, 1, 1, "mmb");
}
TEST(Queue, prod_cons_1vN_unicast) {
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::unicast> {}, 1, i, "smu");
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::unicast>{}, 1, i, "smu");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, 1, i, "mmu");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast>{}, 1, i, "mmu");
}
}
TEST(Queue, prod_cons_1vN_broadcast) {
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::broadcast> {}, 1, i, "smb");
test_sr(elems_t<ipc::relat::single, ipc::relat::multi , ipc::trans::broadcast>{}, 1, i, "smb");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, 1, i, "mmb");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast>{}, 1, i, "mmb");
}
}
TEST(Queue, prod_cons_NvN_unicast) {
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, 1, i, "mmu");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast>{}, 1, i, "mmu");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, i, 1, "mmu");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast>{}, i, 1, "mmu");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast> {}, i, i, "mmu");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::unicast>{}, i, i, "mmu");
}
}
TEST(Queue, prod_cons_NvN_broadcast) {
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, 1, i, "mmb");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast>{}, 1, i, "mmb");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, i, 1, "mmb");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast>{}, i, 1, "mmb");
}
for (int i = 1; i <= ThreadMax; ++i) {
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast> {}, i, i, "mmb");
test_sr(elems_t<ipc::relat::multi , ipc::relat::multi , ipc::trans::broadcast>{}, i, i, "mmb");
}
}