From 85c9eecdfd3cb6f6c855b1ba39226da05a9f515d Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 20 Sep 2020 12:37:47 +0800 Subject: [PATCH] =?UTF-8?q?=E9=81=BF=E5=85=8D=E7=BC=96=E8=AF=91=E6=97=B6?= =?UTF-8?q?=E7=9A=84=E5=91=BD=E5=90=8D=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/libipc/ipc.h | 24 +++++------ src/ipc.cpp | 96 ++++++++++++++++++++++---------------------- 2 files changed, 60 insertions(+), 60 deletions(-) diff --git a/include/libipc/ipc.h b/include/libipc/ipc.h index 37f3c7e..f0eba66 100755 --- a/include/libipc/ipc.h +++ b/include/libipc/ipc.h @@ -19,20 +19,20 @@ enum : unsigned { template struct IPC_EXPORT chan_impl { - static bool connect (handle_t * ph, char const * name, unsigned mode); - static void disconnect(handle_t h); - static void destroy (handle_t h); + static bool connect (ipc::handle_t * ph, char const * name, unsigned mode); + static void disconnect(ipc::handle_t h); + static void destroy (ipc::handle_t h); - static char const * name(handle_t h); + static char const * name(ipc::handle_t h); - static std::size_t recv_count(handle_t h); - static bool wait_for_recv(handle_t h, std::size_t r_count, std::size_t tm); + static std::size_t recv_count(ipc::handle_t h); + static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm); - static bool send(handle_t h, void const * data, std::size_t size, std::size_t tm); - static buff_t recv(handle_t h, std::size_t tm); + static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm); + static buff_t recv(ipc::handle_t h, std::size_t tm); - static bool try_send(handle_t h, void const * data, std::size_t size, std::size_t tm); - static buff_t try_recv(handle_t h); + static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std::size_t tm); + static buff_t try_recv(ipc::handle_t h); }; template @@ -40,7 +40,7 @@ class chan_wrapper { private: using detail_t = chan_impl; - handle_t h_ = nullptr; + ipc::handle_t h_ = nullptr; unsigned mode_ = ipc::sender; public: @@ -71,7 +71,7 @@ public: return detail_t::name(h_); } - handle_t handle() const noexcept { + ipc::handle_t handle() const noexcept { return h_; } diff --git a/src/ipc.cpp b/src/ipc.cpp index 379799a..b30b971 100755 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -31,8 +31,6 @@ namespace { -using namespace ipc; - using msg_id_t = std::uint32_t; using acc_t = std::atomic; @@ -65,54 +63,54 @@ struct msg_t : msg_t<0, AlignSize> { }; template -buff_t make_cache(T& data, std::size_t size) { - auto ptr = mem::alloc(size); +ipc::buff_t make_cache(T& data, std::size_t size) { + auto ptr = ipc::mem::alloc(size); std::memcpy(ptr, &data, (ipc::detail::min)(sizeof(data), size)); - return { ptr, size, mem::free }; + return { ptr, size, ipc::mem::free }; } struct cache_t { std::size_t fill_; - buff_t buff_; + ipc::buff_t buff_; - cache_t(std::size_t f, buff_t&& b) + cache_t(std::size_t f, ipc::buff_t && b) : fill_(f), buff_(std::move(b)) {} void append(void const * data, std::size_t size) { if (fill_ >= buff_.size() || data == nullptr || size == 0) return; auto new_fill = (ipc::detail::min)(fill_ + size, buff_.size()); - std::memcpy(static_cast(buff_.data()) + fill_, data, new_fill - fill_); + std::memcpy(static_cast(buff_.data()) + fill_, data, new_fill - fill_); fill_ = new_fill; } }; auto cc_acc() { - static shm::handle acc_h("__CA_CONN__", sizeof(acc_t)); + static ipc::shm::handle acc_h("__CA_CONN__", sizeof(acc_t)); return static_cast(acc_h.get()); } auto& cls_storages() { struct cls_t { - shm::handle id_info_; - std::array::max_count> mems_; + ipc::shm::handle id_info_; + std::array::max_count> mems_; }; static ipc::unordered_map cls_s; return cls_s; } auto& cls_lock() { - static spin_lock cls_l; + static ipc::spin_lock cls_l; return cls_l; } struct cls_info_t { - id_pool<> pool_; - spin_lock lock_; + ipc::id_pool<> pool_; + ipc::spin_lock lock_; }; constexpr std::size_t calc_cls_size(std::size_t size) noexcept { - return (((size - 1) / large_msg_limit) + 1) * large_msg_limit; + return (((size - 1) / ipc::large_msg_limit) + 1) * ipc::large_msg_limit; } auto& cls_storage(std::size_t cls_size) { @@ -136,8 +134,8 @@ cls_info_t* cls_storage_info(const char* func, T& cls_shm, std::size_t cls_size) } template -byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std::size_t id) { - if (id == invalid_value) { +ipc::byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std::size_t id) { + if (id == ipc::invalid_value) { return nullptr; } if (!cls_shm.mems_[id].valid() && @@ -148,7 +146,7 @@ byte_t* cls_storage_mem(const char* func, T& cls_shm, std::size_t cls_size, std: return nullptr; } - byte_t* ptr = static_cast(cls_shm.mems_[id].get()); + auto ptr = static_cast(cls_shm.mems_[id].get()); if (ptr == nullptr) { ipc::error("[%s] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", func, id, cls_size); return nullptr; @@ -191,7 +189,7 @@ void* find_storage(std::size_t id, std::size_t size) { } void recycle_storage(std::size_t id, std::size_t size) { - if (id == invalid_value) { + if (id == ipc::invalid_value) { ipc::error("[recycle_storage] id is invalid: id = %zd, size = %zd\n", id, size); return; } @@ -203,7 +201,7 @@ void recycle_storage(std::size_t id, std::size_t size) { ipc::error("[recycle_storage] should find storage first: id = %zd, cls_size = %zd\n", id, cls_size); return; } - byte_t* ptr = static_cast(cls_shm.mems_[id].get()); + auto ptr = static_cast(cls_shm.mems_[id].get()); if (ptr == nullptr) { ipc::error("[recycle_storage] cls_shm.mems_[id].get failed: id = %zd, cls_size = %zd\n", id, cls_size); return; @@ -222,7 +220,7 @@ void recycle_storage(std::size_t id, std::size_t size) { } void clear_storage(std::size_t id, std::size_t size) { - if (id == invalid_value) { + if (id == ipc::invalid_value) { ipc::error("[clear_storage] id is invalid: id = %zd, size = %zd\n", id, size); return; } @@ -255,8 +253,8 @@ struct conn_info_head { ipc::string name_; msg_id_t cc_id_; // connection-info id - waiter cc_waiter_, wt_waiter_, rd_waiter_; - shm::handle acc_h_; + ipc::waiter cc_waiter_, wt_waiter_, rd_waiter_; + ipc::shm::handle acc_h_; /* * thread_local may have some bugs. @@ -269,7 +267,7 @@ struct conn_info_head { * - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html * - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827 */ - tls::pointer> recv_cache_; + ipc::tls::pointer> recv_cache_; conn_info_head(char const * name) : name_ (name) @@ -313,7 +311,7 @@ bool wait_for(W& waiter, F&& pred, std::size_t tm) { } template struct queue_generator { @@ -347,10 +345,10 @@ constexpr static queue_t* queue_of(ipc::handle_t h) { /* API implementations */ -static bool connect(handle_t * ph, char const * name, bool start) { +static bool connect(ipc::handle_t * ph, char const * name, bool start) { assert(ph != nullptr); if (*ph == nullptr) { - *ph = mem::alloc(name); + *ph = ipc::mem::alloc(name); } auto que = queue_of(*ph); if (que == nullptr) { @@ -378,13 +376,13 @@ static void disconnect(ipc::handle_t h) { static void destroy(ipc::handle_t h) { disconnect(h); - mem::free(info_of(h)); + ipc::mem::free(info_of(h)); } static std::size_t recv_count(ipc::handle_t h) { auto que = queue_of(h); if (que == nullptr) { - return invalid_value; + return ipc::invalid_value; } return que->conn_count(); } @@ -426,30 +424,31 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s } auto msg_id = acc->fetch_add(1, std::memory_order_relaxed); auto try_push = std::forward(gen_push)(info_of(h), que, msg_id); - if (size > large_msg_limit) { + if (size > ipc::large_msg_limit) { auto dat = apply_storage(que->conn_count(), size); void * buf = dat.second; if (buf != nullptr) { std::memcpy(buf, data, size); return try_push(static_cast(size) - - static_cast(data_length), &(dat.first), 0); + static_cast(ipc::data_length), &(dat.first), 0); } // try using message fragment // ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd\n", msg_id, size); } // push message fragment std::int32_t offset = 0; - for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { - if (!try_push(static_cast(size) - offset - static_cast(data_length), - static_cast(data) + offset, data_length)) { + for (int i = 0; i < static_cast(size / ipc::data_length); ++i, offset += ipc::data_length) { + if (!try_push(static_cast(size) - offset - static_cast(ipc::data_length), + static_cast(data) + offset, ipc::data_length)) { return false; } } // if remain > 0, this is the last message fragment std::int32_t remain = static_cast(size) - offset; if (remain > 0) { - if (!try_push(remain - static_cast(data_length), - static_cast(data) + offset, static_cast(remain))) { + if (!try_push(remain - static_cast(ipc::data_length), + static_cast(data) + offset, + static_cast(remain))) { return false; } } @@ -466,8 +465,9 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size if (!que->force_push([](void* p) { auto tmp_msg = static_cast(p); if (tmp_msg->storage_) { - clear_storage(*reinterpret_cast(&tmp_msg->data_), - static_cast(data_length) + tmp_msg->remain_); + clear_storage( + *reinterpret_cast(&tmp_msg->data_), + static_cast(ipc::data_length) + tmp_msg->remain_); } return true; }, info->cc_id_, msg_id, remain, data, size)) { @@ -494,7 +494,7 @@ static bool try_send(ipc::handle_t h, void const * data, std::size_t size, std:: }, h, data, size); } -static buff_t recv(ipc::handle_t h, std::size_t tm) { +static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) { auto que = queue_of(h); if (que == nullptr) { ipc::error("fail: recv, queue_of(h) == nullptr\n"); @@ -517,18 +517,18 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { continue; // ignore message to self } // msg.remain_ may minus & abs(msg.remain_) < data_length - std::size_t remain = static_cast(data_length) + msg.remain_; + std::size_t remain = static_cast(ipc::data_length) + msg.remain_; // find cache with msg.id_ auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { - if (remain <= data_length) { + if (remain <= ipc::data_length) { return make_cache(msg.data_, remain); } if (msg.storage_) { std::size_t buf_id = *reinterpret_cast(&msg.data_); void * buf = find_storage(buf_id, remain); if (buf != nullptr) { - return buff_t { buf, remain, [](void* ptr, std::size_t size) { + return ipc::buff_t { buf, remain, [](void* ptr, std::size_t size) { recycle_storage(reinterpret_cast(ptr) - 1, size); }, reinterpret_cast(buf_id + 1) }; } @@ -546,7 +546,7 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { for (auto id : need_del) rc.erase(id); } // cache the first message fragment - rc.emplace(msg.id_, cache_t { data_length, make_cache(msg.data_, remain) }); + rc.emplace(msg.id_, cache_t { ipc::data_length, make_cache(msg.data_, remain) }); } // has cached before this message else { @@ -560,26 +560,26 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { return buff; } // there are remain datas after this message - cac.append(&(msg.data_), data_length); + cac.append(&(msg.data_), ipc::data_length); } } } -static buff_t try_recv(ipc::handle_t h) { +static ipc::buff_t try_recv(ipc::handle_t h) { return recv(h, 0); } }; // detail_impl template -using policy_t = policy::choose; +using policy_t = ipc::policy::choose; } // internal-linkage namespace ipc { template -bool chan_impl::connect(handle_t * ph, char const * name, unsigned mode) { +bool chan_impl::connect(ipc::handle_t * ph, char const * name, unsigned mode) { return detail_impl>::connect(ph, name, mode & receiver); } @@ -589,7 +589,7 @@ void chan_impl::disconnect(ipc::handle_t h) { } template -void chan_impl::destroy(handle_t h) { +void chan_impl::destroy(ipc::handle_t h) { detail_impl>::destroy(h); }