#pragma once #include #include #include #include #include "def.h" namespace ipc { namespace circ { template struct alignas(std::max_align_t) elem_array_head { using u1_t = uint_t; using u2_t = uint_t; std::atomic cc_ { 0 }; // connection counter, using for broadcast std::atomic wt_ { 0 }; // write index std::atomic lc_ { 0 }; // write spin lock flag static u1_t index_of(u2_t c) { return static_cast(c); } std::size_t connect(void) { return cc_.fetch_add(1, std::memory_order_release); } std::size_t disconnect(void) { return cc_.fetch_sub(1, std::memory_order_release); } std::size_t conn_count(void) const { return cc_.load(std::memory_order_acquire); } u2_t cursor(void) const { return wt_.load(std::memory_order_acquire); } auto acquire(void) { while (lc_.exchange(1, std::memory_order_acquire)) { std::this_thread::yield(); } return index_of(wt_.load(std::memory_order_relaxed)); } void commit(void) { wt_.fetch_add(1, std::memory_order_relaxed); lc_.store(0, std::memory_order_release); } }; template constexpr std::size_t elem_array_head_size = (sizeof(elem_array_head) % alignof(std::max_align_t)) ? ((sizeof(elem_array_head) / alignof(std::max_align_t)) + 1) * alignof(std::max_align_t) : sizeof(elem_array_head); struct elem_head { std::atomic> rc_ { 0 }; // read counter }; template class elem_array : private elem_array_head { public: using base_t = elem_array_head; using head_t = elem_head; using typename base_t::u1_t; using typename base_t::u2_t; enum : std::size_t { head_size = elem_array_head_size, data_size = DataSize, elem_max = std::numeric_limits::max() + 1, // default is 255 + 1 elem_size = sizeof(head_t) + DataSize, block_size = elem_size * elem_max }; private: struct elem_t { head_t head_; byte_t data_[data_size] {}; }; elem_t block_[elem_max]; elem_t* elem_start(void) { return block_; } static elem_t* elem(void* ptr) { return reinterpret_cast(static_cast(ptr) - sizeof(head_t)); } elem_t* elem(u1_t i ) { return elem_start() + i; } public: elem_array(void) = default; elem_array(const elem_array&) = delete; elem_array& operator=(const elem_array&) = delete; elem_array(elem_array&&) = delete; elem_array& operator=(elem_array&&) = delete; using base_t::connect; using base_t::disconnect; using base_t::conn_count; using base_t::cursor; void* acquire(void) { elem_t* el = elem(base_t::acquire()); // check all consumers have finished reading while(1) { uint_t<32> expected = 0; if (el->head_.rc_.compare_exchange_weak( expected, static_cast>(conn_count()), std::memory_order_release)) { break; } std::this_thread::yield(); } return el->data_; } void commit(void* /*ptr*/) { base_t::commit(); } void* take(u2_t cursor) { return elem(base_t::index_of(cursor))->data_; } void put(void* ptr) { elem(ptr)->head_.rc_.fetch_sub(1, std::memory_order_release); } }; } // namespace circ } // namespace ipc