diff --git a/include/libipc/def.h b/include/libipc/def.h index 9be5206..2b80b81 100755 --- a/include/libipc/def.h +++ b/include/libipc/def.h @@ -29,7 +29,7 @@ enum : std::size_t { invalid_value = (std::numeric_limits::max)(), data_length = 64, large_msg_limit = data_length, - large_msg_align = 512, + large_msg_align = 1024, large_msg_cache = 32, default_timeout = 100 // ms }; diff --git a/src/ipc.cpp b/src/ipc.cpp index 9835d50..ae6f270 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -51,15 +51,16 @@ struct msg_t : msg_t<0, AlignSize> { std::aligned_storage_t data_ {}; msg_t() = default; - msg_t(msg_id_t c, msg_id_t i, std::int32_t r, void const * d, std::size_t s) - : msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } { + msg_t(msg_id_t conn, msg_id_t id, std::int32_t remain, void const * data, std::size_t size) + : msg_t<0, AlignSize> {conn, id, remain, (data == nullptr) || (size == 0)} { if (this->storage_) { - if (d != nullptr) { + if (data != nullptr) { // copy storage-id - *reinterpret_cast(&data_) = *static_cast(d); + *reinterpret_cast(&data_) = + *static_cast(data); } } - else std::memcpy(&data_, d, s); + else std::memcpy(&data_, data, size); } }; @@ -95,17 +96,13 @@ struct chunk_info_t { ipc::id_pool<> pool_; ipc::spin_lock lock_; - IPC_CONSTEXPR_ static std::size_t chunks_elem_size(std::size_t chunk_size) noexcept { - return ipc::make_align(alignof(std::max_align_t), chunk_size); - } - IPC_CONSTEXPR_ static std::size_t chunks_mem_size(std::size_t chunk_size) noexcept { - return ipc::id_pool<>::max_count * chunks_elem_size(chunk_size); + return ipc::id_pool<>::max_count * chunk_size; } - ipc::byte_t *at(std::size_t chunk_size, std::size_t id) noexcept { - if (id == ipc::invalid_value) return nullptr; - return reinterpret_cast(this + 1) + (chunks_elem_size(chunk_size) * id); + ipc::byte_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept { + if (id < 0) return nullptr; + return reinterpret_cast(this + 1) + (chunk_size * id); } }; @@ -129,29 +126,22 @@ auto& chunk_storages() { return info; } }; - static ipc::unordered_map chunk_s; + thread_local ipc::unordered_map chunk_s; return chunk_s; } -auto& chunk_lock() { - static ipc::spin_lock chunk_l; - return chunk_l; +IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept { + return ipc::make_align(alignof(std::max_align_t), + (((size - 1) / ipc::large_msg_align) + 1) * ipc::large_msg_align); } -constexpr std::size_t calc_chunk_size(std::size_t size) noexcept { - return ( ((size - 1) / ipc::large_msg_align) + 1 ) * ipc::large_msg_align; +chunk_info_t *chunk_storage_info(std::size_t chunk_size) { + return chunk_storages()[chunk_size].get_info(chunk_size); } -auto& chunk_storage(std::size_t chunk_size) { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(chunk_lock()); - return chunk_storages()[chunk_size]; -} - -std::pair apply_storage(std::size_t size) { +std::pair apply_storage(std::size_t size) { std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return {}; info->lock_.lock(); @@ -163,27 +153,25 @@ std::pair apply_storage(std::size_t size) { return { id, info->at(chunk_size, id) }; } -void *find_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { +void *find_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { ipc::error("[find_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return nullptr; } std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return nullptr; return info->at(chunk_size, id); } -void clear_storage(std::size_t id, std::size_t size) { - if (id == ipc::invalid_value) { +void release_storage(ipc::storage_id_t id, std::size_t size) { + if (id < 0) { ipc::error("[clear_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size); return; } std::size_t chunk_size = calc_chunk_size(size); - auto & chunk_shm = chunk_storage(chunk_size); - auto info = chunk_shm.get_info(chunk_size); + auto info = chunk_storage_info(chunk_size); if (info == nullptr) return; info->lock_.lock(); @@ -195,9 +183,14 @@ template bool recycle_message(void* p) { auto msg = static_cast(p); if (msg->storage_) { - clear_storage( - *reinterpret_cast(&msg->data_), - static_cast(ipc::data_length) + msg->remain_); + std::int32_t r_size = static_cast(ipc::data_length) + msg->remain_; + if (r_size <= 0) { + ipc::error("[recycle_message] invalid msg size: %d\n", (int)r_size); + return true; + } + release_storage( + *reinterpret_cast(&msg->data_), + static_cast(r_size)); } return true; } @@ -220,7 +213,7 @@ struct conn_info_head { * - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html * - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827 */ - ipc::tls::pointer> recv_cache_; + ipc::tls::pointer> recv_cache_; conn_info_head(char const * name) : name_ {name} @@ -409,11 +402,11 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s static_cast(ipc::data_length), &(dat.first), 0); } // try using message fragment - // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); + //ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment std::int32_t offset = 0; - for (int i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { + for (std::int32_t i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { if (!try_push(static_cast(size) - offset - static_cast(ipc::data_length), static_cast(data) + offset, ipc::data_length)) { return false; @@ -479,7 +472,7 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { return {}; } auto& rc = info_of(h)->recv_cache(); - while (1) { + for (;;) { // pop a new message typename queue_t::value_t msg; if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) { @@ -491,20 +484,28 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length - std::size_t remain = static_cast(ipc::data_length) + msg.remain_; + std::int32_t r_size = static_cast(ipc::data_length) + msg.remain_; + if (r_size <= 0) { + ipc::error("fail: recv, r_size = %d\n", (int)r_size); + return {}; + } + std::size_t msg_size = static_cast(r_size); // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { - if (remain <= ipc::data_length) { - return make_cache(msg.data_, remain); + if (msg_size <= ipc::data_length) { + return make_cache(msg.data_, msg_size); } if (msg.storage_) { std::size_t buf_id = *reinterpret_cast(&msg.data_); - void * buf = find_storage(buf_id, remain); + void * buf = find_storage(buf_id, msg_size); if (buf != nullptr) { - return ipc::buff_t{buf, remain}; + return ipc::buff_t{buf, msg_size}; + } + else { + ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size); + continue; } - else ipc::log("fail: shm::handle for big message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, remain); } // gc if (rc.size() > 1024) { @@ -518,14 +519,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { for (auto id : need_del) rc.erase(id); } // cache the first message fragment - rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) }); + rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, msg_size) }); } // has cached before this message else { auto& cac = cac_it->second; // this is the last message fragment if (msg.remain_ <= 0) { - cac.append(&(msg.data_), remain); + cac.append(&(msg.data_), msg_size); // finish this message, erase it from cache auto buff = std::move(cac.buff_); rc.erase(cac_it); diff --git a/src/libipc/memory/resource.h b/src/libipc/memory/resource.h index 1084e64..063e8dc 100755 --- a/src/libipc/memory/resource.h +++ b/src/libipc/memory/resource.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -49,6 +50,11 @@ using unordered_map = std::unordered_map< Key, T, std::hash, std::equal_to, ipc::mem::allocator> >; +template +using map = std::map< + Key, T, std::less, ipc::mem::allocator> +>; + template using basic_string = std::basic_string< Char, std::char_traits, ipc::mem::allocator diff --git a/src/libipc/utility/id_pool.h b/src/libipc/utility/id_pool.h index 7207aa9..24d32e4 100755 --- a/src/libipc/utility/id_pool.h +++ b/src/libipc/utility/id_pool.h @@ -2,13 +2,15 @@ #include // std::aligned_storage_t #include // std::memcmp +#include #include "libipc/def.h" - #include "libipc/platform/detail.h" namespace ipc { +using storage_id_t = std::int32_t; + template struct id_type; @@ -16,7 +18,7 @@ template struct id_type<0, AlignSize> { uint_t<8> id_; - id_type& operator=(std::size_t val) { + id_type& operator=(storage_id_t val) { id_ = static_cast>(val); return (*this); } @@ -57,7 +59,7 @@ public: } void init() { - for (std::size_t i = 0; i < max_count;) { + for (storage_id_t i = 0; i < max_count;) { i = next_[i] = (i + 1); } } @@ -71,22 +73,22 @@ public: return cursor_ == max_count; } - std::size_t acquire() { - if (empty()) return invalid_value; - std::size_t id = cursor_; + storage_id_t acquire() { + if (empty()) return -1; + storage_id_t id = cursor_; cursor_ = next_[id]; // point to next return id; } - bool release(std::size_t id) { - if (id == invalid_value) return false; + bool release(storage_id_t id) { + if (id < 0) return false; next_[id] = cursor_; cursor_ = static_cast>(id); // put it back return true; } - void * at(std::size_t id) { return &(next_[id].data_); } - void const * at(std::size_t id) const { return &(next_[id].data_); } + void * at(storage_id_t id) { return &(next_[id].data_); } + void const * at(storage_id_t id) const { return &(next_[id].data_); } }; template @@ -94,8 +96,8 @@ class obj_pool : public id_pool { using base_t = id_pool; public: - T * at(std::size_t id) { return reinterpret_cast(base_t::at(id)); } - T const * at(std::size_t id) const { return reinterpret_cast(base_t::at(id)); } + T * at(storage_id_t id) { return reinterpret_cast(base_t::at(id)); } + T const * at(storage_id_t id) const { return reinterpret_cast(base_t::at(id)); } }; } // namespace ipc diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index faae13c..5ca1a7c 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -1,6 +1,8 @@ #include #include +#include +#include #include #include "libipc/ipc.h" @@ -16,15 +18,28 @@ using namespace ipc; namespace { +constexpr int LoopCount = 10000; +constexpr int MultiMax = 8; + +struct msg_head { + int id_; +}; + class rand_buf : public buffer { public: rand_buf() { - int size = capo::random<>{1, 65536}(); + int size = capo::random<>{sizeof(msg_head), 65536}(); *this = buffer(new char[size], size, [](void * p, std::size_t) { delete [] static_cast(p); }); } + rand_buf(msg_head const &msg) { + *this = buffer(new msg_head{msg}, sizeof(msg), [](void * p, std::size_t) { + delete static_cast(p); + }); + } + rand_buf(rand_buf &&) = default; rand_buf(rand_buf const & rhs) { if (rhs.empty()) return; @@ -40,11 +55,11 @@ public: } void set_id(int k) noexcept { - *get() = static_cast(k); + get()->id_ = k; } int get_id() const noexcept { - return static_cast(*get()); + return get()->id_; } using buffer::operator=; @@ -67,57 +82,67 @@ void test_basic(char const * name) { EXPECT_EQ(que2.recv(), test2); } -template -void test_sr(char const * name, int size, int s_cnt, int r_cnt) { - using que_t = chan; +class data_set { + std::vector datas_; +public: + data_set() { + datas_.resize(LoopCount); + for (int i = 0; i < LoopCount; ++i) { + datas_[i].set_id(i); + } + } + + std::vector const &get() const noexcept { + return datas_; + } +} const data_set__; + +template > +void test_sr(char const * name, int s_cnt, int r_cnt) { ipc_ut::sender().start(static_cast(s_cnt)); ipc_ut::reader().start(static_cast(r_cnt)); + + std::atomic_thread_fence(std::memory_order_seq_cst); ipc_ut::test_stopwatch sw; - std::vector tests(size); for (int k = 0; k < s_cnt; ++k) { - ipc_ut::sender() << [name, &tests, &sw, r_cnt, k] { - que_t que1 { name }; - EXPECT_TRUE(que1.wait_for_recv(r_cnt)); + ipc_ut::sender() << [name, &sw, r_cnt, k] { + Que que { name, ipc::sender }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); sw.start(); - for (auto & buf : tests) { - rand_buf data { buf }; - data.set_id(k); - EXPECT_TRUE(que1.send(data)); + for (int i = 0; i < (int)data_set__.get().size(); ++i) { + EXPECT_TRUE(que.send(data_set__.get()[i])); } }; } for (int k = 0; k < r_cnt; ++k) { - ipc_ut::reader() << [name, &tests, s_cnt] { - que_t que2 { name, ipc::receiver }; - std::vector cursors(s_cnt); + ipc_ut::reader() << [name] { + Que que { name, ipc::receiver }; for (;;) { - rand_buf got { que2.recv() }; + rand_buf got { que.recv() }; ASSERT_FALSE(got.empty()); - int & cur = cursors.at(got.get_id()); - ASSERT_TRUE((cur >= 0) && (cur < static_cast(tests.size()))); - rand_buf buf { tests.at(cur++) }; - buf.set_id(got.get_id()); - EXPECT_EQ(got, buf); - int n = 0; - for (; n < static_cast(cursors.size()); ++n) { - if (cursors[n] < static_cast(tests.size())) break; + int i = got.get_id(); + if (i == -1) { + return; } - if (n == static_cast(cursors.size())) break; + ASSERT_TRUE((i >= 0) && (i < (int)data_set__.get().size())); + EXPECT_EQ(data_set__.get()[i], got); } }; } ipc_ut::sender().wait_for_done(); + Que que { name }; + EXPECT_TRUE(que.wait_for_recv(r_cnt)); + for (int k = 0; k < r_cnt; ++k) { + que.send(rand_buf{msg_head{-1}}); + } ipc_ut::reader().wait_for_done(); - sw.print_elapsed(s_cnt, r_cnt, size, name); + sw.print_elapsed(s_cnt, r_cnt, (int)data_set__.get().size(), name); } -constexpr int LoopCount = 10000; -constexpr int MultiMax = 8; - } // internal-linkage TEST(IPC, basic) { @@ -129,22 +154,26 @@ TEST(IPC, basic) { } TEST(IPC, 1v1) { - test_sr("ssu", LoopCount, 1, 1); - test_sr("smu", LoopCount, 1, 1); - test_sr("mmu", LoopCount, 1, 1); - test_sr("smb", LoopCount, 1, 1); - test_sr("mmb", LoopCount, 1, 1); + test_sr("ssu", 1, 1); + test_sr("smu", 1, 1); + test_sr("mmu", 1, 1); + test_sr("smb", 1, 1); + test_sr("mmb", 1, 1); } TEST(IPC, 1vN) { - test_sr("smb", LoopCount, 1, MultiMax); - test_sr("mmb", LoopCount, 1, MultiMax); + //test_sr("smu", 1, MultiMax); + //test_sr("mmu", 1, MultiMax); + test_sr("smb", 1, MultiMax); + test_sr("mmb", 1, MultiMax); } TEST(IPC, Nv1) { - test_sr("mmb", LoopCount, MultiMax, 1); + //test_sr("mmu", MultiMax, 1); + test_sr("mmb", MultiMax, 1); } TEST(IPC, NvN) { - test_sr("mmb", LoopCount, MultiMax, MultiMax); + //test_sr("mmu", MultiMax, MultiMax); + test_sr("mmb", MultiMax, MultiMax); } diff --git a/test/thread_pool.h b/test/thread_pool.h index a7d2b28..03b7e8f 100755 --- a/test/thread_pool.h +++ b/test/thread_pool.h @@ -9,6 +9,8 @@ #include // std::size_t #include // assert +#include "capo/scope_guard.hpp" + namespace ipc_ut { class thread_pool final { @@ -32,6 +34,9 @@ class thread_pool final { if (pool->quit_) return; if (pool->jobs_.empty()) { pool->waiting_cnt_ += 1; + CAPO_SCOPE_GUARD_ = [pool] { + pool->waiting_cnt_ -= 1; + }; if (pool->waiting_cnt_ == pool->workers_.size()) { pool->cv_empty_.notify_all(); @@ -41,8 +46,6 @@ class thread_pool final { pool->cv_jobs_.wait(guard); if (pool->quit_) return; } while (pool->jobs_.empty()); - - pool->waiting_cnt_ -= 1; } assert(!pool->jobs_.empty()); job = std::move(pool->jobs_.front()); @@ -71,6 +74,7 @@ public: } void start(std::size_t n) { + std::unique_lock guard { lock_ }; if (n <= workers_.size()) return; for (std::size_t i = workers_.size(); i < n; ++i) { workers_.push_back(std::thread { &thread_pool::proc, this });