/** * \file libconcur/concurrent.h * \author mutouyun (orz@orzz.org) * \brief Define different policies for concurrent queue. * \date 2022-11-07 */ #pragma once #include #include #include #include "libimp/span.h" #include "libconcur/def.h" #include "libconcur/element.h" LIBCONCUR_NAMESPACE_BEG_ /// \brief The queue index type. using index_t = std::uint32_t; /// \brief Multiplicity of the relationship. namespace relation { class single {}; class multi {}; } // namespace relation /// \brief Transmission mode namespace trans { class unicast {}; class broadcast {}; } // namespace trans /// \brief Determines whether type T can be implicitly converted to type U. template using is_convertible = typename std::enable_if::value>::type; /// \brief Check whether the context type is valid. template using is_context = decltype(typename std::enable_if().valid()), bool>::value>::type(), std::declval() % std::declval().circ_size); /** * \brief Calculate the corresponding queue position modulo the index value. * * \tparam C a context type * \param ctx a context object * \param idx a context array index * \return index_t - a corresponding queue position */ template > constexpr index_t trunc_index(C const &ctx, index_t idx) noexcept { /// \remark `circ_size == 2^N` => `idx & (circ_size - 1)` return ctx.valid() ? (idx % ctx.circ_size) : 0; } /// \brief Producer algorithm implementation. template struct producer; /// \brief Consumer algorithm implementation. template struct consumer; /** * \brief Algorithm definition under unicast transmission model. * * \ref A bounded wait-free(almost) zero-copy MPMC queue written in C++11. * Modified from MengRao/WFMPMC. * Copyright (c) 2018. Meng Rao (https://github.com/MengRao/WFMPMC). */ /// \brief Single-write producer model implementation. template <> struct producer { struct context_impl { /// \brief Write index. alignas(cache_line_size) index_t w_idx {0}; }; template , typename = is_convertible> static bool enqueue(::LIBIMP::span> elems, C &ctx, U &&src) noexcept { auto w_idx = ctx.w_idx; auto w_cur = trunc_index(ctx, w_idx); auto &elem = elems[w_cur]; auto f_ct = elem.get_flag(); /// \remark Verify index. if ((f_ct != state::invalid_value) && (f_ct != w_idx)) { return false; // full } /// \remark Get a valid index and iterate backwards. ctx.w_idx += 1; /// \remark Set data & flag. elem.set_data(std::forward(src)); elem.set_flag(static_cast(~w_idx)); return true; } }; /// \brief Multi-write producer model implementation. template <> struct producer { struct context_impl { /// \brief Write index. alignas(cache_line_size) std::atomic w_idx {0}; }; template , typename = is_convertible> static bool enqueue(::LIBIMP::span> elems, C &ctx, U &&src) noexcept { auto w_idx = ctx.w_idx.load(std::memory_order_acquire); for (;;) { auto w_cur = trunc_index(ctx, w_idx); auto &elem = elems[w_cur]; auto f_ct = elem.get_flag(); /// \remark Verify index. if ((f_ct != state::invalid_value) && (f_ct != w_idx)) { return false; // full } /// \remark Get a valid index and iterate backwards. if (!ctx.w_idx.compare_exchange_weak(w_idx, w_idx + 1, std::memory_order_acq_rel)) { continue; } /// \remark Set data & flag. elem.set_data(std::forward(src)); elem.set_flag(static_cast(~w_idx)); return true; } } }; /// \brief Single-read consumer model implementation. template <> struct consumer { struct context_impl { /// \brief Read index. alignas(cache_line_size) index_t r_idx {0}; }; template , typename = is_convertible> static bool dequeue(::LIBIMP::span> elems, C &ctx, U &des) noexcept { auto r_idx = ctx.r_idx; auto r_cur = trunc_index(ctx, r_idx); auto &elem = elems[r_cur]; auto f_ct = elem.get_flag(); /// \remark Verify index. if (f_ct != static_cast(~r_idx)) { return false; // empty } /// \remark Get a valid index and iterate backwards. ctx.r_idx += 1; /// \remark Get data & set flag. des = elem.get_data(); elem.set_flag(r_idx + static_cast(elems.size())); return true; } }; /// \brief Multi-read consumer model implementation. template <> struct consumer { struct context_impl { /// \brief Read index. alignas(cache_line_size) std::atomic r_idx {0}; }; template , typename = is_convertible> static bool dequeue(::LIBIMP::span> elems, C &ctx, U &des) noexcept { auto r_idx = ctx.r_idx.load(std::memory_order_acquire); for (;;) { auto r_cur = trunc_index(ctx, r_idx); auto &elem = elems[r_cur]; auto f_ct = elem.get_flag(); /// \remark Verify index. if (f_ct != static_cast(~r_idx)) { return false; // empty } /// \remark Get a valid index and iterate backwards. if (!ctx.r_idx.compare_exchange_weak(r_idx, r_idx + 1, std::memory_order_acq_rel)) { continue; } /// \remark Get data & set flag. des = elem.get_data(); elem.set_flag(r_idx + static_cast(elems.size())); return true; } } }; /** * \brief Algorithm definition under broadcast transmission model. */ /// \brief Single-write producer model implementation. template <> struct producer { }; /// \brief Multi-write producer model implementation. template <> struct producer { }; /// \brief Single-read consumer model implementation. template <> struct consumer { }; /// \brief Multi-read consumer model implementation. template <> struct consumer { }; /** * \brief Producer-consumer implementation. * * \tparam TransModT transmission mode (trans::unicast/trans::broadcast) * \tparam ProdModT producer relationship model (relation::single/relation::multi) * \tparam ConsModT consumer relationship model (relation::single/relation::multi) */ template struct prod_cons : producer , consumer { /// \brief Mixing producer and consumer context definitions. struct context : producer::context_impl , consumer::context_impl { index_t const circ_size; constexpr context(index_t cs) noexcept : circ_size(cs) {} template constexpr context(::LIBIMP::span> elems) noexcept : circ_size(static_cast(elems.size())) {} constexpr bool valid() const noexcept { /// \remark circ_size must be a power of two. return (circ_size > 1) && ((circ_size & (circ_size - 1)) == 0); } }; }; LIBCONCUR_NAMESPACE_END_