diff --git a/build/src.pro b/build/src.pro index faffb18..1ef35f2 100644 --- a/build/src.pro +++ b/build/src.pro @@ -15,7 +15,7 @@ INCLUDEPATH += \ HEADERS += \ ../include/export.h \ ../include/shm.h \ - ../src/circ_queue.h + ../src/circ_elem_array.h SOURCES += \ ../src/shm.cpp diff --git a/build/test.pro b/build/test.pro index fe568b6..1cf95bb 100644 --- a/build/test.pro +++ b/build/test.pro @@ -17,6 +17,6 @@ HEADERS += \ SOURCES += \ ../test/main.cpp \ ../test/test_shm.cpp \ - ../test/test_circ_queue.cpp + ../test/test_circ_elem_array.cpp LIBS += -L$${DESTDIR} -lipc diff --git a/src/circ_elem_array.h b/src/circ_elem_array.h new file mode 100644 index 0000000..38f6e9c --- /dev/null +++ b/src/circ_elem_array.h @@ -0,0 +1,155 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +namespace ipc { + +using byte_t = std::uint8_t; + +namespace circ { + +struct alignas(std::max_align_t) elem_array_head { + using ui_t = std::uint8_t; + using uc_t = std::uint16_t; + using ai_t = std::atomic; + using ac_t = std::atomic; + + ac_t cc_ { 0 }; // connection counter, using for broadcast + ac_t cr_ { 0 }; // cursor + ai_t wt_ { 0 }; // write index +}; + +enum : std::size_t { + elem_array_head_size = + (sizeof(elem_array_head) % alignof(std::max_align_t)) ? + ((sizeof(elem_array_head) / alignof(std::max_align_t)) + 1) * alignof(std::max_align_t) : + sizeof(elem_array_head) +}; + +template +class elem_array : private elem_array_head { + struct head_t { + ac_t rf_; // read flag + ac_t wf_; // write flag + }; + +public: + enum : std::size_t { + head_size = elem_array_head_size, + data_size = DataSize, + elem_max = std::numeric_limits::max() + 1, // default is 255 + 1 + elem_size = sizeof(head_t) + DataSize, + block_size = elem_size * elem_max + }; + + static_assert(data_size % alignof(head_t) == 0, "data_size must be multiple of alignof(head_t)"); + +private: + byte_t block_[block_size]; + + struct elem_t { + head_t head_; + byte_t data_[data_size]; + }; + + elem_t* elem_start(void) { + return reinterpret_cast(block_); + } + + static elem_t* elem(void* ptr) { + return reinterpret_cast(static_cast(ptr) - sizeof(head_t)); + } + + elem_t* elem(ui_t i) { + return elem_start() + i; + } + + static ui_t index_of(uc_t c) { + return static_cast(c & std::numeric_limits::max()); + } + + ui_t index_of(elem_t* el) { + return static_cast(el - elem_start()); + } + +public: + elem_array(void) { + ::memset(block_, 0, sizeof(block_)); + } + ~elem_array(void) = delete; + + elem_array(const elem_array&) = delete; + elem_array& operator=(const elem_array&) = delete; + elem_array(elem_array&&) = delete; + elem_array& operator=(elem_array&&) = delete; + + std::size_t connect(void) { + return cc_.fetch_add(1, std::memory_order_release); + } + + std::size_t disconnect(void) { + return cc_.fetch_sub(1, std::memory_order_release); + } + + std::size_t conn_count(void) const { + return cc_.load(std::memory_order_consume); + } + + void* acquire(void) { + auto el = elem(wt_.fetch_add(1, std::memory_order_consume)); + // check read flag + do { + uc_t expected = 0; + if (el->head_.rf_.compare_exchange_weak( + expected, static_cast(conn_count()), + std::memory_order_consume, std::memory_order_relaxed)) { + break; + } + } while(1); + return el->data_; + } + + void commit(void* ptr) { + auto el = elem(ptr); + ui_t wt = index_of(el); + do { + bool no_next; + uc_t curr; + do { + curr = cr_.load(std::memory_order_relaxed); + no_next = (index_of(curr) != wt); + if (no_next) { + el->head_.wf_.store(1, std::memory_order_relaxed); + } + else { + cr_.fetch_add(1, std::memory_order_relaxed); + el->head_.wf_.store(0, std::memory_order_release); + no_next = false; + break; + } + } while(curr != cr_.load(std::memory_order_acq_rel)); + if (no_next) return; + } while(el = elem(++wt), el->head_.wf_.load(std::memory_order_consume)); + } + + uc_t cursor(void) const { + return cr_.load(std::memory_order_consume); + } + + void* take(uc_t cursor) { + return elem(index_of(cursor))->data_; + } + + void put(void* ptr) { + elem(ptr)->head_.rf_.fetch_sub(1, std::memory_order_release); + } +}; + +} // namespace circ +} // namespace ipc diff --git a/src/circ_queue.h b/src/circ_queue.h deleted file mode 100644 index 78e7273..0000000 --- a/src/circ_queue.h +++ /dev/null @@ -1,117 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -namespace ipc { - -struct circ_queue_head { - using ui_t = std::atomic; - using el_t = std::atomic; // element head - - ui_t cc_ { 0 }; // connection counter - ui_t rd_ { 0 }; // read cursor - ui_t wt_ { 0 }; // write cursor -}; - -template -class circ_queue : private circ_queue_head { -public: - enum : std::size_t { - total_size = Size, - head_size = sizeof(circ_queue_head), - block_size = Size - head_size, - elem_max = std::numeric_limits::max(), - elem_size = (Size / (elem_max + 1)), - data_size = elem_size - sizeof(el_t) - }; - - static_assert(Size > head_size , "Size must > head_size"); - static_assert(elem_size >= head_size , "elem_size must >= head_size"); - static_assert(elem_size > sizeof(el_t), "elem_size must > sizeof(el_t)"); - static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size"); - -private: - struct elem_t { - el_t head_; - std::uint8_t data_[data_size]; - }; - - elem_t* elem_start(void) { - return reinterpret_cast(this) + 1; - } - - static std::uint8_t id(std::uint16_t i) { - return i & 0x00ff; - } - - static elem_t* elem(void* ptr) { - return reinterpret_cast(static_cast(ptr) - sizeof(el_t)); - } - - std::uint8_t block_[block_size]; - -public: - static std::uint16_t next(std::uint16_t i) { - return (id(++i) == elem_max) ? ++i : i; - } - - circ_queue(void) { - ::memset(block_, 0, sizeof(block_)); - } - ~circ_queue(void) = delete; - - circ_queue(const circ_queue&) = delete; - circ_queue& operator=(const circ_queue&) = delete; - circ_queue(circ_queue&&) = delete; - circ_queue& operator=(circ_queue&&) = delete; - - std::size_t connect(void) { - return cc_.fetch_add(1, std::memory_order_release); - } - - std::size_t disconnect(void) { - return cc_.fetch_sub(1, std::memory_order_release); - } - - std::size_t conn_count(void) const { - return cc_.load(std::memory_order_consume); - } - - void* acquire(void) { - auto st = elem_start() + id(wt_.load(std::memory_order_relaxed)); - // check remain count of consumers - do { - std::size_t expected = 0; - if (st->head_.compare_exchange_weak(expected, conn_count(), - std::memory_order_consume, std::memory_order_relaxed)) { - break; - } - } while(1); - return st->data_; - } - - void commit(void) { - wt_.store(next(wt_.load(std::memory_order_relaxed)), std::memory_order_release); - } - - std::uint16_t cursor(void) const { - return wt_.load(std::memory_order_consume); - } - - void* take(std::uint16_t index) { - return (elem_start() + id(index))->data_; - } - - void put(void* ptr) { - auto st = elem(ptr); - st->head_.fetch_sub(1, std::memory_order_release); - } -}; - -} // namespace ipc diff --git a/test/test_circ_queue.cpp b/test/test_circ_elem_array.cpp similarity index 54% rename from test/test_circ_queue.cpp rename to test/test_circ_elem_array.cpp index d78e23c..80e64d2 100644 --- a/test/test_circ_queue.cpp +++ b/test/test_circ_elem_array.cpp @@ -6,7 +6,7 @@ #include #include -#include "circ_queue.h" +#include "circ_elem_array.h" #include "test.h" #include "stopwatch.hpp" @@ -17,30 +17,43 @@ class Unit : public TestSuite { private slots: void test_inst(void); - void test_producer(void); + void test_prod_cons_1vN(void); } unit__; -#include "test_circ_queue.moc" +#include "test_circ_elem_array.moc" -using cq_t = ipc::circ_queue<4096>; +using cq_t = ipc::circ::elem_array<12>; cq_t* cq__; void Unit::test_inst(void) { + std::cout << "cq_t::head_size = " << cq_t::head_size << std::endl; + std::cout << "cq_t::data_size = " << cq_t::data_size << std::endl; + std::cout << "cq_t::elem_size = " << cq_t::elem_size << std::endl; + std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl; + + QCOMPARE(cq_t::data_size , 12); + QCOMPARE(cq_t::block_size, 4096); + QCOMPARE(sizeof(cq_t), cq_t::block_size + cq_t::head_size); + cq__ = new cq_t; - QCOMPARE(sizeof(*cq__), static_cast(cq_t::total_size)); + std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl; + auto a = cq__->take(1); auto b = cq__->take(2); - QCOMPARE(static_cast(static_cast(b) - - static_cast(a)), + QCOMPARE(static_cast(static_cast(b) - + static_cast(a)), static_cast(cq_t::elem_size)); } -void Unit::test_producer(void) { +void Unit::test_prod_cons_1vN(void) { ::new (cq__) cq_t; - std::thread consumers[3]; + std::thread consumers[1]; + std::atomic_int fini { 0 }; + capo::stopwatch<> sw; + constexpr static int loops = 1000000; for (auto& c : consumers) { - c = std::thread{[&c] { + c = std::thread{[&] { auto cur = cq__->cursor(); std::cout << "start consumer " << &c << ": cur = " << (int)cur << std::endl; @@ -55,43 +68,41 @@ void Unit::test_producer(void) { auto p = static_cast(cq__->take(cur)); int d = *p; cq__->put(p); - if (d < 0) return; - cur = cq__->next(cur); + if (d < 0) goto finished; + ++cur; list.push_back(d); } - for (int d : list) { - QCOMPARE(i, d); - ++i; - } - list.clear(); } while(1); + finished: + if (++fini == std::extent::value) { + auto ts = sw.elapsed(); + std::cout << "performance: " << (double(ts) / double(loops)) << " us/d" << std::endl; + } + for (int d : list) { + QCOMPARE(i, d); + ++i; + } }}; } while (cq__->conn_count() != std::extent::value) { std::this_thread::yield(); } - capo::stopwatch<> sw; - constexpr static int loops = 1000000; std::cout << "start producer..." << std::endl; sw.start(); for (int i = 0; i < loops; ++i) { auto d = static_cast(cq__->acquire()); *d = i; - cq__->commit(); + cq__->commit(d); } auto d = static_cast(cq__->acquire()); *d = -1; - cq__->commit(); + cq__->commit(d); for (auto& c : consumers) { c.join(); } - - auto ts = sw.elapsed(); - std::cout << "time spent : " << (ts / 1000) << " ms" << std::endl; - std::cout << "performance: " << (double(ts) / double(loops)) << " us/msg" << std::endl; } } // internal-linkage