add: [imp] (TBD) impl trans::broadcast

This commit is contained in:
mutouyun 2023-04-05 19:21:15 +08:00
parent 33768dd79d
commit a80d7f7590
3 changed files with 91 additions and 14 deletions

View File

@ -225,7 +225,8 @@ struct consumer<trans::unicast, relation::single> {
template <typename T, typename H, typename C, typename U,
is_elems_header<H> = true,
is_convertible<H, header_impl> = true>
is_convertible<H, header_impl> = true,
std::enable_if_t<std::is_nothrow_move_assignable<U>::value, bool> = true>
static bool dequeue(::LIBIMP::span<element<T>> elems, H &hdr, C &/*ctx*/, U &des) noexcept {
auto r_idx = hdr.r_idx;
auto r_cur = trunc_index(hdr, r_idx);
@ -238,8 +239,8 @@ struct consumer<trans::unicast, relation::single> {
// Get a valid index and iterate backwards.
hdr.r_idx += 1;
// Get data & set flag.
des = LIBCONCUR::get(elem);
elem.set_flag(r_idx + static_cast<index_t>(elems.size())/*avoid overflow*/);
des = LIBCONCUR::get(std::move(elem));
elem.set_flag(r_idx + hdr.circ_size);
return true;
}
};
@ -255,7 +256,8 @@ struct consumer<trans::unicast, relation::multi> {
template <typename T, typename H, typename C, typename U,
is_elems_header<H> = true,
is_convertible<H, header_impl> = true>
is_convertible<H, header_impl> = true,
std::enable_if_t<std::is_nothrow_move_assignable<U>::value, bool> = true>
static bool dequeue(::LIBIMP::span<element<T>> elems, H &hdr, C &/*ctx*/, U &des) noexcept {
auto r_idx = hdr.r_idx.load(std::memory_order_acquire);
for (;;) {
@ -271,8 +273,8 @@ struct consumer<trans::unicast, relation::multi> {
continue;
}
// Get data & set flag.
des = LIBCONCUR::get(elem);
elem.set_flag(r_idx + static_cast<index_t>(elems.size())/*avoid overflow*/);
des = LIBCONCUR::get(std::move(elem));
elem.set_flag(r_idx + hdr.circ_size);
return true;
}
}
@ -285,17 +287,82 @@ struct consumer<trans::unicast, relation::multi> {
/// \brief Single-write producer model implementation.
template <>
struct producer<trans::broadcast, relation::single> {
struct header_impl {
std::atomic<index_t> w_idx {0}; ///< write index
private: padding<decltype(w_idx)> ___;
};
template <typename T, typename H, typename C, typename U,
is_elems_header<H> = true,
is_convertible<H, header_impl> = true>
static bool enqueue(::LIBIMP::span<element<T>> elems, H &hdr, C &/*ctx*/, U &&src) noexcept {
auto w_idx = hdr.w_idx.load(std::memory_order_acquire);
auto w_cur = trunc_index(hdr, w_idx);
auto &elem = elems[w_cur];
auto f_ct = elem.get_flag();
// Verify index.
if ((f_ct != state::invalid_value) &&
(f_ct != state::dequeued)) {
return false; // full
}
// Get a valid index and iterate backwards.
hdr.w_idx.fetch_add(1, std::memory_order_release);
// Set data & flag.
elem.set_flag(w_idx | state::enqueue_mask);
elem.set_data(std::forward<U>(src));
elem.set_flag(w_idx | state::commit_mask);
return true;
}
};
/// \brief Multi-write producer model implementation.
template <>
struct producer<trans::broadcast, relation::multi> {
};
struct producer<trans::broadcast, relation::multi> {};
/// \brief Multi-read consumer model implementation.
/// Single-read consumer model is not required.
template <>
struct consumer<trans::broadcast, relation::multi> {
struct context_impl {
index_t r_idx {0}; ///< read index
state::flag_t w_lst {state::invalid_value}; ///< last write index
};
template <typename T, typename H, typename C, typename U,
is_elems_header<H> = true,
is_convertible<C, context_impl> = true,
std::enable_if_t<std::is_nothrow_copy_assignable<U>::value, bool> = true>
static bool dequeue(::LIBIMP::span<element<T>> elems, H &hdr, C &ctx, U &des) noexcept {
auto r_idx = ctx.r_idx;
auto w_idx = static_cast<index_t>(hdr.w_idx.load(std::memory_order_relaxed));
auto r_cur = trunc_index(hdr, r_idx);
auto const &elem = elems[r_cur];
// Verify index.
if ((ctx.w_lst != state::invalid_value) &&
(ctx.w_lst + hdr.circ_size > w_idx)) {
return false; // not ready
}
auto f_ct = elem.get_flag();
if (f_ct == state::invalid_value) {
return false; // empty
}
// Try getting data.
for (;;) {
if (f_ct & state::enqueue_mask == state::enqueue_mask) {
return false; // unreadable
}
des = LIBCONCUR::get(elem);
// Correct data can be obtained only if
// the elem data is not modified during the getting process.
if (elem.cas_flag(f_ct, state::dequeued)) break;
}
// Get a valid index and iterate backwards.
ctx.r_idx += 1;
ctx.w_lst = w_idx;
return true;
}
};
/**
@ -310,8 +377,8 @@ struct prod_cons : producer<TransModT, ProdModT>
, consumer<TransModT, ConsModT> {
/// \brief Mixing producer and consumer header definitions.
struct header : producer<TransModT, ProdModT>::header_impl
, consumer<TransModT, ConsModT>::header_impl {
struct header : traits<producer<TransModT, ProdModT>>::header
, traits<consumer<TransModT, ConsModT>>::header {
index_t const circ_size;
constexpr header(index_t cs) noexcept

View File

@ -25,8 +25,12 @@ namespace state {
using flag_t = std::uint64_t;
enum : flag_t {
/// \brief The invalid state value.
invalid_value = ~flag_t(0),
committed = ~flag_t(1),
dequeued = ~flag_t(2),
enqueue_mask = invalid_value << 32,
commit_mask = committed << 32,
};
} // namespace state
@ -80,8 +84,14 @@ public:
}
}
void set_flag(state::flag_t flag) noexcept {
f_ct_.store(flag, std::memory_order_release);
void set_flag(state::flag_t flag,
std::memory_order const order = std::memory_order_release) noexcept {
f_ct_.store(flag, order);
}
bool cas_flag(state::flag_t &expected, state::flag_t flag,
std::memory_order const order = std::memory_order_acq_rel) noexcept {
return f_ct_.compare_exchange_weak(expected, flag, order);
}
state::flag_t get_flag() const noexcept {

View File

@ -143,7 +143,7 @@ void test_concur(std::size_t np, std::size_t nc, std::size_t k) {
} // namespace
TEST(concurrent, prod_cons) {
TEST(concurrent, unicast_prod_cons) {
using namespace concur;
/// @brief 1-1