upd: [concur] (TBD) continue the implementation of the broadcast pattern.

This commit is contained in:
mutouyun 2023-04-09 15:17:10 +08:00
parent a80d7f7590
commit 32585a39f3
3 changed files with 105 additions and 33 deletions

View File

@ -289,8 +289,10 @@ template <>
struct producer<trans::broadcast, relation::single> {
struct header_impl {
std::atomic<index_t> w_idx {0}; ///< write index
private: padding<decltype(w_idx)> ___;
public : std::atomic<index_t> w_idx {0}; ///< write index
private: padding<decltype(w_idx)> ___1;
public : std::atomic<index_t> w_beg {0}; ///< write begin index
private: padding<decltype(w_beg)> ___2;
};
template <typename T, typename H, typename C, typename U,
@ -298,19 +300,18 @@ struct producer<trans::broadcast, relation::single> {
is_convertible<H, header_impl> = true>
static bool enqueue(::LIBIMP::span<element<T>> elems, H &hdr, C &/*ctx*/, U &&src) noexcept {
auto w_idx = hdr.w_idx.load(std::memory_order_acquire);
auto w_beg = hdr.w_beg.load(std::memory_order_relaxed);
auto w_cur = trunc_index(hdr, w_idx);
auto &elem = elems[w_cur];
auto f_ct = elem.get_flag();
// Verify index.
if ((f_ct != state::invalid_value) &&
(f_ct != state::dequeued)) {
return false; // full
// Move the queue head index.
if (w_beg + hdr.circ_size <= w_idx) {
hdr.w_beg.fetch_add(1, std::memory_order_release);
}
// Get a valid index and iterate backwards.
hdr.w_idx.fetch_add(1, std::memory_order_release);
// Set data & flag.
elem.set_flag(w_idx | state::enqueue_mask);
elem.set_data(std::forward<U>(src));
elem.set_data(std::forward<U>(src)); // Here should not be interrupted.
elem.set_flag(w_idx | state::commit_mask);
return true;
}
@ -335,32 +336,36 @@ struct consumer<trans::broadcast, relation::multi> {
is_convertible<C, context_impl> = true,
std::enable_if_t<std::is_nothrow_copy_assignable<U>::value, bool> = true>
static bool dequeue(::LIBIMP::span<element<T>> elems, H &hdr, C &ctx, U &des) noexcept {
auto r_idx = ctx.r_idx;
auto w_idx = static_cast<index_t>(hdr.w_idx.load(std::memory_order_relaxed));
auto r_cur = trunc_index(hdr, r_idx);
auto const &elem = elems[r_cur];
auto w_idx = hdr.w_idx.load(std::memory_order_relaxed);
// Verify index.
if ((ctx.w_lst != state::invalid_value) &&
(ctx.w_lst + hdr.circ_size > w_idx)) {
if (ctx.w_lst == w_idx) {
return false; // not ready
}
// Obtain the queue head index if we need.
auto w_beg = hdr.w_beg.load(std::memory_order_relaxed);
if ((ctx.r_idx < w_beg) ||
(ctx.r_idx >= w_beg + hdr.circ_size)) {
ctx.r_idx = w_beg;
}
auto r_cur = trunc_index(hdr, ctx.r_idx);
auto &elem = elems[r_cur];
auto f_ct = elem.get_flag();
if (f_ct == state::invalid_value) {
return false; // empty
}
// Try getting data.
for (;;) {
if (f_ct & state::enqueue_mask == state::enqueue_mask) {
if ((f_ct & state::enqueue_mask) == state::enqueue_mask) {
return false; // unreadable
}
des = LIBCONCUR::get(elem);
// Correct data can be obtained only if
// the elem data is not modified during the getting process.
if (elem.cas_flag(f_ct, state::dequeued)) break;
if (elem.cas_flag(f_ct, f_ct)) break;
}
ctx.w_lst = (f_ct & ~state::enqueue_mask) + 1;
// Get a valid index and iterate backwards.
ctx.r_idx += 1;
ctx.w_lst = w_idx;
return true;
}
};
@ -393,6 +398,10 @@ struct prod_cons : producer<TransModT, ProdModT>
return (circ_size > 1) && ((circ_size & (circ_size - 1)) == 0);
}
};
/// \brief Mixing producer and consumer context definitions.
struct context : traits<producer<TransModT, ProdModT>>::context
, traits<consumer<TransModT, ConsModT>>::context {};
};
LIBCONCUR_NAMESPACE_END_

View File

@ -26,11 +26,8 @@ using flag_t = std::uint64_t;
enum : flag_t {
invalid_value = ~flag_t(0),
committed = ~flag_t(1),
dequeued = ~flag_t(2),
enqueue_mask = invalid_value << 32,
commit_mask = committed << 32,
commit_mask = ~flag_t(1) << 32,
};
} // namespace state

View File

@ -91,7 +91,7 @@ TEST(concurrent, trunc_index) {
namespace {
template <typename PC>
void test_concur(std::size_t np, std::size_t nc, std::size_t k) {
void test_unicast(std::size_t np, std::size_t nc, std::size_t k) {
LIBIMP_LOG_();
log.info("\n\tStart with: ", imp::nameof<PC>(), ", ", np, " producers, ", nc, " consumers...");
@ -100,13 +100,13 @@ void test_concur(std::size_t np, std::size_t nc, std::size_t k) {
concur::element<std::uint64_t> circ[32] {};
PC pc;
typename concur::traits<PC>::header hdr {imp::make_span(circ)};
typename concur::traits<PC>::context ctx {};
ASSERT_TRUE(hdr.valid());
std::atomic<std::uint64_t> sum {0};
std::atomic<std::size_t> running {np};
auto prod_call = [&](std::size_t n) {
typename concur::traits<PC>::context ctx {};
for (std::uint32_t i = 1; i <= loop_size; ++i) {
std::this_thread::yield();
while (!pc.enqueue(imp::make_span(circ), hdr, ctx, i)) {
@ -119,6 +119,7 @@ void test_concur(std::size_t np, std::size_t nc, std::size_t k) {
--running;
};
auto cons_call = [&] {
typename concur::traits<PC>::context ctx {};
for (;;) {
std::this_thread::yield();
std::uint64_t i;
@ -143,23 +144,88 @@ void test_concur(std::size_t np, std::size_t nc, std::size_t k) {
} // namespace
TEST(concurrent, unicast_prod_cons) {
TEST(concurrent, unicast) {
using namespace concur;
/// @brief 1-1
test_concur<prod_cons<trans::unicast, relation::single, relation::single>>(1, 1, 1);
test_concur<prod_cons<trans::unicast, relation::single, relation::multi >>(1, 1, 1);
test_concur<prod_cons<trans::unicast, relation::multi , relation::single>>(1, 1, 1);
test_concur<prod_cons<trans::unicast, relation::multi , relation::multi >>(1, 1, 1);
test_unicast<prod_cons<trans::unicast, relation::single, relation::single>>(1, 1, 1);
test_unicast<prod_cons<trans::unicast, relation::single, relation::multi >>(1, 1, 1);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::single>>(1, 1, 1);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(1, 1, 1);
/// @brief 8-1
test_concur<prod_cons<trans::unicast, relation::multi , relation::single>>(8, 1, 1);
test_concur<prod_cons<trans::unicast, relation::multi , relation::multi >>(8, 1, 1);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::single>>(8, 1, 1);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(8, 1, 1);
/// @brief 1-8
test_concur<prod_cons<trans::unicast, relation::single, relation::multi >>(1, 8, 1);
test_concur<prod_cons<trans::unicast, relation::multi , relation::multi >>(1, 8, 1);
test_unicast<prod_cons<trans::unicast, relation::single, relation::multi >>(1, 8, 1);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(1, 8, 1);
/// @brief 8-8
test_concur<prod_cons<trans::unicast, relation::multi , relation::multi >>(8, 8, 1);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(8, 8, 1);
}
namespace {
template <typename PC>
void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) {
LIBIMP_LOG_();
concur::element<std::uint64_t> circ[32] {};
PC pc;
typename concur::traits<PC>::header hdr {imp::make_span(circ)};
ASSERT_TRUE(hdr.valid());
auto push_one = [&, ctx = typename concur::traits<PC>::context{}](std::uint32_t i) mutable {
return pc.enqueue(imp::make_span(circ), hdr, ctx, i);
};
auto pop_one = [&, ctx = typename concur::traits<PC>::context{}]() mutable {
std::uint64_t i;
if (pc.dequeue(imp::make_span(circ), hdr, ctx, i)) {
return i;
}
return (std::uint64_t)-1;
};
auto pop_one_2 = pop_one;
// empty queue pop
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
// test one push & pop
for (int i = 0; i < 32; ++i) {
ASSERT_TRUE(push_one(i));
ASSERT_EQ(pop_one(), i);
}
for (int i = 0; i < 100; ++i) {
ASSERT_TRUE(push_one(i));
ASSERT_EQ(pop_one(), i);
}
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
// test loop push & pop
for (int i = 0; i < 10; ++i) ASSERT_TRUE(push_one(i));
for (int i = 0; i < 10; ++i) ASSERT_EQ(pop_one(), i);
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
// other loop pop
for (int i = 0; i < 32; ++i) ASSERT_TRUE(push_one(i));
for (int i = 0; i < 32; ++i) ASSERT_EQ(pop_one_2(), i);
ASSERT_EQ(pop_one_2(), (std::uint64_t)-1);
// overwrite
ASSERT_TRUE(push_one(123));
for (int i = 1; i < 32; ++i) {
ASSERT_EQ(pop_one(), i);
}
ASSERT_EQ(pop_one(), 123);
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
}
} // namespace
TEST(concurrent, broadcast) {
using namespace concur;
/// @brief 1-1
test_broadcast<prod_cons<trans::broadcast, relation::single, relation::multi>>(1, 1, 1);
}