using ipc::buffer instead of std::vector<byte_t>

This commit is contained in:
mutouyun 2018-12-31 22:22:54 +08:00
parent 3bea8af992
commit e1c3594ca5
6 changed files with 194 additions and 57 deletions

View File

@ -25,6 +25,7 @@ HEADERS += \
../include/rw_lock.h \ ../include/rw_lock.h \
../include/tls_pointer.h \ ../include/tls_pointer.h \
../include/pool_alloc.h \ ../include/pool_alloc.h \
../include/buffer.h \
../src/channel.inc \ ../src/channel.inc \
../src/route.inc \ ../src/route.inc \
../src/id_pool.inc \ ../src/id_pool.inc \
@ -35,7 +36,8 @@ HEADERS += \
SOURCES += \ SOURCES += \
../src/shm.cpp \ ../src/shm.cpp \
../src/ipc.cpp \ ../src/ipc.cpp \
../src/pool_alloc.cpp ../src/pool_alloc.cpp \
../src/buffer.cpp
unix { unix {

63
include/buffer.h Normal file
View File

@ -0,0 +1,63 @@
#pragma once
#include <cstddef>
#include <tuple>
#include <vector>
#include "export.h"
#include "def.h"
namespace ipc {
class IPC_EXPORT buffer {
public:
using destructor_t = void (*)(void*, std::size_t);
buffer();
buffer(void* p, std::size_t s, destructor_t d);
buffer(void* p, std::size_t s);
template <std::size_t N>
explicit buffer(byte_t const (& data)[N])
: buffer(data, sizeof(data)) {
}
explicit buffer(char const & c);
buffer(buffer&& rhs);
~buffer();
void swap(buffer& rhs);
buffer& operator=(buffer rhs);
bool empty() const noexcept;
void * data() noexcept;
void const * data() const noexcept;
std::size_t size() const noexcept;
std::tuple<void*, std::size_t> to_tuple() {
return std::make_tuple(data(), size());
}
std::tuple<void const *, std::size_t> to_tuple() const {
return std::make_tuple(data(), size());
}
std::vector<byte_t> to_vector() const {
auto [d, s] = to_tuple();
return {
static_cast<byte_t const *>(d),
static_cast<byte_t const *>(d) + s
};
}
friend IPC_EXPORT bool operator==(buffer const & b1, buffer const & b2);
private:
class buffer_;
buffer_* p_;
};
} // namespace ipc

View File

@ -5,17 +5,13 @@
#include "export.h" #include "export.h"
#include "def.h" #include "def.h"
#include "buffer.h"
#include "shm.h" #include "shm.h"
namespace ipc { namespace ipc {
using handle_t = void*; using handle_t = void*;
using buff_t = std::vector<byte_t>; using buff_t = buffer;
IPC_EXPORT buff_t make_buff(void const * data, std::size_t size);
template <std::size_t N>
buff_t make_buff(byte_t const (& data)[N]) { return make_buff(data, N); }
IPC_EXPORT handle_t connect (char const * name); IPC_EXPORT handle_t connect (char const * name);
IPC_EXPORT void disconnect(handle_t h); IPC_EXPORT void disconnect(handle_t h);

77
src/buffer.cpp Normal file
View File

@ -0,0 +1,77 @@
#include "buffer.h"
#include <cstring>
namespace ipc {
bool operator==(buffer const & b1, buffer const & b2) {
return (b1.size() == b2.size()) && (std::memcmp(b1.data(), b2.data(), b1.size()) == 0);
}
class buffer::buffer_ : public pimpl<buffer_> {
public:
void* p_;
std::size_t s_;
destructor_t d_;
buffer_(void* p, std::size_t s, destructor_t d)
: p_(p), s_(s), d_(d) {
}
~buffer_() {
if (d_ == nullptr) return;
d_(p_, s_);
}
};
buffer::buffer()
: buffer(nullptr, 0, nullptr) {
}
buffer::buffer(void* p, std::size_t s, destructor_t d)
: p_(p_->make(p, s, d)) {
}
buffer::buffer(void* p, std::size_t s)
: buffer(p, s, nullptr) {
}
buffer::buffer(char const & c)
: buffer(const_cast<char*>(&c), 1, nullptr) {
}
buffer::buffer(buffer&& rhs)
: p_(nullptr) {
swap(rhs);
}
buffer::~buffer() {
p_->clear();
}
void buffer::swap(buffer& rhs) {
std::swap(p_, rhs.p_);
}
buffer& buffer::operator=(buffer rhs) {
swap(rhs);
return *this;
}
bool buffer::empty() const noexcept {
return (impl(p_)->p_ == nullptr) || (impl(p_)->s_ == 0);
}
void* buffer::data() noexcept {
return impl(p_)->p_;
}
void const * buffer::data() const noexcept {
return impl(p_)->p_;
}
std::size_t buffer::size() const noexcept {
return impl(p_)->s_;
}
} // namespace ipc

View File

@ -46,28 +46,25 @@ constexpr queue_t* queue_of(handle_t h) {
return static_cast<queue_t*>(h); return static_cast<queue_t*>(h);
} }
using cache_t = mem::vector<byte_t>; inline buff_t make_cache(void const * data, std::size_t size) {
auto ptr = mem::detail::pool_alloc::alloc(size);
std::memcpy(ptr, data, size);
return { ptr, size, mem::detail::pool_alloc::free };
}
template <std::size_t N> struct cache_t {
cache_t make_cache(byte_t const (& data)[N]) { std::size_t fill_;
return { buff_t buff_;
static_cast<cache_t::value_type const *>(data),
static_cast<cache_t::value_type const *>(data) + N cache_t(std::size_t f, buff_t&& b)
: fill_(f), buff_(std::move(b))
{}
void append(void const * data, std::size_t size) {
std::memcpy(static_cast<byte_t*>(buff_.data()) + fill_, data, size);
fill_ += size;
}
}; };
}
template <typename T>
using remove_cv_ref_t = std::remove_cv_t<std::remove_reference_t<T>>;
template <typename Cache>
constexpr auto to_buff(Cache&& cac) -> Requires<std::is_same<remove_cv_ref_t<Cache>, buff_t>::value, Cache&&> {
return std::forward<Cache>(cac);
}
template <typename Cache>
auto to_buff(Cache&& cac) -> Requires<!std::is_same<remove_cv_ref_t<Cache>, buff_t>::value, buff_t> {
return make_buff(cac.data(), cac.size());
}
inline auto& recv_cache() { inline auto& recv_cache() {
/* /*
@ -92,13 +89,6 @@ inline auto& queues_cache() {
namespace ipc { namespace ipc {
buff_t make_buff(void const * data, std::size_t size) {
return {
static_cast<buff_t::value_type const *>(data),
static_cast<buff_t::value_type const *>(data) + size
};
}
handle_t connect(char const * name) { handle_t connect(char const * name) {
auto mem = shm::acquire(name, sizeof(shm_info_t)); auto mem = shm::acquire(name, sizeof(shm_info_t));
if (mem == nullptr) { if (mem == nullptr) {
@ -185,27 +175,27 @@ buff_t multi_recv(F&& upd) {
std::size_t remain = static_cast<std::size_t>( std::size_t remain = static_cast<std::size_t>(
static_cast<int>(data_length) + msg.remain_); static_cast<int>(data_length) + msg.remain_);
// find cache with msg.id_ // find cache with msg.id_
auto cache_it = rc.find(msg.id_); auto cac_it = rc.find(msg.id_);
if (cache_it == rc.end()) { if (cac_it == rc.end()) {
if (remain <= data_length) { if (remain <= data_length) {
return make_buff(msg.data_, remain); return make_cache(msg.data_, remain);
} }
// cache the first message fragment // cache the first message fragment
else rc.emplace(msg.id_, make_cache(msg.data_)); else rc.try_emplace(msg.id_, data_length, make_cache(msg.data_, remain));
} }
// has cached before this message // has cached before this message
else { else {
auto& cache = cache_it->second; auto& cac = cac_it->second;
// this is the last message fragment // this is the last message fragment
if (msg.remain_ <= 0) { if (msg.remain_ <= 0) {
cache.insert(cache.end(), msg.data_, msg.data_ + remain); cac.append(msg.data_, remain);
// finish this message, erase it from cache // finish this message, erase it from cache
auto cac = std::move(cache); auto buff = std::move(cac.buff_);
rc.erase(cache_it); rc.erase(cac_it);
return to_buff(std::move(cac)); return buff;
} }
// there are remain datas after this message // there are remain datas after this message
cache.insert(cache.end(), msg.data_, msg.data_ + data_length); cac.append(msg.data_, data_length);
} }
} }
} }

View File

@ -42,14 +42,18 @@ struct test_verify {
void prepare(void* /*pt*/) {} void prepare(void* /*pt*/) {}
void push_data(int cid, ipc::buff_t const & msg) { void push_data(int cid, ipc::buff_t & msg) {
list_[cid].emplace_back(std::move(msg)); list_[cid].emplace_back(std::move(msg));
} }
void verify(int /*N*/, int /*Loops*/) { void verify(int /*N*/, int /*Loops*/) {
std::cout << "verifying..." << std::endl; std::cout << "verifying..." << std::endl;
for (auto& c_dats : list_) { for (auto& c_dats : list_) {
QCOMPARE(datas__, c_dats); QCOMPARE(datas__.size(), c_dats.size());
std::size_t i = 0;
for (auto& d : c_dats) {
QCOMPARE(datas__[i++], d);
}
} }
} }
}; };
@ -85,7 +89,7 @@ struct test_cq<ipc::route> {
do { do {
auto msg = cn.recv(); auto msg = cn.recv();
if (msg.size() < 2) { if (msg.size() < 2) {
QCOMPARE(msg, ipc::buff_t { '\0' }); QCOMPARE(msg, ipc::buff_t('\0'));
return; return;
} }
proc(msg); proc(msg);
@ -99,7 +103,7 @@ struct test_cq<ipc::route> {
void send(cn_t& cn, const std::array<int, 2>& info) { void send(cn_t& cn, const std::array<int, 2>& info) {
int n = info[1]; int n = info[1];
if (n < 0) { if (n < 0) {
/*QVERIFY*/(cn.send(ipc::buff_t { '\0' })); /*QVERIFY*/(cn.send(ipc::buff_t('\0')));
} }
else /*QVERIFY*/(cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)])); else /*QVERIFY*/(cn.send(datas__[static_cast<decltype(datas__)::size_type>(n)]));
} }
@ -137,7 +141,7 @@ struct test_cq<ipc::channel> {
do { do {
auto msg = cn.recv(); auto msg = cn.recv();
if (msg.size() < 2) { if (msg.size() < 2) {
QCOMPARE(msg, ipc::buff_t { '\0' }); QCOMPARE(msg, ipc::buff_t('\0'));
return; return;
} }
proc(msg); proc(msg);
@ -162,7 +166,7 @@ struct test_cq<ipc::channel> {
} _(cn, m_); } _(cn, m_);
int n = info[1]; int n = info[1];
if (n < 0) { if (n < 0) {
/*QVERIFY*/(cn->send(ipc::buff_t { '\0' })); /*QVERIFY*/(cn->send(ipc::buff_t('\0')));
} }
else /*QVERIFY*/(cn->send(datas__[static_cast<decltype(datas__)::size_type>(n)])); else /*QVERIFY*/(cn->send(datas__[static_cast<decltype(datas__)::size_type>(n)]));
} }
@ -197,13 +201,18 @@ void Unit::initTestCase() {
TestSuite::initTestCase(); TestSuite::initTestCase();
capo::random<> rdm { DataMin, DataMax }; capo::random<> rdm { DataMin, DataMax };
capo::random<> bit { 0, (std::numeric_limits<ipc::buff_t::value_type>::max)() }; capo::random<> bit { 0, (std::numeric_limits<ipc::byte_t>::max)() };
for (int i = 0; i < LoopCount; ++i) { for (int i = 0; i < LoopCount; ++i) {
auto n = rdm(); std::size_t n = static_cast<std::size_t>(rdm());
ipc::buff_t buff(static_cast<ipc::buff_t::size_type>(n)); ipc::buff_t buff {
new ipc::byte_t[n], n,
[](void* p, std::size_t) {
delete [] static_cast<ipc::byte_t*>(p);
}
};
for (std::size_t k = 0; k < buff.size(); ++k) { for (std::size_t k = 0; k < buff.size(); ++k) {
buff[k] = static_cast<ipc::buff_t::value_type>(bit()); static_cast<ipc::byte_t*>(buff.data())[k] = static_cast<ipc::byte_t>(bit());
} }
datas__.emplace_back(std::move(buff)); datas__.emplace_back(std::move(buff));
} }
@ -381,7 +390,7 @@ void Unit::test_route_rtt() {
auto dd = cc.recv(); auto dd = cc.recv();
if (dd.size() < 2) return; if (dd.size() < 2) return;
// std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl; // std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl;
while (!cr.send(ipc::buff_t { 'a' })) { while (!cr.send(ipc::buff_t('a'))) {
std::this_thread::yield(); std::this_thread::yield();
} }
} }
@ -402,7 +411,7 @@ void Unit::test_route_rtt() {
// QVERIFY(false); // QVERIFY(false);
// } // }
} }
cc.send(ipc::buff_t { '\0' }); cc.send(ipc::buff_t('\0'));
t1.join(); t1.join();
sw.print_elapsed(1, 1, LoopCount); sw.print_elapsed(1, 1, LoopCount);
}}; }};
@ -466,7 +475,7 @@ void Unit::test_channel() {
std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl; std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl;
cc.send(datas__[i]); cc.send(datas__[i]);
} }
cc.send(ipc::buff_t { '\0' }); cc.send(ipc::buff_t('\0'));
t1.join(); t1.join();
}}; }};
@ -482,7 +491,7 @@ void Unit::test_channel_rtt() {
auto dd = cc.recv(); auto dd = cc.recv();
if (dd.size() < 2) return; if (dd.size() < 2) return;
// std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl; // std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl;
while (!cc.send(ipc::buff_t { 'a' })) { while (!cc.send(ipc::buff_t('a'))) {
cc.wait_for_recv(1); cc.wait_for_recv(1);
} }
} }
@ -500,7 +509,7 @@ void Unit::test_channel_rtt() {
// QVERIFY(false); // QVERIFY(false);
// } // }
} }
cc.send(ipc::buff_t { '\0' }); cc.send(ipc::buff_t('\0'));
t1.join(); t1.join();
sw.print_elapsed(1, 1, LoopCount); sw.print_elapsed(1, 1, LoopCount);
}}; }};