From bbd241948b8bdedf26fd72bc46092e2f451435fb Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 26 Jun 2021 12:46:55 +0800 Subject: [PATCH 01/14] fix: large message cache may not be recycled with multiple receivers --- demo/msg_que/main.cpp | 2 +- src/ipc.cpp | 104 ++++++++++++------------------------------ src/libipc/queue.h | 6 +-- test/test_queue.cpp | 2 +- 4 files changed, 33 insertions(+), 81 deletions(-) diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index ea60f39..5635101 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -23,7 +23,7 @@ constexpr std::size_t const max_sz = 1024 * 16; std::atomic is_quit__{ false }; std::atomic size_counter__{ 0 }; -using msg_que_t = ipc::chan; +using msg_que_t = ipc::chan; msg_que_t que__{ name__ }; ipc::byte_t buff__[max_sz]; diff --git a/src/ipc.cpp b/src/ipc.cpp index 12a629b..9835d50 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -96,7 +96,7 @@ struct chunk_info_t { ipc::spin_lock lock_; IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept { - return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t)); + return ipc::make_align(alignof(std::max_align_t), chunk_size); } IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { @@ -147,9 +147,7 @@ auto& chunk_storage(std::size_t chunk_size) { return chunk_storages()[chunk_size]; } -std::pair apply_storage(std::size_t conn_count, std::size_t size) { - if (conn_count == 0) return {}; - +std::pair apply_storage(std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); auto & chunk_shm = chunk_storage(chunk_size); @@ -162,90 +160,48 @@ std::pair apply_storage(std::size_t conn_count, std::size_t auto id = info->pool_.acquire(); info->lock_.unlock(); - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return {}; - reinterpret_cast(ptr + chunk_size)->store(static_cast(conn_count), std::memory_order_release); - return { id, ptr }; + return { id, info->at(chunk_size, id) }; } void *find_storage(std::size_t id, std::size_t size) { if (id == ipc::invalid_value) { - ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size); + ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return nullptr; } - std::size_t chunk_size = calc_chunk_size(size); auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); if (info == nullptr) return nullptr; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return nullptr; - if (reinterpret_cast(ptr + chunk_size)->load(std::memory_order_acquire) == 0) { - ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size); - return nullptr; - } - return ptr; -} - -void recycle_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { - ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size); - return; - } - - std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); - if (info == nullptr) return; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) { - ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size); - return; - } - if (reinterpret_cast(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) { - // not the last receiver, just return - return; - } - - info->lock_.lock(); - info->pool_.release(id); - info->lock_.unlock(); + return info->at(chunk_size, id); } void clear_storage(std::size_t id, std::size_t size) { if (id == ipc::invalid_value) { - ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size); + ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } std::size_t chunk_size = calc_chunk_size(size); auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); if (info == nullptr) return; - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return; - - auto cc_flag = reinterpret_cast(ptr + chunk_size); - for (unsigned k = 0;;) { - auto cc_curr = cc_flag->load(std::memory_order_acquire); - if (cc_curr == 0) return; // means this id has been cleared - if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) { - break; - } - ipc::yield(k); - } - info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); } +template +bool recycle_message(void* p) { + auto msg = static_cast(p); + if (msg->storage_) { + clear_storage( + *reinterpret_cast(&msg->data_), + static_cast(ipc::data_length) + msg->remain_); + } + return true; +} + struct conn_info_head { ipc::string name_; @@ -445,7 +401,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = apply_storage(que->conn_count(), size); + auto dat = apply_storage(size); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -479,18 +435,14 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + recycle_message, + info->cc_id_, msg_id, remain, data, size); }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push([](void* p) { - auto tmp_msg = static_cast(p); - if (tmp_msg->storage_) { - clear_storage( - *reinterpret_cast(&tmp_msg->data_), - static_cast(ipc::data_length) + tmp_msg->remain_); - } - return true; - }, info->cc_id_, msg_id, remain, data, size)) { + if (!que->force_push( + recycle_message, + info->cc_id_, msg_id, remain, data, size)) { return false; } } @@ -504,7 +456,9 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + recycle_message, + info->cc_id_, msg_id, remain, data, size); }, tm)) { return false; } @@ -548,9 +502,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { std::size_t buf_id = *reinterpret_cast(&msg.data_); void * buf = find_storage(buf_id, remain); if (buf != nullptr) { - return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) { - recycle_storage(reinterpret_cast(ptr) - 1, size); - }, reinterpret_cast(buf_id + 1) }; + return ipc::buff_t{buf, remain}; } else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); } diff --git a/src/libipc/queue.h b/src/libipc/queue.h index f04febf..07aac57 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -155,11 +155,11 @@ public: return !valid() || (cursor_ == elems_->cursor()); } - template - bool push(P&&... params) { + template + bool push(F&& prep, P&&... params) { if (elems_ == nullptr) return false; return elems_->push(this, [&](void* p) { - ::new (p) T(std::forward

(params)...); + if (prep(p)) ::new (p) T(std::forward

(params)...); }); } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index c1114ef..a59b3a7 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -44,7 +44,7 @@ constexpr int ThreadMax = 8; template void push(Que & que, int p, int d) { - for (int n = 0; !que.push(p, d); ++n) { + for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) { ASSERT_NE(n, PushRetry); std::this_thread::yield(); } From 2179ce2a19bf3a7c7646369d5fcea3a7e3096272 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jun 2021 15:46:21 +0800 Subject: [PATCH 02/14] fix some bugs, adjust the test cases --- include/libipc/def.h | 2 +- src/ipc.cpp | 103 +++++++++++++++++---------------- src/libipc/memory/resource.h | 6 ++ src/libipc/utility/id_pool.h | 26 +++++---- test/test_ipc.cpp | 109 ++++++++++++++++++++++------------- test/thread_pool.h | 8 ++- 6 files changed, 148 insertions(+), 106 deletions(-) diff --git a/include/libipc/def.h b/include/libipc/def.h index 9be5206..2b80b81 100755 --- a/include/libipc/def.h +++ b/include/libipc/def.h @@ -29,7 +29,7 @@ enum : std::size_t { invalid_value = (std::numeric_limits::max)(), data_length = 64, large_msg_limit = data_length, - large_msg_align = 512, + large_msg_align = 1024, large_msg_cache = 32, default_timeout = 100 // ms }; diff --git a/src/ipc.cpp b/src/ipc.cpp index 9835d50..ae6f270 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -51,15 +51,16 @@ struct msg_t : msg_t<0, AlignSize> { std::aligned_storage_t data_ {}; msg_t() = default; - msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s) - : msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } { + msg_t(msg_id_t conn, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) + : msg_t<0, AlignSize> {conn, id, remain, (data == nullptr) || (size == 0)} { if (this->storage_) { - if (d != nullptr) { + if (data != nullptr) { // copy storage-id - *reinterpret_cast(&data_) = *static_cast(d); + *reinterpret_cast(&data_) = + *static_cast(data); } } - else std::memcpy(&data_, d, s); + else std::memcpy(&data_, data, size); } }; @@ -95,17 +96,13 @@ struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; - IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept { - return ipc::make_align(alignof(std::max_align_t), chunk_size); - } - IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { - return ipc::id_pool<>::max_count * chunks_elem_size(chunk_size); + return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t *at(std::size_t chunk_size, std::size_t id) noexcept { - if (id == ipc::invalid_value) return nullptr; - return reinterpret_cast(this + 1) + (chunks_elem_size(chunk_size) * id); + ipc::byte_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + if (id < 0) return nullptr; + return reinterpret_cast(this + 1) + (chunk_size * id); } }; @@ -129,29 +126,22 @@ auto& chunk_storages() { return info; } }; - static ipc::unordered_map chunk_s; + thread_local ipc::unordered_map chunk_s; return chunk_s; } -auto& chunk_lock() { - static ipc::spin_lock chunk_l; - return chunk_l; +IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { + return ipc::make_align(alignof(std::max_align_t), + (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align); } -constexpr std::size_t calc_chunk_size(std::size_t size) noexcept { - return ( ((size - 1) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align; +chunk_info_t *chunk_storage_info(std::size_t chunk_size) { + return chunk_storages()[chunk_size].get_info(chunk_size); } -auto& chunk_storage(std::size_t chunk_size) { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(chunk_lock()); - return chunk_storages()[chunk_size]; -} - -std::pair apply_storage(std::size_t size) { +std::pair apply_storage(std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; info->lock_.lock(); @@ -163,27 +153,25 @@ std::pair apply_storage(std::size_t size) { return { id, info->at(chunk_size, id) }; } -void *find_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { +void *find_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return nullptr; } std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return nullptr; return info->at(chunk_size, id); } -void clear_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { +void release_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; info->lock_.lock(); @@ -195,9 +183,14 @@ template bool recycle_message(void* p) { auto msg = static_cast(p); if (msg->storage_) { - clear_storage( - *reinterpret_cast(&msg->data_), - static_cast(ipc::data_length) + msg->remain_); + std::int32_t r_size = static_cast(ipc::data_length) + msg->remain_; + if (r_size <= 0) { + ipc::error("[recycle_message] invalid msg size: %d\n", (int)r_size); + return true; + } + release_storage( + *reinterpret_cast(&msg->data_), + static_cast(r_size)); } return true; } @@ -220,7 +213,7 @@ struct conn_info_head { * - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html * - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827 */ - ipc::tls::pointer> recv_cache_; + ipc::tls::pointer> recv_cache_; conn_info_head(char const * name) : name_ {name} @@ -409,11 +402,11 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s static_cast(ipc::data_length), &(dat.first), 0); } // try using message fragment - // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); + //ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment std::int32_t offset = 0; - for (int i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { + for (std::int32_t i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { if (!try_push(static_cast(size) - offset - static_cast(ipc::data_length), static_cast(data) + offset, ipc::data_length)) { return false; @@ -479,7 +472,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { return {}; } auto& rc = info_of(h)->recv_cache(); - while (1) { + for (;;) { // pop a new message typename queue_t::value_t msg; if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { @@ -491,20 +484,28 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length - std::size_t remain = static_cast(ipc::data_length) + msg.remain_; + std::int32_t r_size = static_cast(ipc::data_length) + msg.remain_; + if (r_size <= 0) { + ipc::error("fail: recv, r_size = %d\n", (int)r_size); + return {}; + } + std::size_t msg_size = static_cast(r_size); // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { - if (remain <= ipc::data_length) { - return make_cache(msg.data_, remain); + if (msg_size <= ipc::data_length) { + return make_cache(msg.data_, msg_size); } if (msg.storage_) { std::size_t buf_id = *reinterpret_cast(&msg.data_); - void * buf = find_storage(buf_id, remain); + void * buf = find_storage(buf_id, msg_size); if (buf != nullptr) { - return ipc::buff_t{buf, remain}; + return ipc::buff_t{buf, msg_size}; + } + else { + ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); + continue; } - else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); } // gc if (rc.size() > 1024) { @@ -518,14 +519,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { for (auto id : need_del) rc.erase(id); } // cache the first message fragment - rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) }); + rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) }); } // has cached before this message else { auto& cac = cac_it->second; // this is the last message fragment if (msg.remain_ <= 0) { - cac.append(&(msg.data_), remain); + cac.append(&(msg.data_), msg_size); // finish this message, erase it from cache auto buff = std::move(cac.buff_); rc.erase(cac_it); diff --git a/src/libipc/memory/resource.h b/src/libipc/memory/resource.h index 1084e64..063e8dc 100755 --- a/src/libipc/memory/resource.h +++ b/src/libipc/memory/resource.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -49,6 +50,11 @@ using unordered_map = std::unordered_map< Key, T, std::hash, std::equal_to, ipc::mem::allocator> >; +template +using map = std::map< + Key, T, std::less, ipc::mem::allocator> +>; + template using basic_string = std::basic_string< Char, std::char_traits, ipc::mem::allocator diff --git a/src/libipc/utility/id_pool.h b/src/libipc/utility/id_pool.h index 7207aa9..24d32e4 100755 --- a/src/libipc/utility/id_pool.h +++ b/src/libipc/utility/id_pool.h @@ -2,13 +2,15 @@ #include // std::aligned_storage_t #include // std::memcmp +#include #include "libipc/def.h" - #include "libipc/platform/detail.h" namespace ipc { +using storage_id_t = std::int32_t; + template struct id_type; @@ -16,7 +18,7 @@ template struct id_type<0, AlignSize> { uint_t<8> id_; - id_type& operator=(std::size_t val) { + id_type& operator=(storage_id_t val) { id_ = static_cast>(val); return (*this); } @@ -57,7 +59,7 @@ public: } void init() { - for (std::size_t i = 0; i < max_count;) { + for (storage_id_t i = 0; i < max_count;) { i = next_[i] = (i + 1); } } @@ -71,22 +73,22 @@ public: return cursor_ == max_count; } - std::size_t acquire() { - if (empty()) return invalid_value; - std::size_t id = cursor_; + storage_id_t acquire() { + if (empty()) return -1; + storage_id_t id = cursor_; cursor_ = next_[id]; // point to next return id; } - bool release(std::size_t id) { - if (id == invalid_value) return false; + bool release(storage_id_t id) { + if (id < 0) return false; next_[id] = cursor_; cursor_ = static_cast>(id); // put it back return true; } - void * at(std::size_t id) { return &(next_[id].data_); } - void const * at(std::size_t id) const { return &(next_[id].data_); } + void * at(storage_id_t id) { return &(next_[id].data_); } + void const * at(storage_id_t id) const { return &(next_[id].data_); } }; template @@ -94,8 +96,8 @@ class obj_pool : public id_pool { using base_t = id_pool; public: - T * at(std::size_t id) { return reinterpret_cast(base_t::at(id)); } - T const * at(std::size_t id) const { return reinterpret_cast(base_t::at(id)); } + T * at(storage_id_t id) { return reinterpret_cast(base_t::at(id)); } + T const * at(storage_id_t id) const { return reinterpret_cast(base_t::at(id)); } }; } // namespace ipc diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index faae13c..5ca1a7c 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -1,6 +1,8 @@ #include #include +#include +#include #include #include "libipc/ipc.h" @@ -16,15 +18,28 @@ using namespace ipc; namespace { +constexpr int LoopCount = 10000; +constexpr int MultiMax = 8; + +struct msg_head { + int id_; +}; + class rand_buf : public buffer { public: rand_buf() { - int size = capo::random<>{1, 65536}(); + int size = capo::random<>{sizeof(msg_head), 65536}(); *this = buffer(new char[size], size, [](void * p, std::size_t) { delete [] static_cast(p); }); } + rand_buf(msg_head const &msg) { + *this = buffer(new msg_head{msg}, sizeof(msg), [](void * p, std::size_t) { + delete static_cast(p); + }); + } + rand_buf(rand_buf &&) = default; rand_buf(rand_buf const & rhs) { if (rhs.empty()) return; @@ -40,11 +55,11 @@ public: } void set_id(int k) noexcept { - *get() = static_cast(k); + get()->id_ = k; } int get_id() const noexcept { - return static_cast(*get()); + return get()->id_; } using buffer::operator=; @@ -67,57 +82,67 @@ void test_basic(char const * name) { EXPECT_EQ(que2.recv(), test2); } -template -void test_sr(char const * name, int size, int s_cnt, int r_cnt) { - using que_t = chan; +class data_set { + std::vector datas_; +public: + data_set() { + datas_.resize(LoopCount); + for (int i = 0; i < LoopCount; ++i) { + datas_[i].set_id(i); + } + } + + std::vector const &get() const noexcept { + return datas_; + } +} const data_set__; + +template > +void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().start(static_cast(s_cnt)); ipc_ut::reader().start(static_cast(r_cnt)); + + std::atomic_thread_fence(std::memory_order_seq_cst); ipc_ut::test_stopwatch sw; - std::vector tests(size); for (int k = 0; k < s_cnt; ++k) { - ipc_ut::sender() << [name, &tests, &sw, r_cnt, k] { - que_t que1 { name }; - EXPECT_TRUE(que1.wait_for_recv(r_cnt)); + ipc_ut::sender() << [name, &sw, r_cnt, k] { + Que que { name, ipc::sender }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); - for (auto & buf : tests) { - rand_buf data { buf }; - data.set_id(k); - EXPECT_TRUE(que1.send(data)); + for (int i = 0; i < (int)data_set__.get().size(); ++i) { + EXPECT_TRUE(que.send(data_set__.get()[i])); } }; } for (int k = 0; k < r_cnt; ++k) { - ipc_ut::reader() << [name, &tests, s_cnt] { - que_t que2 { name, ipc::receiver }; - std::vector cursors(s_cnt); + ipc_ut::reader() << [name] { + Que que { name, ipc::receiver }; for (;;) { - rand_buf got { que2.recv() }; + rand_buf got { que.recv() }; ASSERT_FALSE(got.empty()); - int & cur = cursors.at(got.get_id()); - ASSERT_TRUE((cur >= 0) && (cur < static_cast(tests.size()))); - rand_buf buf { tests.at(cur++) }; - buf.set_id(got.get_id()); - EXPECT_EQ(got, buf); - int n = 0; - for (; n < static_cast(cursors.size()); ++n) { - if (cursors[n] < static_cast(tests.size())) break; + int i = got.get_id(); + if (i == -1) { + return; } - if (n == static_cast(cursors.size())) break; + ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); + EXPECT_EQ(data_set__.get()[i], got); } }; } ipc_ut::sender().wait_for_done(); + Que que { name }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); + for (int k = 0; k < r_cnt; ++k) { + que.send(rand_buf{msg_head{-1}}); + } ipc_ut::reader().wait_for_done(); - sw.print_elapsed(s_cnt, r_cnt, size, name); + sw.print_elapsed(s_cnt, r_cnt, (int)data_set__.get().size(), name); } -constexpr int LoopCount = 10000; -constexpr int MultiMax = 8; - } // internal-linkage TEST(IPC, basic) { @@ -129,22 +154,26 @@ TEST(IPC, basic) { } TEST(IPC, 1v1) { - test_sr("ssu", LoopCount, 1, 1); - test_sr("smu", LoopCount, 1, 1); - test_sr("mmu", LoopCount, 1, 1); - test_sr("smb", LoopCount, 1, 1); - test_sr("mmb", LoopCount, 1, 1); + test_sr("ssu", 1, 1); + test_sr("smu", 1, 1); + test_sr("mmu", 1, 1); + test_sr("smb", 1, 1); + test_sr("mmb", 1, 1); } TEST(IPC, 1vN) { - test_sr("smb", LoopCount, 1, MultiMax); - test_sr("mmb", LoopCount, 1, MultiMax); + //test_sr("smu", 1, MultiMax); + //test_sr("mmu", 1, MultiMax); + test_sr("smb", 1, MultiMax); + test_sr("mmb", 1, MultiMax); } TEST(IPC, Nv1) { - test_sr("mmb", LoopCount, MultiMax, 1); + //test_sr("mmu", MultiMax, 1); + test_sr("mmb", MultiMax, 1); } TEST(IPC, NvN) { - test_sr("mmb", LoopCount, MultiMax, MultiMax); + //test_sr("mmu", MultiMax, MultiMax); + test_sr("mmb", MultiMax, MultiMax); } diff --git a/test/thread_pool.h b/test/thread_pool.h index a7d2b28..03b7e8f 100755 --- a/test/thread_pool.h +++ b/test/thread_pool.h @@ -9,6 +9,8 @@ #include // std::size_t #include // assert +#include "capo/scope_guard.hpp" + namespace ipc_ut { class thread_pool final { @@ -32,6 +34,9 @@ class thread_pool final { if (pool->quit_) return; if (pool->jobs_.empty()) { pool->waiting_cnt_ += 1; + CAPO_SCOPE_GUARD_ = [pool] { + pool->waiting_cnt_ -= 1; + }; if (pool->waiting_cnt_ == pool->workers_.size()) { pool->cv_empty_.notify_all(); @@ -41,8 +46,6 @@ class thread_pool final { pool->cv_jobs_.wait(guard); if (pool->quit_) return; } while (pool->jobs_.empty()); - - pool->waiting_cnt_ -= 1; } assert(!pool->jobs_.empty()); job = std::move(pool->jobs_.front()); @@ -71,6 +74,7 @@ public: } void start(std::size_t n) { + std::unique_lock guard { lock_ }; if (n <= workers_.size()) return; for (std::size_t i = workers_.size(); i < n; ++i) { workers_.push_back(std::thread { &thread_pool::proc, this }); From 7dd993767026d4b61b9f6b51ee7b8a2b82069401 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jun 2021 18:03:17 +0800 Subject: [PATCH 03/14] try to adjust recycling strategy for large message cache --- src/ipc.cpp | 72 +++++++++++++++++++++--------------- src/libipc/circ/elem_array.h | 6 +-- src/libipc/prod_cons.h | 48 +++++++++++++----------- src/libipc/queue.h | 13 +++++-- 4 files changed, 82 insertions(+), 57 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index ae6f270..b3b0921 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -32,8 +32,8 @@ namespace { -using msg_id_t = std::uint32_t; -using acc_t = std::atomic; +using msg_id_t = std::uint32_t; +using acc_t = std::atomic; template struct msg_t; @@ -92,6 +92,14 @@ auto cc_acc() { return static_cast(acc_h.get()); } +IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept { + return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align; +} + +IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { + return ipc::make_align(alignof(std::max_align_t), align_chunk_size(size)); +} + struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; @@ -100,9 +108,13 @@ struct chunk_info_t { return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { - if (id < 0) return nullptr; - return reinterpret_cast(this + 1) + (chunk_size * id); + ipc::byte_t* chunks_mem() noexcept { + return reinterpret_cast(this + 1); + } + + ipc::byte_t* at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + assert(id >= 0); + return chunks_mem() + (chunk_size * id); } }; @@ -130,16 +142,11 @@ auto& chunk_storages() { return chunk_s; } -IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { - return ipc::make_align(alignof(std::max_align_t), - (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align); -} - chunk_info_t *chunk_storage_info(std::size_t chunk_size) { return chunk_storages()[chunk_size].get_info(chunk_size); } -std::pair apply_storage(std::size_t size) { +std::pair acquire_storage(std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; @@ -166,14 +173,12 @@ void *find_storage(ipc::storage_id_t id, std::size_t size) { void release_storage(ipc::storage_id_t id, std::size_t size) { if (id < 0) { - ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); + ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } - std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; - info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); @@ -394,7 +399,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = apply_storage(size); + auto dat = acquire_storage(size); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -429,7 +434,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push( - recycle_message, + [](void*) { return true; }, info->cc_id_, msg_id, remain, data, size); }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); @@ -450,7 +455,7 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push( - recycle_message, + [](void*) { return true; }, info->cc_id_, msg_id, remain, data, size); }, tm)) { return false; @@ -475,7 +480,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { for (;;) { // pop a new message typename queue_t::value_t msg; - if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { + bool recycled = false; + if (!wait_for(info_of(h)->rd_waiter_, [que, &msg, &recycled] { + return !que->pop(msg, [&recycled](bool r) { recycled = r; }); + }, tm)) { // pop failed, just return. return {}; } @@ -490,23 +498,29 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { return {}; } std::size_t msg_size = static_cast(r_size); + // large message + if (msg.storage_) { + ipc::storage_id_t buf_id = *reinterpret_cast(&msg.data_); + void* buf = find_storage(buf_id, msg_size); + if (buf != nullptr) { + if (recycled) { + return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) { + release_storage(reinterpret_cast(pmid) - 1, size); + }, reinterpret_cast(buf_id + 1)}; + } else { + return ipc::buff_t{buf, msg_size}; // no recycle + } + } else { + ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); + continue; + } + } // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { if (msg_size <= ipc::data_length) { return make_cache(msg.data_, msg_size); } - if (msg.storage_) { - std::size_t buf_id = *reinterpret_cast(&msg.data_); - void * buf = find_storage(buf_id, msg_size); - if (buf != nullptr) { - return ipc::buff_t{buf, msg_size}; - } - else { - ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); - continue; - } - } // gc if (rc.size() > 1024) { std::vector need_del; diff --git a/src/libipc/circ/elem_array.h b/src/libipc/circ/elem_array.h index 74f031a..0b21f48 100755 --- a/src/libipc/circ/elem_array.h +++ b/src/libipc/circ/elem_array.h @@ -130,10 +130,10 @@ public: return head_.force_push(que, std::forward(f), block_); } - template - bool pop(Q* que, cursor_t* cur, F&& f) { + template + bool pop(Q* que, cursor_t* cur, F&& f, R&& out) { if (cur == nullptr) return false; - return head_.pop(que, *cur, std::forward(f), block_); + return head_.pop(que, *cur, std::forward(f), std::forward(out), block_); } }; diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index d5684fa..28d99bd 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -58,13 +58,14 @@ struct prod_cons_impl> { return false; } - template - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } std::forward(f)(&(elems[cur_rd].data_)); + std::forward(out)(true); rd_.fetch_add(1, std::memory_order_release); return true; } @@ -80,8 +81,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -92,6 +94,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -156,8 +159,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -179,6 +183,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -263,20 +268,20 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) { if (cur == cursor()) return false; // acquire auto* el = elems + circ::index_of(cur++); std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & ep_mask; - if (rem_cc == 0) { + if ((cur_rc & ep_mask) == 0) { + std::forward(out)(true); return true; } - if (el->rc_.compare_exchange_weak(cur_rc, - cur_rc & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + auto nxt_rc = cur_rc & ~static_cast(wrapper->connected_id()); + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)((nxt_rc & ep_mask) == 0); return true; } ipc::yield(k); @@ -395,8 +400,8 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) { auto* el = elems + circ::index_of(cur); auto cur_fl = el->f_ct_.load(std::memory_order_acquire); if (cur_fl != ~static_cast(cur)) { @@ -406,17 +411,18 @@ struct prod_cons_impl> { std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & rc_mask; - if (rem_cc == 0) { + if ((cur_rc & rc_mask) == 0) { + std::forward(out)(true); el->f_ct_.store(cur + N - 1, std::memory_order_release); return true; } - if ((rem_cc & ~wrapper->connected_id()) == 0) { + auto nxt_rc = inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()); + bool last_one = false; + if ((last_one = (nxt_rc & rc_mask) == 0)) { el->f_ct_.store(cur + N - 1, std::memory_order_release); } - if (el->rc_.compare_exchange_weak(cur_rc, - inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)(last_one); return true; } ipc::yield(k); diff --git a/src/libipc/queue.h b/src/libipc/queue.h index 07aac57..1a782a9 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -171,14 +171,14 @@ public: }); } - template - bool pop(T& item) { + template + bool pop(T& item, F&& out) { if (elems_ == nullptr) { return false; } return elems_->pop(this, &(this->cursor_), [&item](void* p) { ::new (&item) T(std::move(*static_cast(p))); - }); + }, std::forward(out)); } }; @@ -204,7 +204,12 @@ public: } bool pop(T& item) { - return base_t::pop(item); + return base_t::pop(item, [](bool) {}); + } + + template + bool pop(T& item, F&& out) { + return base_t::pop(item, std::forward(out)); } }; From 7c063f8c17814c3db8a6b6c8de1852e8eb4e2c71 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E5=A4=B4=E4=BA=91?= Date: Sun, 27 Jun 2021 18:11:05 +0800 Subject: [PATCH 04/14] Update c-cpp.yml --- .github/workflows/c-cpp.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index 3ca4d7e..c5542c8 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -2,7 +2,7 @@ name: C/C++ CI on: push: - branches: [ master, develop ] + branches: [ master, develop, issue-* ] pull_request: branches: [ master, develop ] From 74f080361e7736aca03d4028565df4f12bc47b74 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jun 2021 18:24:30 +0800 Subject: [PATCH 05/14] horrible_cast --- src/ipc.cpp | 8 ++++---- src/libipc/platform/detail.h | 12 ++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index b3b0921..31c0a0e 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -32,8 +32,8 @@ namespace { -using msg_id_t = std::uint32_t; -using acc_t = std::atomic; +using msg_id_t = std::uint32_t; +using acc_t = std::atomic; template struct msg_t; @@ -505,8 +505,8 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { if (buf != nullptr) { if (recycled) { return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) { - release_storage(reinterpret_cast(pmid) - 1, size); - }, reinterpret_cast(buf_id + 1)}; + release_storage(ipc::detail::horrible_cast(pmid) - 1, size); + }, ipc::detail::horrible_cast(buf_id + 1)}; } else { return ipc::buff_t{buf, msg_size}; // no recycle } diff --git a/src/libipc/platform/detail.h b/src/libipc/platform/detail.h index fbf539a..141de6e 100755 --- a/src/libipc/platform/detail.h +++ b/src/libipc/platform/detail.h @@ -111,5 +111,17 @@ constexpr const T& (min)(const T& a, const T& b) { #endif/*__cplusplus < 201703L*/ +template +auto horrible_cast(U rhs) noexcept + -> typename std::enable_if::value + && std::is_trivially_copyable::value, T>::type { + union { + T t; + U u; + } r = {}; + r.u = rhs; + return r.t; +} + } // namespace detail } // namespace ipc From 9f235616b38dda8fc20e2108ae6855b643abb11b Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jun 2021 18:49:23 +0800 Subject: [PATCH 06/14] adjust test, chunk_storages revert to static --- src/ipc.cpp | 4 +++- test/test_ipc.cpp | 10 +++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index 31c0a0e..bd8257b 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -138,11 +138,13 @@ auto& chunk_storages() { return info; } }; - thread_local ipc::unordered_map chunk_s; + static ipc::unordered_map chunk_s; return chunk_s; } chunk_info_t *chunk_storage_info(std::size_t chunk_size) { + static std::mutex lock; + IPC_UNUSED_ std::lock_guard guard {lock}; return chunk_storages()[chunk_size].get_info(chunk_size); } diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 5ca1a7c..a76b365 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -75,8 +75,8 @@ void test_basic(char const * name) { EXPECT_FALSE(que1.try_send(test2)); que_t que2 { que1.name(), ipc::receiver }; - EXPECT_TRUE(que1.send(test1)); - EXPECT_TRUE(que1.try_send(test2)); + ASSERT_TRUE(que1.send(test1)); + ASSERT_TRUE(que1.try_send(test2)); EXPECT_EQ(que2.recv(), test1); EXPECT_EQ(que2.recv(), test2); @@ -128,7 +128,11 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { return; } ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); - EXPECT_EQ(data_set__.get()[i], got); + if (data_set__.get()[i] != got) { + printf("data_set__.get()[%d] != got, size = %zd/%zd\n", + i, data_set__.get()[i].size(), got.size()); + EXPECT_TRUE(false); + } } }; } From 7bedfbfb5be595e9f5d44bde14ee54ccf10f05f6 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sat, 26 Jun 2021 12:46:55 +0800 Subject: [PATCH 07/14] fix: large message cache may not be recycled with multiple receivers --- demo/msg_que/main.cpp | 2 +- src/ipc.cpp | 104 ++++++++++++------------------------------ src/libipc/queue.h | 6 +-- test/test_queue.cpp | 2 +- 4 files changed, 33 insertions(+), 81 deletions(-) diff --git a/demo/msg_que/main.cpp b/demo/msg_que/main.cpp index ea60f39..5635101 100644 --- a/demo/msg_que/main.cpp +++ b/demo/msg_que/main.cpp @@ -23,7 +23,7 @@ constexpr std::size_t const max_sz = 1024 * 16; std::atomic is_quit__{ false }; std::atomic size_counter__{ 0 }; -using msg_que_t = ipc::chan; +using msg_que_t = ipc::chan; msg_que_t que__{ name__ }; ipc::byte_t buff__[max_sz]; diff --git a/src/ipc.cpp b/src/ipc.cpp index 8b18dde..920250a 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -95,7 +95,7 @@ struct chunk_info_t { ipc::spin_lock lock_; IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept { - return ipc::make_align(alignof(std::max_align_t), chunk_size + sizeof(acc_t)); + return ipc::make_align(alignof(std::max_align_t), chunk_size); } IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { @@ -146,9 +146,7 @@ auto& chunk_storage(std::size_t chunk_size) { return chunk_storages()[chunk_size]; } -std::pair apply_storage(std::size_t conn_count, std::size_t size) { - if (conn_count == 0) return {}; - +std::pair apply_storage(std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); auto & chunk_shm = chunk_storage(chunk_size); @@ -161,90 +159,48 @@ std::pair apply_storage(std::size_t conn_count, std::size_t auto id = info->pool_.acquire(); info->lock_.unlock(); - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return {}; - reinterpret_cast(ptr + chunk_size)->store(static_cast(conn_count), std::memory_order_release); - return { id, ptr }; + return { id, info->at(chunk_size, id) }; } void *find_storage(std::size_t id, std::size_t size) { if (id == ipc::invalid_value) { - ipc::error("[find_storage] id is invalid: id = %zd, size = %zd\n", id, size); + ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return nullptr; } - std::size_t chunk_size = calc_chunk_size(size); auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); if (info == nullptr) return nullptr; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return nullptr; - if (reinterpret_cast(ptr + chunk_size)->load(std::memory_order_acquire) == 0) { - ipc::error("[find_storage] cc test failed: id = %zd, chunk_size = %zd\n", id, chunk_size); - return nullptr; - } - return ptr; -} - -void recycle_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { - ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size); - return; - } - - std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); - if (info == nullptr) return; - - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) { - ipc::error("[recycle_storage] chunk_shm.mems[%zd] failed: chunk_size = %zd\n", id, chunk_size); - return; - } - if (reinterpret_cast(ptr + chunk_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) { - // not the last receiver, just return - return; - } - - info->lock_.lock(); - info->pool_.release(id); - info->lock_.unlock(); + return info->at(chunk_size, id); } void clear_storage(std::size_t id, std::size_t size) { if (id == ipc::invalid_value) { - ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size); + ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } std::size_t chunk_size = calc_chunk_size(size); auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); if (info == nullptr) return; - auto ptr = info->at(chunk_size, id); - if (ptr == nullptr) return; - - auto cc_flag = reinterpret_cast(ptr + chunk_size); - for (unsigned k = 0;;) { - auto cc_curr = cc_flag->load(std::memory_order_acquire); - if (cc_curr == 0) return; // means this id has been cleared - if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) { - break; - } - ipc::yield(k); - } - info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); } +template +bool recycle_message(void* p) { + auto msg = static_cast(p); + if (msg->storage_) { + clear_storage( + *reinterpret_cast(&msg->data_), + static_cast(ipc::data_length) + msg->remain_); + } + return true; +} + struct conn_info_head { ipc::string name_; @@ -432,7 +388,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = apply_storage(que->conn_count(), size); + auto dat = apply_storage(size); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -466,18 +422,14 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + recycle_message, + info->cc_id_, msg_id, remain, data, size); }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push([](void* p) { - auto tmp_msg = static_cast(p); - if (tmp_msg->storage_) { - clear_storage( - *reinterpret_cast(&tmp_msg->data_), - static_cast(ipc::data_length) + tmp_msg->remain_); - } - return true; - }, info->cc_id_, msg_id, remain, data, size)) { + if (!que->force_push( + recycle_message, + info->cc_id_, msg_id, remain, data, size)) { return false; } } @@ -491,7 +443,9 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: return send([tm](auto info, auto que, auto msg_id) { return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { - return !que->push(info->cc_id_, msg_id, remain, data, size); + return !que->push( + recycle_message, + info->cc_id_, msg_id, remain, data, size); }, tm)) { return false; } @@ -535,9 +489,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { std::size_t buf_id = *reinterpret_cast(&msg.data_); void * buf = find_storage(buf_id, remain); if (buf != nullptr) { - return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) { - recycle_storage(reinterpret_cast(ptr) - 1, size); - }, reinterpret_cast(buf_id + 1) }; + return ipc::buff_t{buf, remain}; } else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); } diff --git a/src/libipc/queue.h b/src/libipc/queue.h index f04febf..07aac57 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -155,11 +155,11 @@ public: return !valid() || (cursor_ == elems_->cursor()); } - template - bool push(P&&... params) { + template + bool push(F&& prep, P&&... params) { if (elems_ == nullptr) return false; return elems_->push(this, [&](void* p) { - ::new (p) T(std::forward

(params)...); + if (prep(p)) ::new (p) T(std::forward

(params)...); }); } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index c1114ef..a59b3a7 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -44,7 +44,7 @@ constexpr int ThreadMax = 8; template void push(Que & que, int p, int d) { - for (int n = 0; !que.push(p, d); ++n) { + for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) { ASSERT_NE(n, PushRetry); std::this_thread::yield(); } From 98a34498656f22cd66911e61572d91fdacb9a94f Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jun 2021 15:46:21 +0800 Subject: [PATCH 08/14] fix some bugs, adjust the test cases --- include/libipc/def.h | 2 +- src/ipc.cpp | 101 ++++++++++++++++---------------- src/libipc/memory/resource.h | 6 ++ src/libipc/utility/id_pool.h | 26 +++++---- test/test_ipc.cpp | 109 ++++++++++++++++++++++------------- test/thread_pool.h | 8 ++- 6 files changed, 147 insertions(+), 105 deletions(-) diff --git a/include/libipc/def.h b/include/libipc/def.h index 9be5206..2b80b81 100755 --- a/include/libipc/def.h +++ b/include/libipc/def.h @@ -29,7 +29,7 @@ enum : std::size_t { invalid_value = (std::numeric_limits::max)(), data_length = 64, large_msg_limit = data_length, - large_msg_align = 512, + large_msg_align = 1024, large_msg_cache = 32, default_timeout = 100 // ms }; diff --git a/src/ipc.cpp b/src/ipc.cpp index 920250a..2739692 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -50,15 +50,16 @@ struct msg_t : msg_t<0, AlignSize> { std::aligned_storage_t data_ {}; msg_t() = default; - msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s) - : msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } { + msg_t(msg_id_t conn, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) + : msg_t<0, AlignSize> {conn, id, remain, (data == nullptr) || (size == 0)} { if (this->storage_) { - if (d != nullptr) { + if (data != nullptr) { // copy storage-id - *reinterpret_cast(&data_) = *static_cast(d); + *reinterpret_cast(&data_) = + *static_cast(data); } } - else std::memcpy(&data_, d, s); + else std::memcpy(&data_, data, size); } }; @@ -94,17 +95,13 @@ struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; - IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept { - return ipc::make_align(alignof(std::max_align_t), chunk_size); - } - IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { - return ipc::id_pool<>::max_count * chunks_elem_size(chunk_size); + return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t *at(std::size_t chunk_size, std::size_t id) noexcept { - if (id == ipc::invalid_value) return nullptr; - return reinterpret_cast(this + 1) + (chunks_elem_size(chunk_size) * id); + ipc::byte_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + if (id < 0) return nullptr; + return reinterpret_cast(this + 1) + (chunk_size * id); } }; @@ -128,29 +125,22 @@ auto& chunk_storages() { return info; } }; - static ipc::unordered_map chunk_s; + thread_local ipc::unordered_map chunk_s; return chunk_s; } -auto& chunk_lock() { - static ipc::spin_lock chunk_l; - return chunk_l; +IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { + return ipc::make_align(alignof(std::max_align_t), + (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align); } -constexpr std::size_t calc_chunk_size(std::size_t size) noexcept { - return ( ((size - 1) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align; +chunk_info_t *chunk_storage_info(std::size_t chunk_size) { + return chunk_storages()[chunk_size].get_info(chunk_size); } -auto& chunk_storage(std::size_t chunk_size) { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(chunk_lock()); - return chunk_storages()[chunk_size]; -} - -std::pair apply_storage(std::size_t size) { +std::pair apply_storage(std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; info->lock_.lock(); @@ -162,27 +152,25 @@ std::pair apply_storage(std::size_t size) { return { id, info->at(chunk_size, id) }; } -void *find_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { +void *find_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return nullptr; } std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return nullptr; return info->at(chunk_size, id); } -void clear_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { +void release_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; info->lock_.lock(); @@ -194,9 +182,14 @@ template bool recycle_message(void* p) { auto msg = static_cast(p); if (msg->storage_) { - clear_storage( - *reinterpret_cast(&msg->data_), - static_cast(ipc::data_length) + msg->remain_); + std::int32_t r_size = static_cast(ipc::data_length) + msg->remain_; + if (r_size <= 0) { + ipc::error("[recycle_message] invalid msg size: %d\n", (int)r_size); + return true; + } + release_storage( + *reinterpret_cast(&msg->data_), + static_cast(r_size)); } return true; } @@ -396,11 +389,11 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s static_cast(ipc::data_length), &(dat.first), 0); } // try using message fragment - // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); + //ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment std::int32_t offset = 0; - for (int i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { + for (std::int32_t i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { if (!try_push(static_cast(size) - offset - static_cast(ipc::data_length), static_cast(data) + offset, ipc::data_length)) { return false; @@ -466,7 +459,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { return {}; } auto& rc = info_of(h)->recv_cache(); - while (1) { + for (;;) { // pop a new message typename queue_t::value_t msg; if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { @@ -478,20 +471,28 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length - std::size_t remain = static_cast(ipc::data_length) + msg.remain_; + std::int32_t r_size = static_cast(ipc::data_length) + msg.remain_; + if (r_size <= 0) { + ipc::error("fail: recv, r_size = %d\n", (int)r_size); + return {}; + } + std::size_t msg_size = static_cast(r_size); // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { - if (remain <= ipc::data_length) { - return make_cache(msg.data_, remain); + if (msg_size <= ipc::data_length) { + return make_cache(msg.data_, msg_size); } if (msg.storage_) { std::size_t buf_id = *reinterpret_cast(&msg.data_); - void * buf = find_storage(buf_id, remain); + void * buf = find_storage(buf_id, msg_size); if (buf != nullptr) { - return ipc::buff_t{buf, remain}; + return ipc::buff_t{buf, msg_size}; + } + else { + ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); + continue; } - else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); } // gc if (rc.size() > 1024) { @@ -505,14 +506,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { for (auto id : need_del) rc.erase(id); } // cache the first message fragment - rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) }); + rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) }); } // has cached before this message else { auto& cac = cac_it->second; // this is the last message fragment if (msg.remain_ <= 0) { - cac.append(&(msg.data_), remain); + cac.append(&(msg.data_), msg_size); // finish this message, erase it from cache auto buff = std::move(cac.buff_); rc.erase(cac_it); diff --git a/src/libipc/memory/resource.h b/src/libipc/memory/resource.h index 1084e64..063e8dc 100755 --- a/src/libipc/memory/resource.h +++ b/src/libipc/memory/resource.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -49,6 +50,11 @@ using unordered_map = std::unordered_map< Key, T, std::hash, std::equal_to, ipc::mem::allocator> >; +template +using map = std::map< + Key, T, std::less, ipc::mem::allocator> +>; + template using basic_string = std::basic_string< Char, std::char_traits, ipc::mem::allocator diff --git a/src/libipc/utility/id_pool.h b/src/libipc/utility/id_pool.h index 7207aa9..24d32e4 100755 --- a/src/libipc/utility/id_pool.h +++ b/src/libipc/utility/id_pool.h @@ -2,13 +2,15 @@ #include // std::aligned_storage_t #include // std::memcmp +#include #include "libipc/def.h" - #include "libipc/platform/detail.h" namespace ipc { +using storage_id_t = std::int32_t; + template struct id_type; @@ -16,7 +18,7 @@ template struct id_type<0, AlignSize> { uint_t<8> id_; - id_type& operator=(std::size_t val) { + id_type& operator=(storage_id_t val) { id_ = static_cast>(val); return (*this); } @@ -57,7 +59,7 @@ public: } void init() { - for (std::size_t i = 0; i < max_count;) { + for (storage_id_t i = 0; i < max_count;) { i = next_[i] = (i + 1); } } @@ -71,22 +73,22 @@ public: return cursor_ == max_count; } - std::size_t acquire() { - if (empty()) return invalid_value; - std::size_t id = cursor_; + storage_id_t acquire() { + if (empty()) return -1; + storage_id_t id = cursor_; cursor_ = next_[id]; // point to next return id; } - bool release(std::size_t id) { - if (id == invalid_value) return false; + bool release(storage_id_t id) { + if (id < 0) return false; next_[id] = cursor_; cursor_ = static_cast>(id); // put it back return true; } - void * at(std::size_t id) { return &(next_[id].data_); } - void const * at(std::size_t id) const { return &(next_[id].data_); } + void * at(storage_id_t id) { return &(next_[id].data_); } + void const * at(storage_id_t id) const { return &(next_[id].data_); } }; template @@ -94,8 +96,8 @@ class obj_pool : public id_pool { using base_t = id_pool; public: - T * at(std::size_t id) { return reinterpret_cast(base_t::at(id)); } - T const * at(std::size_t id) const { return reinterpret_cast(base_t::at(id)); } + T * at(storage_id_t id) { return reinterpret_cast(base_t::at(id)); } + T const * at(storage_id_t id) const { return reinterpret_cast(base_t::at(id)); } }; } // namespace ipc diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index faae13c..5ca1a7c 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -1,6 +1,8 @@ #include #include +#include +#include #include #include "libipc/ipc.h" @@ -16,15 +18,28 @@ using namespace ipc; namespace { +constexpr int LoopCount = 10000; +constexpr int MultiMax = 8; + +struct msg_head { + int id_; +}; + class rand_buf : public buffer { public: rand_buf() { - int size = capo::random<>{1, 65536}(); + int size = capo::random<>{sizeof(msg_head), 65536}(); *this = buffer(new char[size], size, [](void * p, std::size_t) { delete [] static_cast(p); }); } + rand_buf(msg_head const &msg) { + *this = buffer(new msg_head{msg}, sizeof(msg), [](void * p, std::size_t) { + delete static_cast(p); + }); + } + rand_buf(rand_buf &&) = default; rand_buf(rand_buf const & rhs) { if (rhs.empty()) return; @@ -40,11 +55,11 @@ public: } void set_id(int k) noexcept { - *get() = static_cast(k); + get()->id_ = k; } int get_id() const noexcept { - return static_cast(*get()); + return get()->id_; } using buffer::operator=; @@ -67,57 +82,67 @@ void test_basic(char const * name) { EXPECT_EQ(que2.recv(), test2); } -template -void test_sr(char const * name, int size, int s_cnt, int r_cnt) { - using que_t = chan; +class data_set { + std::vector datas_; +public: + data_set() { + datas_.resize(LoopCount); + for (int i = 0; i < LoopCount; ++i) { + datas_[i].set_id(i); + } + } + + std::vector const &get() const noexcept { + return datas_; + } +} const data_set__; + +template > +void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().start(static_cast(s_cnt)); ipc_ut::reader().start(static_cast(r_cnt)); + + std::atomic_thread_fence(std::memory_order_seq_cst); ipc_ut::test_stopwatch sw; - std::vector tests(size); for (int k = 0; k < s_cnt; ++k) { - ipc_ut::sender() << [name, &tests, &sw, r_cnt, k] { - que_t que1 { name }; - EXPECT_TRUE(que1.wait_for_recv(r_cnt)); + ipc_ut::sender() << [name, &sw, r_cnt, k] { + Que que { name, ipc::sender }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); - for (auto & buf : tests) { - rand_buf data { buf }; - data.set_id(k); - EXPECT_TRUE(que1.send(data)); + for (int i = 0; i < (int)data_set__.get().size(); ++i) { + EXPECT_TRUE(que.send(data_set__.get()[i])); } }; } for (int k = 0; k < r_cnt; ++k) { - ipc_ut::reader() << [name, &tests, s_cnt] { - que_t que2 { name, ipc::receiver }; - std::vector cursors(s_cnt); + ipc_ut::reader() << [name] { + Que que { name, ipc::receiver }; for (;;) { - rand_buf got { que2.recv() }; + rand_buf got { que.recv() }; ASSERT_FALSE(got.empty()); - int & cur = cursors.at(got.get_id()); - ASSERT_TRUE((cur >= 0) && (cur < static_cast(tests.size()))); - rand_buf buf { tests.at(cur++) }; - buf.set_id(got.get_id()); - EXPECT_EQ(got, buf); - int n = 0; - for (; n < static_cast(cursors.size()); ++n) { - if (cursors[n] < static_cast(tests.size())) break; + int i = got.get_id(); + if (i == -1) { + return; } - if (n == static_cast(cursors.size())) break; + ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); + EXPECT_EQ(data_set__.get()[i], got); } }; } ipc_ut::sender().wait_for_done(); + Que que { name }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); + for (int k = 0; k < r_cnt; ++k) { + que.send(rand_buf{msg_head{-1}}); + } ipc_ut::reader().wait_for_done(); - sw.print_elapsed(s_cnt, r_cnt, size, name); + sw.print_elapsed(s_cnt, r_cnt, (int)data_set__.get().size(), name); } -constexpr int LoopCount = 10000; -constexpr int MultiMax = 8; - } // internal-linkage TEST(IPC, basic) { @@ -129,22 +154,26 @@ TEST(IPC, basic) { } TEST(IPC, 1v1) { - test_sr("ssu", LoopCount, 1, 1); - test_sr("smu", LoopCount, 1, 1); - test_sr("mmu", LoopCount, 1, 1); - test_sr("smb", LoopCount, 1, 1); - test_sr("mmb", LoopCount, 1, 1); + test_sr("ssu", 1, 1); + test_sr("smu", 1, 1); + test_sr("mmu", 1, 1); + test_sr("smb", 1, 1); + test_sr("mmb", 1, 1); } TEST(IPC, 1vN) { - test_sr("smb", LoopCount, 1, MultiMax); - test_sr("mmb", LoopCount, 1, MultiMax); + //test_sr("smu", 1, MultiMax); + //test_sr("mmu", 1, MultiMax); + test_sr("smb", 1, MultiMax); + test_sr("mmb", 1, MultiMax); } TEST(IPC, Nv1) { - test_sr("mmb", LoopCount, MultiMax, 1); + //test_sr("mmu", MultiMax, 1); + test_sr("mmb", MultiMax, 1); } TEST(IPC, NvN) { - test_sr("mmb", LoopCount, MultiMax, MultiMax); + //test_sr("mmu", MultiMax, MultiMax); + test_sr("mmb", MultiMax, MultiMax); } diff --git a/test/thread_pool.h b/test/thread_pool.h index a7d2b28..03b7e8f 100755 --- a/test/thread_pool.h +++ b/test/thread_pool.h @@ -9,6 +9,8 @@ #include // std::size_t #include // assert +#include "capo/scope_guard.hpp" + namespace ipc_ut { class thread_pool final { @@ -32,6 +34,9 @@ class thread_pool final { if (pool->quit_) return; if (pool->jobs_.empty()) { pool->waiting_cnt_ += 1; + CAPO_SCOPE_GUARD_ = [pool] { + pool->waiting_cnt_ -= 1; + }; if (pool->waiting_cnt_ == pool->workers_.size()) { pool->cv_empty_.notify_all(); @@ -41,8 +46,6 @@ class thread_pool final { pool->cv_jobs_.wait(guard); if (pool->quit_) return; } while (pool->jobs_.empty()); - - pool->waiting_cnt_ -= 1; } assert(!pool->jobs_.empty()); job = std::move(pool->jobs_.front()); @@ -71,6 +74,7 @@ public: } void start(std::size_t n) { + std::unique_lock guard { lock_ }; if (n <= workers_.size()) return; for (std::size_t i = workers_.size(); i < n; ++i) { workers_.push_back(std::thread { &thread_pool::proc, this }); From 628914d4288a3471e0d1d8f0145bc4c0fd225d34 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jun 2021 18:03:17 +0800 Subject: [PATCH 09/14] try to adjust recycling strategy for large message cache --- src/ipc.cpp | 72 +++++++++++++++++++++--------------- src/libipc/circ/elem_array.h | 6 +-- src/libipc/prod_cons.h | 48 +++++++++++++----------- src/libipc/queue.h | 13 +++++-- 4 files changed, 82 insertions(+), 57 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index 2739692..dc15588 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -31,8 +31,8 @@ namespace { -using msg_id_t = std::uint32_t; -using acc_t = std::atomic; +using msg_id_t = std::uint32_t; +using acc_t = std::atomic; template struct msg_t; @@ -91,6 +91,14 @@ auto cc_acc() { return static_cast(acc_h.get()); } +IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept { + return (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align; +} + +IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { + return ipc::make_align(alignof(std::max_align_t), align_chunk_size(size)); +} + struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; @@ -99,9 +107,13 @@ struct chunk_info_t { return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { - if (id < 0) return nullptr; - return reinterpret_cast(this + 1) + (chunk_size * id); + ipc::byte_t* chunks_mem() noexcept { + return reinterpret_cast(this + 1); + } + + ipc::byte_t* at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + assert(id >= 0); + return chunks_mem() + (chunk_size * id); } }; @@ -129,16 +141,11 @@ auto& chunk_storages() { return chunk_s; } -IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { - return ipc::make_align(alignof(std::max_align_t), - (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align); -} - chunk_info_t *chunk_storage_info(std::size_t chunk_size) { return chunk_storages()[chunk_size].get_info(chunk_size); } -std::pair apply_storage(std::size_t size) { +std::pair acquire_storage(std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; @@ -165,14 +172,12 @@ void *find_storage(ipc::storage_id_t id, std::size_t size) { void release_storage(ipc::storage_id_t id, std::size_t size) { if (id < 0) { - ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); + ipc::error("[release_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } - std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; - info->lock_.lock(); info->pool_.release(id); info->lock_.unlock(); @@ -381,7 +386,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = apply_storage(size); + auto dat = acquire_storage(size); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -416,7 +421,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push( - recycle_message, + [](void*) { return true; }, info->cc_id_, msg_id, remain, data, size); }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); @@ -437,7 +442,7 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: return [tm, info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push( - recycle_message, + [](void*) { return true; }, info->cc_id_, msg_id, remain, data, size); }, tm)) { return false; @@ -462,7 +467,10 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { for (;;) { // pop a new message typename queue_t::value_t msg; - if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { + bool recycled = false; + if (!wait_for(info_of(h)->rd_waiter_, [que, &msg, &recycled] { + return !que->pop(msg, [&recycled](bool r) { recycled = r; }); + }, tm)) { // pop failed, just return. return {}; } @@ -477,23 +485,29 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { return {}; } std::size_t msg_size = static_cast(r_size); + // large message + if (msg.storage_) { + ipc::storage_id_t buf_id = *reinterpret_cast(&msg.data_); + void* buf = find_storage(buf_id, msg_size); + if (buf != nullptr) { + if (recycled) { + return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) { + release_storage(reinterpret_cast(pmid) - 1, size); + }, reinterpret_cast(buf_id + 1)}; + } else { + return ipc::buff_t{buf, msg_size}; // no recycle + } + } else { + ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); + continue; + } + } // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { if (msg_size <= ipc::data_length) { return make_cache(msg.data_, msg_size); } - if (msg.storage_) { - std::size_t buf_id = *reinterpret_cast(&msg.data_); - void * buf = find_storage(buf_id, msg_size); - if (buf != nullptr) { - return ipc::buff_t{buf, msg_size}; - } - else { - ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); - continue; - } - } // gc if (rc.size() > 1024) { std::vector need_del; diff --git a/src/libipc/circ/elem_array.h b/src/libipc/circ/elem_array.h index 74f031a..0b21f48 100755 --- a/src/libipc/circ/elem_array.h +++ b/src/libipc/circ/elem_array.h @@ -130,10 +130,10 @@ public: return head_.force_push(que, std::forward(f), block_); } - template - bool pop(Q* que, cursor_t* cur, F&& f) { + template + bool pop(Q* que, cursor_t* cur, F&& f, R&& out) { if (cur == nullptr) return false; - return head_.pop(que, *cur, std::forward(f), block_); + return head_.pop(que, *cur, std::forward(f), std::forward(out), block_); } }; diff --git a/src/libipc/prod_cons.h b/src/libipc/prod_cons.h index d5684fa..28d99bd 100755 --- a/src/libipc/prod_cons.h +++ b/src/libipc/prod_cons.h @@ -58,13 +58,14 @@ struct prod_cons_impl> { return false; } - template - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { auto cur_rd = circ::index_of(rd_.load(std::memory_order_relaxed)); if (cur_rd == circ::index_of(wt_.load(std::memory_order_acquire))) { return false; // empty } std::forward(f)(&(elems[cur_rd].data_)); + std::forward(out)(true); rd_.fetch_add(1, std::memory_order_release); return true; } @@ -80,8 +81,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -92,6 +94,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -156,8 +159,9 @@ struct prod_cons_impl> return false; } - template class E, std::size_t DS, std::size_t AS> - bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, E* elems) { + template class E, std::size_t DS, std::size_t AS> + bool pop(W* /*wrapper*/, circ::u2_t& /*cur*/, F&& f, R&& out, E* elems) { byte_t buff[DS]; for (unsigned k = 0;;) { auto cur_rd = rd_.load(std::memory_order_relaxed); @@ -179,6 +183,7 @@ struct prod_cons_impl> std::memcpy(buff, &(elems[circ::index_of(cur_rd)].data_), sizeof(buff)); if (rd_.compare_exchange_weak(cur_rd, cur_rd + 1, std::memory_order_release)) { std::forward(f)(buff); + std::forward(out)(true); return true; } ipc::yield(k); @@ -263,20 +268,20 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E* elems) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E* elems) { if (cur == cursor()) return false; // acquire auto* el = elems + circ::index_of(cur++); std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & ep_mask; - if (rem_cc == 0) { + if ((cur_rc & ep_mask) == 0) { + std::forward(out)(true); return true; } - if (el->rc_.compare_exchange_weak(cur_rc, - cur_rc & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + auto nxt_rc = cur_rc & ~static_cast(wrapper->connected_id()); + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)((nxt_rc & ep_mask) == 0); return true; } ipc::yield(k); @@ -395,8 +400,8 @@ struct prod_cons_impl> { return true; } - template - bool pop(W* wrapper, circ::u2_t& cur, F&& f, E(& elems)[N]) { + template + bool pop(W* wrapper, circ::u2_t& cur, F&& f, R&& out, E(& elems)[N]) { auto* el = elems + circ::index_of(cur); auto cur_fl = el->f_ct_.load(std::memory_order_acquire); if (cur_fl != ~static_cast(cur)) { @@ -406,17 +411,18 @@ struct prod_cons_impl> { std::forward(f)(&(el->data_)); for (unsigned k = 0;;) { auto cur_rc = el->rc_.load(std::memory_order_acquire); - circ::cc_t rem_cc = cur_rc & rc_mask; - if (rem_cc == 0) { + if ((cur_rc & rc_mask) == 0) { + std::forward(out)(true); el->f_ct_.store(cur + N - 1, std::memory_order_release); return true; } - if ((rem_cc & ~wrapper->connected_id()) == 0) { + auto nxt_rc = inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()); + bool last_one = false; + if ((last_one = (nxt_rc & rc_mask) == 0)) { el->f_ct_.store(cur + N - 1, std::memory_order_release); } - if (el->rc_.compare_exchange_weak(cur_rc, - inc_rc(cur_rc) & ~static_cast(wrapper->connected_id()), - std::memory_order_release)) { + if (el->rc_.compare_exchange_weak(cur_rc, nxt_rc, std::memory_order_release)) { + std::forward(out)(last_one); return true; } ipc::yield(k); diff --git a/src/libipc/queue.h b/src/libipc/queue.h index 07aac57..1a782a9 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -171,14 +171,14 @@ public: }); } - template - bool pop(T& item) { + template + bool pop(T& item, F&& out) { if (elems_ == nullptr) { return false; } return elems_->pop(this, &(this->cursor_), [&item](void* p) { ::new (&item) T(std::move(*static_cast(p))); - }); + }, std::forward(out)); } }; @@ -204,7 +204,12 @@ public: } bool pop(T& item) { - return base_t::pop(item); + return base_t::pop(item, [](bool) {}); + } + + template + bool pop(T& item, F&& out) { + return base_t::pop(item, std::forward(out)); } }; From 349094561f769067c1b51abe0fe8f1311312decc Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jun 2021 18:24:30 +0800 Subject: [PATCH 10/14] horrible_cast --- src/ipc.cpp | 8 ++++---- src/libipc/platform/detail.h | 12 ++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index dc15588..4e96c09 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -31,8 +31,8 @@ namespace { -using msg_id_t = std::uint32_t; -using acc_t = std::atomic; +using msg_id_t = std::uint32_t; +using acc_t = std::atomic; template struct msg_t; @@ -492,8 +492,8 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { if (buf != nullptr) { if (recycled) { return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) { - release_storage(reinterpret_cast(pmid) - 1, size); - }, reinterpret_cast(buf_id + 1)}; + release_storage(ipc::detail::horrible_cast(pmid) - 1, size); + }, ipc::detail::horrible_cast(buf_id + 1)}; } else { return ipc::buff_t{buf, msg_size}; // no recycle } diff --git a/src/libipc/platform/detail.h b/src/libipc/platform/detail.h index fbf539a..141de6e 100755 --- a/src/libipc/platform/detail.h +++ b/src/libipc/platform/detail.h @@ -111,5 +111,17 @@ constexpr const T& (min)(const T& a, const T& b) { #endif/*__cplusplus < 201703L*/ +template +auto horrible_cast(U rhs) noexcept + -> typename std::enable_if::value + && std::is_trivially_copyable::value, T>::type { + union { + T t; + U u; + } r = {}; + r.u = rhs; + return r.t; +} + } // namespace detail } // namespace ipc From e3c8f8edc1834b07ba7017c6d2c97e2f03ae0067 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 27 Jun 2021 18:49:23 +0800 Subject: [PATCH 11/14] adjust test, chunk_storages revert to static --- src/ipc.cpp | 4 +++- test/test_ipc.cpp | 10 +++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index 4e96c09..96aecea 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -137,11 +137,13 @@ auto& chunk_storages() { return info; } }; - thread_local ipc::unordered_map chunk_s; + static ipc::unordered_map chunk_s; return chunk_s; } chunk_info_t *chunk_storage_info(std::size_t chunk_size) { + static std::mutex lock; + IPC_UNUSED_ std::lock_guard guard {lock}; return chunk_storages()[chunk_size].get_info(chunk_size); } diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 5ca1a7c..a76b365 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -75,8 +75,8 @@ void test_basic(char const * name) { EXPECT_FALSE(que1.try_send(test2)); que_t que2 { que1.name(), ipc::receiver }; - EXPECT_TRUE(que1.send(test1)); - EXPECT_TRUE(que1.try_send(test2)); + ASSERT_TRUE(que1.send(test1)); + ASSERT_TRUE(que1.try_send(test2)); EXPECT_EQ(que2.recv(), test1); EXPECT_EQ(que2.recv(), test2); @@ -128,7 +128,11 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { return; } ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); - EXPECT_EQ(data_set__.get()[i], got); + if (data_set__.get()[i] != got) { + printf("data_set__.get()[%d] != got, size = %zd/%zd\n", + i, data_set__.get()[i].size(), got.size()); + EXPECT_TRUE(false); + } } }; } From 57a62bc073f623d442d5e4f5863f4c025f4b8eb2 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 11 Jul 2021 13:11:09 +0800 Subject: [PATCH 12/14] fix some bugs --- src/ipc.cpp | 6 ++---- test/test_ipc.cpp | 7 ++++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index 96aecea..f4c5fa8 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -112,7 +112,7 @@ struct chunk_info_t { } ipc::byte_t* at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { - assert(id >= 0); + if (id < 0) return nullptr; return chunks_mem() + (chunk_size * id); } }; @@ -137,13 +137,11 @@ auto& chunk_storages() { return info; } }; - static ipc::unordered_map chunk_s; + thread_local ipc::unordered_map chunk_s; return chunk_s; } chunk_info_t *chunk_storage_info(std::size_t chunk_size) { - static std::mutex lock; - IPC_UNUSED_ std::lock_guard guard {lock}; return chunk_storages()[chunk_size].get_info(chunk_size); } diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index a76b365..45590e3 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -128,9 +128,10 @@ void test_sr(char const * name, int s_cnt, int r_cnt) { return; } ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); - if (data_set__.get()[i] != got) { + auto const &data_set = data_set__.get()[i]; + if (data_set != got) { printf("data_set__.get()[%d] != got, size = %zd/%zd\n", - i, data_set__.get()[i].size(), got.size()); + i, data_set.size(), got.size()); EXPECT_TRUE(false); } } @@ -169,7 +170,7 @@ TEST(IPC, 1vN) { //test_sr("smu", 1, MultiMax); //test_sr("mmu", 1, MultiMax); test_sr("smb", 1, MultiMax); - test_sr("mmb", 1, MultiMax); + //test_sr("mmb", 1, MultiMax); } TEST(IPC, Nv1) { From d0f965359d4bd1c92835fb6253070b0849a30396 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 11 Jul 2021 15:56:30 +0800 Subject: [PATCH 13/14] recycle storage for large message --- src/ipc.cpp | 124 +++++++++++++++++++++++++++++-------- src/libipc/circ/elem_def.h | 2 +- src/libipc/policy.h | 4 +- test/test_ipc.cpp | 2 +- 4 files changed, 102 insertions(+), 30 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index f4c5fa8..a9916ce 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -20,6 +20,7 @@ #include "libipc/utility/log.h" #include "libipc/utility/id_pool.h" +#include "libipc/utility/scope_guard.h" #include "libipc/utility/utility.h" #include "libipc/memory/resource.h" @@ -39,7 +40,7 @@ struct msg_t; template struct msg_t<0, AlignSize> { - msg_id_t conn_; + msg_id_t cc_id_; msg_id_t id_; std::int32_t remain_; bool storage_; @@ -50,8 +51,8 @@ struct msg_t : msg_t<0, AlignSize> { std::aligned_storage_t data_ {}; msg_t() = default; - msg_t(msg_id_t conn, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) - : msg_t<0, AlignSize> {conn, id, remain, (data == nullptr) || (size == 0)} { + msg_t(msg_id_t cc_id, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) + : msg_t<0, AlignSize> {cc_id, id, remain, (data == nullptr) || (size == 0)} { if (this->storage_) { if (data != nullptr) { // copy storage-id @@ -96,9 +97,21 @@ IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept { } IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { - return ipc::make_align(alignof(std::max_align_t), align_chunk_size(size)); + return ipc::make_align(alignof(std::max_align_t), align_chunk_size( + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)) + size)); } +struct chunk_t { + std::atomic &conns() noexcept { + return *reinterpret_cast *>(this); + } + + void *data() noexcept { + return reinterpret_cast(this) + + ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic)); + } +}; + struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; @@ -107,13 +120,13 @@ struct chunk_info_t { return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t* chunks_mem() noexcept { - return reinterpret_cast(this + 1); + ipc::byte_t *chunks_mem() noexcept { + return reinterpret_cast(this + 1); } - ipc::byte_t* at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + chunk_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { if (id < 0) return nullptr; - return chunks_mem() + (chunk_size * id); + return reinterpret_cast(chunks_mem() + (chunk_size * id)); } }; @@ -145,7 +158,7 @@ chunk_info_t *chunk_storage_info(std::size_t chunk_size) { return chunk_storages()[chunk_size].get_info(chunk_size); } -std::pair acquire_storage(std::size_t size) { +std::pair acquire_storage(std::size_t size, ipc::circ::cc_t conns) { std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; @@ -156,7 +169,10 @@ std::pair acquire_storage(std::size_t size) { auto id = info->pool_.acquire(); info->lock_.unlock(); - return { id, info->at(chunk_size, id) }; + auto chunk = info->at(chunk_size, id); + if (chunk == nullptr) return {}; + chunk->conns().store(conns, std::memory_order_relaxed); + return { id, chunk->data() }; } void *find_storage(ipc::storage_id_t id, std::size_t size) { @@ -167,7 +183,7 @@ void *find_storage(ipc::storage_id_t id, std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); auto info = chunk_storage_info(chunk_size); if (info == nullptr) return nullptr; - return info->at(chunk_size, id); + return info->at(chunk_size, id)->data(); } void release_storage(ipc::storage_id_t id, std::size_t size) { @@ -183,13 +199,53 @@ void release_storage(ipc::storage_id_t id, std::size_t size) { info->lock_.unlock(); } +template +bool sub_rc(ipc::wr, + std::atomic &/*conns*/, ipc::circ::cc_t /*curr_conns*/, ipc::circ::cc_t /*conn_id*/) noexcept { + return true; +} + +template +bool sub_rc(ipc::wr, + std::atomic &conns, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) noexcept { + auto last_conns = curr_conns & ~conn_id; + for (unsigned k = 0;;) { + auto chunk_conns = conns.load(std::memory_order_acquire); + if (conns.compare_exchange_weak(chunk_conns, chunk_conns & last_conns, std::memory_order_release)) { + return (chunk_conns & last_conns) == 0; + } + ipc::yield(k); + } +} + +template +void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) { + if (id < 0) { + ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); + return; + } + std::size_t chunk_size = calc_chunk_size(size); + auto info = chunk_storage_info(chunk_size); + if (info == nullptr) return; + + auto chunk = info->at(chunk_size, id); + if (chunk == nullptr) return; + + if (!sub_rc(Flag{}, chunk->conns(), curr_conns, conn_id)) { + return; + } + info->lock_.lock(); + info->pool_.release(id); + info->lock_.unlock(); +} + template -bool recycle_message(void* p) { +bool clear_message(void* p) { auto msg = static_cast(p); if (msg->storage_) { std::int32_t r_size = static_cast(ipc::data_length) + msg->remain_; if (r_size <= 0) { - ipc::error("[recycle_message] invalid msg size: %d\n", (int)r_size); + ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size); return true; } release_storage( @@ -278,8 +334,10 @@ struct queue_generator { template struct detail_impl { -using queue_t = typename queue_generator::queue_t; -using conn_info_t = typename queue_generator::conn_info_t; +using policy_t = Policy; +using flag_t = typename policy_t::flag_t; +using queue_t = typename queue_generator::queue_t; +using conn_info_t = typename queue_generator::conn_info_t; constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept { return static_cast(h); @@ -373,7 +431,8 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s ipc::error("fail: send, que->ready_sending() == false\n"); return false; } - if (que->elems()->connections(std::memory_order_relaxed) == 0) { + ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed); + if (conns == 0) { ipc::error("fail: send, there is no receiver on this connection.\n"); return false; } @@ -386,7 +445,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > ipc::large_msg_limit) { - auto dat = acquire_storage(size); + auto dat = acquire_storage(size, conns); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); @@ -426,7 +485,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size }, tm)) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); if (!que->force_push( - recycle_message, + clear_message, info->cc_id_, msg_id, remain, data, size)) { return false; } @@ -467,15 +526,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { for (;;) { // pop a new message typename queue_t::value_t msg; - bool recycled = false; - if (!wait_for(info_of(h)->rd_waiter_, [que, &msg, &recycled] { - return !que->pop(msg, [&recycled](bool r) { recycled = r; }); + if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { + return !que->pop(msg); }, tm)) { // pop failed, just return. return {}; } info_of(h)->wt_waiter_.broadcast(); - if ((info_of(h)->acc() != nullptr) && (msg.conn_ == info_of(h)->cc_id_)) { + if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length @@ -490,12 +548,24 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { ipc::storage_id_t buf_id = *reinterpret_cast(&msg.data_); void* buf = find_storage(buf_id, msg_size); if (buf != nullptr) { - if (recycled) { - return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) { - release_storage(ipc::detail::horrible_cast(pmid) - 1, size); - }, ipc::detail::horrible_cast(buf_id + 1)}; - } else { + struct recycle_t { + ipc::storage_id_t storage_id; + ipc::circ::cc_t curr_conns; + ipc::circ::cc_t conn_id; + } *r_info = ipc::mem::alloc(recycle_t{ + buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id() + }); + if (r_info == nullptr) { + ipc::log("fail: ipc::mem::alloc.\n"); return ipc::buff_t{buf, msg_size}; // no recycle + } else { + return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) { + auto r_info = static_cast(p_info); + IPC_UNUSED_ auto finally = ipc::guard([r_info] { + ipc::mem::free(r_info); + }); + recycle_storage(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id); + }, r_info}; } } else { ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); diff --git a/src/libipc/circ/elem_def.h b/src/libipc/circ/elem_def.h index f2379e2..4003948 100755 --- a/src/libipc/circ/elem_def.h +++ b/src/libipc/circ/elem_def.h @@ -16,7 +16,7 @@ namespace circ { using u1_t = ipc::uint_t<8>; using u2_t = ipc::uint_t<32>; -/** only supports max 32 connections */ +/** only supports max 32 connections in broadcast mode */ using cc_t = u2_t; constexpr u1_t index_of(u2_t c) noexcept { diff --git a/src/libipc/policy.h b/src/libipc/policy.h index d2ad313..8959607 100755 --- a/src/libipc/policy.h +++ b/src/libipc/policy.h @@ -15,8 +15,10 @@ struct choose; template struct choose { + using flag_t = Flag; + template - using elems_t = circ::elem_array, DataSize, AlignSize>; + using elems_t = circ::elem_array, DataSize, AlignSize>; }; } // namespace policy diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 45590e3..16c076c 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -170,7 +170,7 @@ TEST(IPC, 1vN) { //test_sr("smu", 1, MultiMax); //test_sr("mmu", 1, MultiMax); test_sr("smb", 1, MultiMax); - //test_sr("mmb", 1, MultiMax); + test_sr("mmb", 1, MultiMax); } TEST(IPC, Nv1) { From 1bb96fe2f152b1e46858435272ecffb3ce915242 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 11 Jul 2021 23:01:36 +0800 Subject: [PATCH 14/14] update chunk_storage_info --- src/ipc.cpp | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/src/ipc.cpp b/src/ipc.cpp index a9916ce..dcf60b3 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -131,7 +131,7 @@ struct chunk_info_t { }; auto& chunk_storages() { - class chunk_t { + class chunk_handle_t { ipc::shm::handle handle_; public: @@ -150,12 +150,24 @@ auto& chunk_storages() { return info; } }; - thread_local ipc::unordered_map chunk_s; - return chunk_s; + static ipc::map chunk_hs; + return chunk_hs; } chunk_info_t *chunk_storage_info(std::size_t chunk_size) { - return chunk_storages()[chunk_size].get_info(chunk_size); + auto &storages = chunk_storages(); + std::decay_t::iterator it; + { + static ipc::rw_lock lock; + IPC_UNUSED_ std::shared_lock guard {lock}; + if ((it = storages.find(chunk_size)) == storages.end()) { + using chunk_handle_t = std::decay_t::value_type::second_type; + guard.unlock(); + IPC_UNUSED_ std::lock_guard guard {lock}; + it = storages.emplace(chunk_size, chunk_handle_t{}).first; + } + } + return it->second.get_info(chunk_size); } std::pair acquire_storage(std::size_t size, ipc::circ::cc_t conns) {