From 7dd993767026d4b61b9f6b51ee7b8a2b82069401 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jun 2021 18:03:17 +0800 Subject: [PATCH] try to adjust recycling strategy for large message cache --- src/ipc.cpp | 72 +++++++++++++++++++++--------------- src/libipc/circ/elem_array.h | 6 +-- src/libipc/prod_cons.h | 48 +++++++++++++----------- src/libipc/queue.h | 13 +++++-- 4 files changed, 82 insertions(+), 57 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index ae6f270..b3b0921 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -32,8 +32,8 @@ namespace { -using msg_id_t = std::uint32_t; -using acc_t = std::atomic; +using msg_id_t = std::uint32_t; +using acc_t = std::atomic; template struct msg_t; @@ -92,6 +92,14 @@ auto cc_acc() { return static_cast(acc_h.get()); } +IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept { + return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align; +} + +IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { + return ipc::make_align(alignof(std::max_align_t), align_chunk_size(size)); +} + struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; @@ -100,9 +108,13 @@ struct chunk_info_t { return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { - if (id < 0) return nullptr; - return reinterpret_cast(this + 1) + (chunk_size * id); + ipc::byte_t* chunks_mem() noexcept { + return reinterpret_cast(this + 1); + } + + ipc::byte_t* at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + assert(id >= 0); + return chunks_mem() + (chunk_size * id); } }; @@ -130,16 +142,11 @@ auto& chunk_storages() { return chunk_s; } -IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { - return ipc::make_align(alignof(std::max_align_t), - (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align); -} - chunk_info_t *chunk_storage_info(std::size_t chunk_size) { return chunk_storages()[chunk_size].get_info(chunk_size); } -std::pair apply_storage(std::size_t size) { +std::pair acquire_storage(std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; @@ -166,14 +173,12 @@ void *find_storage(ipc::storage_id_t id, std::size_t size) { void release_storage(ipc::storage_id_t id, std::size_t size) { if (id < 0) { - ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); + ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } - std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; - info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); @@ -394,7 +399,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = apply_storage(size); + auto dat = acquire_storage(size); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -429,7 +434,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push( - recycle_message, + [](void*) { return true; }, info->cc_id_, msg_id, remain, data, size); }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); @@ -450,7 +455,7 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push( - recycle_message, + [](void*) { return true; }, info->cc_id_, msg_id, remain, data, size); }, tm)) { return false; @@ -475,7 +480,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { for (;;) { // pop a new message typename queue_t::value_t msg; - if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { + bool recycled = false; + if (!wait_for(info_of(h)->rd_waiter_, [que, &msg, &recycled] { + return !que->pop(msg, [&recycled](bool r) { recycled = r; }); + }, tm)) { // pop failed, just return. return {}; } @@ -490,23 +498,29 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { return {}; } std::size_t msg_size = static_cast(r_size); + // large message + if (msg.storage_) { + ipc::storage_id_t buf_id = *reinterpret_cast(&msg.data_); + void* buf = find_storage(buf_id, msg_size); + if (buf != nullptr) { + if (recycled) { + return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) { + release_storage(reinterpret_cast(pmid) - 1, size); + }, reinterpret_cast(buf_id + 1)}; + } else { + return ipc::buff_t{buf, msg_size}; // no recycle + } + } else { + ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); + continue; + } + } // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { if (msg_size <= ipc::data_length) { return make_cache(msg.data_, msg_size); } - if (msg.storage_) { - std::size_t buf_id = *reinterpret_cast(&msg.data_); - void * buf = find_storage(buf_id, msg_size); - if (buf != nullptr) { - return ipc::buff_t{buf, msg_size}; - } - else { - ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); - continue; - } - } // gc if (rc.size() > 1024) { std::vector need_del; diff --git a/src/libipc/circ/elem_array.h b/src/libipc/circ/elem_array.h index 74f031a..0b21f48 100755 --- a/src/libipc/circ/elem_array.h +++ b/src/libipc/circ/elem_array.h @@ -130,10 +130,10 @@ public: return head_.force_push(que, std::forward(f), block_); } - template - bool pop(Q* que, cursor_t* cur, F&& f) { + template + bool pop(Q* que, cursor_t* cur, F&& f, R&& out) { if (cur == nullptr) return false; - return head_.pop(que, *cur, std::forward(f), block_); + return head_.pop(que, *cur, std::forward(f), std::forward(out), block_); } }; diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index d5684fa..28d99bd 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -58,13 +58,14 @@ struct prod_cons_impl> { return false; } - template - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } std::forward(f)(&(elems[cur_rd].data_)); + std::forward(out)(true); rd_.fetch_add(1, std::memory_order_release); return true; } @@ -80,8 +81,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -92,6 +94,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -156,8 +159,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -179,6 +183,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -263,20 +268,20 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) { if (cur == cursor()) return false; // acquire auto* el = elems + circ::index_of(cur++); std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & ep_mask; - if (rem_cc == 0) { + if ((cur_rc & ep_mask) == 0) { + std::forward(out)(true); return true; } - if (el->rc_.compare_exchange_weak(cur_rc, - cur_rc & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + auto nxt_rc = cur_rc & ~static_cast(wrapper->connected_id()); + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)((nxt_rc & ep_mask) == 0); return true; } ipc::yield(k); @@ -395,8 +400,8 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) { auto* el = elems + circ::index_of(cur); auto cur_fl = el->f_ct_.load(std::memory_order_acquire); if (cur_fl != ~static_cast(cur)) { @@ -406,17 +411,18 @@ struct prod_cons_impl> { std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & rc_mask; - if (rem_cc == 0) { + if ((cur_rc & rc_mask) == 0) { + std::forward(out)(true); el->f_ct_.store(cur + N - 1, std::memory_order_release); return true; } - if ((rem_cc & ~wrapper->connected_id()) == 0) { + auto nxt_rc = inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()); + bool last_one = false; + if ((last_one = (nxt_rc & rc_mask) == 0)) { el->f_ct_.store(cur + N - 1, std::memory_order_release); } - if (el->rc_.compare_exchange_weak(cur_rc, - inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)(last_one); return true; } ipc::yield(k); diff --git a/src/libipc/queue.h b/src/libipc/queue.h index 07aac57..1a782a9 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -171,14 +171,14 @@ public: }); } - template - bool pop(T& item) { + template + bool pop(T& item, F&& out) { if (elems_ == nullptr) { return false; } return elems_->pop(this, &(this->cursor_), [&item](void* p) { ::new (&item) T(std::move(*static_cast(p))); - }); + }, std::forward(out)); } }; @@ -204,7 +204,12 @@ public: } bool pop(T& item) { - return base_t::pop(item); + return base_t::pop(item, [](bool) {}); + } + + template + bool pop(T& item, F&& out) { + return base_t::pop(item, std::forward(out)); } };