From d4bf94c2a36e1fb1df96c1136dca5d7e9ac4e852 Mon Sep 17 00:00:00 2001 From: zhangyi Date: Wed, 23 Oct 2019 16:23:07 +0800 Subject: [PATCH] use big message cache --- include/def.h | 2 +- src/id_pool.h | 104 +++++++++++++++++++++++++ src/ipc.cpp | 194 +++++++++++++++++++++++++++++----------------- test/test_ipc.cpp | 5 ++ 4 files changed, 232 insertions(+), 73 deletions(-) create mode 100644 src/id_pool.h diff --git a/include/def.h b/include/def.h index 0360ecf..2658ca4 100644 --- a/include/def.h +++ b/include/def.h @@ -28,7 +28,7 @@ using uint_t = typename uint::type; enum : std::size_t { invalid_value = (std::numeric_limits::max)(), data_length = 64, - small_msg_limit = data_length * 64 - 1, // 4095 + small_msg_limit = data_length, default_timeut = 100 // ms }; diff --git a/src/id_pool.h b/src/id_pool.h new file mode 100644 index 0000000..2801877 --- /dev/null +++ b/src/id_pool.h @@ -0,0 +1,104 @@ +#pragma once + +#include // std::aligned_storage_t +#include // std::memcmp + +#include "def.h" + +#include "platform/detail.h" + +namespace ipc { + +template +struct id_type { + uint_t<8> id_; + std::aligned_storage_t data_; + + id_type& operator=(uint_t<8> val) { + id_ = val; + return (*this); + } + + operator uint_t<8>() const { + return id_; + } +}; + +template +struct id_type<0, AlignSize> { + uint_t<8> id_; + + id_type& operator=(uint_t<8> val) { + id_ = val; + return (*this); + } + + operator uint_t<8>() const { + return id_; + } +}; + +template +class id_pool { +public: + enum : std::size_t { + max_count = (std::numeric_limits>::max)() // 255 + }; + +private: + id_type next_[max_count]; + uint_t<8> cursor_ = 0; + bool prepared_ = false; + +public: + void prepare() { + if (!prepared_ && this->invalid()) this->init(); + prepared_ = true; + } + + void init() { + for (std::size_t i = 0; i < max_count;) { + i = next_[i] = static_cast>(i + 1); + } + } + + bool invalid() const { + static id_pool inv; + return std::memcmp(this, &inv, sizeof(id_pool)) == 0; + } + + bool empty() const { + return cursor_ == max_count; + } + + std::size_t acquire() { + if (empty()) { + return invalid_value; + } + std::size_t id = cursor_; + cursor_ = next_[id]; // point to next + return id; + } + + bool release(std::size_t id) { + if (id == invalid_value) return false; + next_[id] = cursor_; + cursor_ = static_cast>(id); // put it back + return true; + } + + void * at(std::size_t id) { return &(next_[id].data_); } + void const * at(std::size_t id) const { return &(next_[id].data_); } +}; + +template +class obj_pool : public id_pool { + using base_t = id_pool; + +public: + T * at(std::size_t id) { return reinterpret_cast(base_t::at(id)); } + T const * at(std::size_t id) const { return reinterpret_cast(base_t::at(id)); } +}; + +} // namespace ipc diff --git a/src/ipc.cpp b/src/ipc.cpp index 14c7329..ec0138b 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -3,11 +3,12 @@ #include #include #include -#include +#include // std::pair, std::move, std::forward #include -#include // aligned_storage_t +#include // aligned_storage_t #include #include +#include #include "def.h" #include "shm.h" @@ -17,6 +18,7 @@ #include "policy.h" #include "rw_lock.h" #include "log.h" +#include "id_pool.h" #include "memory/resource.h" @@ -28,7 +30,9 @@ namespace { using namespace ipc; + using msg_id_t = std::uint32_t; +using acc_t = std::atomic; template struct msg_t; @@ -48,9 +52,13 @@ struct msg_t : msg_t<0, AlignSize> { msg_t() = default; msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s) : msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } { - if (!this->storage_) { - std::memcpy(&data_, d, s); + if (this->storage_) { + if (d != nullptr) { + // copy storage-id + *reinterpret_cast(&data_) = *static_cast(d); + } } + else std::memcpy(&data_, d, s); } }; @@ -77,13 +85,107 @@ struct cache_t { } }; -struct conn_info_head { - using acc_t = std::atomic; +auto cc_acc() { + static shm::handle acc_h("__CA_CONN__", sizeof(acc_t)); + return static_cast(acc_h.get()); +} - static auto cc_acc() { - static shm::handle acc_h("__CA_CONN__", sizeof(acc_t)); - return static_cast(acc_h.get()); +auto& cls_storages() { + struct cls_t { + shm::handle id_info_; + std::array::max_count> mems_; + }; + static ipc::unordered_map cls_s; + return cls_s; +} + +auto& cls_lock() { + static spin_lock cls_l; + return cls_l; +} + +struct cls_info_t { + id_pool<> pool_; + spin_lock lock_; +}; + +constexpr std::size_t calc_cls_size(std::size_t size) noexcept { + return (((size - 1) / small_msg_limit) + 1) * small_msg_limit; +} + +std::pair apply_storage(std::size_t size) { + std::size_t cls_size = calc_cls_size(size); + + cls_lock().lock(); + auto& cls_shm = cls_storages()[cls_size]; + cls_lock().unlock(); + + if (!cls_shm.id_info_.valid() && + !cls_shm.id_info_.acquire(("__CLS_INFO__" + ipc::to_string(cls_size)).c_str(), sizeof(cls_info_t))) { + return {}; } + auto info = static_cast(cls_shm.id_info_.get()); + if (info == nullptr) { + return {}; + } + + info->lock_.lock(); + info->pool_.prepare(); + // got an unique id + auto id = info->pool_.acquire(); + info->lock_.unlock(); + + if (id == invalid_value) { + return {}; + } + if (!cls_shm.mems_[id].valid() && + !cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) + + "__" + ipc::to_string(id)).c_str(), cls_size)) { + return {}; + } + return { id, cls_shm.mems_[id].get() }; +} + +void* find_storage(msg_id_t id, std::size_t size) { + std::size_t cls_size = calc_cls_size(size); + + cls_lock().lock(); + auto& cls_shm = cls_storages()[cls_size]; + cls_lock().unlock(); + + if (id == invalid_value) { + return nullptr; + } + if (!cls_shm.mems_[id].valid() && + !cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) + + "__" + ipc::to_string(id)).c_str(), cls_size)) { + return nullptr; + } + return cls_shm.mems_[id].get(); +} + +void recycle_storage(msg_id_t id, std::size_t size) { + std::size_t cls_size = calc_cls_size(size); + + cls_lock().lock(); + auto& cls_shm = cls_storages()[cls_size]; + cls_lock().unlock(); + + if (!cls_shm.id_info_.valid() && + !cls_shm.id_info_.acquire(("__CLS_INFO__" + ipc::to_string(cls_size)).c_str(), sizeof(cls_info_t))) { + return; + } + auto info = static_cast(cls_shm.id_info_.get()); + if (info == nullptr) { + return; + } + + info->lock_.lock(); + info->pool_.release(id); + info->lock_.unlock(); +} + +struct conn_info_head { ipc::string name_; msg_id_t cc_id_; // connection-info id @@ -103,27 +205,6 @@ struct conn_info_head { */ tls::pointer> recv_cache_; - struct simple_push { - - template - using elem_t = shm::id_t; - - circ::u2_t wt_; // write index - - constexpr circ::u2_t cursor() const noexcept { - return 0; - } - - template - bool push(W* /*wrapper*/, F&& f, E* elems) { - std::forward(f)(&(elems[circ::index_of(wt_)])); - ++ wt_; - return true; - } - }; - - circ::elem_array msg_datas_; - conn_info_head(char const * name) : name_ (name) , cc_id_ ((cc_acc() == nullptr) ? 0 : cc_acc()->fetch_add(1, std::memory_order_relaxed)) @@ -140,32 +221,6 @@ struct conn_info_head { auto& recv_cache() { return *recv_cache_.create(); } - - shm::id_t apply_storage(msg_id_t msg_id, std::size_t size) { - return shm::acquire( - ("__ST_CONN__" + ipc::to_string(cc_id_) + - "__" + ipc::to_string(msg_id)).c_str(), size, shm::create); - } - - static shm::id_t acquire_storage(msg_id_t cc_id, msg_id_t msg_id) { - return shm::acquire( - ("__ST_CONN__" + ipc::to_string(cc_id) + - "__" + ipc::to_string(msg_id)).c_str(), 0, shm::open); - } - - void store(shm::id_t dat) { - msg_datas_.push([dat](shm::id_t * id) { - (*id) = dat; - }); - } - - void clear_store() { - msg_datas_.push([](shm::id_t * id) { - if (*id == nullptr) return; - shm::remove(*id); - (*id) = nullptr; - }); - } }; template @@ -284,15 +339,14 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > small_msg_limit) { - auto dat = info_of(h)->apply_storage(msg_id, size); - void * buf = shm::get_mem(dat, nullptr); + auto dat = apply_storage(size); + void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); - info_of(h)->store(dat); - return try_push(static_cast(size) - static_cast(data_length), nullptr, 0); + return try_push(static_cast(size) - static_cast(data_length), &(dat.first), 0); } // try using message fragment - ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); + // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment int offset = 0; @@ -301,7 +355,6 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s static_cast(data) + offset, data_length)) { return false; } - info_of(h)->clear_store(); } // if remain > 0, this is the last message fragment int remain = static_cast(size) - offset; @@ -310,7 +363,6 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s static_cast(data) + offset, static_cast(remain))) { return false; } - info_of(h)->clear_store(); } return true; } @@ -375,16 +427,14 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { return make_cache(msg.data_, remain); } if (msg.storage_) { - auto dat = info_of(h)->acquire_storage(msg.conn_, msg.id_); - std::size_t dat_sz = 0; - void * buf = shm::get_mem(dat, &dat_sz); - if (buf != nullptr && remain <= dat_sz) { - return buff_t { buf, remain, [](void * dat, std::size_t) { - shm::release(dat); - }, dat }; + std::size_t buf_id = *reinterpret_cast(&msg.data_); + void * buf = find_storage(buf_id, remain); + if (buf != nullptr) { + return buff_t { buf, remain, [](void* ptr, std::size_t size) { + recycle_storage(reinterpret_cast(ptr) - 1, size); + }, reinterpret_cast(buf_id + 1) }; } - else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd, shm.size: %zd\n", - msg.id_, remain, dat_sz); + else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg.id_, remain); } // gc if (rc.size() > 1024) { diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 5fbe8d1..65d94a2 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -353,6 +353,7 @@ void Unit::test_route() { } void Unit::test_route_rtt() { + // return; test_stopwatch sw; std::thread t1 {[&] { @@ -392,6 +393,7 @@ void Unit::test_route_rtt() { } void Unit::test_route_performance() { + // return; ipc::detail::static_for<8>([](auto index) { test_prod_cons(); }); @@ -399,6 +401,7 @@ void Unit::test_route_performance() { } void Unit::test_channel() { + // return; std::thread t1 {[&] { ipc::channel cc { "my-ipc-channel" }; for (std::size_t i = 0;; ++i) { @@ -423,6 +426,7 @@ void Unit::test_channel() { } void Unit::test_channel_rtt() { + // return; test_stopwatch sw; std::thread t1 {[&] { @@ -465,6 +469,7 @@ void Unit::test_channel_rtt() { } void Unit::test_channel_performance() { + // return; ipc::detail::static_for<8>([](auto index) { test_prod_cons(); });