add circ_queue

This commit is contained in:
mutouyun 2018-11-20 23:04:32 +08:00
parent 5fa9db52bf
commit 17375bb32c
7 changed files with 200 additions and 6 deletions

View File

@ -15,7 +15,7 @@ INCLUDEPATH += \
HEADERS += \ HEADERS += \
../include/export.h \ ../include/export.h \
../include/shm.h \ ../include/shm.h \
../test/test.h ../src/circ_queue.h
SOURCES += \ SOURCES += \
../src/shm.cpp ../src/shm.cpp

View File

@ -16,6 +16,7 @@ HEADERS += \
SOURCES += \ SOURCES += \
../test/main.cpp \ ../test/main.cpp \
../test/test_shm.cpp ../test/test_shm.cpp \
../test/test_circ_queue.cpp
LIBS += -L$${DESTDIR} -lipc LIBS += -L$${DESTDIR} -lipc

View File

@ -9,7 +9,7 @@
/* /*
* Compiler & system detection for IPC_DECL_EXPORT & IPC_DECL_IMPORT. * Compiler & system detection for IPC_DECL_EXPORT & IPC_DECL_IMPORT.
* Not using QtCore cause it shouldn't depend on Qt & C++. * Not using QtCore cause it shouldn't depend on Qt.
*/ */
#if defined(_MSC_VER) #if defined(_MSC_VER)

101
src/circ_queue.cpp Normal file
View File

@ -0,0 +1,101 @@
#include "circ_queue.h"
namespace ipc {
class circ_queue_ {
public:
};
circ_queue::circ_queue(void)
: p_(new circ_queue_) {
}
circ_queue::circ_queue(void* mem, std::size_t size)
: circ_queue() {
attach(mem, size);
}
circ_queue::circ_queue(circ_queue&& rhs)
: circ_queue() {
swap(rhs);
}
circ_queue::~circ_queue(void) {
delete p_;
}
void circ_queue::swap(circ_queue& rhs) {
std::swap(p_, rhs.p_);
}
circ_queue& circ_queue::operator=(circ_queue rhs) {
swap(rhs);
return *this;
}
bool circ_queue::valid(void) const {
return (p_ != nullptr) && (p_->elem_start_ != nullptr) && (p_->elem_size_ != 0);
}
std::size_t circ_queue::head_size(void) const {
if (!valid()) return 0;
return reinterpret_cast<std::uint8_t*>(p_->start_) - p_->elem_start_;
}
std::size_t circ_queue::elem_size(void) const {
if (!valid()) return 0;
return p_->elem_size_;
}
bool circ_queue::attach(void* mem, std::size_t size) {
if (p_ == nullptr) return false;
if ((mem == nullptr) || (size == 0)) return false;
if (size < p_->size_min) return false;
detach();
p_->start_ = static_cast<std::atomic_uint8_t*>(mem);
p_->count_ = p_->start_ + 1;
p_->flag_ = reinterpret_cast<std::atomic_flag*>(p_->count_ + 1);
std::size_t hs = std::max(size / (p_->elem_max + 1), static_cast<std::size_t>(p_->head_size));
p_->elem_start_ = reinterpret_cast<std::uint8_t*>(p_->start_) + hs;
p_->elem_size_ = (size - hs) / p_->elem_max;
p_->lc_start_ = p_->start_->load();
p_->lc_count_ = p_->count_->load();
return true;
}
void circ_queue::detach(void) {
if (!valid()) return;
p_->elem_start_ = nullptr;
p_->elem_size_ = 0;
p_->start_ = nullptr;
p_->count_ = nullptr;
p_->lc_start_ = 0;
p_->lc_count_ = 0;
}
void* circ_queue::alloc(void) {
}
bool circ_queue::push(void* elem) {
if (!valid()) return false;
if (elem == nullptr) return false;
do {
std::uint8_t offset = p_->start_->load() + p_->count_->load(); // overflow is mod
std::size_t lastps = offset * p_->elem_size_;
if (p_->flag_->test_and_set(std::memory_order_acquire)) {
continue;
}
memcpy(p_->elem_start_ + lastps, elem, p_->elem_size_);
} while(1);
return true;
}
} // namespace ipc

65
src/circ_queue.h Normal file
View File

@ -0,0 +1,65 @@
#pragma once
#include <cstddef>
#include <cstdint>
#include <atomic>
#include <utility>
#include <limits>
#include <algorithm>
#include <tuple>
namespace ipc {
struct circ_queue_head {
using u8_t = std::atomic<std::uint8_t>;
u8_t cc_; // connection count
u8_t rd_; // read-index
u8_t wt_; // write-index
};
template <std::size_t Size>
class circ_queue : public circ_queue_head {
public:
enum : std::size_t {
total_size = Size,
head_size = sizeof(circ_queue_head),
block_size = Size - head_size,
elem_max = std::numeric_limits<std::uint8_t>::max(),
elem_size = Size / (elem_max + 1)
};
static_assert(Size > head_size , "Size must > head_size");
static_assert(elem_size >= head_size, "elem_size must >= head_size");
static_assert(Size % elem_size == 0 , "Size must be multiple of elem_size");
circ_queue(void) = default;
~circ_queue(void) = delete;
circ_queue(const circ_queue&) = delete;
circ_queue& operator=(const circ_queue&) = delete;
circ_queue(circ_queue&&) = delete;
circ_queue& operator=(circ_queue&&) = delete;
std::size_t connect(void) {
return cc_.fetch_add(1, std::memory_order_relaxed);
}
std::size_t disconnect(void) {
return cc_.fetch_sub(1, std::memory_order_relaxed);
}
std::tuple<void*, std::uint8_t> acquire(void) {
std::uint8_t wt_id = wt_.fetch_add(1, std::memory_order_relaxed);
return std::make_tuple(&(block_[wt_id]), wt_id);
}
void commit() {
}
private:
std::uint8_t block_[block_size];
};
} // namespace ipc

View File

@ -6,12 +6,16 @@
namespace { namespace {
QVector<QObject*> suites__; QVector<QObject*>* suites__ = nullptr;
} // internal-linkage } // internal-linkage
TestSuite::TestSuite(void) { TestSuite::TestSuite(void) {
suites__ << this; static struct __ {
QVector<QObject*> suites_;
__(void) { suites__ = &suites_; }
} _;
_.suites_ << this;
} }
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
@ -19,7 +23,7 @@ int main(int argc, char* argv[]) {
Q_UNUSED(app) Q_UNUSED(app)
int failed_count = 0; int failed_count = 0;
for (const auto& suite : suites__) { for (const auto& suite : (*suites__)) {
if (QTest::qExec(suite, argc, argv) != 0) if (QTest::qExec(suite, argc, argv) != 0)
++failed_count; ++failed_count;
} }

23
test/test_circ_queue.cpp Normal file
View File

@ -0,0 +1,23 @@
#include <atomic>
#include <iostream>
#include "circ_queue.h"
#include "test.h"
namespace {
class Unit : public TestSuite {
Q_OBJECT
private slots:
void test_(void);
} unit__;
#include "test_circ_queue.moc"
void Unit::test_(void) {
auto* cq = new ipc::circ_queue<4096>;
QCOMPARE(sizeof(*cq), static_cast<std::size_t>(4096));
}
} // internal-linkage