add: [concur] producer-consumer implementation (TBD)

This commit is contained in:
mutouyun 2022-11-19 17:16:11 +08:00
parent 4d5939c513
commit 6c9926bb93
7 changed files with 391 additions and 24 deletions

View File

@ -6,26 +6,230 @@
*/ */
#pragma once #pragma once
#include <type_traits>
#include <atomic>
#include <cstdint> #include <cstdint>
#include <limits>
#include "libimp/span.h"
#include "libconcur/def.h" #include "libconcur/def.h"
#include "libconcur/element.h"
LIBCONCUR_NAMESPACE_BEG_ LIBCONCUR_NAMESPACE_BEG_
/// @brief The queue index type. /// @brief The queue index type.
using index_t = std::uint32_t; using index_t = std::uint32_t;
namespace state { /// @brief Multiplicity of the relationship.
namespace relation {
/// @brief The state flag type for the queue element. class single {};
using flag_t = std::uint64_t; class multi {};
enum : flag_t { } // namespace relation
/// @brief The invalid state value.
invalid_value = (std::numeric_limits<flag_t>::max)(), /// @brief Transmission mode
namespace trans {
class unicast {};
class broadcast {};
} // namespace trans
/// @brief Determines whether type T can be implicitly converted to type U.
template <typename T, typename U>
using is_convertible = typename std::enable_if<std::is_convertible<T *, U *>::value>::type;
/// @brief Check whether the context type is valid.
template <typename T>
using is_context = decltype(typename std::enable_if<std::is_convertible<decltype(
std::declval<T>().valid()), bool>::value>::type(),
std::declval<index_t>() % std::declval<T>().circ_size);
/**
* @brief Calculate the corresponding queue position modulo the index value.
*
* @tparam C a context type
* @param ctx a context object
* @param idx a context array index
* @return index_t - a corresponding queue position
*/
template <typename C, typename = is_context<C>>
constexpr index_t trunc_index(C const &ctx, index_t idx) noexcept {
/// @remark `circ_size == 2^N` => `idx & (circ_size - 1)`
return ctx.valid() ? (idx % ctx.circ_size) : 0;
}
/// @brief Producer algorithm implementation.
template <typename TransModT, typename RelationT>
struct producer;
/// @brief Consumer algorithm implementation.
template <typename TransModT, typename RelationT>
struct consumer;
/**
* @brief Algorithm definition under unicast transmission model.
*
* @ref A bounded wait-free(almost) zero-copy MPMC queue written in C++11.
* Modified from MengRao/WFMPMC.
* Copyright (c) 2018. Meng Rao (https://github.com/MengRao/WFMPMC).
*/
/// @brief Single-write producer model implementation.
template <>
struct producer<trans::unicast, relation::single> {
struct context_impl {
/// @brief Write index.
alignas(cache_line_size) index_t w_idx {0};
}; };
} // namespace state template <typename T, typename C, typename U,
typename = is_context<C>,
typename = is_convertible<C, context_impl>>
static bool enqueue(::LIBIMP_::span<element<T>> elems, C &ctx, U &&src) noexcept {
auto w_idx = ctx.w_idx;
auto w_cur = trunc_index(ctx, w_idx);
auto &elem = elems[w_cur];
auto f_ct = elem.get_flag();
/// @remark Verify index.
if ((f_ct != state::invalid_value) &&
(f_ct != static_cast<state::flag_t>(w_idx))) {
return false; // full
}
/// @remark Get a valid index and iterate backwards.
ctx.w_idx += 1;
/// @remark Set data & flag.
elem.set_data(std::forward<U>(src));
elem.set_flag(static_cast<state::flag_t>(~w_idx));
return true;
}
};
/// @brief Multi-write producer model implementation.
template <>
struct producer<trans::unicast, relation::multi> {
struct context_impl {
/// @brief Write index.
alignas(cache_line_size) std::atomic<index_t> w_idx {0};
};
template <typename T, typename C, typename U,
typename = is_context<C>,
typename = is_convertible<C, context_impl>>
static bool enqueue(::LIBIMP_::span<element<T>> elems, C &ctx, U &&src) noexcept {
auto w_idx = ctx.w_idx.load(std::memory_order_acquire);
for (;;) {
auto w_cur = trunc_index(ctx, w_idx);
auto &elem = elems[w_cur];
auto f_ct = elem.get_flag();
/// @remark Verify index.
if ((f_ct != state::invalid_value) &&
(f_ct != w_idx)) {
return false; // full
}
/// @remark Get a valid index and iterate backwards.
if (!ctx.w_idx.compare_exchange_week(w_idx, w_idx + 1, std::memory_order_acq_rel)) {
continue;
}
/// @remark Set data & flag.
elem.set_data(std::forward<U>(src));
elem.set_flag(~w_idx);
return true;
}
}
};
/// @brief Single-read consumer model implementation.
template <>
struct consumer<trans::unicast, relation::single> {
struct context_impl {
/// @brief Read index.
alignas(cache_line_size) index_t r_idx {0};
};
template <typename T, typename C, typename U,
typename = is_context<C>,
typename = is_convertible<C, context_impl>>
static bool dequeue(::LIBIMP_::span<element<T>> elems, C &ctx, U &des) noexcept {
auto r_idx = ctx.r_idx;
auto r_cur = trunc_index(ctx, r_idx);
auto &elem = elems[r_cur];
auto f_ct = elem.get_flag();
/// @remark Verify index.
if (f_ct != ~r_idx) {
return false; // empty
}
/// @remark Get a valid index and iterate backwards.
ctx.r_idx += 1;
/// @remark Get data & set flag.
des = elem.get_data();
elem.set_flag(r_idx + static_cast<index_t>(elems.size()));
return true;
}
};
/// @brief Multi-read consumer model implementation.
template <>
struct consumer<trans::unicast, relation::multi> {
struct context_impl {
/// @brief Read index.
alignas(cache_line_size) std::atomic<index_t> r_idx {0};
};
template <typename T, typename C, typename U,
typename = is_context<C>,
typename = is_convertible<C, context_impl>>
static bool dequeue(::LIBIMP_::span<element<T>> elems, C &ctx, U &des) noexcept {
auto r_idx = ctx.r_idx.load(std::memory_order_acquire);
for (;;) {
auto r_cur = trunc_index(ctx, r_idx);
auto &elem = elems[r_cur];
auto f_ct = elem.get_flag();
/// @remark Verify index.
if (f_ct != ~r_idx) {
return false; // empty
}
/// @remark Get a valid index and iterate backwards.
if (!ctx.r_idx.compare_exchange_week(r_idx, r_idx + 1, std::memory_order_acq_rel)) {
continue;
}
/// @remark Get data & set flag.
des = elem.get_data();
elem.set_flag(r_idx + static_cast<index_t>(elems.size()));
return true;
}
}
};
/**
* @brief Producer-consumer implementation.
*
* @tparam TransModT transmission mode (trans::unicast/trans::broadcast)
* @tparam ProdModT producer relationship model (relation::single/relation::multi)
* @tparam ConsModT consumer relationship model (relation::single/relation::multi)
*/
template <typename TransModT, typename ProdModT, typename ConsModT>
struct prod_cons : producer<TransModT, ProdModT>
, consumer<TransModT, ConsModT> {
/// @brief Mixing producer and consumer context definitions.
struct context : producer<TransModT, ProdModT>::context_impl
, consumer<TransModT, ConsModT>::context_impl {
index_t const circ_size;
constexpr context(index_t cs) noexcept
: circ_size(cs) {}
constexpr bool valid() const noexcept {
/// @remark circ_size must be a power of two.
return (circ_size > 1) && ((circ_size & (circ_size - 1)) == 0);
}
};
};
LIBCONCUR_NAMESPACE_END_ LIBCONCUR_NAMESPACE_END_

