From bbd241948b8bdedf26fd72bc46092e2f451435fb Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 26 Jun 2021 12:46:55 +0800 Subject: [PATCH] fix: large message cache may not be recycled with multiple receivers --- demo/msg_que/main.cpp | 2 +- src/ipc.cpp | 104 ++++++++++++------------------------------ src/libipc/queue.h | 6 +-- test/test_queue.cpp | 2 +- 4 files changed, 33 insertions(+), 81 deletions(-) diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index ea60f39..5635101 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -23,7 +23,7 @@ constexpr std::size_t const max_sz = 1024 * 16; std::atomic is_quit__{ false }; std::atomic size_counter__{ 0 }; -using msg_que_t = ipc::chan; +using msg_que_t = ipc::chan; msg_que_t que__{ name__ }; ipc::byte_t buff__[max_sz]; diff --git a/src/ipc.cpp b/src/ipc.cpp index 12a629b..9835d50 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -96,7 +96,7 @@ struct chunk_info_t { ipc::spin_lock lock_; IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept { - return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t)); + return ipc::make_align(alignof(std::max_align_t), chunk_size); } IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { @@ -147,9 +147,7 @@ auto& chunk_storage(std::size_t chunk_size) { return chunk_storages()[chunk_size]; } -std::pair apply_storage(std::size_t conn_count, std::size_t size) { - if (conn_count == 0) return {}; - +std::pair apply_storage(std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); auto & chunk_shm = chunk_storage(chunk_size); @@ -162,90 +160,48 @@ std::pair apply_storage(std::size_t conn_count, std::size_t auto id = info->pool_.acquire(); info->lock_.unlock(); - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return {}; - reinterpret_cast(ptr + chunk_size)->store(static_cast(conn_count), std::memory_order_release); - return { id, ptr }; + return { id, info->at(chunk_size, id) }; } void *find_storage(std::size_t id, std::size_t size) { if (id == ipc::invalid_value) { - ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size); + ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return nullptr; } - std::size_t chunk_size = calc_chunk_size(size); auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); if (info == nullptr) return nullptr; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return nullptr; - if (reinterpret_cast(ptr + chunk_size)->load(std::memory_order_acquire) == 0) { - ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size); - return nullptr; - } - return ptr; -} - -void recycle_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { - ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size); - return; - } - - std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); - if (info == nullptr) return; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) { - ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size); - return; - } - if (reinterpret_cast(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) { - // not the last receiver, just return - return; - } - - info->lock_.lock(); - info->pool_.release(id); - info->lock_.unlock(); + return info->at(chunk_size, id); } void clear_storage(std::size_t id, std::size_t size) { if (id == ipc::invalid_value) { - ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size); + ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } std::size_t chunk_size = calc_chunk_size(size); auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); if (info == nullptr) return; - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return; - - auto cc_flag = reinterpret_cast(ptr + chunk_size); - for (unsigned k = 0;;) { - auto cc_curr = cc_flag->load(std::memory_order_acquire); - if (cc_curr == 0) return; // means this id has been cleared - if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) { - break; - } - ipc::yield(k); - } - info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); } +template +bool recycle_message(void* p) { + auto msg = static_cast(p); + if (msg->storage_) { + clear_storage( + *reinterpret_cast(&msg->data_), + static_cast(ipc::data_length) + msg->remain_); + } + return true; +} + struct conn_info_head { ipc::string name_; @@ -445,7 +401,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = apply_storage(que->conn_count(), size); + auto dat = apply_storage(size); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -479,18 +435,14 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + recycle_message, + info->cc_id_, msg_id, remain, data, size); }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push([](void* p) { - auto tmp_msg = static_cast(p); - if (tmp_msg->storage_) { - clear_storage( - *reinterpret_cast(&tmp_msg->data_), - static_cast(ipc::data_length) + tmp_msg->remain_); - } - return true; - }, info->cc_id_, msg_id, remain, data, size)) { + if (!que->force_push( + recycle_message, + info->cc_id_, msg_id, remain, data, size)) { return false; } } @@ -504,7 +456,9 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + recycle_message, + info->cc_id_, msg_id, remain, data, size); }, tm)) { return false; } @@ -548,9 +502,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { std::size_t buf_id = *reinterpret_cast(&msg.data_); void * buf = find_storage(buf_id, remain); if (buf != nullptr) { - return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) { - recycle_storage(reinterpret_cast(ptr) - 1, size); - }, reinterpret_cast(buf_id + 1) }; + return ipc::buff_t{buf, remain}; } else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); } diff --git a/src/libipc/queue.h b/src/libipc/queue.h index f04febf..07aac57 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -155,11 +155,11 @@ public: return !valid() || (cursor_ == elems_->cursor()); } - template - bool push(P&&... params) { + template + bool push(F&& prep, P&&... params) { if (elems_ == nullptr) return false; return elems_->push(this, [&](void* p) { - ::new (p) T(std::forward

(params)...); + if (prep(p)) ::new (p) T(std::forward

(params)...); }); } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index c1114ef..a59b3a7 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -44,7 +44,7 @@ constexpr int ThreadMax = 8; template void push(Que & que, int p, int d) { - for (int n = 0; !que.push(p, d); ++n) { + for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) { ASSERT_NE(n, PushRetry); std::this_thread::yield(); }