diff --git a/src/ipc.cpp b/src/ipc.cpp index f4c5fa8..a9916ce 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -20,6 +20,7 @@ #include "libipc/utility/log.h" #include "libipc/utility/id_pool.h" +#include "libipc/utility/scope_guard.h" #include "libipc/utility/utility.h" #include "libipc/memory/resource.h" @@ -39,7 +40,7 @@ struct msg_t; template struct msg_t<0, AlignSize> { - msg_id_t conn_; + msg_id_t cc_id_; msg_id_t id_; std::int32_t remain_; bool storage_; @@ -50,8 +51,8 @@ struct msg_t : msg_t<0, AlignSize> { std::aligned_storage_t data_ {}; msg_t() = default; - msg_t(msg_id_t conn, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) - : msg_t<0, AlignSize> {conn, id, remain, (data == nullptr) || (size == 0)} { + msg_t(msg_id_t cc_id, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) + : msg_t<0, AlignSize> {cc_id, id, remain, (data == nullptr) || (size == 0)} { if (this->storage_) { if (data != nullptr) { // copy storage-id @@ -96,9 +97,21 @@ IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept { } IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { - return ipc::make_align(alignof(std::max_align_t), align_chunk_size(size)); + return ipc::make_align(alignof(std::max_align_t), align_chunk_size( + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)) + size)); } +struct chunk_t { + std::atomic &conns() noexcept { + return *reinterpret_cast *>(this); + } + + void *data() noexcept { + return reinterpret_cast(this) + + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)); + } +}; + struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; @@ -107,13 +120,13 @@ struct chunk_info_t { return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t* chunks_mem() noexcept { - return reinterpret_cast(this + 1); + ipc::byte_t *chunks_mem() noexcept { + return reinterpret_cast(this + 1); } - ipc::byte_t* at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + chunk_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { if (id < 0) return nullptr; - return chunks_mem() + (chunk_size * id); + return reinterpret_cast(chunks_mem() + (chunk_size * id)); } }; @@ -145,7 +158,7 @@ chunk_info_t *chunk_storage_info(std::size_t chunk_size) { return chunk_storages()[chunk_size].get_info(chunk_size); } -std::pair acquire_storage(std::size_t size) { +std::pair acquire_storage(std::size_t size, ipc::circ::cc_t conns) { std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; @@ -156,7 +169,10 @@ std::pair acquire_storage(std::size_t size) { auto id = info->pool_.acquire(); info->lock_.unlock(); - return { id, info->at(chunk_size, id) }; + auto chunk = info->at(chunk_size, id); + if (chunk == nullptr) return {}; + chunk->conns().store(conns, std::memory_order_relaxed); + return { id, chunk->data() }; } void *find_storage(ipc::storage_id_t id, std::size_t size) { @@ -167,7 +183,7 @@ void *find_storage(ipc::storage_id_t id, std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return nullptr; - return info->at(chunk_size, id); + return info->at(chunk_size, id)->data(); } void release_storage(ipc::storage_id_t id, std::size_t size) { @@ -183,13 +199,53 @@ void release_storage(ipc::storage_id_t id, std::size_t size) { info->lock_.unlock(); } +template +bool sub_rc(ipc::wr, + std::atomic &/*conns*/, ipc::circ::cc_t /*curr_conns*/, ipc::circ::cc_t /*conn_id*/) noexcept { + return true; +} + +template +bool sub_rc(ipc::wr, + std::atomic &conns, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) noexcept { + auto last_conns = curr_conns & ~conn_id; + for (unsigned k = 0;;) { + auto chunk_conns = conns.load(std::memory_order_acquire); + if (conns.compare_exchange_weak(chunk_conns, chunk_conns & last_conns, std::memory_order_release)) { + return (chunk_conns & last_conns) == 0; + } + ipc::yield(k); + } +} + +template +void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) { + if (id < 0) { + ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); + return; + } + std::size_t chunk_size = calc_chunk_size(size); + auto info = chunk_storage_info(chunk_size); + if (info == nullptr) return; + + auto chunk = info->at(chunk_size, id); + if (chunk == nullptr) return; + + if (!sub_rc(Flag{}, chunk->conns(), curr_conns, conn_id)) { + return; + } + info->lock_.lock(); + info->pool_.release(id); + info->lock_.unlock(); +} + template -bool recycle_message(void* p) { +bool clear_message(void* p) { auto msg = static_cast(p); if (msg->storage_) { std::int32_t r_size = static_cast(ipc::data_length) + msg->remain_; if (r_size <= 0) { - ipc::error("[recycle_message] invalid msg size: %d\n", (int)r_size); + ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size); return true; } release_storage( @@ -278,8 +334,10 @@ struct queue_generator { template struct detail_impl { -using queue_t = typename queue_generator::queue_t; -using conn_info_t = typename queue_generator::conn_info_t; +using policy_t = Policy; +using flag_t = typename policy_t::flag_t; +using queue_t = typename queue_generator::queue_t; +using conn_info_t = typename queue_generator::conn_info_t; constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept { return static_cast(h); @@ -373,7 +431,8 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s ipc::error("fail: send, que->ready_sending() == false\n"); return false; } - if (que->elems()->connections(std::memory_order_relaxed) == 0) { + ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed); + if (conns == 0) { ipc::error("fail: send, there is no receiver on this connection.\n"); return false; } @@ -386,7 +445,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = acquire_storage(size); + auto dat = acquire_storage(size, conns); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -426,7 +485,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); if (!que->force_push( - recycle_message, + clear_message, info->cc_id_, msg_id, remain, data, size)) { return false; } @@ -467,15 +526,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { for (;;) { // pop a new message typename queue_t::value_t msg; - bool recycled = false; - if (!wait_for(info_of(h)->rd_waiter_, [que, &msg, &recycled] { - return !que->pop(msg, [&recycled](bool r) { recycled = r; }); + if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { + return !que->pop(msg); }, tm)) { // pop failed, just return. return {}; } info_of(h)->wt_waiter_.broadcast(); - if ((info_of(h)->acc() != nullptr) && (msg.conn_ == info_of(h)->cc_id_)) { + if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length @@ -490,12 +548,24 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { ipc::storage_id_t buf_id = *reinterpret_cast(&msg.data_); void* buf = find_storage(buf_id, msg_size); if (buf != nullptr) { - if (recycled) { - return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) { - release_storage(ipc::detail::horrible_cast(pmid) - 1, size); - }, ipc::detail::horrible_cast(buf_id + 1)}; - } else { + struct recycle_t { + ipc::storage_id_t storage_id; + ipc::circ::cc_t curr_conns; + ipc::circ::cc_t conn_id; + } *r_info = ipc::mem::alloc(recycle_t{ + buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id() + }); + if (r_info == nullptr) { + ipc::log("fail: ipc::mem::alloc.\n"); return ipc::buff_t{buf, msg_size}; // no recycle + } else { + return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) { + auto r_info = static_cast(p_info); + IPC_UNUSED_ auto finally = ipc::guard([r_info] { + ipc::mem::free(r_info); + }); + recycle_storage(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id); + }, r_info}; } } else { ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); diff --git a/src/libipc/circ/elem_def.h b/src/libipc/circ/elem_def.h index f2379e2..4003948 100755 --- a/src/libipc/circ/elem_def.h +++ b/src/libipc/circ/elem_def.h @@ -16,7 +16,7 @@ namespace circ { using u1_t = ipc::uint_t<8>; using u2_t = ipc::uint_t<32>; -/** only supports max 32 connections */ +/** only supports max 32 connections in broadcast mode */ using cc_t = u2_t; constexpr u1_t index_of(u2_t c) noexcept { diff --git a/src/libipc/policy.h b/src/libipc/policy.h index d2ad313..8959607 100755 --- a/src/libipc/policy.h +++ b/src/libipc/policy.h @@ -15,8 +15,10 @@ struct choose; template struct choose { + using flag_t = Flag; + template - using elems_t = circ::elem_array, DataSize, AlignSize>; + using elems_t = circ::elem_array, DataSize, AlignSize>; }; } // namespace policy diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 45590e3..16c076c 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -170,7 +170,7 @@ TEST(IPC, 1vN) { //test_sr("smu", 1, MultiMax); //test_sr("mmu", 1, MultiMax); test_sr("smb", 1, MultiMax); - //test_sr("mmb", 1, MultiMax); + test_sr("mmb", 1, MultiMax); } TEST(IPC, Nv1) {