use one shm-block for big message (>= 4096)

This commit is contained in:
zhangyi 2019-05-06 16:19:19 +08:00
parent 518550070d
commit ce3e9869fb
5 changed files with 117 additions and 34 deletions

View File

@ -4,6 +4,7 @@
#include <tuple>
#include <vector>
#include <type_traits>
#include <functional>
#include "export.h"
#include "def.h"
@ -14,9 +15,14 @@ class IPC_EXPORT buffer {
public:
using destructor_t = void (*)(void*, std::size_t);
enum class use {
functor
};
buffer();
buffer(void* p, std::size_t s, destructor_t d);
buffer(void* p, std::size_t s, std::function<void(void*, std::size_t)> d, use);
buffer(void* p, std::size_t s);
template <std::size_t N>

View File

@ -28,6 +28,7 @@ using uint_t = typename uint<N>::type;
enum : std::size_t {
invalid_value = (std::numeric_limits<std::size_t>::max)(),
data_length = 64,
small_msg_limit = data_length * 64 - 1, // 4095
default_timeut = 100 // ms
};

View File

@ -13,14 +13,14 @@ class buffer::buffer_ : public pimpl<buffer_> {
public:
void* p_;
std::size_t s_;
destructor_t d_;
std::function<void(void*, std::size_t)> d_;
buffer_(void* p, std::size_t s, destructor_t d)
: p_(p), s_(s), d_(d) {
buffer_(void* p, std::size_t s, std::function<void(void*, std::size_t)> d)
: p_(p), s_(s), d_(std::move(d)) {
}
~buffer_() {
if (d_ == nullptr) return;
if (!d_) return;
d_(p_, s_);
}
};
@ -33,6 +33,10 @@ buffer::buffer(void* p, std::size_t s, destructor_t d)
: p_(p_->make(p, s, d)) {
}
buffer::buffer(void* p, std::size_t s, std::function<void(void*, std::size_t)> d, use)
: p_(p_->make(p, s, std::move(d))) {
}
buffer::buffer(void* p, std::size_t s)
: buffer(p, s, nullptr) {
}

93
src/ipc.cpp Executable file → Normal file
View File

@ -23,6 +23,8 @@
#include "platform/detail.h"
#include "platform/waiter_wrapper.h"
#include "circ/elem_array.h"
namespace {
using namespace ipc;
@ -44,12 +46,12 @@ struct msg_t {
std::aligned_storage_t<DataSize, AlignSize> data_ {};
msg_t() = default;
msg_t(void* q, msg_id_t i, int r, void const * d, std::size_t s) {
head_.que_ = q;
head_.id_ = i;
head_.remain_ = r;
msg_t(void* q, msg_id_t i, int r, void const * d, std::size_t s)
: head_ { q, i, r } {
if ((d != nullptr) && (s > 0)) {
std::memcpy(&data_, d, s);
}
}
};
template <typename T>
@ -78,19 +80,64 @@ struct cache_t {
struct conn_info_head {
using acc_t = std::atomic<msg_id_t>;
std::string name_;
waiter cc_waiter_, wt_waiter_, rd_waiter_;
shm::handle acc_h_;
struct simple_push {
template <std::size_t, std::size_t>
using elem_t = shm::handle;
circ::u2_t wt_; // write index
constexpr circ::u2_t cursor() const noexcept {
return 0;
}
template <typename W, typename F, typename E>
bool push(W* /*wrapper*/, F&& f, E* elems) {
std::forward<F>(f)(&(elems[circ::index_of(wt_)]));
++ wt_;
return true;
}
template <typename W, typename F, typename E>
bool force_push(W* /*wrapper*/, F&& /*f*/, E* /*elems*/) {
return false;
}
template <typename W, typename F, typename E>
bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& /*f*/, E* /*elems*/) {
return false;
}
};
circ::elem_array<simple_push, sizeof(shm::handle), 0> msg_datas_;
conn_info_head(char const * name)
: cc_waiter_((std::string{ "__CC_CONN__" } + name).c_str())
, wt_waiter_((std::string{ "__WT_CONN__" } + name).c_str())
, rd_waiter_((std::string{ "__RD_CONN__" } + name).c_str())
, acc_h_ ((std::string{ "__AC_CONN__" } + name).c_str(), sizeof(acc_t)) {
: name_(name)
, cc_waiter_(("__CC_CONN__" + name_).c_str())
, wt_waiter_(("__WT_CONN__" + name_).c_str())
, rd_waiter_(("__RD_CONN__" + name_).c_str())
, acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) {
}
auto acc() {
return static_cast<acc_t*>(acc_h_.get());
}
shm::handle apply_storage(msg_id_t msg_id, std::size_t size, unsigned mode) {
return { ("__IPC_DAT_STORAGE__" + name_ + "__" + std::to_string(msg_id)).c_str(), size, mode };
}
void store(shm::handle && dat) {
msg_datas_.push([&dat](shm::handle * p) { p->swap(dat); });
}
void clear_store() {
msg_datas_.push([](shm::handle * p) { p->release(); });
}
};
template <typename W, typename F>
@ -112,7 +159,7 @@ bool wait_for(W& waiter, F&& pred, std::size_t tm) {
}
template <typename Policy,
std::size_t DataSize,
std::size_t DataSize = data_length,
std::size_t AlignSize = (ipc::detail::min)(DataSize, alignof(std::max_align_t))>
struct queue_generator {
@ -222,11 +269,23 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
}
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
if (size > small_msg_limit) {
auto dat = info_of(h)->apply_storage(msg_id, size, shm::create);
void * buf = dat.get();
if (buf != nullptr) {
std::memcpy(buf, data, size);
info_of(h)->store(std::move(dat));
return try_push(static_cast<int>(size) - static_cast<int>(data_length), nullptr, 0);
}
// try using message fragment
ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size);
}
// push message fragment
int offset = 0;
for (int i = 0; i < static_cast<int>(size / data_length); ++i, offset += data_length) {
if (!try_push(static_cast<int>(size) - offset - static_cast<int>(data_length),
static_cast<byte_t const *>(data) + offset, data_length)) {
info_of(h)->clear_store();
return false;
}
}
@ -235,6 +294,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
if (remain > 0) {
if (!try_push(remain - static_cast<int>(data_length),
static_cast<byte_t const *>(data) + offset, static_cast<std::size_t>(remain))) {
info_of(h)->clear_store();
return false;
}
}
@ -303,7 +363,19 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
if (remain <= data_length) {
return make_cache(msg.data_, remain);
}
else {
if (remain > small_msg_limit) {
auto dat = info_of(h)->apply_storage(msg.head_.id_, 0, shm::open);
void * buf = dat.get();
if (buf != nullptr && remain <= dat.size()) {
auto id = dat.detach();
return buff_t { buf, remain, [id](void *, std::size_t) {
shm::handle dat;
dat.attach(id);
}, buff_t::use::functor };
}
else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd, shm.size: %zd\n",
msg.head_.id_, remain, dat.size());
}
// gc
if (rc.size() > 1024) {
std::vector<msg_id_t> need_del;
@ -318,7 +390,6 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
// cache the first message fragment
rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(msg.data_, remain) });
}
}
// has cached before this message
else {
auto& cac = cac_it->second;

View File

@ -4,6 +4,7 @@
#include <utility>
#include "concept.h"
#include "pool_alloc.h"
namespace ipc {
@ -34,12 +35,12 @@ constexpr auto clear_impl(T* p) -> IsImplComfortable<T, void> {
template <typename T, typename... P>
constexpr auto make_impl(P&&... params) -> IsImplUncomfortable<T> {
return new T { std::forward<P>(params)... };
return mem::alloc<T>(std::forward<P>(params)...);
}
template <typename T>
constexpr auto clear_impl(T* p) -> IsImplUncomfortable<T, void> {
delete p;
mem::free(p);
}
template <typename T>