diff --git a/include/buffer.h b/include/buffer.h index 0f93cd7..4895a72 100644 --- a/include/buffer.h +++ b/include/buffer.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "export.h" #include "def.h" @@ -14,9 +15,14 @@ class IPC_EXPORT buffer { public: using destructor_t = void (*)(void*, std::size_t); + enum class use { + functor + }; + buffer(); buffer(void* p, std::size_t s, destructor_t d); + buffer(void* p, std::size_t s, std::function d, use); buffer(void* p, std::size_t s); template diff --git a/include/def.h b/include/def.h index 3f438eb..d02f112 100644 --- a/include/def.h +++ b/include/def.h @@ -26,9 +26,10 @@ using uint_t = typename uint::type; // constants enum : std::size_t { - invalid_value = (std::numeric_limits::max)(), - data_length = 64, - default_timeut = 100 // ms + invalid_value = (std::numeric_limits::max)(), + data_length = 64, + small_msg_limit = data_length * 64 - 1, // 4095 + default_timeut = 100 // ms }; enum class relat { // multiplicity of the relationship diff --git a/src/buffer.cpp b/src/buffer.cpp index ad077bf..1ed36f9 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -11,16 +11,16 @@ bool operator==(buffer const & b1, buffer const & b2) { class buffer::buffer_ : public pimpl { public: - void* p_; - std::size_t s_; - destructor_t d_; + void* p_; + std::size_t s_; + std::function d_; - buffer_(void* p, std::size_t s, destructor_t d) - : p_(p), s_(s), d_(d) { + buffer_(void* p, std::size_t s, std::function d) + : p_(p), s_(s), d_(std::move(d)) { } ~buffer_() { - if (d_ == nullptr) return; + if (!d_) return; d_(p_, s_); } }; @@ -33,6 +33,10 @@ buffer::buffer(void* p, std::size_t s, destructor_t d) : p_(p_->make(p, s, d)) { } +buffer::buffer(void* p, std::size_t s, std::function d, use) + : p_(p_->make(p, s, std::move(d))) { +} + buffer::buffer(void* p, std::size_t s) : buffer(p, s, nullptr) { } diff --git a/src/ipc.cpp b/src/ipc.cpp old mode 100755 new mode 100644 index 6536cff..7b3e2e2 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -23,6 +23,8 @@ #include "platform/detail.h" #include "platform/waiter_wrapper.h" +#include "circ/elem_array.h" + namespace { using namespace ipc; @@ -44,11 +46,11 @@ struct msg_t { std::aligned_storage_t data_ {}; msg_t() = default; - msg_t(void* q, msg_id_t i, int r, void const * d, std::size_t s) { - head_.que_ = q; - head_.id_ = i; - head_.remain_ = r; - std::memcpy(&data_, d, s); + msg_t(void* q, msg_id_t i, int r, void const * d, std::size_t s) + : head_ { q, i, r } { + if ((d != nullptr) && (s > 0)) { + std::memcpy(&data_, d, s); + } } }; @@ -78,19 +80,64 @@ struct cache_t { struct conn_info_head { using acc_t = std::atomic; + std::string name_; waiter cc_waiter_, wt_waiter_, rd_waiter_; shm::handle acc_h_; + struct simple_push { + + template + using elem_t = shm::handle; + + circ::u2_t wt_; // write index + + constexpr circ::u2_t cursor() const noexcept { + return 0; + } + + template + bool push(W* /*wrapper*/, F&& f, E* elems) { + std::forward(f)(&(elems[circ::index_of(wt_)])); + ++ wt_; + return true; + } + + template + bool force_push(W* /*wrapper*/, F&& /*f*/, E* /*elems*/) { + return false; + } + + template + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& /*f*/, E* /*elems*/) { + return false; + } + }; + + circ::elem_array msg_datas_; + conn_info_head(char const * name) - : cc_waiter_((std::string{ "__CC_CONN__" } + name).c_str()) - , wt_waiter_((std::string{ "__WT_CONN__" } + name).c_str()) - , rd_waiter_((std::string{ "__RD_CONN__" } + name).c_str()) - , acc_h_ ((std::string{ "__AC_CONN__" } + name).c_str(), sizeof(acc_t)) { + : name_(name) + , cc_waiter_(("__CC_CONN__" + name_).c_str()) + , wt_waiter_(("__WT_CONN__" + name_).c_str()) + , rd_waiter_(("__RD_CONN__" + name_).c_str()) + , acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) { } auto acc() { return static_cast(acc_h_.get()); } + + shm::handle apply_storage(msg_id_t msg_id, std::size_t size, unsigned mode) { + return { ("__IPC_DAT_STORAGE__" + name_ + "__" + std::to_string(msg_id)).c_str(), size, mode }; + } + + void store(shm::handle && dat) { + msg_datas_.push([&dat](shm::handle * p) { p->swap(dat); }); + } + + void clear_store() { + msg_datas_.push([](shm::handle * p) { p->release(); }); + } }; template @@ -112,7 +159,7 @@ bool wait_for(W& waiter, F&& pred, std::size_t tm) { } template struct queue_generator { @@ -222,11 +269,23 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s } auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); + if (size > small_msg_limit) { + auto dat = info_of(h)->apply_storage(msg_id, size, shm::create); + void * buf = dat.get(); + if (buf != nullptr) { + std::memcpy(buf, data, size); + info_of(h)->store(std::move(dat)); + return try_push(static_cast(size) - static_cast(data_length), nullptr, 0); + } + // try using message fragment + ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); + } // push message fragment int offset = 0; for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { if (!try_push(static_cast(size) - offset - static_cast(data_length), static_cast(data) + offset, data_length)) { + info_of(h)->clear_store(); return false; } } @@ -235,6 +294,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s if (remain > 0) { if (!try_push(remain - static_cast(data_length), static_cast(data) + offset, static_cast(remain))) { + info_of(h)->clear_store(); return false; } } @@ -303,21 +363,32 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { if (remain <= data_length) { return make_cache(msg.data_, remain); } - else { - // gc - if (rc.size() > 1024) { - std::vector need_del; - for (auto const & pair : rc) { - auto cmp = std::minmax(msg.head_.id_, pair.first); - if (cmp.second - cmp.first > 8192) { - need_del.push_back(pair.first); - } - } - for (auto id : need_del) rc.erase(id); + if (remain > small_msg_limit) { + auto dat = info_of(h)->apply_storage(msg.head_.id_, 0, shm::open); + void * buf = dat.get(); + if (buf != nullptr && remain <= dat.size()) { + auto id = dat.detach(); + return buff_t { buf, remain, [id](void *, std::size_t) { + shm::handle dat; + dat.attach(id); + }, buff_t::use::functor }; } - // cache the first message fragment - rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(msg.data_, remain) }); + else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd, shm.size: %zd\n", + msg.head_.id_, remain, dat.size()); } + // gc + if (rc.size() > 1024) { + std::vector need_del; + for (auto const & pair : rc) { + auto cmp = std::minmax(msg.head_.id_, pair.first); + if (cmp.second - cmp.first > 8192) { + need_del.push_back(pair.first); + } + } + for (auto id : need_del) rc.erase(id); + } + // cache the first message fragment + rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(msg.data_, remain) }); } // has cached before this message else { diff --git a/src/pimpl.h b/src/pimpl.h index 9e7d228..6fdc0a5 100644 --- a/src/pimpl.h +++ b/src/pimpl.h @@ -4,6 +4,7 @@ #include #include "concept.h" +#include "pool_alloc.h" namespace ipc { @@ -34,12 +35,12 @@ constexpr auto clear_impl(T* p) -> IsImplComfortable { template constexpr auto make_impl(P&&... params) -> IsImplUncomfortable { - return new T { std::forward

(params)... }; + return mem::alloc(std::forward

(params)...); } template constexpr auto clear_impl(T* p) -> IsImplUncomfortable { - delete p; + mem::free(p); } template