diff --git a/build/ipc.pro b/build/ipc.pro index 652f270..8a0370c 100644 --- a/build/ipc.pro +++ b/build/ipc.pro @@ -25,6 +25,7 @@ HEADERS += \ ../include/rw_lock.h \ ../include/tls_pointer.h \ ../include/pool_alloc.h \ + ../include/buffer.h \ ../src/channel.inc \ ../src/route.inc \ ../src/id_pool.inc \ @@ -35,7 +36,8 @@ HEADERS += \ SOURCES += \ ../src/shm.cpp \ ../src/ipc.cpp \ - ../src/pool_alloc.cpp + ../src/pool_alloc.cpp \ + ../src/buffer.cpp unix { diff --git a/include/buffer.h b/include/buffer.h new file mode 100644 index 0000000..1835ba5 --- /dev/null +++ b/include/buffer.h @@ -0,0 +1,63 @@ +#pragma once + +#include +#include +#include + +#include "export.h" +#include "def.h" + +namespace ipc { + +class IPC_EXPORT buffer { +public: + using destructor_t = void (*)(void*, std::size_t); + + buffer(); + + buffer(void* p, std::size_t s, destructor_t d); + buffer(void* p, std::size_t s); + + template + explicit buffer(byte_t const (& data)[N]) + : buffer(data, sizeof(data)) { + } + explicit buffer(char const & c); + + buffer(buffer&& rhs); + ~buffer(); + + void swap(buffer& rhs); + buffer& operator=(buffer rhs); + + bool empty() const noexcept; + + void * data() noexcept; + void const * data() const noexcept; + + std::size_t size() const noexcept; + + std::tuple to_tuple() { + return std::make_tuple(data(), size()); + } + + std::tuple to_tuple() const { + return std::make_tuple(data(), size()); + } + + std::vector to_vector() const { + auto [d, s] = to_tuple(); + return { + static_cast(d), + static_cast(d) + s + }; + } + + friend IPC_EXPORT bool operator==(buffer const & b1, buffer const & b2); + +private: + class buffer_; + buffer_* p_; +}; + +} // namespace ipc diff --git a/include/ipc.h b/include/ipc.h index 2036e7d..43b189c 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -5,17 +5,13 @@ #include "export.h" #include "def.h" +#include "buffer.h" #include "shm.h" namespace ipc { using handle_t = void*; -using buff_t = std::vector; - -IPC_EXPORT buff_t make_buff(void const * data, std::size_t size); - -template -buff_t make_buff(byte_t const (& data)[N]) { return make_buff(data, N); } +using buff_t = buffer; IPC_EXPORT handle_t connect (char const * name); IPC_EXPORT void disconnect(handle_t h); diff --git a/src/buffer.cpp b/src/buffer.cpp new file mode 100644 index 0000000..1625c5b --- /dev/null +++ b/src/buffer.cpp @@ -0,0 +1,77 @@ +#include "buffer.h" + +#include + +namespace ipc { + +bool operator==(buffer const & b1, buffer const & b2) { + return (b1.size() == b2.size()) && (std::memcmp(b1.data(), b2.data(), b1.size()) == 0); +} + +class buffer::buffer_ : public pimpl { +public: + void* p_; + std::size_t s_; + destructor_t d_; + + buffer_(void* p, std::size_t s, destructor_t d) + : p_(p), s_(s), d_(d) { + } + + ~buffer_() { + if (d_ == nullptr) return; + d_(p_, s_); + } +}; + +buffer::buffer() + : buffer(nullptr, 0, nullptr) { +} + +buffer::buffer(void* p, std::size_t s, destructor_t d) + : p_(p_->make(p, s, d)) { +} + +buffer::buffer(void* p, std::size_t s) + : buffer(p, s, nullptr) { +} + +buffer::buffer(char const & c) + : buffer(const_cast(&c), 1, nullptr) { +} + +buffer::buffer(buffer&& rhs) + : p_(nullptr) { + swap(rhs); +} + +buffer::~buffer() { + p_->clear(); +} + +void buffer::swap(buffer& rhs) { + std::swap(p_, rhs.p_); +} + +buffer& buffer::operator=(buffer rhs) { + swap(rhs); + return *this; +} + +bool buffer::empty() const noexcept { + return (impl(p_)->p_ == nullptr) || (impl(p_)->s_ == 0); +} + +void* buffer::data() noexcept { + return impl(p_)->p_; +} + +void const * buffer::data() const noexcept { + return impl(p_)->p_; +} + +std::size_t buffer::size() const noexcept { + return impl(p_)->s_; +} + +} // namespace ipc diff --git a/src/ipc.cpp b/src/ipc.cpp index 2a90285..3490b55 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -46,28 +46,25 @@ constexpr queue_t* queue_of(handle_t h) { return static_cast(h); } -using cache_t = mem::vector; - -template -cache_t make_cache(byte_t const (& data)[N]) { - return { - static_cast(data), - static_cast(data) + N - }; +inline buff_t make_cache(void const * data, std::size_t size) { + auto ptr = mem::detail::pool_alloc::alloc(size); + std::memcpy(ptr, data, size); + return { ptr, size, mem::detail::pool_alloc::free }; } -template -using remove_cv_ref_t = std::remove_cv_t>; +struct cache_t { + std::size_t fill_; + buff_t buff_; -template -constexpr auto to_buff(Cache&& cac) -> Requires, buff_t>::value, Cache&&> { - return std::forward(cac); -} + cache_t(std::size_t f, buff_t&& b) + : fill_(f), buff_(std::move(b)) + {} -template -auto to_buff(Cache&& cac) -> Requires, buff_t>::value, buff_t> { - return make_buff(cac.data(), cac.size()); -} + void append(void const * data, std::size_t size) { + std::memcpy(static_cast(buff_.data()) + fill_, data, size); + fill_ += size; + } +}; inline auto& recv_cache() { /* @@ -92,13 +89,6 @@ inline auto& queues_cache() { namespace ipc { -buff_t make_buff(void const * data, std::size_t size) { - return { - static_cast(data), - static_cast(data) + size - }; -} - handle_t connect(char const * name) { auto mem = shm::acquire(name, sizeof(shm_info_t)); if (mem == nullptr) { @@ -185,27 +175,27 @@ buff_t multi_recv(F&& upd) { std::size_t remain = static_cast( static_cast(data_length) + msg.remain_); // find cache with msg.id_ - auto cache_it = rc.find(msg.id_); - if (cache_it == rc.end()) { + auto cac_it = rc.find(msg.id_); + if (cac_it == rc.end()) { if (remain <= data_length) { - return make_buff(msg.data_, remain); + return make_cache(msg.data_, remain); } // cache the first message fragment - else rc.emplace(msg.id_, make_cache(msg.data_)); + else rc.try_emplace(msg.id_, data_length, make_cache(msg.data_, remain)); } // has cached before this message else { - auto& cache = cache_it->second; + auto& cac = cac_it->second; // this is the last message fragment if (msg.remain_ <= 0) { - cache.insert(cache.end(), msg.data_, msg.data_ + remain); + cac.append(msg.data_, remain); // finish this message, erase it from cache - auto cac = std::move(cache); - rc.erase(cache_it); - return to_buff(std::move(cac)); + auto buff = std::move(cac.buff_); + rc.erase(cac_it); + return buff; } // there are remain datas after this message - cache.insert(cache.end(), msg.data_, msg.data_ + data_length); + cac.append(msg.data_, data_length); } } } diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 9c18e62..04a86ea 100644 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -42,14 +42,18 @@ struct test_verify { void prepare(void* /*pt*/) {} - void push_data(int cid, ipc::buff_t const & msg) { + void push_data(int cid, ipc::buff_t & msg) { list_[cid].emplace_back(std::move(msg)); } void verify(int /*N*/, int /*Loops*/) { std::cout << "verifying..." << std::endl; for (auto& c_dats : list_) { - QCOMPARE(datas__, c_dats); + QCOMPARE(datas__.size(), c_dats.size()); + std::size_t i = 0; + for (auto& d : c_dats) { + QCOMPARE(datas__[i++], d); + } } } }; @@ -85,7 +89,7 @@ struct test_cq { do { auto msg = cn.recv(); if (msg.size() < 2) { - QCOMPARE(msg, ipc::buff_t { '\0' }); + QCOMPARE(msg, ipc::buff_t('\0')); return; } proc(msg); @@ -99,7 +103,7 @@ struct test_cq { void send(cn_t& cn, const std::array& info) { int n = info[1]; if (n < 0) { - /*QVERIFY*/(cn.send(ipc::buff_t { '\0' })); + /*QVERIFY*/(cn.send(ipc::buff_t('\0'))); } else /*QVERIFY*/(cn.send(datas__[static_cast(n)])); } @@ -137,7 +141,7 @@ struct test_cq { do { auto msg = cn.recv(); if (msg.size() < 2) { - QCOMPARE(msg, ipc::buff_t { '\0' }); + QCOMPARE(msg, ipc::buff_t('\0')); return; } proc(msg); @@ -162,7 +166,7 @@ struct test_cq { } _(cn, m_); int n = info[1]; if (n < 0) { - /*QVERIFY*/(cn->send(ipc::buff_t { '\0' })); + /*QVERIFY*/(cn->send(ipc::buff_t('\0'))); } else /*QVERIFY*/(cn->send(datas__[static_cast(n)])); } @@ -197,13 +201,18 @@ void Unit::initTestCase() { TestSuite::initTestCase(); capo::random<> rdm { DataMin, DataMax }; - capo::random<> bit { 0, (std::numeric_limits::max)() }; + capo::random<> bit { 0, (std::numeric_limits::max)() }; for (int i = 0; i < LoopCount; ++i) { - auto n = rdm(); - ipc::buff_t buff(static_cast(n)); + std::size_t n = static_cast(rdm()); + ipc::buff_t buff { + new ipc::byte_t[n], n, + [](void* p, std::size_t) { + delete [] static_cast(p); + } + }; for (std::size_t k = 0; k < buff.size(); ++k) { - buff[k] = static_cast(bit()); + static_cast(buff.data())[k] = static_cast(bit()); } datas__.emplace_back(std::move(buff)); } @@ -381,7 +390,7 @@ void Unit::test_route_rtt() { auto dd = cc.recv(); if (dd.size() < 2) return; // std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl; - while (!cr.send(ipc::buff_t { 'a' })) { + while (!cr.send(ipc::buff_t('a'))) { std::this_thread::yield(); } } @@ -402,7 +411,7 @@ void Unit::test_route_rtt() { // QVERIFY(false); // } } - cc.send(ipc::buff_t { '\0' }); + cc.send(ipc::buff_t('\0')); t1.join(); sw.print_elapsed(1, 1, LoopCount); }}; @@ -466,7 +475,7 @@ void Unit::test_channel() { std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl; cc.send(datas__[i]); } - cc.send(ipc::buff_t { '\0' }); + cc.send(ipc::buff_t('\0')); t1.join(); }}; @@ -482,7 +491,7 @@ void Unit::test_channel_rtt() { auto dd = cc.recv(); if (dd.size() < 2) return; // std::cout << "recving: " << i << "-[" << dd.size() << "]" << std::endl; - while (!cc.send(ipc::buff_t { 'a' })) { + while (!cc.send(ipc::buff_t('a'))) { cc.wait_for_recv(1); } } @@ -500,7 +509,7 @@ void Unit::test_channel_rtt() { // QVERIFY(false); // } } - cc.send(ipc::buff_t { '\0' }); + cc.send(ipc::buff_t('\0')); t1.join(); sw.print_elapsed(1, 1, LoopCount); }};