From a7b8af7fa8364e83a1a95817447d23cff652290d Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 16 Apr 2023 15:40:46 +0800 Subject: [PATCH] add: [concur] producer --- include/libconcur/concurrent.h | 82 +++++++++++++++++++++++--- test/concur/test_concur_concurrent.cpp | 76 +++++++++++++++--------- 2 files changed, 121 insertions(+), 37 deletions(-) diff --git a/include/libconcur/concurrent.h b/include/libconcur/concurrent.h index 12058bf..63719a6 100644 --- a/include/libconcur/concurrent.h +++ b/include/libconcur/concurrent.h @@ -9,7 +9,9 @@ #include #include #include +#include #include +#include #include "libimp/span.h" #include "libimp/generic.h" @@ -110,7 +112,7 @@ class broadcast {}; /// \brief Determines whether type T can be implicitly converted to type U. template -using is_convertible = std::enable_if_t::value, bool>; +using is_convertible = typename std::enable_if::value, bool>::type; /// \brief Check whether the elems header type is valid. template @@ -289,10 +291,15 @@ template <> struct producer { struct header_impl { - public : std::atomic w_idx {0}; ///< write index - private: padding ___1; - public : std::atomic w_beg {0}; ///< write begin index - private: padding ___2; + std::atomic w_idx {0}; ///< write index + std::atomic w_beg {0}; ///< write begin index + private: padding> ___; + + public: + void get(index_t &idx, index_t &beg) const noexcept { + idx = w_idx.load(std::memory_order_relaxed); + beg = w_beg.load(std::memory_order_relaxed); + } }; template { /// \brief Multi-write producer model implementation. template <> -struct producer {}; +struct producer { + + struct header_impl { + std::atomic w_flags {0}; ///< write flags, combined current and starting index. + private: padding ___; + + public: + void get(index_t &idx, index_t &beg) const noexcept { + auto w_flags = this->w_flags.load(std::memory_order_relaxed); + idx = get_index(w_flags); + beg = get_begin(w_flags); + } + }; + + template = true, + is_convertible = true> + static bool enqueue(::LIBIMP::span> elems, H &hdr, C &/*ctx*/, U &&src) noexcept { + auto w_flags = hdr.w_flags.load(std::memory_order_acquire); + index_t w_idx; + for (;;) { + w_idx = get_index(w_flags); + auto w_beg = get_begin(w_flags); + // Move the queue head index. + if (w_beg + hdr.circ_size <= w_idx) { + w_beg += 1; + } + // Update flags. + auto n_flags = make_flags(w_idx + 1/*iterate backwards*/, w_beg); + if (hdr.w_flags.compare_exchange_weak(w_flags, n_flags, std::memory_order_acq_rel)) { + break; + } + } + // Get element. + auto w_cur = trunc_index(hdr, w_idx); + auto &elem = elems[w_cur]; + // Set data & flag. + elem.set_flag(w_idx | state::enqueue_mask); + elem.set_data(std::forward(src)); // Here should not be interrupted. + elem.set_flag(w_idx | state::commit_mask); + return true; + } + +private: + friend struct producer::header_impl; + + constexpr static index_t get_index(state::flag_t flags) noexcept { + return index_t(flags); + } + + constexpr static index_t get_begin(state::flag_t flags) noexcept { + constexpr auto index_bits = sizeof(index_t) * CHAR_BIT; + return index_t(flags >> index_bits); + } + + constexpr static state::flag_t make_flags(index_t idx, index_t beg) noexcept { + constexpr auto index_bits = sizeof(index_t) * CHAR_BIT; + return state::flag_t(idx) | (state::flag_t(beg) << index_bits); + } +}; /// \brief Multi-read consumer model implementation. /// Single-read consumer model is not required. @@ -336,13 +402,13 @@ struct consumer { is_convertible = true, std::enable_if_t::value, bool> = true> static bool dequeue(::LIBIMP::span> elems, H &hdr, C &ctx, U &des) noexcept { - auto w_idx = hdr.w_idx.load(std::memory_order_relaxed); + index_t w_idx, w_beg; + hdr.get(w_idx, w_beg); // Verify index. if (ctx.w_lst == w_idx) { return false; // not ready } // Obtain the queue head index if we need. - auto w_beg = hdr.w_beg.load(std::memory_order_relaxed); if ((ctx.r_idx < w_beg) || (ctx.r_idx >= w_beg + hdr.circ_size)) { ctx.r_idx = w_beg; diff --git a/test/concur/test_concur_concurrent.cpp b/test/concur/test_concur_concurrent.cpp index 91f16b7..8c919b7 100644 --- a/test/concur/test_concur_concurrent.cpp +++ b/test/concur/test_concur_concurrent.cpp @@ -31,7 +31,7 @@ TEST(concurrent, trunc_index) { } }; - /// @brief circ-size = 0 + /// \brief circ-size = 0 EXPECT_EQ(concur::trunc_index(header{0}, 0), 0); EXPECT_EQ(concur::trunc_index(header{0}, 1), 0); EXPECT_EQ(concur::trunc_index(header{0}, 2), 0); @@ -39,7 +39,7 @@ TEST(concurrent, trunc_index) { EXPECT_EQ(concur::trunc_index(header{0}, 111), 0); EXPECT_EQ(concur::trunc_index(header{0}, -1), 0); - /// @brief circ-size = 1 + /// \brief circ-size = 1 EXPECT_EQ(concur::trunc_index(header{1}, 0), 0); EXPECT_EQ(concur::trunc_index(header{1}, 1), 0); EXPECT_EQ(concur::trunc_index(header{1}, 2), 0); @@ -47,7 +47,7 @@ TEST(concurrent, trunc_index) { EXPECT_EQ(concur::trunc_index(header{1}, 111), 0); EXPECT_EQ(concur::trunc_index(header{1}, -1), 0); - /// @brief circ-size = 2 + /// \brief circ-size = 2 EXPECT_EQ(concur::trunc_index(header{2}, 0), 0); EXPECT_EQ(concur::trunc_index(header{2}, 1), 1); EXPECT_EQ(concur::trunc_index(header{2}, 2), 0); @@ -55,7 +55,7 @@ TEST(concurrent, trunc_index) { EXPECT_EQ(concur::trunc_index(header{2}, 111), 1); EXPECT_EQ(concur::trunc_index(header{2}, -1), 1); - /// @brief circ-size = 10 + /// \brief circ-size = 10 EXPECT_EQ(concur::trunc_index(header{10}, 0), 0); EXPECT_EQ(concur::trunc_index(header{10}, 1), 0); EXPECT_EQ(concur::trunc_index(header{10}, 2), 0); @@ -63,7 +63,7 @@ TEST(concurrent, trunc_index) { EXPECT_EQ(concur::trunc_index(header{10}, 111), 0); EXPECT_EQ(concur::trunc_index(header{10}, -1), 0); - /// @brief circ-size = 16 + /// \brief circ-size = 16 EXPECT_EQ(concur::trunc_index(header{16}, 0), 0); EXPECT_EQ(concur::trunc_index(header{16}, 1), 1); EXPECT_EQ(concur::trunc_index(header{16}, 2), 2); @@ -71,7 +71,7 @@ TEST(concurrent, trunc_index) { EXPECT_EQ(concur::trunc_index(header{16}, 111), 15); EXPECT_EQ(concur::trunc_index(header{16}, -1), 15); - /// @brief circ-size = (index_t)-1 + /// \brief circ-size = (index_t)-1 EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 0), 0); EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 1), 0); EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 2), 0); @@ -79,7 +79,7 @@ TEST(concurrent, trunc_index) { EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, 111), 0); EXPECT_EQ(concur::trunc_index(header{(concur::index_t)-1}, -1), 0); - /// @brief circ-size = 2147483648 (2^31) + /// \brief circ-size = 2147483648 (2^31) EXPECT_EQ(concur::trunc_index(header{2147483648u}, 0), 0); EXPECT_EQ(concur::trunc_index(header{2147483648u}, 1), 1); EXPECT_EQ(concur::trunc_index(header{2147483648u}, 2), 2); @@ -91,7 +91,7 @@ TEST(concurrent, trunc_index) { namespace { template -void test_unicast(std::size_t np, std::size_t nc, std::size_t k) { +void test_unicast(std::size_t np, std::size_t nc) { LIBIMP_LOG_(); log.info("\n\tStart with: ", imp::nameof(), ", ", np, " producers, ", nc, " consumers..."); @@ -139,7 +139,7 @@ void test_unicast(std::size_t np, std::size_t nc, std::size_t k) { for (auto &p : prods) p.join(); for (auto &c : conss) c.join(); - EXPECT_EQ(sum, k * np * (loop_size * std::uint64_t(loop_size + 1)) / 2); + EXPECT_EQ(sum, np * (loop_size * std::uint64_t(loop_size + 1)) / 2); } } // namespace @@ -147,28 +147,28 @@ void test_unicast(std::size_t np, std::size_t nc, std::size_t k) { TEST(concurrent, unicast) { using namespace concur; - /// @brief 1-1 - test_unicast>(1, 1, 1); - test_unicast>(1, 1, 1); - test_unicast>(1, 1, 1); - test_unicast>(1, 1, 1); + /// \brief 1-1 + test_unicast>(1, 1); + test_unicast>(1, 1); + test_unicast>(1, 1); + test_unicast>(1, 1); - /// @brief 8-1 - test_unicast>(8, 1, 1); - test_unicast>(8, 1, 1); + /// \brief 8-1 + test_unicast>(8, 1); + test_unicast>(8, 1); - /// @brief 1-8 - test_unicast>(1, 8, 1); - test_unicast>(1, 8, 1); + /// \brief 1-8 + test_unicast>(1, 8); + test_unicast>(1, 8); - /// @brief 8-8 - test_unicast>(8, 8, 1); + /// \brief 8-8 + test_unicast>(8, 8); } namespace { template -void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) { +void test_broadcast(std::size_t np, std::size_t nc) { LIBIMP_LOG_(); { concur::element circ[32] {}; @@ -243,11 +243,18 @@ void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) { for (std::uint32_t i = 1; i <= loop_size; ++i) { std::this_thread::yield(); counters[n] = 0; - do { + for (std::uint32_t k = 1;; ++k) { ASSERT_TRUE(pc.enqueue(imp::make_span(circ), hdr, ctx, Data{n, i})); - std::this_thread::yield(); // We need to wait for the consumer to consume the data. - } while (counters[n] < nc); + if (counters[n] >= nc) { + break; + } + std::this_thread::yield(); + if (k % (loop_size / 10) == 0) { + log.info("[", n, "] put count: ", i, ", retry: ", k, ", counters: ", counters[n]); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + } if (i % (loop_size / 10) == 0) { log.info("[", n, "] put count: ", i); } @@ -282,7 +289,7 @@ void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) { for (auto &p : prods) p.join(); for (auto &c : conss) c.join(); - EXPECT_EQ(sum, k * np * (loop_size * std::uint64_t(loop_size + 1)) / 2); + EXPECT_EQ(sum, np * nc * (loop_size * std::uint64_t(loop_size + 1)) / 2); } } @@ -291,6 +298,17 @@ void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) { TEST(concurrent, broadcast) { using namespace concur; - /// @brief 1-1 - test_broadcast>(1, 1, 1); + /// \brief 1-1 + test_broadcast>(1, 1); + test_broadcast>(1, 1); + + /// \brief 8-1 + test_broadcast>(8, 1); + + /// \brief 1-8 + test_broadcast>(1, 8); + test_broadcast>(1, 8); + + /// \brief 8-8 + test_broadcast>(8, 8); } \ No newline at end of file