/** * @file libconcur/concurrent.h * @author mutouyun (orz@orzz.org) * @brief Define different policies for concurrent queue. * @date 2022-11-07 */ #pragma once #include #include #include #include "libimp/span.h" #include "libconcur/def.h" #include "libconcur/element.h" LIBCONCUR_NAMESPACE_BEG_ /// @brief The queue index type. using index_t = std::uint32_t; /// @brief Multiplicity of the relationship. namespace relation { class single {}; class multi {}; } // namespace relation /// @brief Transmission mode namespace trans { class unicast {}; class broadcast {}; } // namespace trans /// @brief Determines whether type T can be implicitly converted to type U. template using is_convertible = typename std::enable_if::value>::type; /// @brief Check whether the context type is valid. template using is_context = decltype(typename std::enable_if().valid()), bool>::value>::type(), std::declval() % std::declval().circ_size); /** * @brief Calculate the corresponding queue position modulo the index value. * * @tparam C a context type * @param ctx a context object * @param idx a context array index * @return index_t - a corresponding queue position */ template > constexpr index_t trunc_index(C const &ctx, index_t idx) noexcept { /// @remark `circ_size == 2^N` => `idx & (circ_size - 1)` return ctx.valid() ? (idx % ctx.circ_size) : 0; } /// @brief Producer algorithm implementation. template struct producer; /// @brief Consumer algorithm implementation. template struct consumer; /** * @brief Algorithm definition under unicast transmission model. * * @ref A bounded wait-free(almost) zero-copy MPMC queue written in C++11. * Modified from MengRao/WFMPMC. * Copyright (c) 2018. Meng Rao (https://github.com/MengRao/WFMPMC). */ /// @brief Single-write producer model implementation. template <> struct producer { struct context_impl { /// @brief Write index. alignas(cache_line_size) index_t w_idx {0}; }; template , typename = is_convertible> static bool enqueue(::LIBIMP_::span> elems, C &ctx, U &&src) noexcept { auto w_idx = ctx.w_idx; auto w_cur = trunc_index(ctx, w_idx); auto &elem = elems[w_cur]; auto f_ct = elem.get_flag(); /// @remark Verify index. if ((f_ct != state::invalid_value) && (f_ct != w_idx)) { return false; // full } /// @remark Get a valid index and iterate backwards. ctx.w_idx += 1; /// @remark Set data & flag. elem.set_data(std::forward(src)); elem.set_flag(static_cast(~w_idx)); return true; } }; /// @brief Multi-write producer model implementation. template <> struct producer { struct context_impl { /// @brief Write index. alignas(cache_line_size) std::atomic w_idx {0}; }; template , typename = is_convertible> static bool enqueue(::LIBIMP_::span> elems, C &ctx, U &&src) noexcept { auto w_idx = ctx.w_idx.load(std::memory_order_acquire); for (;;) { auto w_cur = trunc_index(ctx, w_idx); auto &elem = elems[w_cur]; auto f_ct = elem.get_flag(); /// @remark Verify index. if ((f_ct != state::invalid_value) && (f_ct != w_idx)) { return false; // full } /// @remark Get a valid index and iterate backwards. if (!ctx.w_idx.compare_exchange_weak(w_idx, w_idx + 1, std::memory_order_acq_rel)) { continue; } /// @remark Set data & flag. elem.set_data(std::forward(src)); elem.set_flag(static_cast(~w_idx)); return true; } } }; /// @brief Single-read consumer model implementation. template <> struct consumer { struct context_impl { /// @brief Read index. alignas(cache_line_size) index_t r_idx {0}; }; template , typename = is_convertible> static bool dequeue(::LIBIMP_::span> elems, C &ctx, U &des) noexcept { auto r_idx = ctx.r_idx; auto r_cur = trunc_index(ctx, r_idx); auto &elem = elems[r_cur]; auto f_ct = elem.get_flag(); /// @remark Verify index. if (f_ct != static_cast(~r_idx)) { return false; // empty } /// @remark Get a valid index and iterate backwards. ctx.r_idx += 1; /// @remark Get data & set flag. des = elem.get_data(); elem.set_flag(r_idx + static_cast(elems.size())); return true; } }; /// @brief Multi-read consumer model implementation. template <> struct consumer { struct context_impl { /// @brief Read index. alignas(cache_line_size) std::atomic r_idx {0}; }; template , typename = is_convertible> static bool dequeue(::LIBIMP_::span> elems, C &ctx, U &des) noexcept { auto r_idx = ctx.r_idx.load(std::memory_order_acquire); for (;;) { auto r_cur = trunc_index(ctx, r_idx); auto &elem = elems[r_cur]; auto f_ct = elem.get_flag(); /// @remark Verify index. if (f_ct != static_cast(~r_idx)) { return false; // empty } /// @remark Get a valid index and iterate backwards. if (!ctx.r_idx.compare_exchange_weak(r_idx, r_idx + 1, std::memory_order_acq_rel)) { continue; } /// @remark Get data & set flag. des = elem.get_data(); elem.set_flag(r_idx + static_cast(elems.size())); return true; } } }; /** * @brief Algorithm definition under broadcast transmission model. */ /// @brief Single-write producer model implementation. template <> struct producer { }; /// @brief Multi-write producer model implementation. template <> struct producer { }; /// @brief Single-read consumer model implementation. template <> struct consumer { }; /// @brief Multi-read consumer model implementation. template <> struct consumer { }; /** * @brief Producer-consumer implementation. * * @tparam TransModT transmission mode (trans::unicast/trans::broadcast) * @tparam ProdModT producer relationship model (relation::single/relation::multi) * @tparam ConsModT consumer relationship model (relation::single/relation::multi) */ template struct prod_cons : producer , consumer { /// @brief Mixing producer and consumer context definitions. struct context : producer::context_impl , consumer::context_impl { index_t const circ_size; constexpr context(index_t cs) noexcept : circ_size(cs) {} template constexpr context(::LIBIMP_::span> elems) noexcept : circ_size(static_cast(elems.size())) {} constexpr bool valid() const noexcept { /// @remark circ_size must be a power of two. return (circ_size > 1) && ((circ_size & (circ_size - 1)) == 0); } }; }; LIBCONCUR_NAMESPACE_END_