diff --git a/include/buffer.h b/include/buffer.h index 719c1fd..5fd9494 100644 --- a/include/buffer.h +++ b/include/buffer.h @@ -37,27 +37,15 @@ public: void * data() noexcept; void const * data() const noexcept; -# if __cplusplus >= 201703L template - auto data() noexcept -> std::enable_if_t, T*> { + auto data() noexcept -> typename std::enable_if::value, T*>::type { return static_cast(data()); } template - auto data() const noexcept -> std::enable_if_t, T*> { + auto data() const noexcept -> typename std::enable_if::value, T*>::type { return static_cast(data()); } -# else /*__cplusplus < 201703L*/ - template - auto data() noexcept -> std::enable_if_t::value, T*> { - return static_cast(data()); - } - - template - auto data() const noexcept -> std::enable_if_t::value, T*> { - return static_cast(data()); - } -# endif/*__cplusplus < 201703L*/ std::size_t size() const noexcept; @@ -84,6 +72,7 @@ public: } friend IPC_EXPORT bool operator==(buffer const & b1, buffer const & b2); + friend IPC_EXPORT bool operator!=(buffer const & b1, buffer const & b2); private: class buffer_; diff --git a/include/def.h b/include/def.h index 2658ca4..ea2bf65 100644 --- a/include/def.h +++ b/include/def.h @@ -29,6 +29,7 @@ enum : std::size_t { invalid_value = (std::numeric_limits::max)(), data_length = 64, small_msg_limit = data_length, + large_msg_cache = 32, default_timeut = 100 // ms }; diff --git a/src/buffer.cpp b/src/buffer.cpp index b014d6b..ce56bce 100644 --- a/src/buffer.cpp +++ b/src/buffer.cpp @@ -9,6 +9,10 @@ bool operator==(buffer const & b1, buffer const & b2) { return (b1.size() == b2.size()) && (std::memcmp(b1.data(), b2.data(), b1.size()) == 0); } +bool operator!=(buffer const & b1, buffer const & b2) { + return !(b1 == b2); +} + class buffer::buffer_ : public pimpl { public: void* p_; diff --git a/src/id_pool.h b/src/id_pool.h index 2801877..e009a37 100644 --- a/src/id_pool.h +++ b/src/id_pool.h @@ -10,12 +10,14 @@ namespace ipc { template -struct id_type { - uint_t<8> id_; - std::aligned_storage_t data_; +struct id_type; - id_type& operator=(uint_t<8> val) { - id_ = val; +template +struct id_type<0, AlignSize> { + uint_t<8> id_; + + id_type& operator=(std::size_t val) { + id_ = static_cast>(val); return (*this); } @@ -24,18 +26,9 @@ struct id_type { } }; -template -struct id_type<0, AlignSize> { - uint_t<8> id_; - - id_type& operator=(uint_t<8> val) { - id_ = val; - return (*this); - } - - operator uint_t<8>() const { - return id_; - } +template +struct id_type : id_type<0, AlignSize> { + std::aligned_storage_t data_; }; template >::max)() // 255 + max_count = ipc::detail::min(large_msg_cache, (std::numeric_limits>::max)()) }; private: @@ -59,7 +52,7 @@ public: void init() { for (std::size_t i = 0; i < max_count;) { - i = next_[i] = static_cast>(i + 1); + i = next_[i] = (i + 1); } } @@ -73,9 +66,7 @@ public: } std::size_t acquire() { - if (empty()) { - return invalid_value; - } + if (empty()) return invalid_value; std::size_t id = cursor_; cursor_ = next_[id]; // point to next return id; diff --git a/src/ipc.cpp b/src/ipc.cpp index d38a5cc..b28244b 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -113,21 +113,55 @@ constexpr std::size_t calc_cls_size(std::size_t size) noexcept { return (((size - 1) / small_msg_limit) + 1) * small_msg_limit; } -std::pair apply_storage(std::size_t size) { - std::size_t cls_size = calc_cls_size(size); - - cls_lock().lock(); - auto& cls_shm = cls_storages()[cls_size]; - cls_lock().unlock(); +auto& cls_storage(std::size_t cls_size) { + IPC_UNUSED_ auto guard = ipc::detail::unique_lock(cls_lock()); + return cls_storages()[cls_size]; +} +template +cls_info_t* cls_storage_info(const char* func, T& cls_shm, std::size_t cls_size) { if (!cls_shm.id_info_.valid() && !cls_shm.id_info_.acquire(("__CLS_INFO__" + ipc::to_string(cls_size)).c_str(), sizeof(cls_info_t))) { - return {}; + ipc::error("[%s] cls_shm.id_info_.acquire failed: cls_size = %zd\n", func, cls_size); + return nullptr; } auto info = static_cast(cls_shm.id_info_.get()); if (info == nullptr) { - return {}; + ipc::error("[%s] cls_shm.id_info_.get failed: cls_size = %zd\n", func, cls_size); + return nullptr; } + return info; +} + +template +byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std::size_t id) { + if (id == invalid_value) { + return nullptr; + } + if (!cls_shm.mems_[id].valid() && + !cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) + + "__" + ipc::to_string(id)).c_str(), + cls_size + sizeof(acc_t))) { + ipc::error("[%s] cls_shm.mems_[id].acquire failed: id = %zd, cls_size = %zd\n", func, id, cls_size); + return nullptr; + } + + byte_t* ptr = static_cast(cls_shm.mems_[id].get()); + if (ptr == nullptr) { + ipc::error("[%s] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", func, id, cls_size); + return nullptr; + } + return ptr; +} + +std::pair apply_storage(std::size_t conn_count, std::size_t size) { + if (conn_count == 0) return {}; + + std::size_t cls_size = calc_cls_size(size); + auto & cls_shm = cls_storage(cls_size); + + auto info = cls_storage_info("apply_storage", cls_shm, cls_size); + if (info == nullptr) return {}; info->lock_.lock(); info->pool_.prepare(); @@ -135,50 +169,80 @@ std::pair apply_storage(std::size_t size) { auto id = info->pool_.acquire(); info->lock_.unlock(); - if (id == invalid_value) { - return {}; - } - if (!cls_shm.mems_[id].valid() && - !cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) + - "__" + ipc::to_string(id)).c_str(), cls_size)) { - return {}; - } - return { id, cls_shm.mems_[id].get() }; + auto ptr = cls_storage_mem("apply_storage", cls_shm, cls_size, id); + if (ptr == nullptr) return {}; + reinterpret_cast(ptr + cls_size)->store(static_cast(conn_count), std::memory_order_release); + return { id, ptr }; } void* find_storage(std::size_t id, std::size_t size) { std::size_t cls_size = calc_cls_size(size); + auto & cls_shm = cls_storage(cls_size); - cls_lock().lock(); - auto& cls_shm = cls_storages()[cls_size]; - cls_lock().unlock(); - - if (id == invalid_value) { + auto ptr = cls_storage_mem("find_storage", cls_shm, cls_size, id); + if (ptr == nullptr) return nullptr; + if (reinterpret_cast(ptr + cls_size)->load(std::memory_order_acquire) == 0) { + ipc::error("[find_storage] cc test failed: id = %zd, cls_size = %zd\n", id, cls_size); return nullptr; } - if (!cls_shm.mems_[id].valid() && - !cls_shm.mems_[id].acquire(("__CLS_MEM_BLOCK__" + ipc::to_string(cls_size) + - "__" + ipc::to_string(id)).c_str(), cls_size)) { - return nullptr; - } - return cls_shm.mems_[id].get(); + return ptr; } void recycle_storage(std::size_t id, std::size_t size) { + if (id == invalid_value) { + ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size); + return; + } + std::size_t cls_size = calc_cls_size(size); + auto & cls_shm = cls_storage(cls_size); - cls_lock().lock(); - auto& cls_shm = cls_storages()[cls_size]; - cls_lock().unlock(); - - if (!cls_shm.id_info_.valid() && - !cls_shm.id_info_.acquire(("__CLS_INFO__" + ipc::to_string(cls_size)).c_str(), sizeof(cls_info_t))) { + if (!cls_shm.mems_[id].valid()) { + ipc::error("[recycle_storage] should find storage first: id = %zd, cls_size = %zd\n", id, cls_size); return; } - auto info = static_cast(cls_shm.id_info_.get()); - if (info == nullptr) { + byte_t* ptr = static_cast(cls_shm.mems_[id].get()); + if (ptr == nullptr) { + ipc::error("[recycle_storage] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", id, cls_size); return; } + if (reinterpret_cast(ptr + cls_size)->fetch_sub(1, std::memory_order_acq_rel) > 1) { + // not the last receiver, just return + return; + } + + auto info = cls_storage_info("recycle_storage", cls_shm, cls_size); + if (info == nullptr) return; + + info->lock_.lock(); + info->pool_.release(id); + info->lock_.unlock(); +} + +void clear_storage(std::size_t id, std::size_t size) { + if (id == invalid_value) { + ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size); + return; + } + + std::size_t cls_size = calc_cls_size(size); + auto & cls_shm = cls_storage(cls_size); + + auto ptr = cls_storage_mem("clear_storage", cls_shm, cls_size, id); + if (ptr == nullptr) return; + + auto cc_flag = reinterpret_cast(ptr + cls_size); + for (unsigned k = 0;;) { + auto cc_curr = cc_flag->load(std::memory_order_acquire); + if (cc_curr == 0) return; // means this id has been cleared + if (cc_flag->compare_exchange_weak(cc_curr, 0, std::memory_order_release)) { + break; + } + ipc::yield(k); + } + + auto info = cls_storage_info("clear_storage", cls_shm, cls_size); + if (info == nullptr) return; info->lock_.lock(); info->pool_.release(id); @@ -339,27 +403,28 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); if (size > small_msg_limit) { - auto dat = apply_storage(size); + auto dat = apply_storage(que->conn_count(), size); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); - return try_push(static_cast(size) - static_cast(data_length), &(dat.first), 0); + return try_push(static_cast(size) - + static_cast(data_length), &(dat.first), 0); } // try using message fragment // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment - int offset = 0; + std::int32_t offset = 0; for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { - if (!try_push(static_cast(size) - offset - static_cast(data_length), + if (!try_push(static_cast(size) - offset - static_cast(data_length), static_cast(data) + offset, data_length)) { return false; } } // if remain > 0, this is the last message fragment - int remain = static_cast(size) - offset; + std::int32_t remain = static_cast(size) - offset; if (remain > 0) { - if (!try_push(remain - static_cast(data_length), + if (!try_push(remain - static_cast(data_length), static_cast(data) + offset, static_cast(remain))) { return false; } @@ -369,12 +434,19 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s static bool send(ipc::handle_t h, void const * data, std::size_t size) { return send([](auto info, auto que, auto msg_id) { - return [info, que, msg_id](int remain, void const * data, std::size_t size) { + return [info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push(info->cc_id_, msg_id, remain, data, size); }, que->dis_flag() ? 0 : static_cast(default_timeut))) { ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size); - if (!que->force_push(info->cc_id_, msg_id, remain, data, size)) { + if (!que->force_push([](void* p) { + auto tmp_msg = static_cast(p); + if (tmp_msg->storage_) { + clear_storage(*reinterpret_cast(&tmp_msg->data_), + static_cast(data_length) + tmp_msg->remain_); + } + return true; + }, info->cc_id_, msg_id, remain, data, size)) { return false; } } @@ -386,7 +458,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { static bool try_send(ipc::handle_t h, void const * data, std::size_t size) { return send([](auto info, auto que, auto msg_id) { - return [info, que, msg_id](int remain, void const * data, std::size_t size) { + return [info, que, msg_id](std::int32_t remain, void const * data, std::size_t size) { if (!wait_for(info->wt_waiter_, [&] { return !que->push(info->cc_id_, msg_id, remain, data, size); }, 0)) { @@ -419,7 +491,7 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length - auto remain = static_cast(static_cast(data_length) + msg.remain_); + std::size_t remain = static_cast(data_length) + msg.remain_; // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { diff --git a/src/queue.h b/src/queue.h index 8c67aa8..857ce67 100644 --- a/src/queue.h +++ b/src/queue.h @@ -149,11 +149,11 @@ public: }); } - template - auto force_push(P&&... params) { + template + auto force_push(F&& prep, P&&... params) { if (elems_ == nullptr) return false; return elems_->force_push([&](void* p) { - ::new (p) T(std::forward