View File

@ -0,0 +1,76 @@
/**
* @file libconcur/element.h
* @author mutouyun (orz@orzz.org)
* @brief Define concurrent queue element abstraction.
* @date 2022-11-19
*/
#pragma once
#include <atomic>
#include <utility>
#include <limits> // std::numeric_limits
#include <cstdint>
#include <exception>
#include "libimp/detect_plat.h"
#include "libimp/log.h"
#include "libconcur/def.h"
LIBCONCUR_NAMESPACE_BEG_
namespace state {
/// @brief The state flag type for the queue element.
using flag_t = std::uint64_t;
enum : flag_t {
/// @brief The invalid state value.
invalid_value = (std::numeric_limits<flag_t>::max)(),
};
} // namespace state
template <typename T>
class element {
/// @brief Committed flag.
alignas(cache_line_size) std::atomic<state::flag_t> f_ct_;
/// @brief The user data segment.
T data_;
/// @brief Disable copy & move.
element(element const &) = delete;
element &operator=(element const &) = delete;
public:
template <typename... A>
element(A &&... args)
noexcept(noexcept(T{std::forward<A>(args)...}))
: f_ct_{state::invalid_value}
, data_{std::forward<A>(args)...} {}
template <typename U>
void set_data(U &&src) noexcept {
LIBIMP_LOG_();
LIBIMP_TRY {
data_ = std::forward<U>(src);
} LIBIMP_CATCH(std::exception const &e) {
log.error("failed: `data = std::forward<U>(src)`. error = {}", e.what());
} LIBIMP_CATCH(...) {
log.error("failed: `data = std::forward<U>(src)`. error = unknown");
}
}
T &&get_data() noexcept {
return std::move(data_);
}
void set_flag(state::flag_t flag) noexcept {
f_ct_.store(flag, std::memory_order_release);
}
state::flag_t get_flag() const noexcept {
return f_ct_.load(std::memory_order_acquire);
}
};
LIBCONCUR_NAMESPACE_END_

