diff --git a/include/libconcur/concurrent.h b/include/libconcur/concurrent.h index 3e0489f..12058bf 100644 --- a/include/libconcur/concurrent.h +++ b/include/libconcur/concurrent.h @@ -289,8 +289,10 @@ template <> struct producer { struct header_impl { - std::atomic w_idx {0}; ///< write index - private: padding ___; + public : std::atomic w_idx {0}; ///< write index + private: padding ___1; + public : std::atomic w_beg {0}; ///< write begin index + private: padding ___2; }; template { is_convertible = true> static bool enqueue(::LIBIMP::span> elems, H &hdr, C &/*ctx*/, U &&src) noexcept { auto w_idx = hdr.w_idx.load(std::memory_order_acquire); + auto w_beg = hdr.w_beg.load(std::memory_order_relaxed); auto w_cur = trunc_index(hdr, w_idx); auto &elem = elems[w_cur]; - auto f_ct = elem.get_flag(); - // Verify index. - if ((f_ct != state::invalid_value) && - (f_ct != state::dequeued)) { - return false; // full + // Move the queue head index. + if (w_beg + hdr.circ_size <= w_idx) { + hdr.w_beg.fetch_add(1, std::memory_order_release); } // Get a valid index and iterate backwards. hdr.w_idx.fetch_add(1, std::memory_order_release); // Set data & flag. elem.set_flag(w_idx | state::enqueue_mask); - elem.set_data(std::forward(src)); + elem.set_data(std::forward(src)); // Here should not be interrupted. elem.set_flag(w_idx | state::commit_mask); return true; } @@ -335,32 +336,36 @@ struct consumer { is_convertible = true, std::enable_if_t::value, bool> = true> static bool dequeue(::LIBIMP::span> elems, H &hdr, C &ctx, U &des) noexcept { - auto r_idx = ctx.r_idx; - auto w_idx = static_cast(hdr.w_idx.load(std::memory_order_relaxed)); - auto r_cur = trunc_index(hdr, r_idx); - auto const &elem = elems[r_cur]; + auto w_idx = hdr.w_idx.load(std::memory_order_relaxed); // Verify index. - if ((ctx.w_lst != state::invalid_value) && - (ctx.w_lst + hdr.circ_size > w_idx)) { + if (ctx.w_lst == w_idx) { return false; // not ready } + // Obtain the queue head index if we need. + auto w_beg = hdr.w_beg.load(std::memory_order_relaxed); + if ((ctx.r_idx < w_beg) || + (ctx.r_idx >= w_beg + hdr.circ_size)) { + ctx.r_idx = w_beg; + } + auto r_cur = trunc_index(hdr, ctx.r_idx); + auto &elem = elems[r_cur]; auto f_ct = elem.get_flag(); if (f_ct == state::invalid_value) { return false; // empty } // Try getting data. for (;;) { - if (f_ct & state::enqueue_mask == state::enqueue_mask) { + if ((f_ct & state::enqueue_mask) == state::enqueue_mask) { return false; // unreadable } des = LIBCONCUR::get(elem); // Correct data can be obtained only if // the elem data is not modified during the getting process. - if (elem.cas_flag(f_ct, state::dequeued)) break; + if (elem.cas_flag(f_ct, f_ct)) break; } + ctx.w_lst = (f_ct & ~state::enqueue_mask) + 1; // Get a valid index and iterate backwards. ctx.r_idx += 1; - ctx.w_lst = w_idx; return true; } }; @@ -393,6 +398,10 @@ struct prod_cons : producer return (circ_size > 1) && ((circ_size & (circ_size - 1)) == 0); } }; + + /// \brief Mixing producer and consumer context definitions. + struct context : traits>::context + , traits>::context {}; }; LIBCONCUR_NAMESPACE_END_ diff --git a/include/libconcur/element.h b/include/libconcur/element.h index 8dd3f39..ba08690 100644 --- a/include/libconcur/element.h +++ b/include/libconcur/element.h @@ -26,11 +26,8 @@ using flag_t = std::uint64_t; enum : flag_t { invalid_value = ~flag_t(0), - committed = ~flag_t(1), - dequeued = ~flag_t(2), - enqueue_mask = invalid_value << 32, - commit_mask = committed << 32, + commit_mask = ~flag_t(1) << 32, }; } // namespace state diff --git a/test/concur/test_concur_concurrent.cpp b/test/concur/test_concur_concurrent.cpp index 0f2d153..80c6210 100644 --- a/test/concur/test_concur_concurrent.cpp +++ b/test/concur/test_concur_concurrent.cpp @@ -91,7 +91,7 @@ TEST(concurrent, trunc_index) { namespace { template -void test_concur(std::size_t np, std::size_t nc, std::size_t k) { +void test_unicast(std::size_t np, std::size_t nc, std::size_t k) { LIBIMP_LOG_(); log.info("\n\tStart with: ", imp::nameof(), ", ", np, " producers, ", nc, " consumers..."); @@ -100,13 +100,13 @@ void test_concur(std::size_t np, std::size_t nc, std::size_t k) { concur::element circ[32] {}; PC pc; typename concur::traits::header hdr {imp::make_span(circ)}; - typename concur::traits::context ctx {}; ASSERT_TRUE(hdr.valid()); std::atomic sum {0}; std::atomic running {np}; auto prod_call = [&](std::size_t n) { + typename concur::traits::context ctx {}; for (std::uint32_t i = 1; i <= loop_size; ++i) { std::this_thread::yield(); while (!pc.enqueue(imp::make_span(circ), hdr, ctx, i)) { @@ -119,6 +119,7 @@ void test_concur(std::size_t np, std::size_t nc, std::size_t k) { --running; }; auto cons_call = [&] { + typename concur::traits::context ctx {}; for (;;) { std::this_thread::yield(); std::uint64_t i; @@ -143,23 +144,88 @@ void test_concur(std::size_t np, std::size_t nc, std::size_t k) { } // namespace -TEST(concurrent, unicast_prod_cons) { +TEST(concurrent, unicast) { using namespace concur; /// @brief 1-1 - test_concur>(1, 1, 1); - test_concur>(1, 1, 1); - test_concur>(1, 1, 1); - test_concur>(1, 1, 1); + test_unicast>(1, 1, 1); + test_unicast>(1, 1, 1); + test_unicast>(1, 1, 1); + test_unicast>(1, 1, 1); /// @brief 8-1 - test_concur>(8, 1, 1); - test_concur>(8, 1, 1); + test_unicast>(8, 1, 1); + test_unicast>(8, 1, 1); /// @brief 1-8 - test_concur>(1, 8, 1); - test_concur>(1, 8, 1); + test_unicast>(1, 8, 1); + test_unicast>(1, 8, 1); /// @brief 8-8 - test_concur>(8, 8, 1); + test_unicast>(8, 8, 1); +} + +namespace { + +template +void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) { + LIBIMP_LOG_(); + + concur::element circ[32] {}; + PC pc; + typename concur::traits::header hdr {imp::make_span(circ)}; + ASSERT_TRUE(hdr.valid()); + + auto push_one = [&, ctx = typename concur::traits::context{}](std::uint32_t i) mutable { + return pc.enqueue(imp::make_span(circ), hdr, ctx, i); + }; + auto pop_one = [&, ctx = typename concur::traits::context{}]() mutable { + std::uint64_t i; + if (pc.dequeue(imp::make_span(circ), hdr, ctx, i)) { + return i; + } + return (std::uint64_t)-1; + }; + auto pop_one_2 = pop_one; + + // empty queue pop + ASSERT_EQ(pop_one(), (std::uint64_t)-1); + + // test one push & pop + for (int i = 0; i < 32; ++i) { + ASSERT_TRUE(push_one(i)); + ASSERT_EQ(pop_one(), i); + } + for (int i = 0; i < 100; ++i) { + ASSERT_TRUE(push_one(i)); + ASSERT_EQ(pop_one(), i); + } + ASSERT_EQ(pop_one(), (std::uint64_t)-1); + + // test loop push & pop + for (int i = 0; i < 10; ++i) ASSERT_TRUE(push_one(i)); + for (int i = 0; i < 10; ++i) ASSERT_EQ(pop_one(), i); + ASSERT_EQ(pop_one(), (std::uint64_t)-1); + + // other loop pop + for (int i = 0; i < 32; ++i) ASSERT_TRUE(push_one(i)); + for (int i = 0; i < 32; ++i) ASSERT_EQ(pop_one_2(), i); + ASSERT_EQ(pop_one_2(), (std::uint64_t)-1); + + // overwrite + ASSERT_TRUE(push_one(123)); + for (int i = 1; i < 32; ++i) { + ASSERT_EQ(pop_one(), i); + } + ASSERT_EQ(pop_one(), 123); + ASSERT_EQ(pop_one(), (std::uint64_t)-1); +} + +} // namespace + +TEST(concurrent, broadcast) { + using namespace concur; + + /// @brief 1-1 + test_broadcast>(1, 1, 1); } \ No newline at end of file