(params)...); + if (prep(p)) ::new (p) T(std::forward

(params)...); }); } diff --git a/test/test.h b/test/test.h index 96a5173..49b630e 100755 --- a/test/test.h +++ b/test/test.h @@ -8,6 +8,7 @@ #include #include #include +#include #if defined(__GNUC__) # include // abi::__cxa_demangle @@ -106,7 +107,7 @@ void benchmark_prod_cons(T* cq) { // std::unique_lock guard { lc }; // std::cout << cid << "-recving: " << (i * 100) / (Loops * N) << "%" << std::endl; // } - vf.push_data(cid, msg); + vf.push_data(cid, std::forward(msg)); ++i; }); // { diff --git a/test/test_ipc.cpp b/test/test_ipc.cpp index 65d94a2..d8deb48 100755 --- a/test/test_ipc.cpp +++ b/test/test_ipc.cpp @@ -44,7 +44,7 @@ struct test_verify { void prepare(void* /*pt*/) {} - void push_data(int cid, ipc::buff_t & msg) { + void push_data(int cid, ipc::buff_t && msg) { list_[cid].emplace_back(std::move(msg)); } @@ -90,7 +90,7 @@ struct test_cq { QCOMPARE(msg, ipc::buff_t('\0')); return; } - proc(msg); + proc(std::move(msg)); } while(1); } @@ -136,7 +136,7 @@ struct test_cq { QCOMPARE(msg, ipc::buff_t('\0')); return; } - proc(msg); + proc(std::move(msg)); } while(1); } @@ -313,7 +313,7 @@ void test_prod_cons() { } void Unit::test_route() { - //return; + // return; std::vector const datas = { "hello!", "foo", @@ -402,12 +402,24 @@ void Unit::test_route_performance() { void Unit::test_channel() { // return; + int fail_v = 0; + std::thread t1 {[&] { ipc::channel cc { "my-ipc-channel" }; for (std::size_t i = 0;; ++i) { ipc::buff_t dd = cc.recv(); if (dd.size() < 2) return; - QCOMPARE(dd, datas__[i]); + if (dd != datas__[i]) { + std::printf("fail recv: %zd-[%zd, %zd]\n", i, dd.size(), datas__[i].size()); + // for (std::size_t k = 0; k < dd.size(); ++k) { + // if (dd.data()[k] != datas__[i].data()[k]) { + // std::printf("fail check: %zd-%zd, %02x != %02x\n", + // i, k, (unsigned)dd .data()[k], + // (unsigned)datas__[i].data()[k]); + // } + // } + ++fail_v; + } } }}; @@ -415,7 +427,7 @@ void Unit::test_channel() { ipc::channel cc { "my-ipc-channel" }; cc.wait_for_recv(1); for (std::size_t i = 0; i < static_cast((std::min)(100, LoopCount)); ++i) { - std::cout << "sending: " << i << "-[" << datas__[i].size() << "]" << std::endl; + std::printf("sending: %zd-[%zd]\n", i, datas__[i].size()); cc.send(datas__[i]); } cc.send(ipc::buff_t('\0')); @@ -423,6 +435,8 @@ void Unit::test_channel() { }}; t2.join(); + + QCOMPARE(fail_v, 0); } void Unit::test_channel_rtt() {