diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index ea60f39..5635101 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -23,7 +23,7 @@ constexpr std::size_t const max_sz = 1024 * 16; std::atomic is_quit__{ false }; std::atomic size_counter__{ 0 }; -using msg_que_t = ipc::chan; +using msg_que_t = ipc::chan; msg_que_t que__{ name__ }; ipc::byte_t buff__[max_sz]; diff --git a/include/libipc/def.h b/include/libipc/def.h index 9be5206..2b80b81 100755 --- a/include/libipc/def.h +++ b/include/libipc/def.h @@ -29,7 +29,7 @@ enum : std::size_t { invalid_value = (std::numeric_limits::max)(), data_length = 64, large_msg_limit = data_length, - large_msg_align = 512, + large_msg_align = 1024, large_msg_cache = 32, default_timeout = 100 // ms }; diff --git a/src/ipc.cpp b/src/ipc.cpp index 8b18dde..dcf60b3 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -20,6 +20,7 @@ #include "libipc/utility/log.h" #include "libipc/utility/id_pool.h" +#include "libipc/utility/scope_guard.h" #include "libipc/utility/utility.h" #include "libipc/memory/resource.h" @@ -39,7 +40,7 @@ struct msg_t; template struct msg_t<0, AlignSize> { - msg_id_t conn_; + msg_id_t cc_id_; msg_id_t id_; std::int32_t remain_; bool storage_; @@ -50,15 +51,16 @@ struct msg_t : msg_t<0, AlignSize> { std::aligned_storage_t data_ {}; msg_t() = default; - msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s) - : msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } { + msg_t(msg_id_t cc_id, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) + : msg_t<0, AlignSize> {cc_id, id, remain, (data == nullptr) || (size == 0)} { if (this->storage_) { - if (d != nullptr) { + if (data != nullptr) { // copy storage-id - *reinterpret_cast(&data_) = *static_cast(d); + *reinterpret_cast(&data_) = + *static_cast(data); } } - else std::memcpy(&data_, d, s); + else std::memcpy(&data_, data, size); } }; @@ -90,26 +92,46 @@ auto cc_acc() { return static_cast(acc_h.get()); } +IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept { + return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align; +} + +IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { + return ipc::make_align(alignof(std::max_align_t), align_chunk_size( + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)) + size)); +} + +struct chunk_t { + std::atomic &conns() noexcept { + return *reinterpret_cast *>(this); + } + + void *data() noexcept { + return reinterpret_cast(this) + + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)); + } +}; + struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; - IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept { - return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t)); - } - IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { - return ipc::id_pool<>::max_count * chunks_elem_size(chunk_size); + return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t *at(std::size_t chunk_size, std::size_t id) noexcept { - if (id == ipc::invalid_value) return nullptr; - return reinterpret_cast(this + 1) + (chunks_elem_size(chunk_size) * id); + ipc::byte_t *chunks_mem() noexcept { + return reinterpret_cast(this + 1); + } + + chunk_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + if (id < 0) return nullptr; + return reinterpret_cast(chunks_mem() + (chunk_size * id)); } }; auto& chunk_storages() { - class chunk_t { + class chunk_handle_t { ipc::shm::handle handle_; public: @@ -128,31 +150,29 @@ auto& chunk_storages() { return info; } }; - static ipc::unordered_map chunk_s; - return chunk_s; + static ipc::map chunk_hs; + return chunk_hs; } -auto& chunk_lock() { - static ipc::spin_lock chunk_l; - return chunk_l; +chunk_info_t *chunk_storage_info(std::size_t chunk_size) { + auto &storages = chunk_storages(); + std::decay_t::iterator it; + { + static ipc::rw_lock lock; + IPC_UNUSED_ std::shared_lock guard {lock}; + if ((it = storages.find(chunk_size)) == storages.end()) { + using chunk_handle_t = std::decay_t::value_type::second_type; + guard.unlock(); + IPC_UNUSED_ std::lock_guard guard {lock}; + it = storages.emplace(chunk_size, chunk_handle_t{}).first; + } + } + return it->second.get_info(chunk_size); } -constexpr std::size_t calc_chunk_size(std::size_t size) noexcept { - return ( ((size - 1) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align; -} - -auto& chunk_storage(std::size_t chunk_size) { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(chunk_lock()); - return chunk_storages()[chunk_size]; -} - -std::pair apply_storage(std::size_t conn_count, std::size_t size) { - if (conn_count == 0) return {}; - +std::pair acquire_storage(std::size_t size, ipc::circ::cc_t conns) { std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; info->lock_.lock(); @@ -161,90 +181,92 @@ std::pair apply_storage(std::size_t conn_count, std::size_t auto id = info->pool_.acquire(); info->lock_.unlock(); - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return {}; - reinterpret_cast(ptr + chunk_size)->store(static_cast(conn_count), std::memory_order_release); - return { id, ptr }; + auto chunk = info->at(chunk_size, id); + if (chunk == nullptr) return {}; + chunk->conns().store(conns, std::memory_order_relaxed); + return { id, chunk->data() }; } -void *find_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { - ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size); +void *find_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { + ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return nullptr; } - std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return nullptr; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return nullptr; - if (reinterpret_cast(ptr + chunk_size)->load(std::memory_order_acquire) == 0) { - ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size); - return nullptr; - } - return ptr; + return info->at(chunk_size, id)->data(); } -void recycle_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { - ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size); +void release_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { + ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } - std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) { - ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size); - return; - } - if (reinterpret_cast(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) { - // not the last receiver, just return - return; - } - info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); } -void clear_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { - ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size); - return; - } +template +bool sub_rc(ipc::wr, + std::atomic &/*conns*/, ipc::circ::cc_t /*curr_conns*/, ipc::circ::cc_t /*conn_id*/) noexcept { + return true; +} - std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); - if (info == nullptr) return; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return; - - auto cc_flag = reinterpret_cast(ptr + chunk_size); +template +bool sub_rc(ipc::wr, + std::atomic &conns, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) noexcept { + auto last_conns = curr_conns & ~conn_id; for (unsigned k = 0;;) { - auto cc_curr = cc_flag->load(std::memory_order_acquire); - if (cc_curr == 0) return; // means this id has been cleared - if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) { - break; + auto chunk_conns = conns.load(std::memory_order_acquire); + if (conns.compare_exchange_weak(chunk_conns, chunk_conns & last_conns, std::memory_order_release)) { + return (chunk_conns & last_conns) == 0; } ipc::yield(k); } +} +template +void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) { + if (id < 0) { + ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); + return; + } + std::size_t chunk_size = calc_chunk_size(size); + auto info = chunk_storage_info(chunk_size); + if (info == nullptr) return; + + auto chunk = info->at(chunk_size, id); + if (chunk == nullptr) return; + + if (!sub_rc(Flag{}, chunk->conns(), curr_conns, conn_id)) { + return; + } info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); } +template +bool clear_message(void* p) { + auto msg = static_cast(p); + if (msg->storage_) { + std::int32_t r_size = static_cast(ipc::data_length) + msg->remain_; + if (r_size <= 0) { + ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size); + return true; + } + release_storage( + *reinterpret_cast(&msg->data_), + static_cast(r_size)); + } + return true; +} + struct conn_info_head { ipc::string name_; @@ -324,8 +346,10 @@ struct queue_generator { template struct detail_impl { -using queue_t = typename queue_generator::queue_t; -using conn_info_t = typename queue_generator::conn_info_t; +using policy_t = Policy; +using flag_t = typename policy_t::flag_t; +using queue_t = typename queue_generator::queue_t; +using conn_info_t = typename queue_generator::conn_info_t; constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept { return static_cast(h); @@ -419,7 +443,8 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s ipc::error("fail: send, que->ready_sending() == false\n"); return false; } - if (que->elems()->connections(std::memory_order_relaxed) == 0) { + ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed); + if (conns == 0) { ipc::error("fail: send, there is no receiver on this connection.\n"); return false; } @@ -432,7 +457,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = apply_storage(que->conn_count(), size); + auto dat = acquire_storage(size, conns); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -440,11 +465,11 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s static_cast(ipc::data_length), &(dat.first), 0); } // try using message fragment - // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); + //ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment std::int32_t offset = 0; - for (int i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { + for (std::int32_t i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { if (!try_push(static_cast(size) - offset - static_cast(ipc::data_length), static_cast(data) + offset, ipc::data_length)) { return false; @@ -466,18 +491,14 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + [](void*) { return true; }, + info->cc_id_, msg_id, remain, data, size); }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push([](void* p) { - auto tmp_msg = static_cast(p); - if (tmp_msg->storage_) { - clear_storage( - *reinterpret_cast(&tmp_msg->data_), - static_cast(ipc::data_length) + tmp_msg->remain_); - } - return true; - }, info->cc_id_, msg_id, remain, data, size)) { + if (!que->force_push( + clear_message, + info->cc_id_, msg_id, remain, data, size)) { return false; } } @@ -491,7 +512,9 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + [](void*) { return true; }, + info->cc_id_, msg_id, remain, data, size); }, tm)) { return false; } @@ -512,34 +535,60 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { return {}; } auto& rc = info_of(h)->recv_cache(); - while (1) { + for (;;) { // pop a new message typename queue_t::value_t msg; - if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { + if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { + return !que->pop(msg); + }, tm)) { // pop failed, just return. return {}; } info_of(h)->wt_waiter_.broadcast(); - if ((info_of(h)->acc() != nullptr) && (msg.conn_ == info_of(h)->cc_id_)) { + if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length - std::size_t remain = static_cast(ipc::data_length) + msg.remain_; + std::int32_t r_size = static_cast(ipc::data_length) + msg.remain_; + if (r_size <= 0) { + ipc::error("fail: recv, r_size = %d\n", (int)r_size); + return {}; + } + std::size_t msg_size = static_cast(r_size); + // large message + if (msg.storage_) { + ipc::storage_id_t buf_id = *reinterpret_cast(&msg.data_); + void* buf = find_storage(buf_id, msg_size); + if (buf != nullptr) { + struct recycle_t { + ipc::storage_id_t storage_id; + ipc::circ::cc_t curr_conns; + ipc::circ::cc_t conn_id; + } *r_info = ipc::mem::alloc(recycle_t{ + buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id() + }); + if (r_info == nullptr) { + ipc::log("fail: ipc::mem::alloc.\n"); + return ipc::buff_t{buf, msg_size}; // no recycle + } else { + return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) { + auto r_info = static_cast(p_info); + IPC_UNUSED_ auto finally = ipc::guard([r_info] { + ipc::mem::free(r_info); + }); + recycle_storage(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id); + }, r_info}; + } + } else { + ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); + continue; + } + } // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { - if (remain <= ipc::data_length) { - return make_cache(msg.data_, remain); - } - if (msg.storage_) { - std::size_t buf_id = *reinterpret_cast(&msg.data_); - void * buf = find_storage(buf_id, remain); - if (buf != nullptr) { - return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) { - recycle_storage(reinterpret_cast(ptr) - 1, size); - }, reinterpret_cast(buf_id + 1) }; - } - else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); + if (msg_size <= ipc::data_length) { + return make_cache(msg.data_, msg_size); } // gc if (rc.size() > 1024) { @@ -553,14 +602,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { for (auto id : need_del) rc.erase(id); } // cache the first message fragment - rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) }); + rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) }); } // has cached before this message else { auto& cac = cac_it->second; // this is the last message fragment if (msg.remain_ <= 0) { - cac.append(&(msg.data_), remain); + cac.append(&(msg.data_), msg_size); // finish this message, erase it from cache auto buff = std::move(cac.buff_); rc.erase(cac_it); diff --git a/src/libipc/circ/elem_array.h b/src/libipc/circ/elem_array.h index 74f031a..0b21f48 100755 --- a/src/libipc/circ/elem_array.h +++ b/src/libipc/circ/elem_array.h @@ -130,10 +130,10 @@ public: return head_.force_push(que, std::forward(f), block_); } - template - bool pop(Q* que, cursor_t* cur, F&& f) { + template + bool pop(Q* que, cursor_t* cur, F&& f, R&& out) { if (cur == nullptr) return false; - return head_.pop(que, *cur, std::forward(f), block_); + return head_.pop(que, *cur, std::forward(f), std::forward(out), block_); } }; diff --git a/src/libipc/circ/elem_def.h b/src/libipc/circ/elem_def.h index f2379e2..4003948 100755 --- a/src/libipc/circ/elem_def.h +++ b/src/libipc/circ/elem_def.h @@ -16,7 +16,7 @@ namespace circ { using u1_t = ipc::uint_t<8>; using u2_t = ipc::uint_t<32>; -/** only supports max 32 connections */ +/** only supports max 32 connections in broadcast mode */ using cc_t = u2_t; constexpr u1_t index_of(u2_t c) noexcept { diff --git a/src/libipc/memory/resource.h b/src/libipc/memory/resource.h index 1084e64..063e8dc 100755 --- a/src/libipc/memory/resource.h +++ b/src/libipc/memory/resource.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -49,6 +50,11 @@ using unordered_map = std::unordered_map< Key, T, std::hash, std::equal_to, ipc::mem::allocator> >; +template +using map = std::map< + Key, T, std::less, ipc::mem::allocator> +>; + template using basic_string = std::basic_string< Char, std::char_traits, ipc::mem::allocator diff --git a/src/libipc/platform/detail.h b/src/libipc/platform/detail.h index fbf539a..141de6e 100755 --- a/src/libipc/platform/detail.h +++ b/src/libipc/platform/detail.h @@ -111,5 +111,17 @@ constexpr const T& (min)(const T& a, const T& b) { #endif/*__cplusplus < 201703L*/ +template +auto horrible_cast(U rhs) noexcept + -> typename std::enable_if::value + && std::is_trivially_copyable::value, T>::type { + union { + T t; + U u; + } r = {}; + r.u = rhs; + return r.t; +} + } // namespace detail } // namespace ipc diff --git a/src/libipc/policy.h b/src/libipc/policy.h index d2ad313..8959607 100755 --- a/src/libipc/policy.h +++ b/src/libipc/policy.h @@ -15,8 +15,10 @@ struct choose; template struct choose { + using flag_t = Flag; + template - using elems_t = circ::elem_array, DataSize, AlignSize>; + using elems_t = circ::elem_array, DataSize, AlignSize>; }; } // namespace policy diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index d5684fa..28d99bd 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -58,13 +58,14 @@ struct prod_cons_impl> { return false; } - template - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } std::forward(f)(&(elems[cur_rd].data_)); + std::forward(out)(true); rd_.fetch_add(1, std::memory_order_release); return true; } @@ -80,8 +81,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -92,6 +94,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -156,8 +159,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -179,6 +183,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -263,20 +268,20 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) { if (cur == cursor()) return false; // acquire auto* el = elems + circ::index_of(cur++); std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & ep_mask; - if (rem_cc == 0) { + if ((cur_rc & ep_mask) == 0) { + std::forward(out)(true); return true; } - if (el->rc_.compare_exchange_weak(cur_rc, - cur_rc & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + auto nxt_rc = cur_rc & ~static_cast(wrapper->connected_id()); + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)((nxt_rc & ep_mask) == 0); return true; } ipc::yield(k); @@ -395,8 +400,8 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) { auto* el = elems + circ::index_of(cur); auto cur_fl = el->f_ct_.load(std::memory_order_acquire); if (cur_fl != ~static_cast(cur)) { @@ -406,17 +411,18 @@ struct prod_cons_impl> { std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & rc_mask; - if (rem_cc == 0) { + if ((cur_rc & rc_mask) == 0) { + std::forward(out)(true); el->f_ct_.store(cur + N - 1, std::memory_order_release); return true; } - if ((rem_cc & ~wrapper->connected_id()) == 0) { + auto nxt_rc = inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()); + bool last_one = false; + if ((last_one = (nxt_rc & rc_mask) == 0)) { el->f_ct_.store(cur + N - 1, std::memory_order_release); } - if (el->rc_.compare_exchange_weak(cur_rc, - inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)(last_one); return true; } ipc::yield(k); diff --git a/src/libipc/queue.h b/src/libipc/queue.h index f04febf..1a782a9 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -155,11 +155,11 @@ public: return !valid() || (cursor_ == elems_->cursor()); } - template - bool push(P&&... params) { + template + bool push(F&& prep, P&&... params) { if (elems_ == nullptr) return false; return elems_->push(this, [&](void* p) { - ::new (p) T(std::forward

(params)...); + if (prep(p)) ::new (p) T(std::forward

(params)...); }); } @@ -171,14 +171,14 @@ public: }); } - template - bool pop(T& item) { + template + bool pop(T& item, F&& out) { if (elems_ == nullptr) { return false; } return elems_->pop(this, &(this->cursor_), [&item](void* p) { ::new (&item) T(std::move(*static_cast(p))); - }); + }, std::forward(out)); } }; @@ -204,7 +204,12 @@ public: } bool pop(T& item) { - return base_t::pop(item); + return base_t::pop(item, [](bool) {}); + } + + template + bool pop(T& item, F&& out) { + return base_t::pop(item, std::forward(out)); } }; diff --git a/src/libipc/utility/id_pool.h b/src/libipc/utility/id_pool.h index 7207aa9..24d32e4 100755 --- a/src/libipc/utility/id_pool.h +++ b/src/libipc/utility/id_pool.h @@ -2,13 +2,15 @@ #include // std::aligned_storage_t #include // std::memcmp +#include #include "libipc/def.h" - #include "libipc/platform/detail.h" namespace ipc { +using storage_id_t = std::int32_t; + template struct id_type; @@ -16,7 +18,7 @@ template struct id_type<0, AlignSize> { uint_t<8> id_; - id_type& operator=(std::size_t val) { + id_type& operator=(storage_id_t val) { id_ = static_cast>(val); return (*this); } @@ -57,7 +59,7 @@ public: } void init() { - for (std::size_t i = 0; i < max_count;) { + for (storage_id_t i = 0; i < max_count;) { i = next_[i] = (i + 1); } } @@ -71,22 +73,22 @@ public: return cursor_ == max_count; } - std::size_t acquire() { - if (empty()) return invalid_value; - std::size_t id = cursor_; + storage_id_t acquire() { + if (empty()) return -1; + storage_id_t id = cursor_; cursor_ = next_[id]; // point to next return id; } - bool release(std::size_t id) { - if (id == invalid_value) return false; + bool release(storage_id_t id) { + if (id < 0) return false; next_[id] = cursor_; cursor_ = static_cast>(id); // put it back return true; } - void * at(std::size_t id) { return &(next_[id].data_); } - void const * at(std::size_t id) const { return &(next_[id].data_); } + void * at(storage_id_t id) { return &(next_[id].data_); } + void const * at(storage_id_t id) const { return &(next_[id].data_); } }; template @@ -94,8 +96,8 @@ class obj_pool : public id_pool { using base_t = id_pool; public: - T * at(std::size_t id) { return reinterpret_cast(base_t::at(id)); } - T const * at(std::size_t id) const { return reinterpret_cast(base_t::at(id)); } + T * at(storage_id_t id) { return reinterpret_cast(base_t::at(id)); } + T const * at(storage_id_t id) const { return reinterpret_cast(base_t::at(id)); } }; } // namespace ipc diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index faae13c..16c076c 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -1,6 +1,8 @@ #include #include +#include +#include #include #include "libipc/ipc.h" @@ -16,15 +18,28 @@ using namespace ipc; namespace { +constexpr int LoopCount = 10000; +constexpr int MultiMax = 8; + +struct msg_head { + int id_; +}; + class rand_buf : public buffer { public: rand_buf() { - int size = capo::random<>{1, 65536}(); + int size = capo::random<>{sizeof(msg_head), 65536}(); *this = buffer(new char[size], size, [](void * p, std::size_t) { delete [] static_cast(p); }); } + rand_buf(msg_head const &msg) { + *this = buffer(new msg_head{msg}, sizeof(msg), [](void * p, std::size_t) { + delete static_cast(p); + }); + } + rand_buf(rand_buf &&) = default; rand_buf(rand_buf const & rhs) { if (rhs.empty()) return; @@ -40,11 +55,11 @@ public: } void set_id(int k) noexcept { - *get() = static_cast(k); + get()->id_ = k; } int get_id() const noexcept { - return static_cast(*get()); + return get()->id_; } using buffer::operator=; @@ -60,64 +75,79 @@ void test_basic(char const * name) { EXPECT_FALSE(que1.try_send(test2)); que_t que2 { que1.name(), ipc::receiver }; - EXPECT_TRUE(que1.send(test1)); - EXPECT_TRUE(que1.try_send(test2)); + ASSERT_TRUE(que1.send(test1)); + ASSERT_TRUE(que1.try_send(test2)); EXPECT_EQ(que2.recv(), test1); EXPECT_EQ(que2.recv(), test2); } -template -void test_sr(char const * name, int size, int s_cnt, int r_cnt) { - using que_t = chan; +class data_set { + std::vector datas_; +public: + data_set() { + datas_.resize(LoopCount); + for (int i = 0; i < LoopCount; ++i) { + datas_[i].set_id(i); + } + } + + std::vector const &get() const noexcept { + return datas_; + } +} const data_set__; + +template > +void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().start(static_cast(s_cnt)); ipc_ut::reader().start(static_cast(r_cnt)); + + std::atomic_thread_fence(std::memory_order_seq_cst); ipc_ut::test_stopwatch sw; - std::vector tests(size); for (int k = 0; k < s_cnt; ++k) { - ipc_ut::sender() << [name, &tests, &sw, r_cnt, k] { - que_t que1 { name }; - EXPECT_TRUE(que1.wait_for_recv(r_cnt)); + ipc_ut::sender() << [name, &sw, r_cnt, k] { + Que que { name, ipc::sender }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); - for (auto & buf : tests) { - rand_buf data { buf }; - data.set_id(k); - EXPECT_TRUE(que1.send(data)); + for (int i = 0; i < (int)data_set__.get().size(); ++i) { + EXPECT_TRUE(que.send(data_set__.get()[i])); } }; } for (int k = 0; k < r_cnt; ++k) { - ipc_ut::reader() << [name, &tests, s_cnt] { - que_t que2 { name, ipc::receiver }; - std::vector cursors(s_cnt); + ipc_ut::reader() << [name] { + Que que { name, ipc::receiver }; for (;;) { - rand_buf got { que2.recv() }; + rand_buf got { que.recv() }; ASSERT_FALSE(got.empty()); - int & cur = cursors.at(got.get_id()); - ASSERT_TRUE((cur >= 0) && (cur < static_cast(tests.size()))); - rand_buf buf { tests.at(cur++) }; - buf.set_id(got.get_id()); - EXPECT_EQ(got, buf); - int n = 0; - for (; n < static_cast(cursors.size()); ++n) { - if (cursors[n] < static_cast(tests.size())) break; + int i = got.get_id(); + if (i == -1) { + return; + } + ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); + auto const &data_set = data_set__.get()[i]; + if (data_set != got) { + printf("data_set__.get()[%d] != got, size = %zd/%zd\n", + i, data_set.size(), got.size()); + EXPECT_TRUE(false); } - if (n == static_cast(cursors.size())) break; } }; } ipc_ut::sender().wait_for_done(); + Que que { name }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); + for (int k = 0; k < r_cnt; ++k) { + que.send(rand_buf{msg_head{-1}}); + } ipc_ut::reader().wait_for_done(); - sw.print_elapsed(s_cnt, r_cnt, size, name); + sw.print_elapsed(s_cnt, r_cnt, (int)data_set__.get().size(), name); } -constexpr int LoopCount = 10000; -constexpr int MultiMax = 8; - } // internal-linkage TEST(IPC, basic) { @@ -129,22 +159,26 @@ TEST(IPC, basic) { } TEST(IPC, 1v1) { - test_sr("ssu", LoopCount, 1, 1); - test_sr("smu", LoopCount, 1, 1); - test_sr("mmu", LoopCount, 1, 1); - test_sr("smb", LoopCount, 1, 1); - test_sr("mmb", LoopCount, 1, 1); + test_sr("ssu", 1, 1); + test_sr("smu", 1, 1); + test_sr("mmu", 1, 1); + test_sr("smb", 1, 1); + test_sr("mmb", 1, 1); } TEST(IPC, 1vN) { - test_sr("smb", LoopCount, 1, MultiMax); - test_sr("mmb", LoopCount, 1, MultiMax); + //test_sr("smu", 1, MultiMax); + //test_sr("mmu", 1, MultiMax); + test_sr("smb", 1, MultiMax); + test_sr("mmb", 1, MultiMax); } TEST(IPC, Nv1) { - test_sr("mmb", LoopCount, MultiMax, 1); + //test_sr("mmu", MultiMax, 1); + test_sr("mmb", MultiMax, 1); } TEST(IPC, NvN) { - test_sr("mmb", LoopCount, MultiMax, MultiMax); + //test_sr("mmu", MultiMax, MultiMax); + test_sr("mmb", MultiMax, MultiMax); } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index c1114ef..a59b3a7 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -44,7 +44,7 @@ constexpr int ThreadMax = 8; template void push(Que & que, int p, int d) { - for (int n = 0; !que.push(p, d); ++n) { + for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) { ASSERT_NE(n, PushRetry); std::this_thread::yield(); } diff --git a/test/thread_pool.h b/test/thread_pool.h index a7d2b28..03b7e8f 100755 --- a/test/thread_pool.h +++ b/test/thread_pool.h @@ -9,6 +9,8 @@ #include // std::size_t #include // assert +#include "capo/scope_guard.hpp" + namespace ipc_ut { class thread_pool final { @@ -32,6 +34,9 @@ class thread_pool final { if (pool->quit_) return; if (pool->jobs_.empty()) { pool->waiting_cnt_ += 1; + CAPO_SCOPE_GUARD_ = [pool] { + pool->waiting_cnt_ -= 1; + }; if (pool->waiting_cnt_ == pool->workers_.size()) { pool->cv_empty_.notify_all(); @@ -41,8 +46,6 @@ class thread_pool final { pool->cv_jobs_.wait(guard); if (pool->quit_) return; } while (pool->jobs_.empty()); - - pool->waiting_cnt_ -= 1; } assert(!pool->jobs_.empty()); job = std::move(pool->jobs_.front()); @@ -71,6 +74,7 @@ public: } void start(std::size_t n) { + std::unique_lock guard { lock_ }; if (n <= workers_.size()) return; for (std::size_t i = workers_.size(); i < n; ++i) { workers_.push_back(std::thread { &thread_pool::proc, this });