From a80d7f7590528c2def13808d8cf18ce04e29c2f6 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Wed, 5 Apr 2023 19:21:15 +0800 Subject: [PATCH] add: [imp] (TBD) impl trans::broadcast --- include/libconcur/concurrent.h | 87 +++++++++++++++++++++++--- include/libconcur/element.h | 16 ++++- test/concur/test_concur_concurrent.cpp | 2 +- 3 files changed, 91 insertions(+), 14 deletions(-) diff --git a/include/libconcur/concurrent.h b/include/libconcur/concurrent.h index 23b330d..3e0489f 100644 --- a/include/libconcur/concurrent.h +++ b/include/libconcur/concurrent.h @@ -225,7 +225,8 @@ struct consumer { template = true, - is_convertible = true> + is_convertible = true, + std::enable_if_t::value, bool> = true> static bool dequeue(::LIBIMP::span> elems, H &hdr, C &/*ctx*/, U &des) noexcept { auto r_idx = hdr.r_idx; auto r_cur = trunc_index(hdr, r_idx); @@ -238,8 +239,8 @@ struct consumer { // Get a valid index and iterate backwards. hdr.r_idx += 1; // Get data & set flag. - des = LIBCONCUR::get(elem); - elem.set_flag(r_idx + static_cast(elems.size())/*avoid overflow*/); + des = LIBCONCUR::get(std::move(elem)); + elem.set_flag(r_idx + hdr.circ_size); return true; } }; @@ -255,7 +256,8 @@ struct consumer { template = true, - is_convertible = true> + is_convertible = true, + std::enable_if_t::value, bool> = true> static bool dequeue(::LIBIMP::span> elems, H &hdr, C &/*ctx*/, U &des) noexcept { auto r_idx = hdr.r_idx.load(std::memory_order_acquire); for (;;) { @@ -271,8 +273,8 @@ struct consumer { continue; } // Get data & set flag. - des = LIBCONCUR::get(elem); - elem.set_flag(r_idx + static_cast(elems.size())/*avoid overflow*/); + des = LIBCONCUR::get(std::move(elem)); + elem.set_flag(r_idx + hdr.circ_size); return true; } } @@ -285,17 +287,82 @@ struct consumer { /// \brief Single-write producer model implementation. template <> struct producer { + + struct header_impl { + std::atomic w_idx {0}; ///< write index + private: padding ___; + }; + + template = true, + is_convertible = true> + static bool enqueue(::LIBIMP::span> elems, H &hdr, C &/*ctx*/, U &&src) noexcept { + auto w_idx = hdr.w_idx.load(std::memory_order_acquire); + auto w_cur = trunc_index(hdr, w_idx); + auto &elem = elems[w_cur]; + auto f_ct = elem.get_flag(); + // Verify index. + if ((f_ct != state::invalid_value) && + (f_ct != state::dequeued)) { + return false; // full + } + // Get a valid index and iterate backwards. + hdr.w_idx.fetch_add(1, std::memory_order_release); + // Set data & flag. + elem.set_flag(w_idx | state::enqueue_mask); + elem.set_data(std::forward(src)); + elem.set_flag(w_idx | state::commit_mask); + return true; + } }; /// \brief Multi-write producer model implementation. template <> -struct producer { -}; +struct producer {}; /// \brief Multi-read consumer model implementation. /// Single-read consumer model is not required. template <> struct consumer { + + struct context_impl { + index_t r_idx {0}; ///< read index + state::flag_t w_lst {state::invalid_value}; ///< last write index + }; + + template = true, + is_convertible = true, + std::enable_if_t::value, bool> = true> + static bool dequeue(::LIBIMP::span> elems, H &hdr, C &ctx, U &des) noexcept { + auto r_idx = ctx.r_idx; + auto w_idx = static_cast(hdr.w_idx.load(std::memory_order_relaxed)); + auto r_cur = trunc_index(hdr, r_idx); + auto const &elem = elems[r_cur]; + // Verify index. + if ((ctx.w_lst != state::invalid_value) && + (ctx.w_lst + hdr.circ_size > w_idx)) { + return false; // not ready + } + auto f_ct = elem.get_flag(); + if (f_ct == state::invalid_value) { + return false; // empty + } + // Try getting data. + for (;;) { + if (f_ct & state::enqueue_mask == state::enqueue_mask) { + return false; // unreadable + } + des = LIBCONCUR::get(elem); + // Correct data can be obtained only if + // the elem data is not modified during the getting process. + if (elem.cas_flag(f_ct, state::dequeued)) break; + } + // Get a valid index and iterate backwards. + ctx.r_idx += 1; + ctx.w_lst = w_idx; + return true; + } }; /** @@ -310,8 +377,8 @@ struct prod_cons : producer , consumer { /// \brief Mixing producer and consumer header definitions. - struct header : producer::header_impl - , consumer::header_impl { + struct header : traits>::header + , traits>::header { index_t const circ_size; constexpr header(index_t cs) noexcept diff --git a/include/libconcur/element.h b/include/libconcur/element.h index e2e4823..8dd3f39 100644 --- a/include/libconcur/element.h +++ b/include/libconcur/element.h @@ -25,8 +25,12 @@ namespace state { using flag_t = std::uint64_t; enum : flag_t { - /// \brief The invalid state value. invalid_value = ~flag_t(0), + committed = ~flag_t(1), + dequeued = ~flag_t(2), + + enqueue_mask = invalid_value << 32, + commit_mask = committed << 32, }; } // namespace state @@ -80,8 +84,14 @@ public: } } - void set_flag(state::flag_t flag) noexcept { - f_ct_.store(flag, std::memory_order_release); + void set_flag(state::flag_t flag, + std::memory_order const order = std::memory_order_release) noexcept { + f_ct_.store(flag, order); + } + + bool cas_flag(state::flag_t &expected, state::flag_t flag, + std::memory_order const order = std::memory_order_acq_rel) noexcept { + return f_ct_.compare_exchange_weak(expected, flag, order); } state::flag_t get_flag() const noexcept { diff --git a/test/concur/test_concur_concurrent.cpp b/test/concur/test_concur_concurrent.cpp index ac6446b..0f2d153 100644 --- a/test/concur/test_concur_concurrent.cpp +++ b/test/concur/test_concur_concurrent.cpp @@ -143,7 +143,7 @@ void test_concur(std::size_t np, std::size_t nc, std::size_t k) { } // namespace -TEST(concurrent, prod_cons) { +TEST(concurrent, unicast_prod_cons) { using namespace concur; /// @brief 1-1