#pragma once #include #include #include #include #include #include #include namespace ipc { using byte_t = std::uint8_t; namespace circ { struct alignas(std::max_align_t) elem_array_head { using ui_t = std::uint8_t; using uc_t = std::uint16_t; using ai_t = std::atomic; using ac_t = std::atomic; ac_t cc_ { 0 }; // connection counter, using for broadcast ac_t cr_ { 0 }; // cursor ai_t wt_ { 0 }; // write index }; enum : std::size_t { elem_array_head_size = (sizeof(elem_array_head) % alignof(std::max_align_t)) ? ((sizeof(elem_array_head) / alignof(std::max_align_t)) + 1) * alignof(std::max_align_t) : sizeof(elem_array_head) }; template class elem_array : private elem_array_head { struct head_t { ac_t rf_; // read flag ac_t wf_; // write flag }; public: enum : std::size_t { head_size = elem_array_head_size, data_size = DataSize, elem_max = std::numeric_limits::max() + 1, // default is 255 + 1 elem_size = sizeof(head_t) + DataSize, block_size = elem_size * elem_max }; static_assert(data_size % alignof(head_t) == 0, "data_size must be multiple of alignof(head_t)"); private: byte_t block_[block_size]; struct elem_t { head_t head_; byte_t data_[data_size]; }; elem_t* elem_start(void) { return reinterpret_cast(block_); } static elem_t* elem(void* ptr) { return reinterpret_cast(static_cast(ptr) - sizeof(head_t)); } elem_t* elem(ui_t i) { return elem_start() + i; } static ui_t index_of(uc_t c) { return static_cast(c & std::numeric_limits::max()); } ui_t index_of(elem_t* el) { return static_cast(el - elem_start()); } public: elem_array(void) { ::memset(block_, 0, sizeof(block_)); } ~elem_array(void) = delete; elem_array(const elem_array&) = delete; elem_array& operator=(const elem_array&) = delete; elem_array(elem_array&&) = delete; elem_array& operator=(elem_array&&) = delete; std::size_t connect(void) { return cc_.fetch_add(1, std::memory_order_release); } std::size_t disconnect(void) { return cc_.fetch_sub(1, std::memory_order_release); } std::size_t conn_count(void) const { return cc_.load(std::memory_order_consume); } void* acquire(void) { auto el = elem(wt_.fetch_add(1, std::memory_order_consume)); // check read finished by all consumers do { uc_t expected = 0; if (el->head_.rf_.compare_exchange_weak( expected, static_cast(conn_count()), std::memory_order_consume, std::memory_order_relaxed)) { break; } } while(1); return el->data_; } void commit(void* ptr) { auto el = elem(ptr); // get the commit element ui_t cm = index_of(el); // get the index of this element do { bool no_next_check; uc_t curr; do { curr = cr_.load(std::memory_order_relaxed); no_next_check = (index_of(curr) != cm); if (no_next_check) { /* * set wf_ for the other producer thread which is commiting * the element matches cr_ could see it has commited */ el->head_.wf_.store(1, std::memory_order_relaxed); } else { /* * no thread changes the cr_ except current thread at here * so we could just fetch_add & break, no need to check cr_ again */ cr_.fetch_add(1, std::memory_order_relaxed); el->head_.wf_.store(0, std::memory_order_release); no_next_check = false; break; } /* * it needs to go back and judge again * when cr_ has been changed by the other producer thread */ } while(curr != cr_.load(std::memory_order_acq_rel)); // check next element has commited or not if (no_next_check) return; } while(el = elem(++cm), el->head_.wf_.load(std::memory_order_consume)); } uc_t cursor(void) const { return cr_.load(std::memory_order_consume); } void* take(uc_t cursor) { return elem(index_of(cursor))->data_; } void put(void* ptr) { elem(ptr)->head_.rf_.fetch_sub(1, std::memory_order_release); } }; } // namespace circ } // namespace ipc