mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
add ut for circ_queue
This commit is contained in:
parent
17375bb32c
commit
19b40e23cb
@ -1,101 +0,0 @@
|
||||
|
||||
#include "circ_queue.h"
|
||||
|
||||
namespace ipc {
|
||||
|
||||
class circ_queue_ {
|
||||
public:
|
||||
};
|
||||
|
||||
circ_queue::circ_queue(void)
|
||||
: p_(new circ_queue_) {
|
||||
}
|
||||
|
||||
circ_queue::circ_queue(void* mem, std::size_t size)
|
||||
: circ_queue() {
|
||||
attach(mem, size);
|
||||
}
|
||||
|
||||
circ_queue::circ_queue(circ_queue&& rhs)
|
||||
: circ_queue() {
|
||||
swap(rhs);
|
||||
}
|
||||
|
||||
circ_queue::~circ_queue(void) {
|
||||
delete p_;
|
||||
}
|
||||
|
||||
void circ_queue::swap(circ_queue& rhs) {
|
||||
std::swap(p_, rhs.p_);
|
||||
}
|
||||
|
||||
circ_queue& circ_queue::operator=(circ_queue rhs) {
|
||||
swap(rhs);
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool circ_queue::valid(void) const {
|
||||
return (p_ != nullptr) && (p_->elem_start_ != nullptr) && (p_->elem_size_ != 0);
|
||||
}
|
||||
|
||||
std::size_t circ_queue::head_size(void) const {
|
||||
if (!valid()) return 0;
|
||||
return reinterpret_cast<std::uint8_t*>(p_->start_) - p_->elem_start_;
|
||||
}
|
||||
|
||||
std::size_t circ_queue::elem_size(void) const {
|
||||
if (!valid()) return 0;
|
||||
return p_->elem_size_;
|
||||
}
|
||||
|
||||
bool circ_queue::attach(void* mem, std::size_t size) {
|
||||
if (p_ == nullptr) return false;
|
||||
if ((mem == nullptr) || (size == 0)) return false;
|
||||
if (size < p_->size_min) return false;
|
||||
|
||||
detach();
|
||||
|
||||
p_->start_ = static_cast<std::atomic_uint8_t*>(mem);
|
||||
p_->count_ = p_->start_ + 1;
|
||||
p_->flag_ = reinterpret_cast<std::atomic_flag*>(p_->count_ + 1);
|
||||
|
||||
std::size_t hs = std::max(size / (p_->elem_max + 1), static_cast<std::size_t>(p_->head_size));
|
||||
p_->elem_start_ = reinterpret_cast<std::uint8_t*>(p_->start_) + hs;
|
||||
p_->elem_size_ = (size - hs) / p_->elem_max;
|
||||
|
||||
p_->lc_start_ = p_->start_->load();
|
||||
p_->lc_count_ = p_->count_->load();
|
||||
return true;
|
||||
}
|
||||
|
||||
void circ_queue::detach(void) {
|
||||
if (!valid()) return;
|
||||
p_->elem_start_ = nullptr;
|
||||
p_->elem_size_ = 0;
|
||||
p_->start_ = nullptr;
|
||||
p_->count_ = nullptr;
|
||||
p_->lc_start_ = 0;
|
||||
p_->lc_count_ = 0;
|
||||
}
|
||||
|
||||
void* circ_queue::alloc(void) {
|
||||
|
||||
}
|
||||
|
||||
bool circ_queue::push(void* elem) {
|
||||
if (!valid()) return false;
|
||||
if (elem == nullptr) return false;
|
||||
|
||||
do {
|
||||
std::uint8_t offset = p_->start_->load() + p_->count_->load(); // overflow is mod
|
||||
std::size_t lastps = offset * p_->elem_size_;
|
||||
if (p_->flag_->test_and_set(std::memory_order_acquire)) {
|
||||
continue;
|
||||
}
|
||||
memcpy(p_->elem_start_ + lastps, elem, p_->elem_size_);
|
||||
} while(1);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace ipc
|
||||
@ -6,7 +6,6 @@
|
||||
#include <utility>
|
||||
#include <limits>
|
||||
#include <algorithm>
|
||||
#include <tuple>
|
||||
|
||||
namespace ipc {
|
||||
|
||||
@ -14,7 +13,6 @@ struct circ_queue_head {
|
||||
using u8_t = std::atomic<std::uint8_t>;
|
||||
|
||||
u8_t cc_; // connection count
|
||||
u8_t rd_; // read-index
|
||||
u8_t wt_; // write-index
|
||||
};
|
||||
|
||||
@ -42,23 +40,38 @@ public:
|
||||
circ_queue& operator=(circ_queue&&) = delete;
|
||||
|
||||
std::size_t connect(void) {
|
||||
return cc_.fetch_add(1, std::memory_order_relaxed);
|
||||
return cc_.fetch_add(1, std::memory_order_acq_rel);
|
||||
}
|
||||
|
||||
std::size_t disconnect(void) {
|
||||
return cc_.fetch_sub(1, std::memory_order_relaxed);
|
||||
return cc_.fetch_sub(1, std::memory_order_acq_rel);
|
||||
}
|
||||
|
||||
std::tuple<void*, std::uint8_t> acquire(void) {
|
||||
std::uint8_t wt_id = wt_.fetch_add(1, std::memory_order_relaxed);
|
||||
return std::make_tuple(&(block_[wt_id]), wt_id);
|
||||
void* acquire(void) {
|
||||
return begin() + wt_.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void commit() {
|
||||
void commit(void) {
|
||||
wt_.fetch_add(1, std::memory_order_release);
|
||||
}
|
||||
|
||||
std::uint8_t cursor(void) const {
|
||||
return wt_.load(std::memory_order_acquire);
|
||||
}
|
||||
|
||||
void const * get(std::uint8_t index) const {
|
||||
return begin() + index;
|
||||
}
|
||||
|
||||
private:
|
||||
using elem_t = std::uint8_t[elem_size];
|
||||
elem_t const * begin(void) const {
|
||||
return reinterpret_cast<elem_t const *>(
|
||||
reinterpret_cast<std::uint8_t const *>(this) + elem_size);
|
||||
}
|
||||
elem_t * begin(void) {
|
||||
return const_cast<elem_t *>(static_cast<circ_queue const *>(this)->begin());
|
||||
}
|
||||
std::uint8_t block_[block_size];
|
||||
};
|
||||
|
||||
|
||||
@ -1,23 +1,80 @@
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
|
||||
#include "circ_queue.h"
|
||||
#include "test.h"
|
||||
|
||||
namespace {
|
||||
|
||||
using namespace std::string_literals;
|
||||
|
||||
class Unit : public TestSuite {
|
||||
Q_OBJECT
|
||||
|
||||
private slots:
|
||||
void test_(void);
|
||||
void test_inst(void);
|
||||
void test_producer(void);
|
||||
} unit__;
|
||||
|
||||
#include "test_circ_queue.moc"
|
||||
|
||||
void Unit::test_(void) {
|
||||
auto* cq = new ipc::circ_queue<4096>;
|
||||
QCOMPARE(sizeof(*cq), static_cast<std::size_t>(4096));
|
||||
using cq_t = ipc::circ_queue<4096>;
|
||||
cq_t* cq__;
|
||||
|
||||
void Unit::test_inst(void) {
|
||||
cq__ = new cq_t;
|
||||
QCOMPARE(sizeof(*cq__), static_cast<std::size_t>(cq_t::total_size));
|
||||
auto a = cq__->get(1);
|
||||
auto b = cq__->get(2);
|
||||
QCOMPARE(static_cast<std::size_t>(
|
||||
static_cast<std::uint8_t const *>(b) -
|
||||
static_cast<std::uint8_t const *>(a)), cq_t::elem_size);
|
||||
}
|
||||
|
||||
void Unit::test_producer(void) {
|
||||
std::thread consumers[3];
|
||||
std::atomic_int flag(0);
|
||||
|
||||
for (auto& c : consumers) {
|
||||
c = std::thread{[&c, &flag] {
|
||||
auto cur = cq__->cursor();
|
||||
std::cout << &c << ": cur = " << (int)cur << std::endl;
|
||||
flag.fetch_add(1, std::memory_order_release);
|
||||
do {
|
||||
while (cur != cq__->cursor()) {
|
||||
auto data = static_cast<const char*>(cq__->get(cur));
|
||||
std::cout << &c << ": " << data << std::endl;
|
||||
if (data == "quit"s) {
|
||||
return;
|
||||
}
|
||||
else QCOMPARE(data, std::to_string(cur).c_str());
|
||||
++cur;
|
||||
}
|
||||
std::this_thread::yield();
|
||||
} while(1);
|
||||
}};
|
||||
}
|
||||
|
||||
while (flag.load(std::memory_order_acquire) != std::extent<decltype(consumers)>::value) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
|
||||
for (int i = 0; i < 10; ++i) {
|
||||
auto str = static_cast<char*>(cq__->acquire());
|
||||
strcpy(str, std::to_string(i).c_str());
|
||||
std::cout << "put: " << str << std::endl;
|
||||
cq__->commit();
|
||||
}
|
||||
auto str = static_cast<char*>(cq__->acquire());
|
||||
strcpy(str, "quit");
|
||||
std::cout << "put: " << str << std::endl;
|
||||
cq__->commit();
|
||||
|
||||
for (auto& c : consumers) {
|
||||
c.join();
|
||||
}
|
||||
}
|
||||
|
||||
} // internal-linkage
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user