mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-07 01:06:45 +08:00
rename: circ_queue.h => circ_elem_array.h; support N:M (TBD)
This commit is contained in:
parent
6802d12912
commit
3e7c97d9b6
@ -15,7 +15,7 @@ INCLUDEPATH += \
|
|||||||
HEADERS += \
|
HEADERS += \
|
||||||
../include/export.h \
|
../include/export.h \
|
||||||
../include/shm.h \
|
../include/shm.h \
|
||||||
../src/circ_queue.h
|
../src/circ_elem_array.h
|
||||||
|
|
||||||
SOURCES += \
|
SOURCES += \
|
||||||
../src/shm.cpp
|
../src/shm.cpp
|
||||||
|
|||||||
@ -17,6 +17,6 @@ HEADERS += \
|
|||||||
SOURCES += \
|
SOURCES += \
|
||||||
../test/main.cpp \
|
../test/main.cpp \
|
||||||
../test/test_shm.cpp \
|
../test/test_shm.cpp \
|
||||||
../test/test_circ_queue.cpp
|
../test/test_circ_elem_array.cpp
|
||||||
|
|
||||||
LIBS += -L$${DESTDIR} -lipc
|
LIBS += -L$${DESTDIR} -lipc
|
||||||
|
|||||||
155
src/circ_elem_array.h
Normal file
155
src/circ_elem_array.h
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <cstring>
|
||||||
|
#include <atomic>
|
||||||
|
#include <utility>
|
||||||
|
#include <limits>
|
||||||
|
#include <algorithm>
|
||||||
|
|
||||||
|
namespace ipc {
|
||||||
|
|
||||||
|
using byte_t = std::uint8_t;
|
||||||
|
|
||||||
|
namespace circ {
|
||||||
|
|
||||||
|
struct alignas(std::max_align_t) elem_array_head {
|
||||||
|
using ui_t = std::uint8_t;
|
||||||
|
using uc_t = std::uint16_t;
|
||||||
|
using ai_t = std::atomic<ui_t>;
|
||||||
|
using ac_t = std::atomic<uc_t>;
|
||||||
|
|
||||||
|
ac_t cc_ { 0 }; // connection counter, using for broadcast
|
||||||
|
ac_t cr_ { 0 }; // cursor
|
||||||
|
ai_t wt_ { 0 }; // write index
|
||||||
|
};
|
||||||
|
|
||||||
|
enum : std::size_t {
|
||||||
|
elem_array_head_size =
|
||||||
|
(sizeof(elem_array_head) % alignof(std::max_align_t)) ?
|
||||||
|
((sizeof(elem_array_head) / alignof(std::max_align_t)) + 1) * alignof(std::max_align_t) :
|
||||||
|
sizeof(elem_array_head)
|
||||||
|
};
|
||||||
|
|
||||||
|
template <std::size_t DataSize>
|
||||||
|
class elem_array : private elem_array_head {
|
||||||
|
struct head_t {
|
||||||
|
ac_t rf_; // read flag
|
||||||
|
ac_t wf_; // write flag
|
||||||
|
};
|
||||||
|
|
||||||
|
public:
|
||||||
|
enum : std::size_t {
|
||||||
|
head_size = elem_array_head_size,
|
||||||
|
data_size = DataSize,
|
||||||
|
elem_max = std::numeric_limits<ui_t>::max() + 1, // default is 255 + 1
|
||||||
|
elem_size = sizeof(head_t) + DataSize,
|
||||||
|
block_size = elem_size * elem_max
|
||||||
|
};
|
||||||
|
|
||||||
|
static_assert(data_size % alignof(head_t) == 0, "data_size must be multiple of alignof(head_t)");
|
||||||
|
|
||||||
|
private:
|
||||||
|
byte_t block_[block_size];
|
||||||
|
|
||||||
|
struct elem_t {
|
||||||
|
head_t head_;
|
||||||
|
byte_t data_[data_size];
|
||||||
|
};
|
||||||
|
|
||||||
|
elem_t* elem_start(void) {
|
||||||
|
return reinterpret_cast<elem_t*>(block_);
|
||||||
|
}
|
||||||
|
|
||||||
|
static elem_t* elem(void* ptr) {
|
||||||
|
return reinterpret_cast<elem_t*>(static_cast<byte_t*>(ptr) - sizeof(head_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
elem_t* elem(ui_t i) {
|
||||||
|
return elem_start() + i;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ui_t index_of(uc_t c) {
|
||||||
|
return static_cast<ui_t>(c & std::numeric_limits<ui_t>::max());
|
||||||
|
}
|
||||||
|
|
||||||
|
ui_t index_of(elem_t* el) {
|
||||||
|
return static_cast<ui_t>(el - elem_start());
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
elem_array(void) {
|
||||||
|
::memset(block_, 0, sizeof(block_));
|
||||||
|
}
|
||||||
|
~elem_array(void) = delete;
|
||||||
|
|
||||||
|
elem_array(const elem_array&) = delete;
|
||||||
|
elem_array& operator=(const elem_array&) = delete;
|
||||||
|
elem_array(elem_array&&) = delete;
|
||||||
|
elem_array& operator=(elem_array&&) = delete;
|
||||||
|
|
||||||
|
std::size_t connect(void) {
|
||||||
|
return cc_.fetch_add(1, std::memory_order_release);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t disconnect(void) {
|
||||||
|
return cc_.fetch_sub(1, std::memory_order_release);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::size_t conn_count(void) const {
|
||||||
|
return cc_.load(std::memory_order_consume);
|
||||||
|
}
|
||||||
|
|
||||||
|
void* acquire(void) {
|
||||||
|
auto el = elem(wt_.fetch_add(1, std::memory_order_consume));
|
||||||
|
// check read flag
|
||||||
|
do {
|
||||||
|
uc_t expected = 0;
|
||||||
|
if (el->head_.rf_.compare_exchange_weak(
|
||||||
|
expected, static_cast<uc_t>(conn_count()),
|
||||||
|
std::memory_order_consume, std::memory_order_relaxed)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} while(1);
|
||||||
|
return el->data_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void commit(void* ptr) {
|
||||||
|
auto el = elem(ptr);
|
||||||
|
ui_t wt = index_of(el);
|
||||||
|
do {
|
||||||
|
bool no_next;
|
||||||
|
uc_t curr;
|
||||||
|
do {
|
||||||
|
curr = cr_.load(std::memory_order_relaxed);
|
||||||
|
no_next = (index_of(curr) != wt);
|
||||||
|
if (no_next) {
|
||||||
|
el->head_.wf_.store(1, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
cr_.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
el->head_.wf_.store(0, std::memory_order_release);
|
||||||
|
no_next = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} while(curr != cr_.load(std::memory_order_acq_rel));
|
||||||
|
if (no_next) return;
|
||||||
|
} while(el = elem(++wt), el->head_.wf_.load(std::memory_order_consume));
|
||||||
|
}
|
||||||
|
|
||||||
|
uc_t cursor(void) const {
|
||||||
|
return cr_.load(std::memory_order_consume);
|
||||||
|
}
|
||||||
|
|
||||||
|
void* take(uc_t cursor) {
|
||||||
|
return elem(index_of(cursor))->data_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void put(void* ptr) {
|
||||||
|
elem(ptr)->head_.rf_.fetch_sub(1, std::memory_order_release);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace circ
|
||||||
|
} // namespace ipc
|
||||||
117
src/circ_queue.h
117
src/circ_queue.h
@ -1,117 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <cstddef>
|
|
||||||
#include <cstdint>
|
|
||||||
#include <cstring>
|
|
||||||
#include <atomic>
|
|
||||||
#include <utility>
|
|
||||||
#include <limits>
|
|
||||||
#include <algorithm>
|
|
||||||
|
|
||||||
namespace ipc {
|
|
||||||
|
|
||||||
struct circ_queue_head {
|
|
||||||
using ui_t = std::atomic<std::uint16_t>;
|
|
||||||
using el_t = std::atomic<std::size_t>; // element head
|
|
||||||
|
|
||||||
ui_t cc_ { 0 }; // connection counter
|
|
||||||
ui_t rd_ { 0 }; // read cursor
|
|
||||||
ui_t wt_ { 0 }; // write cursor
|
|
||||||
};
|
|
||||||
|
|
||||||
template <std::size_t Size>
|
|
||||||
class circ_queue : private circ_queue_head {
|
|
||||||
public:
|
|
||||||
enum : std::size_t {
|
|
||||||
total_size = Size,
|
|
||||||
head_size = sizeof(circ_queue_head),
|
|
||||||
block_size = Size - head_size,
|
|
||||||
elem_max = std::numeric_limits<std::uint8_t>::max(),
|
|
||||||
elem_size = (Size / (elem_max + 1)),
|
|
||||||
data_size = elem_size - sizeof(el_t)
|
|
||||||
};
|
|
||||||
|
|
||||||
static_assert(Size > head_size , "Size must > head_size");
|
|
||||||
static_assert(elem_size >= head_size , "elem_size must >= head_size");
|
|
||||||
static_assert(elem_size > sizeof(el_t), "elem_size must > sizeof(el_t)");
|
|
||||||
static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size");
|
|
||||||
|
|
||||||
private:
|
|
||||||
struct elem_t {
|
|
||||||
el_t head_;
|
|
||||||
std::uint8_t data_[data_size];
|
|
||||||
};
|
|
||||||
|
|
||||||
elem_t* elem_start(void) {
|
|
||||||
return reinterpret_cast<elem_t*>(this) + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static std::uint8_t id(std::uint16_t i) {
|
|
||||||
return i & 0x00ff;
|
|
||||||
}
|
|
||||||
|
|
||||||
static elem_t* elem(void* ptr) {
|
|
||||||
return reinterpret_cast<elem_t*>(static_cast<std::uint8_t*>(ptr) - sizeof(el_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::uint8_t block_[block_size];
|
|
||||||
|
|
||||||
public:
|
|
||||||
static std::uint16_t next(std::uint16_t i) {
|
|
||||||
return (id(++i) == elem_max) ? ++i : i;
|
|
||||||
}
|
|
||||||
|
|
||||||
circ_queue(void) {
|
|
||||||
::memset(block_, 0, sizeof(block_));
|
|
||||||
}
|
|
||||||
~circ_queue(void) = delete;
|
|
||||||
|
|
||||||
circ_queue(const circ_queue&) = delete;
|
|
||||||
circ_queue& operator=(const circ_queue&) = delete;
|
|
||||||
circ_queue(circ_queue&&) = delete;
|
|
||||||
circ_queue& operator=(circ_queue&&) = delete;
|
|
||||||
|
|
||||||
std::size_t connect(void) {
|
|
||||||
return cc_.fetch_add(1, std::memory_order_release);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::size_t disconnect(void) {
|
|
||||||
return cc_.fetch_sub(1, std::memory_order_release);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::size_t conn_count(void) const {
|
|
||||||
return cc_.load(std::memory_order_consume);
|
|
||||||
}
|
|
||||||
|
|
||||||
void* acquire(void) {
|
|
||||||
auto st = elem_start() + id(wt_.load(std::memory_order_relaxed));
|
|
||||||
// check remain count of consumers
|
|
||||||
do {
|
|
||||||
std::size_t expected = 0;
|
|
||||||
if (st->head_.compare_exchange_weak(expected, conn_count(),
|
|
||||||
std::memory_order_consume, std::memory_order_relaxed)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} while(1);
|
|
||||||
return st->data_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void commit(void) {
|
|
||||||
wt_.store(next(wt_.load(std::memory_order_relaxed)), std::memory_order_release);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::uint16_t cursor(void) const {
|
|
||||||
return wt_.load(std::memory_order_consume);
|
|
||||||
}
|
|
||||||
|
|
||||||
void* take(std::uint16_t index) {
|
|
||||||
return (elem_start() + id(index))->data_;
|
|
||||||
}
|
|
||||||
|
|
||||||
void put(void* ptr) {
|
|
||||||
auto st = elem(ptr);
|
|
||||||
st->head_.fetch_sub(1, std::memory_order_release);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace ipc
|
|
||||||
@ -6,7 +6,7 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
#include "circ_queue.h"
|
#include "circ_elem_array.h"
|
||||||
#include "test.h"
|
#include "test.h"
|
||||||
#include "stopwatch.hpp"
|
#include "stopwatch.hpp"
|
||||||
|
|
||||||
@ -17,30 +17,43 @@ class Unit : public TestSuite {
|
|||||||
|
|
||||||
private slots:
|
private slots:
|
||||||
void test_inst(void);
|
void test_inst(void);
|
||||||
void test_producer(void);
|
void test_prod_cons_1vN(void);
|
||||||
} unit__;
|
} unit__;
|
||||||
|
|
||||||
#include "test_circ_queue.moc"
|
#include "test_circ_elem_array.moc"
|
||||||
|
|
||||||
using cq_t = ipc::circ_queue<4096>;
|
using cq_t = ipc::circ::elem_array<12>;
|
||||||
cq_t* cq__;
|
cq_t* cq__;
|
||||||
|
|
||||||
void Unit::test_inst(void) {
|
void Unit::test_inst(void) {
|
||||||
|
std::cout << "cq_t::head_size = " << cq_t::head_size << std::endl;
|
||||||
|
std::cout << "cq_t::data_size = " << cq_t::data_size << std::endl;
|
||||||
|
std::cout << "cq_t::elem_size = " << cq_t::elem_size << std::endl;
|
||||||
|
std::cout << "cq_t::block_size = " << cq_t::block_size << std::endl;
|
||||||
|
|
||||||
|
QCOMPARE(cq_t::data_size , 12);
|
||||||
|
QCOMPARE(cq_t::block_size, 4096);
|
||||||
|
QCOMPARE(sizeof(cq_t), cq_t::block_size + cq_t::head_size);
|
||||||
|
|
||||||
cq__ = new cq_t;
|
cq__ = new cq_t;
|
||||||
QCOMPARE(sizeof(*cq__), static_cast<std::size_t>(cq_t::total_size));
|
std::cout << "sizeof(ipc::circ::elem_array<4096>) = " << sizeof(*cq__) << std::endl;
|
||||||
|
|
||||||
auto a = cq__->take(1);
|
auto a = cq__->take(1);
|
||||||
auto b = cq__->take(2);
|
auto b = cq__->take(2);
|
||||||
QCOMPARE(static_cast<std::size_t>(static_cast<std::uint8_t const *>(b) -
|
QCOMPARE(static_cast<std::size_t>(static_cast<ipc::byte_t*>(b) -
|
||||||
static_cast<std::uint8_t const *>(a)),
|
static_cast<ipc::byte_t*>(a)),
|
||||||
static_cast<std::size_t>(cq_t::elem_size));
|
static_cast<std::size_t>(cq_t::elem_size));
|
||||||
}
|
}
|
||||||
|
|
||||||
void Unit::test_producer(void) {
|
void Unit::test_prod_cons_1vN(void) {
|
||||||
::new (cq__) cq_t;
|
::new (cq__) cq_t;
|
||||||
std::thread consumers[3];
|
std::thread consumers[1];
|
||||||
|
std::atomic_int fini { 0 };
|
||||||
|
capo::stopwatch<> sw;
|
||||||
|
constexpr static int loops = 1000000;
|
||||||
|
|
||||||
for (auto& c : consumers) {
|
for (auto& c : consumers) {
|
||||||
c = std::thread{[&c] {
|
c = std::thread{[&] {
|
||||||
auto cur = cq__->cursor();
|
auto cur = cq__->cursor();
|
||||||
std::cout << "start consumer " << &c << ": cur = " << (int)cur << std::endl;
|
std::cout << "start consumer " << &c << ": cur = " << (int)cur << std::endl;
|
||||||
|
|
||||||
@ -55,43 +68,41 @@ void Unit::test_producer(void) {
|
|||||||
auto p = static_cast<int*>(cq__->take(cur));
|
auto p = static_cast<int*>(cq__->take(cur));
|
||||||
int d = *p;
|
int d = *p;
|
||||||
cq__->put(p);
|
cq__->put(p);
|
||||||
if (d < 0) return;
|
if (d < 0) goto finished;
|
||||||
cur = cq__->next(cur);
|
++cur;
|
||||||
list.push_back(d);
|
list.push_back(d);
|
||||||
}
|
}
|
||||||
for (int d : list) {
|
|
||||||
QCOMPARE(i, d);
|
|
||||||
++i;
|
|
||||||
}
|
|
||||||
list.clear();
|
|
||||||
} while(1);
|
} while(1);
|
||||||
|
finished:
|
||||||
|
if (++fini == std::extent<decltype(consumers)>::value) {
|
||||||
|
auto ts = sw.elapsed<std::chrono::microseconds>();
|
||||||
|
std::cout << "performance: " << (double(ts) / double(loops)) << " us/d" << std::endl;
|
||||||
|
}
|
||||||
|
for (int d : list) {
|
||||||
|
QCOMPARE(i, d);
|
||||||
|
++i;
|
||||||
|
}
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
|
|
||||||
while (cq__->conn_count() != std::extent<decltype(consumers)>::value) {
|
while (cq__->conn_count() != std::extent<decltype(consumers)>::value) {
|
||||||
std::this_thread::yield();
|
std::this_thread::yield();
|
||||||
}
|
}
|
||||||
capo::stopwatch<> sw;
|
|
||||||
constexpr static int loops = 1000000;
|
|
||||||
|
|
||||||
std::cout << "start producer..." << std::endl;
|
std::cout << "start producer..." << std::endl;
|
||||||
sw.start();
|
sw.start();
|
||||||
for (int i = 0; i < loops; ++i) {
|
for (int i = 0; i < loops; ++i) {
|
||||||
auto d = static_cast<int*>(cq__->acquire());
|
auto d = static_cast<int*>(cq__->acquire());
|
||||||
*d = i;
|
*d = i;
|
||||||
cq__->commit();
|
cq__->commit(d);
|
||||||
}
|
}
|
||||||
auto d = static_cast<int*>(cq__->acquire());
|
auto d = static_cast<int*>(cq__->acquire());
|
||||||
*d = -1;
|
*d = -1;
|
||||||
cq__->commit();
|
cq__->commit(d);
|
||||||
|
|
||||||
for (auto& c : consumers) {
|
for (auto& c : consumers) {
|
||||||
c.join();
|
c.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto ts = sw.elapsed<std::chrono::microseconds>();
|
|
||||||
std::cout << "time spent : " << (ts / 1000) << " ms" << std::endl;
|
|
||||||
std::cout << "performance: " << (double(ts) / double(loops)) << " us/msg" << std::endl;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // internal-linkage
|
} // internal-linkage
|
||||||
Loading…
x
Reference in New Issue
Block a user