add: [concur] producer<trans::broadcast, relation::multi>

This commit is contained in:
mutouyun 2023-04-16 15:40:46 +08:00
parent 5578f48595
commit a7b8af7fa8
2 changed files with 121 additions and 37 deletions

View File

@ -9,7 +9,9 @@
#include <type_traits> #include <type_traits>
#include <atomic> #include <atomic>
#include <array> #include <array>
#include <tuple>
#include <cstdint> #include <cstdint>
#include <climits>
#include "libimp/span.h" #include "libimp/span.h"
#include "libimp/generic.h" #include "libimp/generic.h"
@ -110,7 +112,7 @@ class broadcast {};
/// \brief Determines whether type T can be implicitly converted to type U. /// \brief Determines whether type T can be implicitly converted to type U.
template <typename T, typename U> template <typename T, typename U>
using is_convertible = std::enable_if_t<std::is_convertible<T *, U *>::value, bool>; using is_convertible = typename std::enable_if<std::is_convertible<T *, U *>::value, bool>::type;
/// \brief Check whether the elems header type is valid. /// \brief Check whether the elems header type is valid.
template <typename T> template <typename T>
@ -289,10 +291,15 @@ template <>
struct producer<trans::broadcast, relation::single> { struct producer<trans::broadcast, relation::single> {
struct header_impl { struct header_impl {
public : std::atomic<index_t> w_idx {0}; ///< write index std::atomic<index_t> w_idx {0}; ///< write index
private: padding<decltype(w_idx)> ___1; std::atomic<index_t> w_beg {0}; ///< write begin index
public : std::atomic<index_t> w_beg {0}; ///< write begin index private: padding<std::tuple<decltype(w_idx), decltype(w_beg)>> ___;
private: padding<decltype(w_beg)> ___2;
public:
void get(index_t &idx, index_t &beg) const noexcept {
idx = w_idx.load(std::memory_order_relaxed);
beg = w_beg.load(std::memory_order_relaxed);
}
}; };
template <typename T, typename H, typename C, typename U, template <typename T, typename H, typename C, typename U,
@ -319,7 +326,66 @@ struct producer<trans::broadcast, relation::single> {
/// \brief Multi-write producer model implementation. /// \brief Multi-write producer model implementation.
template <> template <>
struct producer<trans::broadcast, relation::multi> {}; struct producer<trans::broadcast, relation::multi> {
struct header_impl {
std::atomic<state::flag_t> w_flags {0}; ///< write flags, combined current and starting index.
private: padding<decltype(w_flags)> ___;
public:
void get(index_t &idx, index_t &beg) const noexcept {
auto w_flags = this->w_flags.load(std::memory_order_relaxed);
idx = get_index(w_flags);
beg = get_begin(w_flags);
}
};
template <typename T, typename H, typename C, typename U,
is_elems_header<H> = true,
is_convertible<H, header_impl> = true>
static bool enqueue(::LIBIMP::span<element<T>> elems, H &hdr, C &/*ctx*/, U &&src) noexcept {
auto w_flags = hdr.w_flags.load(std::memory_order_acquire);
index_t w_idx;
for (;;) {
w_idx = get_index(w_flags);
auto w_beg = get_begin(w_flags);
// Move the queue head index.
if (w_beg + hdr.circ_size <= w_idx) {
w_beg += 1;
}
// Update flags.
auto n_flags = make_flags(w_idx + 1/*iterate backwards*/, w_beg);
if (hdr.w_flags.compare_exchange_weak(w_flags, n_flags, std::memory_order_acq_rel)) {
break;
}
}
// Get element.
auto w_cur = trunc_index(hdr, w_idx);
auto &elem = elems[w_cur];
// Set data & flag.
elem.set_flag(w_idx | state::enqueue_mask);
elem.set_data(std::forward<U>(src)); // Here should not be interrupted.
elem.set_flag(w_idx | state::commit_mask);
return true;
}
private:
friend struct producer::header_impl;
constexpr static index_t get_index(state::flag_t flags) noexcept {
return index_t(flags);
}
constexpr static index_t get_begin(state::flag_t flags) noexcept {
constexpr auto index_bits = sizeof(index_t) * CHAR_BIT;
return index_t(flags >> index_bits);
}
constexpr static state::flag_t make_flags(index_t idx, index_t beg) noexcept {
constexpr auto index_bits = sizeof(index_t) * CHAR_BIT;
return state::flag_t(idx) | (state::flag_t(beg) << index_bits);
}
};
/// \brief Multi-read consumer model implementation. /// \brief Multi-read consumer model implementation.
/// Single-read consumer model is not required. /// Single-read consumer model is not required.
@ -336,13 +402,13 @@ struct consumer<trans::broadcast, relation::multi> {
is_convertible<C, context_impl> = true, is_convertible<C, context_impl> = true,
std::enable_if_t<std::is_nothrow_copy_assignable<U>::value, bool> = true> std::enable_if_t<std::is_nothrow_copy_assignable<U>::value, bool> = true>
static bool dequeue(::LIBIMP::span<element<T>> elems, H &hdr, C &ctx, U &des) noexcept { static bool dequeue(::LIBIMP::span<element<T>> elems, H &hdr, C &ctx, U &des) noexcept {
auto w_idx = hdr.w_idx.load(std::memory_order_relaxed); index_t w_idx, w_beg;
hdr.get(w_idx, w_beg);
// Verify index. // Verify index.
if (ctx.w_lst == w_idx) { if (ctx.w_lst == w_idx) {
return false; // not ready return false; // not ready
} }
// Obtain the queue head index if we need. // Obtain the queue head index if we need.
auto w_beg = hdr.w_beg.load(std::memory_order_relaxed);
if ((ctx.r_idx < w_beg) || if ((ctx.r_idx < w_beg) ||
(ctx.r_idx >= w_beg + hdr.circ_size)) { (ctx.r_idx >= w_beg + hdr.circ_size)) {
ctx.r_idx = w_beg; ctx.r_idx = w_beg;

View File

@ -31,7 +31,7 @@ TEST(concurrent, trunc_index) {
} }
}; };
/// @brief circ-size = 0 /// \brief circ-size = 0
EXPECT_EQ(concur::trunc_index(header{0}, 0), 0); EXPECT_EQ(concur::trunc_index(header{0}, 0), 0);
EXPECT_EQ(concur::trunc_index(header{0}, 1), 0); EXPECT_EQ(concur::trunc_index(header{0}, 1), 0);
EXPECT_EQ(concur::trunc_index(header{0}, 2), 0); EXPECT_EQ(concur::trunc_index(header{0}, 2), 0);
@ -39,7 +39,7 @@ TEST(concurrent, trunc_index) {
EXPECT_EQ(concur::trunc_index(header{0}, 111), 0); EXPECT_EQ(concur::trunc_index(header{0}, 111), 0);
EXPECT_EQ(concur::trunc_index(header{0}, -1), 0); EXPECT_EQ(concur::trunc_index(header{0}, -1), 0);
/// @brief circ-size = 1 /// \brief circ-size = 1
EXPECT_EQ(concur::trunc_index(header{1}, 0), 0); EXPECT_EQ(concur::trunc_index(header{1}, 0), 0);
EXPECT_EQ(concur::trunc_index(header{1}, 1), 0); EXPECT_EQ(concur::trunc_index(header{1}, 1), 0);
EXPECT_EQ(concur::trunc_index(header{1}, 2), 0); EXPECT_EQ(concur::trunc_index(header{1}, 2), 0);
@ -47,7 +47,7 @@ TEST(concurrent, trunc_index) {
EXPECT_EQ(concur::trunc_index(header{1}, 111), 0); EXPECT_EQ(concur::trunc_index(header{1}, 111), 0);
EXPECT_EQ(concur::trunc_index(header{1}, -1), 0); EXPECT_EQ(concur::trunc_index(header{1}, -1), 0);
/// @brief circ-size = 2 /// \brief circ-size = 2
EXPECT_EQ(concur::trunc_index(header{2}, 0), 0); EXPECT_EQ(concur::trunc_index(header{2}, 0), 0);
EXPECT_EQ(concur::trunc_index(header{2}, 1), 1); EXPECT_EQ(concur::trunc_index(header{2}, 1), 1);
EXPECT_EQ(concur::trunc_index(header{2}, 2), 0); EXPECT_EQ(concur::trunc_index(header{2}, 2), 0);
@ -55,7 +55,7 @@ TEST(concurrent, trunc_index) {
EXPECT_EQ(concur::trunc_index(header{2}, 111), 1); EXPECT_EQ(concur::trunc_index(header{2}, 111), 1);
EXPECT_EQ(concur::trunc_index(header{2}, -1), 1); EXPECT_EQ(concur::trunc_index(header{2}, -1), 1);
/// @brief circ-size = 10 /// \brief circ-size = 10
EXPECT_EQ(concur::trunc_index(header{10}, 0), 0); EXPECT_EQ(concur::trunc_index(header{10}, 0), 0);
EXPECT_EQ(concur::trunc_index(header{10}, 1), 0); EXPECT_EQ(concur::trunc_index(header{10}, 1), 0);
EXPECT_EQ(concur::trunc_index(header{10}, 2), 0); EXPECT_EQ(concur::trunc_index(header{10}, 2), 0);
@ -63,7 +63,7 @@ TEST(concurrent, trunc_index) {
EXPECT_EQ(concur::trunc_index(header{10}, 111), 0); EXPECT_EQ(concur::trunc_index(header{10}, 111), 0);
EXPECT_EQ(concur::trunc_index(header{10}, -1), 0); EXPECT_EQ(concur::trunc_index(header{10}, -1), 0);
/// @brief circ-size = 16 /// \brief circ-size = 16
EXPECT_EQ(concur::trunc_index(header{16}, 0), 0); EXPECT_EQ(concur::trunc_index(header{16}, 0), 0);
EXPECT_EQ(concur::trunc_index(header{16}, 1), 1); EXPECT_EQ(concur::trunc_index(header{16}, 1), 1);
EXPECT_EQ(concur::trunc_index(header{16}, 2), 2); EXPECT_EQ(concur::trunc_index(header{16}, 2), 2);
@ -71,7 +71,7 @@ TEST(concurrent, trunc_index) {
EXPECT_EQ(concur::trunc_index(header{16}, 111), 15); EXPECT_EQ(concur::trunc_index(header{16}, 111), 15);
EXPECT_EQ(concur::trunc_index(header{16}, -1), 15); EXPECT_EQ(concur::trunc_index(header{16}, -1), 15);
/// @brief circ-size = (index_t)-1 /// \brief circ-size = (index_t)-1
EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 0), 0); EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 0), 0);
EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 1), 0); EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 1), 0);
EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 2), 0); EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 2), 0);
@ -79,7 +79,7 @@ TEST(concurrent, trunc_index) {
EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 111), 0); EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 111), 0);
EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, -1), 0); EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, -1), 0);
/// @brief circ-size = 2147483648 (2^31) /// \brief circ-size = 2147483648 (2^31)
EXPECT_EQ(concur::trunc_index(header{2147483648u}, 0), 0); EXPECT_EQ(concur::trunc_index(header{2147483648u}, 0), 0);
EXPECT_EQ(concur::trunc_index(header{2147483648u}, 1), 1); EXPECT_EQ(concur::trunc_index(header{2147483648u}, 1), 1);
EXPECT_EQ(concur::trunc_index(header{2147483648u}, 2), 2); EXPECT_EQ(concur::trunc_index(header{2147483648u}, 2), 2);
@ -91,7 +91,7 @@ TEST(concurrent, trunc_index) {
namespace { namespace {
template <typename PC> template <typename PC>
void test_unicast(std::size_t np, std::size_t nc, std::size_t k) { void test_unicast(std::size_t np, std::size_t nc) {
LIBIMP_LOG_(); LIBIMP_LOG_();
log.info("\n\tStart with: ", imp::nameof<PC>(), ", ", np, " producers, ", nc, " consumers..."); log.info("\n\tStart with: ", imp::nameof<PC>(), ", ", np, " producers, ", nc, " consumers...");
@ -139,7 +139,7 @@ void test_unicast(std::size_t np, std::size_t nc, std::size_t k) {
for (auto &p : prods) p.join(); for (auto &p : prods) p.join();
for (auto &c : conss) c.join(); for (auto &c : conss) c.join();
EXPECT_EQ(sum, k * np * (loop_size * std::uint64_t(loop_size + 1)) / 2); EXPECT_EQ(sum, np * (loop_size * std::uint64_t(loop_size + 1)) / 2);
} }
} // namespace } // namespace
@ -147,28 +147,28 @@ void test_unicast(std::size_t np, std::size_t nc, std::size_t k) {
TEST(concurrent, unicast) { TEST(concurrent, unicast) {
using namespace concur; using namespace concur;
/// @brief 1-1 /// \brief 1-1
test_unicast<prod_cons<trans::unicast, relation::single, relation::single>>(1, 1, 1); test_unicast<prod_cons<trans::unicast, relation::single, relation::single>>(1, 1);
test_unicast<prod_cons<trans::unicast, relation::single, relation::multi >>(1, 1, 1); test_unicast<prod_cons<trans::unicast, relation::single, relation::multi >>(1, 1);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::single>>(1, 1, 1); test_unicast<prod_cons<trans::unicast, relation::multi , relation::single>>(1, 1);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(1, 1, 1); test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(1, 1);
/// @brief 8-1 /// \brief 8-1
test_unicast<prod_cons<trans::unicast, relation::multi , relation::single>>(8, 1, 1); test_unicast<prod_cons<trans::unicast, relation::multi , relation::single>>(8, 1);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(8, 1, 1); test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(8, 1);
/// @brief 1-8 /// \brief 1-8
test_unicast<prod_cons<trans::unicast, relation::single, relation::multi >>(1, 8, 1); test_unicast<prod_cons<trans::unicast, relation::single, relation::multi >>(1, 8);
test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(1, 8, 1); test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(1, 8);
/// @brief 8-8 /// \brief 8-8
test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(8, 8, 1); test_unicast<prod_cons<trans::unicast, relation::multi , relation::multi >>(8, 8);
} }
namespace { namespace {
template <typename PC> template <typename PC>
void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) { void test_broadcast(std::size_t np, std::size_t nc) {
LIBIMP_LOG_(); LIBIMP_LOG_();
{ {
concur::element<std::uint64_t> circ[32] {}; concur::element<std::uint64_t> circ[32] {};
@ -243,11 +243,18 @@ void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) {
for (std::uint32_t i = 1; i <= loop_size; ++i) { for (std::uint32_t i = 1; i <= loop_size; ++i) {
std::this_thread::yield(); std::this_thread::yield();
counters[n] = 0; counters[n] = 0;
do { for (std::uint32_t k = 1;; ++k) {
ASSERT_TRUE(pc.enqueue(imp::make_span(circ), hdr, ctx, Data{n, i})); ASSERT_TRUE(pc.enqueue(imp::make_span(circ), hdr, ctx, Data{n, i}));
std::this_thread::yield();
// We need to wait for the consumer to consume the data. // We need to wait for the consumer to consume the data.
} while (counters[n] < nc); if (counters[n] >= nc) {
break;
}
std::this_thread::yield();
if (k % (loop_size / 10) == 0) {
log.info("[", n, "] put count: ", i, ", retry: ", k, ", counters: ", counters[n]);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
if (i % (loop_size / 10) == 0) { if (i % (loop_size / 10) == 0) {
log.info("[", n, "] put count: ", i); log.info("[", n, "] put count: ", i);
} }
@ -282,7 +289,7 @@ void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) {
for (auto &p : prods) p.join(); for (auto &p : prods) p.join();
for (auto &c : conss) c.join(); for (auto &c : conss) c.join();
EXPECT_EQ(sum, k * np * (loop_size * std::uint64_t(loop_size + 1)) / 2); EXPECT_EQ(sum, np * nc * (loop_size * std::uint64_t(loop_size + 1)) / 2);
} }
} }
@ -291,6 +298,17 @@ void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) {
TEST(concurrent, broadcast) { TEST(concurrent, broadcast) {
using namespace concur; using namespace concur;
/// @brief 1-1 /// \brief 1-1
test_broadcast<prod_cons<trans::broadcast, relation::single, relation::multi>>(1, 1, 1); test_broadcast<prod_cons<trans::broadcast, relation::single, relation::multi>>(1, 1);
test_broadcast<prod_cons<trans::broadcast, relation::multi , relation::multi>>(1, 1);
/// \brief 8-1
test_broadcast<prod_cons<trans::broadcast, relation::multi , relation::multi>>(8, 1);
/// \brief 1-8
test_broadcast<prod_cons<trans::broadcast, relation::single, relation::multi>>(1, 8);
test_broadcast<prod_cons<trans::broadcast, relation::multi , relation::multi>>(1, 8);
/// \brief 8-8
test_broadcast<prod_cons<trans::broadcast, relation::multi , relation::multi>>(8, 8);
} }