From 8a4e6be9dae1b7fba2faa0b41b0453e750ed55b1 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 28 May 2023 19:05:17 +0800 Subject: [PATCH] add: [concur] (TBD) bus --- benchmark/CMakeLists.txt | 2 +- include/libconcur/bus.h | 19 +++- include/libconcur/data_model.h | 167 ++++++++++++++++++++++++++++++++ include/libconcur/queue.h | 148 ++-------------------------- test/concur/test_concur_bus.cpp | 4 + 5 files changed, 198 insertions(+), 142 deletions(-) create mode 100644 include/libconcur/data_model.h diff --git a/benchmark/CMakeLists.txt b/benchmark/CMakeLists.txt index f8dbd5d..a81d85f 100644 --- a/benchmark/CMakeLists.txt +++ b/benchmark/CMakeLists.txt @@ -37,4 +37,4 @@ add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES}) target_link_libraries(${PROJECT_NAME} benchmark_main fmt - imp ipc) + imp pmr ipc) diff --git a/include/libconcur/bus.h b/include/libconcur/bus.h index 5f68c64..0b1a80f 100644 --- a/include/libconcur/bus.h +++ b/include/libconcur/bus.h @@ -7,10 +7,27 @@ #pragma once #include "libconcur/def.h" -#include "libconcur/concurrent.h" +#include "libconcur/data_model.h" LIBCONCUR_NAMESPACE_BEG_ +template +class bus + : public data_model { + using base_t = data_model; + +public: + using base_t::base_t; + + template + bool broadcast(U &&value) noexcept { + return this->enqueue(std::forward(value)); + } + + bool receive(typename base_t::value_type &value) noexcept { + return this->dequeue(value); + } +}; LIBCONCUR_NAMESPACE_END_ diff --git a/include/libconcur/data_model.h b/include/libconcur/data_model.h new file mode 100644 index 0000000..5c5b5c4 --- /dev/null +++ b/include/libconcur/data_model.h @@ -0,0 +1,167 @@ +/** + * \file libconcur/data_model.h + * \author mutouyun (orz@orzz.org) + * \brief Define the concurrent data model. + * \date 2023-05-28 + */ +#pragma once + +#include +#include +#include +#include + +#include "libimp/construct.h" +#include "libimp/detect_plat.h" +#include "libimp/aligned.h" + +#include "libpmr/allocator.h" +#include "libpmr/memory_resource.h" + +#include "libconcur/def.h" +#include "libconcur/concurrent.h" + +LIBCONCUR_NAMESPACE_BEG_ + +template +class data_model { +public: + using producer_relation_t = PRelationT; + using consumer_relation_t = CRelationT; + using transmission_mode_t = TransModT; + + using model_type = prod_cons; + using value_type = T; + using size_type = std::int64_t; + +private: + struct data { + model_type model_; + typename concur::traits::header header_; + ::LIBIMP::aligned> elements_start_; + + template + data(U &&model) : header_(std::forward(model)) { + auto elements = this->elements(); + typename decltype(elements)::size_type i = 0; + LIBIMP_TRY { + for (; i < elements.size(); ++i) { + (void)::LIBIMP::construct>(&elements[i]); + } + } LIBIMP_CATCH(...) { + for (decltype(i) k = 0; k < i; ++k) { + (void)::LIBIMP::destroy>(&elements[k]); + } + throw; + } + } + + ~data() noexcept { + for (auto &elem : this->elements()) { + (void)::LIBIMP::destroy>(&elem); + } + } + + static std::size_t size_of(index_t circ_size) noexcept { + return sizeof(struct data) + ( (circ_size - 1) * sizeof(element) ); + } + + std::size_t byte_size() const noexcept { + return size_of(header_.circ_size); + } + + /// \brief element elements[0]; + ::LIBIMP::span> elements() noexcept { + return {elements_start_.ptr(), header_.circ_size}; + } + }; + + data *init(index_t circ_size) noexcept { + if (!data_allocator_) { + return nullptr; + } + void *data_ptr = nullptr; + LIBIMP_TRY { + data_ptr = data_allocator_.alloc(data::size_of(circ_size)); + if (data_ptr == nullptr) { + return nullptr; + } + return ::LIBIMP::construct(data_ptr, circ_size); + } LIBIMP_CATCH(...) { + data_allocator_.dealloc(data_ptr, data::size_of(circ_size)); + return nullptr; + } + } + + ::LIBPMR::allocator data_allocator_; + std::atomic size_ {0}; + data *data_; + typename concur::traits::context context_ {}; + +protected: + template + bool enqueue(U &&value) noexcept { + if (!valid()) return false; + if (!data_->model_.enqueue(data_->elements(), data_->header_, context_, std::forward(value))) { + return false; + } + size_.fetch_add(1, std::memory_order_relaxed); + return true; + } + + bool dequeue(value_type &value) noexcept { + if (!valid()) return false; + if (!data_->model_.dequeue(data_->elements(), data_->header_, context_, value)) { + return false; + } + size_.fetch_sub(1, std::memory_order_relaxed); + return true; + } + +public: + data_model(data_model const &) = delete; + data_model(data_model &&) = delete; + data_model &operator=(data_model const &) = delete; + data_model &operator=(data_model &&) = delete; + + ~data_model() noexcept { + if (valid()) { + auto sz = data_->byte_size(); + (void)::LIBIMP::destroy(data_); + data_allocator_.dealloc(data_, sz); + } + } + + template = true> + explicit data_model(index_t circ_size, MR *memory_resource) noexcept + : data_allocator_(memory_resource) + , data_(init(circ_size)) {} + + template = true> + explicit data_model(MR *memory_resource) noexcept + : data_model(default_circle_buffer_size, memory_resource) {} + + explicit data_model(index_t circ_size) noexcept + : data_model(circ_size, ::LIBPMR::new_delete_resource::get()) {} + + data_model() noexcept + : data_model(default_circle_buffer_size) {} + + bool valid() const noexcept { + return (data_ != nullptr) && data_allocator_.valid(); + } + + explicit operator bool() const noexcept { + return valid(); + } + + size_type approx_size() const noexcept { + return size_.load(std::memory_order_relaxed); + } + + bool empty() const noexcept { + return !valid() || (approx_size() == 0); + } +}; + +LIBCONCUR_NAMESPACE_END_ diff --git a/include/libconcur/queue.h b/include/libconcur/queue.h index 544964b..a8daed0 100644 --- a/include/libconcur/queue.h +++ b/include/libconcur/queue.h @@ -6,160 +6,28 @@ */ #pragma once -#include -#include -#include -#include - -#include "libimp/construct.h" -#include "libimp/detect_plat.h" -#include "libimp/aligned.h" - -#include "libpmr/allocator.h" -#include "libpmr/memory_resource.h" - #include "libconcur/def.h" -#include "libconcur/concurrent.h" +#include "libconcur/data_model.h" LIBCONCUR_NAMESPACE_BEG_ template -class queue { -public: - using producer_relation_t = PRelationT; - using consumer_relation_t = CRelationT; - using model_type = prod_cons; - using value_type = T; - using size_type = std::int64_t; +class queue + : public data_model { -private: - struct data { - model_type model_; - typename concur::traits::header header_; - ::LIBIMP::aligned> elements_start_; - - template - data(U &&model) - : header_(std::forward(model)) { - auto elements = this->elements(); - typename decltype(elements)::size_type i = 0; - LIBIMP_TRY { - for (; i < elements.size(); ++i) { - (void)::LIBIMP::construct>(&elements[i]); - } - } LIBIMP_CATCH(...) { - for (decltype(i) k = 0; k < i; ++k) { - (void)::LIBIMP::destroy>(&elements[k]); - } - throw; - } - } - - ~data() noexcept { - for (auto &elem : this->elements()) { - (void)::LIBIMP::destroy>(&elem); - } - } - - static std::size_t size_of(index_t circ_size) noexcept { - return sizeof(struct data) + ( (circ_size - 1) * sizeof(element) ); - } - - std::size_t byte_size() const noexcept { - return size_of(header_.circ_size); - } - - /// \brief element elements[0]; - ::LIBIMP::span> elements() noexcept { - return {elements_start_.ptr(), header_.circ_size}; - } - }; - - data *init(index_t circ_size) noexcept { - if (!data_allocator_) { - return nullptr; - } - void *data_ptr = nullptr; - LIBIMP_TRY { - data_ptr = data_allocator_.alloc(data::size_of(circ_size)); - if (data_ptr == nullptr) { - return nullptr; - } - return ::LIBIMP::construct(data_ptr, circ_size); - } LIBIMP_CATCH(...) { - data_allocator_.dealloc(data_ptr, data::size_of(circ_size)); - return nullptr; - } - } - - ::LIBPMR::allocator data_allocator_; - std::atomic size_ {0}; - data *data_; - typename concur::traits::context context_ {}; + using base_t = data_model; public: - queue(queue const &) = delete; - queue(queue &&) = delete; - queue &operator=(queue const &) = delete; - queue &operator=(queue &&) = delete; - - ~queue() noexcept { - if (valid()) { - auto sz = data_->byte_size(); - (void)::LIBIMP::destroy(data_); - data_allocator_.dealloc(data_, sz); - } - } - - template = true> - explicit queue(index_t circ_size, MR *memory_resource) noexcept - : data_allocator_(memory_resource) - , data_(init(circ_size)) {} - - template = true> - explicit queue(MR *memory_resource) noexcept - : queue(default_circle_buffer_size, memory_resource) {} - - explicit queue(index_t circ_size) noexcept - : queue(circ_size, ::LIBPMR::new_delete_resource::get()) {} - - queue() noexcept - : queue(default_circle_buffer_size) {} - - bool valid() const noexcept { - return (data_ != nullptr) && data_allocator_.valid(); - } - - explicit operator bool() const noexcept { - return valid(); - } - - size_type approx_size() const noexcept { - return size_.load(std::memory_order_relaxed); - } - - bool empty() const noexcept { - return !valid() || (approx_size() == 0); - } + using base_t::base_t; template bool push(U &&value) noexcept { - if (!valid()) return false; - if (!data_->model_.enqueue(data_->elements(), data_->header_, context_, std::forward(value))) { - return false; - } - size_.fetch_add(1, std::memory_order_relaxed); - return true; + return this->enqueue(std::forward(value)); } - bool pop(value_type &value) noexcept { - if (!valid()) return false; - if (!data_->model_.dequeue(data_->elements(), data_->header_, context_, value)) { - return false; - } - size_.fetch_sub(1, std::memory_order_relaxed); - return true; + bool pop(typename base_t::value_type &value) noexcept { + return this->dequeue(value); } }; diff --git a/test/concur/test_concur_bus.cpp b/test/concur/test_concur_bus.cpp index 87776ec..b2933df 100644 --- a/test/concur/test_concur_bus.cpp +++ b/test/concur/test_concur_bus.cpp @@ -2,3 +2,7 @@ #include "gtest/gtest.h" #include "libconcur/bus.h" +#include "libimp/span.h" + +TEST(bus, construct) { +} \ No newline at end of file