#pragma once #include #include #include #include #include "def.h" namespace ipc { namespace circ { template struct alignas(std::max_align_t) elem_array_head { using u1_t = uint_t; using u2_t = uint_t; std::atomic cc_ { 0 }; // connection counter, using for broadcast std::atomic wt_ { 0 }; // write index static u1_t index_of(u2_t c) noexcept { return static_cast(c); } std::size_t connect() noexcept { return cc_.fetch_add(1, std::memory_order_release); } std::size_t disconnect() noexcept { return cc_.fetch_sub(1, std::memory_order_release); } std::size_t conn_count() const noexcept { return cc_.load(std::memory_order_acquire); } u2_t cursor() const noexcept { return wt_.load(std::memory_order_acquire); } auto acquire() noexcept { return index_of(wt_.load(std::memory_order_relaxed)); } void commit() noexcept { wt_.fetch_add(1, std::memory_order_release); } }; template constexpr std::size_t elem_array_head_size = (sizeof(elem_array_head) % alignof(std::max_align_t)) ? ((sizeof(elem_array_head) / alignof(std::max_align_t)) + 1) * alignof(std::max_align_t) : sizeof(elem_array_head); struct elem_head { std::atomic> rc_ { 0 }; // read counter }; template class elem_array : private elem_array_head { public: using base_t = elem_array_head; using head_t = elem_head; using typename base_t::u1_t; using typename base_t::u2_t; enum : std::size_t { head_size = elem_array_head_size, data_size = DataSize, elem_max = (std::numeric_limits::max)() + 1, // default is 255 + 1 elem_size = sizeof(head_t) + DataSize, block_size = elem_size * elem_max }; private: struct elem_t { head_t head_; byte_t data_[data_size] {}; }; elem_t block_[elem_max]; elem_t* elem_start() noexcept { return block_; } static elem_t* elem(void* ptr) noexcept { return reinterpret_cast(static_cast(ptr) - sizeof(head_t)); } elem_t* elem(u1_t i ) noexcept { return elem_start() + i; } public: elem_array() = default; elem_array(const elem_array&) = delete; elem_array& operator=(const elem_array&) = delete; elem_array(elem_array&&) = delete; elem_array& operator=(elem_array&&) = delete; using base_t::connect; using base_t::disconnect; using base_t::conn_count; using base_t::cursor; void* acquire() noexcept { elem_t* el = elem(base_t::acquire()); // check all consumers have finished reading while(1) { uint_t<32> expected = 0, conn_cnt = static_cast>(conn_count()); // acquire if (el->head_.rc_.compare_exchange_weak( expected, conn_cnt, std::memory_order_relaxed)) { break; } std::this_thread::yield(); } return el->data_; } void commit(void* /*ptr*/) noexcept { base_t::commit(); } void* take(u2_t cursor) noexcept { std::atomic_thread_fence(std::memory_order_acquire); return elem(base_t::index_of(cursor))->data_; } void put(void* ptr) noexcept { elem(ptr)->head_.rc_.fetch_sub(1, std::memory_order_release); } }; } // namespace circ } // namespace ipc