From 6c9926bb93f7d0512bdfe83fe049b178e4dba2ce Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 19 Nov 2022 17:16:11 +0800 Subject: [PATCH] add: [concur] producer-consumer implementation (TBD) --- include/libconcur/concurrent.h | 220 ++++++++++++++++++++++++- include/libconcur/element.h | 76 +++++++++ include/libconcur/queue.h | 16 ++ test/CMakeLists.txt | 1 + test/concur/test_concur_concurrent.cpp | 82 +++++++++ test/concur/test_concur_queue.cpp | 4 + test/test_concur_concurrent.cpp | 16 -- 7 files changed, 391 insertions(+), 24 deletions(-) create mode 100644 include/libconcur/element.h create mode 100644 include/libconcur/queue.h create mode 100644 test/concur/test_concur_concurrent.cpp create mode 100644 test/concur/test_concur_queue.cpp delete mode 100644 test/test_concur_concurrent.cpp diff --git a/include/libconcur/concurrent.h b/include/libconcur/concurrent.h index 8d46d74..476e384 100644 --- a/include/libconcur/concurrent.h +++ b/include/libconcur/concurrent.h @@ -6,26 +6,230 @@ */ #pragma once +#include +#include #include -#include + +#include "libimp/span.h" #include "libconcur/def.h" +#include "libconcur/element.h" LIBCONCUR_NAMESPACE_BEG_ /// @brief The queue index type. using index_t = std::uint32_t; -namespace state { +/// @brief Multiplicity of the relationship. +namespace relation { -/// @brief The state flag type for the queue element. -using flag_t = std::uint64_t; +class single {}; +class multi {}; -enum : flag_t { - /// @brief The invalid state value. - invalid_value = (std::numeric_limits::max)(), +} // namespace relation + +/// @brief Transmission mode +namespace trans { + +class unicast {}; +class broadcast {}; + +} // namespace trans + +/// @brief Determines whether type T can be implicitly converted to type U. +template +using is_convertible = typename std::enable_if::value>::type; + +/// @brief Check whether the context type is valid. +template +using is_context = decltype(typename std::enable_if().valid()), bool>::value>::type(), + std::declval() % std::declval().circ_size); + +/** + * @brief Calculate the corresponding queue position modulo the index value. + * + * @tparam C a context type + * @param ctx a context object + * @param idx a context array index + * @return index_t - a corresponding queue position + */ +template > +constexpr index_t trunc_index(C const &ctx, index_t idx) noexcept { + /// @remark `circ_size == 2^N` => `idx & (circ_size - 1)` + return ctx.valid() ? (idx % ctx.circ_size) : 0; +} + +/// @brief Producer algorithm implementation. +template +struct producer; + +/// @brief Consumer algorithm implementation. +template +struct consumer; + +/** + * @brief Algorithm definition under unicast transmission model. + * + * @ref A bounded wait-free(almost) zero-copy MPMC queue written in C++11. + * Modified from MengRao/WFMPMC. + * Copyright (c) 2018. Meng Rao (https://github.com/MengRao/WFMPMC). +*/ + +/// @brief Single-write producer model implementation. +template <> +struct producer { + + struct context_impl { + /// @brief Write index. + alignas(cache_line_size) index_t w_idx {0}; + }; + + template , + typename = is_convertible> + static bool enqueue(::LIBIMP_::span> elems, C &ctx, U &&src) noexcept { + auto w_idx = ctx.w_idx; + auto w_cur = trunc_index(ctx, w_idx); + auto &elem = elems[w_cur]; + auto f_ct = elem.get_flag(); + /// @remark Verify index. + if ((f_ct != state::invalid_value) && + (f_ct != static_cast(w_idx))) { + return false; // full + } + /// @remark Get a valid index and iterate backwards. + ctx.w_idx += 1; + /// @remark Set data & flag. + elem.set_data(std::forward(src)); + elem.set_flag(static_cast(~w_idx)); + return true; + } }; -} // namespace state +/// @brief Multi-write producer model implementation. +template <> +struct producer { + + struct context_impl { + /// @brief Write index. + alignas(cache_line_size) std::atomic w_idx {0}; + }; + + template , + typename = is_convertible> + static bool enqueue(::LIBIMP_::span> elems, C &ctx, U &&src) noexcept { + auto w_idx = ctx.w_idx.load(std::memory_order_acquire); + for (;;) { + auto w_cur = trunc_index(ctx, w_idx); + auto &elem = elems[w_cur]; + auto f_ct = elem.get_flag(); + /// @remark Verify index. + if ((f_ct != state::invalid_value) && + (f_ct != w_idx)) { + return false; // full + } + /// @remark Get a valid index and iterate backwards. + if (!ctx.w_idx.compare_exchange_week(w_idx, w_idx + 1, std::memory_order_acq_rel)) { + continue; + } + /// @remark Set data & flag. + elem.set_data(std::forward(src)); + elem.set_flag(~w_idx); + return true; + } + } +}; + +/// @brief Single-read consumer model implementation. +template <> +struct consumer { + + struct context_impl { + /// @brief Read index. + alignas(cache_line_size) index_t r_idx {0}; + }; + + template , + typename = is_convertible> + static bool dequeue(::LIBIMP_::span> elems, C &ctx, U &des) noexcept { + auto r_idx = ctx.r_idx; + auto r_cur = trunc_index(ctx, r_idx); + auto &elem = elems[r_cur]; + auto f_ct = elem.get_flag(); + /// @remark Verify index. + if (f_ct != ~r_idx) { + return false; // empty + } + /// @remark Get a valid index and iterate backwards. + ctx.r_idx += 1; + /// @remark Get data & set flag. + des = elem.get_data(); + elem.set_flag(r_idx + static_cast(elems.size())); + return true; + } +}; + +/// @brief Multi-read consumer model implementation. +template <> +struct consumer { + + struct context_impl { + /// @brief Read index. + alignas(cache_line_size) std::atomic r_idx {0}; + }; + + template , + typename = is_convertible> + static bool dequeue(::LIBIMP_::span> elems, C &ctx, U &des) noexcept { + auto r_idx = ctx.r_idx.load(std::memory_order_acquire); + for (;;) { + auto r_cur = trunc_index(ctx, r_idx); + auto &elem = elems[r_cur]; + auto f_ct = elem.get_flag(); + /// @remark Verify index. + if (f_ct != ~r_idx) { + return false; // empty + } + /// @remark Get a valid index and iterate backwards. + if (!ctx.r_idx.compare_exchange_week(r_idx, r_idx + 1, std::memory_order_acq_rel)) { + continue; + } + /// @remark Get data & set flag. + des = elem.get_data(); + elem.set_flag(r_idx + static_cast(elems.size())); + return true; + } + } +}; + +/** + * @brief Producer-consumer implementation. + * + * @tparam TransModT transmission mode (trans::unicast/trans::broadcast) + * @tparam ProdModT producer relationship model (relation::single/relation::multi) + * @tparam ConsModT consumer relationship model (relation::single/relation::multi) + */ +template +struct prod_cons : producer + , consumer { + + /// @brief Mixing producer and consumer context definitions. + struct context : producer::context_impl + , consumer::context_impl { + index_t const circ_size; + + constexpr context(index_t cs) noexcept + : circ_size(cs) {} + + constexpr bool valid() const noexcept { + /// @remark circ_size must be a power of two. + return (circ_size > 1) && ((circ_size & (circ_size - 1)) == 0); + } + }; +}; LIBCONCUR_NAMESPACE_END_ diff --git a/include/libconcur/element.h b/include/libconcur/element.h new file mode 100644 index 0000000..b45dd28 --- /dev/null +++ b/include/libconcur/element.h @@ -0,0 +1,76 @@ +/** + * @file libconcur/element.h + * @author mutouyun (orz@orzz.org) + * @brief Define concurrent queue element abstraction. + * @date 2022-11-19 + */ +#pragma once + +#include +#include +#include // std::numeric_limits +#include +#include + +#include "libimp/detect_plat.h" +#include "libimp/log.h" + +#include "libconcur/def.h" + +LIBCONCUR_NAMESPACE_BEG_ +namespace state { + +/// @brief The state flag type for the queue element. +using flag_t = std::uint64_t; + +enum : flag_t { + /// @brief The invalid state value. + invalid_value = (std::numeric_limits::max)(), +}; + +} // namespace state + +template +class element { + /// @brief Committed flag. + alignas(cache_line_size) std::atomic f_ct_; + /// @brief The user data segment. + T data_; + + /// @brief Disable copy & move. + element(element const &) = delete; + element &operator=(element const &) = delete; + +public: + template + element(A &&... args) + noexcept(noexcept(T{std::forward(args)...})) + : f_ct_{state::invalid_value} + , data_{std::forward(args)...} {} + + template + void set_data(U &&src) noexcept { + LIBIMP_LOG_(); + LIBIMP_TRY { + data_ = std::forward(src); + } LIBIMP_CATCH(std::exception const &e) { + log.error("failed: `data = std::forward(src)`. error = {}", e.what()); + } LIBIMP_CATCH(...) { + log.error("failed: `data = std::forward(src)`. error = unknown"); + } + } + + T &&get_data() noexcept { + return std::move(data_); + } + + void set_flag(state::flag_t flag) noexcept { + f_ct_.store(flag, std::memory_order_release); + } + + state::flag_t get_flag() const noexcept { + return f_ct_.load(std::memory_order_acquire); + } +}; + +LIBCONCUR_NAMESPACE_END_ diff --git a/include/libconcur/queue.h b/include/libconcur/queue.h new file mode 100644 index 0000000..485045b --- /dev/null +++ b/include/libconcur/queue.h @@ -0,0 +1,16 @@ +/** + * @file libconcur/queue.h + * @author mutouyun (orz@orzz.org) + * @brief Define concurrent queue. + * @date 2022-11-19 + */ +#pragma once + +#include "libconcur/def.h" +#include "libconcur/element.h" + +LIBCONCUR_NAMESPACE_BEG_ + + + +LIBCONCUR_NAMESPACE_END_ diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 76cfa6a..f534f3f 100755 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -16,6 +16,7 @@ include_directories( file(GLOB SRC_FILES ${LIBIPC_PROJECT_DIR}/test/imp/*.cpp ${LIBIPC_PROJECT_DIR}/test/pmr/*.cpp + ${LIBIPC_PROJECT_DIR}/test/concur/*.cpp ${LIBIPC_PROJECT_DIR}/test/*.cpp) file(GLOB HEAD_FILES diff --git a/test/concur/test_concur_concurrent.cpp b/test/concur/test_concur_concurrent.cpp new file mode 100644 index 0000000..e07a3f1 --- /dev/null +++ b/test/concur/test_concur_concurrent.cpp @@ -0,0 +1,82 @@ + +#include +#include + +#include "gtest/gtest.h" + +#include "libconcur/concurrent.h" + +TEST(concurrent, cache_line_size) { + std::cout << concur::cache_line_size << "\n"; + EXPECT_TRUE(concur::cache_line_size >= alignof(std::max_align_t)); +} + +TEST(concurrent, index_and_flag) { + EXPECT_TRUE(sizeof(concur::index_t) < sizeof(concur::state::flag_t)); +} + +TEST(concurrent, trunc_index) { + struct context { + concur::index_t circ_size; + + bool valid() const noexcept { + return (circ_size > 1) && ((circ_size & (circ_size - 1)) == 0); + } + }; + + /// @brief circ-size = 0 + EXPECT_EQ(concur::trunc_index(context{0}, 0), 0); + EXPECT_EQ(concur::trunc_index(context{0}, 1), 0); + EXPECT_EQ(concur::trunc_index(context{0}, 2), 0); + EXPECT_EQ(concur::trunc_index(context{0}, 16), 0); + EXPECT_EQ(concur::trunc_index(context{0}, 111), 0); + EXPECT_EQ(concur::trunc_index(context{0}, -1), 0); + + /// @brief circ-size = 1 + EXPECT_EQ(concur::trunc_index(context{1}, 0), 0); + EXPECT_EQ(concur::trunc_index(context{1}, 1), 0); + EXPECT_EQ(concur::trunc_index(context{1}, 2), 0); + EXPECT_EQ(concur::trunc_index(context{1}, 16), 0); + EXPECT_EQ(concur::trunc_index(context{1}, 111), 0); + EXPECT_EQ(concur::trunc_index(context{1}, -1), 0); + + /// @brief circ-size = 2 + EXPECT_EQ(concur::trunc_index(context{2}, 0), 0); + EXPECT_EQ(concur::trunc_index(context{2}, 1), 1); + EXPECT_EQ(concur::trunc_index(context{2}, 2), 0); + EXPECT_EQ(concur::trunc_index(context{2}, 16), 0); + EXPECT_EQ(concur::trunc_index(context{2}, 111), 1); + EXPECT_EQ(concur::trunc_index(context{2}, -1), 1); + + /// @brief circ-size = 10 + EXPECT_EQ(concur::trunc_index(context{10}, 0), 0); + EXPECT_EQ(concur::trunc_index(context{10}, 1), 0); + EXPECT_EQ(concur::trunc_index(context{10}, 2), 0); + EXPECT_EQ(concur::trunc_index(context{10}, 16), 0); + EXPECT_EQ(concur::trunc_index(context{10}, 111), 0); + EXPECT_EQ(concur::trunc_index(context{10}, -1), 0); + + /// @brief circ-size = 16 + EXPECT_EQ(concur::trunc_index(context{16}, 0), 0); + EXPECT_EQ(concur::trunc_index(context{16}, 1), 1); + EXPECT_EQ(concur::trunc_index(context{16}, 2), 2); + EXPECT_EQ(concur::trunc_index(context{16}, 16), 0); + EXPECT_EQ(concur::trunc_index(context{16}, 111), 15); + EXPECT_EQ(concur::trunc_index(context{16}, -1), 15); + + /// @brief circ-size = (index_t)-1 + EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 0), 0); + EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 1), 0); + EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 2), 0); + EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 16), 0); + EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 111), 0); + EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, -1), 0); + + /// @brief circ-size = 2147483648 (2^31) + EXPECT_EQ(concur::trunc_index(context{2147483648u}, 0), 0); + EXPECT_EQ(concur::trunc_index(context{2147483648u}, 1), 1); + EXPECT_EQ(concur::trunc_index(context{2147483648u}, 2), 2); + EXPECT_EQ(concur::trunc_index(context{2147483648u}, 16), 16); + EXPECT_EQ(concur::trunc_index(context{2147483648u}, 111), 111); + EXPECT_EQ(concur::trunc_index(context{2147483648u}, -1), 2147483647); +} \ No newline at end of file diff --git a/test/concur/test_concur_queue.cpp b/test/concur/test_concur_queue.cpp new file mode 100644 index 0000000..e05c1b1 --- /dev/null +++ b/test/concur/test_concur_queue.cpp @@ -0,0 +1,4 @@ + +#include "gtest/gtest.h" + +#include "libconcur/queue.h" diff --git a/test/test_concur_concurrent.cpp b/test/test_concur_concurrent.cpp deleted file mode 100644 index df8e3a7..0000000 --- a/test/test_concur_concurrent.cpp +++ /dev/null @@ -1,16 +0,0 @@ - -#include -#include - -#include "gtest/gtest.h" - -#include "libconcur/concurrent.h" - -TEST(concurrent, cache_line_size) { - std::cout << concur::cache_line_size << "\n"; - EXPECT_TRUE(concur::cache_line_size >= alignof(std::max_align_t)); -} - -TEST(concurrent, index_and_flag) { - EXPECT_TRUE(sizeof(concur::index_t) < sizeof(concur::state::flag_t)); -} \ No newline at end of file