16
include/libconcur/queue.h Normal file
View File

@ -0,0 +1,16 @@
/**
* @file libconcur/queue.h
* @author mutouyun (orz@orzz.org)
* @brief Define concurrent queue.
* @date 2022-11-19
*/
#pragma once
#include "libconcur/def.h"
#include "libconcur/element.h"
LIBCONCUR_NAMESPACE_BEG_
LIBCONCUR_NAMESPACE_END_

View File

@ -16,6 +16,7 @@ include_directories(
file(GLOB SRC_FILES file(GLOB SRC_FILES
${LIBIPC_PROJECT_DIR}/test/imp/*.cpp ${LIBIPC_PROJECT_DIR}/test/imp/*.cpp
${LIBIPC_PROJECT_DIR}/test/pmr/*.cpp ${LIBIPC_PROJECT_DIR}/test/pmr/*.cpp
${LIBIPC_PROJECT_DIR}/test/concur/*.cpp
${LIBIPC_PROJECT_DIR}/test/*.cpp) ${LIBIPC_PROJECT_DIR}/test/*.cpp)
file(GLOB HEAD_FILES file(GLOB HEAD_FILES

View File

@ -0,0 +1,82 @@
#include <iostream>
#include <cstddef>
#include "gtest/gtest.h"
#include "libconcur/concurrent.h"
TEST(concurrent, cache_line_size) {
std::cout << concur::cache_line_size << "\n";
EXPECT_TRUE(concur::cache_line_size >= alignof(std::max_align_t));
}
TEST(concurrent, index_and_flag) {
EXPECT_TRUE(sizeof(concur::index_t) < sizeof(concur::state::flag_t));
}
TEST(concurrent, trunc_index) {
struct context {
concur::index_t circ_size;
bool valid() const noexcept {
return (circ_size > 1) && ((circ_size & (circ_size - 1)) == 0);
}
};
/// @brief circ-size = 0
EXPECT_EQ(concur::trunc_index(context{0}, 0), 0);
EXPECT_EQ(concur::trunc_index(context{0}, 1), 0);
EXPECT_EQ(concur::trunc_index(context{0}, 2), 0);
EXPECT_EQ(concur::trunc_index(context{0}, 16), 0);
EXPECT_EQ(concur::trunc_index(context{0}, 111), 0);
EXPECT_EQ(concur::trunc_index(context{0}, -1), 0);
/// @brief circ-size = 1
EXPECT_EQ(concur::trunc_index(context{1}, 0), 0);
EXPECT_EQ(concur::trunc_index(context{1}, 1), 0);
EXPECT_EQ(concur::trunc_index(context{1}, 2), 0);
EXPECT_EQ(concur::trunc_index(context{1}, 16), 0);
EXPECT_EQ(concur::trunc_index(context{1}, 111), 0);
EXPECT_EQ(concur::trunc_index(context{1}, -1), 0);
/// @brief circ-size = 2
EXPECT_EQ(concur::trunc_index(context{2}, 0), 0);
EXPECT_EQ(concur::trunc_index(context{2}, 1), 1);
EXPECT_EQ(concur::trunc_index(context{2}, 2), 0);
EXPECT_EQ(concur::trunc_index(context{2}, 16), 0);
EXPECT_EQ(concur::trunc_index(context{2}, 111), 1);
EXPECT_EQ(concur::trunc_index(context{2}, -1), 1);
/// @brief circ-size = 10
EXPECT_EQ(concur::trunc_index(context{10}, 0), 0);
EXPECT_EQ(concur::trunc_index(context{10}, 1), 0);
EXPECT_EQ(concur::trunc_index(context{10}, 2), 0);
EXPECT_EQ(concur::trunc_index(context{10}, 16), 0);
EXPECT_EQ(concur::trunc_index(context{10}, 111), 0);
EXPECT_EQ(concur::trunc_index(context{10}, -1), 0);
/// @brief circ-size = 16
EXPECT_EQ(concur::trunc_index(context{16}, 0), 0);
EXPECT_EQ(concur::trunc_index(context{16}, 1), 1);
EXPECT_EQ(concur::trunc_index(context{16}, 2), 2);
EXPECT_EQ(concur::trunc_index(context{16}, 16), 0);
EXPECT_EQ(concur::trunc_index(context{16}, 111), 15);
EXPECT_EQ(concur::trunc_index(context{16}, -1), 15);
/// @brief circ-size = (index_t)-1
EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 0), 0);
EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 1), 0);
EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 2), 0);
EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 16), 0);
EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, 111), 0);
EXPECT_EQ(concur::trunc_index(context{(concur::index_t)-1}, -1), 0);
/// @brief circ-size = 2147483648 (2^31)
EXPECT_EQ(concur::trunc_index(context{2147483648u}, 0), 0);
EXPECT_EQ(concur::trunc_index(context{2147483648u}, 1), 1);
EXPECT_EQ(concur::trunc_index(context{2147483648u}, 2), 2);
EXPECT_EQ(concur::trunc_index(context{2147483648u}, 16), 16);
EXPECT_EQ(concur::trunc_index(context{2147483648u}, 111), 111);
EXPECT_EQ(concur::trunc_index(context{2147483648u}, -1), 2147483647);
}

View File

@ -0,0 +1,4 @@
#include "gtest/gtest.h"
#include "libconcur/queue.h"

View File

@ -1,16 +0,0 @@
#include <iostream>
#include <cstddef>
#include "gtest/gtest.h"
#include "libconcur/concurrent.h"
TEST(concurrent, cache_line_size) {
std::cout << concur::cache_line_size << "\n";
EXPECT_TRUE(concur::cache_line_size >= alignof(std::max_align_t));
}
TEST(concurrent, index_and_flag) {
EXPECT_TRUE(sizeof(concur::index_t) < sizeof(concur::state::flag_t));
}