#pragma once #include #include #include #include #include "def.h" namespace ipc { namespace circ { struct alignas(std::max_align_t) elem_array_head { using ui_t = std::uint8_t; using uc_t = std::uint16_t; using ac_t = std::atomic; std::atomic lc_ { 0 }; // write spin lock flag ac_t cc_ { 0 }; // connection counter, using for broadcast ac_t wt_ { 0 }; // write index }; enum : std::size_t { elem_array_head_size = (sizeof(elem_array_head) % alignof(std::max_align_t)) ? ((sizeof(elem_array_head) / alignof(std::max_align_t)) + 1) * alignof(std::max_align_t) : sizeof(elem_array_head) }; template class elem_array : private elem_array_head { struct head_t { std::atomic rc_ { 0 }; // read counter }; public: enum : std::size_t { head_size = elem_array_head_size, data_size = DataSize, elem_max = std::numeric_limits::max() + 1, // default is 255 + 1 elem_size = sizeof(head_t) + DataSize, block_size = elem_size * elem_max }; private: struct elem_t { head_t head_; byte_t data_[data_size] {}; }; elem_t block_[elem_max]; elem_t* elem_start(void) { return block_; } static elem_t* elem(void* ptr) { return reinterpret_cast(static_cast(ptr) - sizeof(head_t)); } elem_t* elem(ui_t i) { return elem_start() + i; } static ui_t index_of(uc_t c) { return static_cast(c); } ui_t index_of(elem_t* el) { return static_cast(el - elem_start()); } public: elem_array(void) = default; elem_array(const elem_array&) = delete; elem_array& operator=(const elem_array&) = delete; elem_array(elem_array&&) = delete; elem_array& operator=(elem_array&&) = delete; std::size_t connect(void) { return cc_.fetch_add(1, std::memory_order_release); } std::size_t disconnect(void) { return cc_.fetch_sub(1, std::memory_order_release); } std::size_t conn_count(void) const { return cc_.load(std::memory_order_consume); } void* acquire(void) { while (lc_.exchange(1, std::memory_order_acquire)) { std::this_thread::yield(); } elem_t* el = elem(index_of(wt_.load(std::memory_order_relaxed))); // check all consumers have finished reading while(1) { std::uint32_t expected = 0; if (el->head_.rc_.compare_exchange_weak( expected, static_cast(cc_.load(std::memory_order_relaxed)), std::memory_order_release)) { break; } std::this_thread::yield(); std::atomic_thread_fence(std::memory_order_acquire); } return el->data_; } void commit(void* /*ptr*/) { wt_.fetch_add(1, std::memory_order_relaxed); lc_.store(0, std::memory_order_release); } uc_t cursor(void) const { return wt_.load(std::memory_order_consume); } void* take(uc_t cursor) { return elem(index_of(cursor))->data_; } void put(void* ptr) { elem(ptr)->head_.rc_.fetch_sub(1, std::memory_order_release); } }; } // namespace circ } // namespace ipc