diff --git a/src/ipc.cpp b/src/ipc.cpp old mode 100644 new mode 100755 index 8e2524f..00c27ce --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -42,18 +42,13 @@ struct msg_t<0, AlignSize> { }; template -struct msg_t { - msg_t<0, AlignSize> head_ { 0, 0, 0, false }; +struct msg_t : msg_t<0, AlignSize> { std::aligned_storage_t data_ {}; msg_t() = default; msg_t(msg_id_t c, msg_id_t i, int r, void const * d, std::size_t s) - : head_ { c, i, r, false } { - if (d != nullptr) { - if (s == 0) { - head_.storage_ = true; - s = sizeof(msg_id_t); - } + : msg_t<0, AlignSize> { c, i, r, (d == nullptr) || (s == 0) } { + if (!this->storage_) { std::memcpy(&data_, d, s); } } @@ -144,12 +139,16 @@ struct conn_info_head { return *recv_cache_.create(); } - static shm::id_t apply_storage(msg_id_t msg_id, std::size_t size) { - return shm::acquire(("__ST_CONN__" + ipc::to_string(msg_id)).c_str(), size, shm::create); + shm::id_t apply_storage(msg_id_t msg_id, std::size_t size) { + return shm::acquire( + ("__ST_CONN__" + ipc::to_string(cc_id_) + + "__" + ipc::to_string(msg_id)).c_str(), size, shm::create); } - static shm::id_t apply_storage(msg_id_t msg_id) { - return shm::acquire(("__ST_CONN__" + ipc::to_string(msg_id)).c_str(), 0, shm::open); + static shm::id_t acquire_storage(msg_id_t cc_id, msg_id_t msg_id) { + return shm::acquire( + ("__ST_CONN__" + ipc::to_string(cc_id) + + "__" + ipc::to_string(msg_id)).c_str(), 0, shm::open); } void store(shm::id_t dat) { @@ -207,8 +206,8 @@ struct queue_generator { template struct detail_impl { -using queue_t = typename queue_generator::queue_t; -using conn_info_t = typename queue_generator::conn_info_t; +using queue_t = typename queue_generator::queue_t; +using conn_info_t = typename queue_generator::conn_info_t; constexpr static conn_info_t* info_of(ipc::handle_t h) { return static_cast(h); @@ -362,34 +361,34 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { return {}; } info_of(h)->wt_waiter_.broadcast(); - if ((info_of(h)->acc() != nullptr) && (msg.head_.conn_ == info_of(h)->cc_id_)) { + if ((info_of(h)->acc() != nullptr) && (msg.conn_ == info_of(h)->cc_id_)) { continue; // ignore message to self } - // msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length - auto remain = static_cast(static_cast(data_length) + msg.head_.remain_); - // find cache with msg.head_.id_ - auto cac_it = rc.find(msg.head_.id_); + // msg.remain_ may minus & abs(msg.remain_) < data_length + auto remain = static_cast(static_cast(data_length) + msg.remain_); + // find cache with msg.id_ + auto cac_it = rc.find(msg.id_); if (cac_it == rc.end()) { if (remain <= data_length) { return make_cache(msg.data_, remain); } - if (msg.head_.storage_) { - auto dat = info_of(h)->apply_storage(msg.head_.id_); + if (msg.storage_) { + auto dat = info_of(h)->acquire_storage(msg.conn_, msg.id_); std::size_t dat_sz = 0; void * buf = shm::get_mem(dat, &dat_sz); if (buf != nullptr && remain <= dat_sz) { - return buff_t { buf, remain, [](void * p, std::size_t) { - shm::release(p); + return buff_t { buf, remain, [](void * dat, std::size_t) { + shm::release(dat); }, dat }; } else ipc::log("fail: shm::handle for big message. msg_id: %zd, size: %zd, shm.size: %zd\n", - msg.head_.id_, remain, dat_sz); + msg.id_, remain, dat_sz); } // gc if (rc.size() > 1024) { std::vector need_del; for (auto const & pair : rc) { - auto cmp = std::minmax(msg.head_.id_, pair.first); + auto cmp = std::minmax(msg.id_, pair.first); if (cmp.second - cmp.first > 8192) { need_del.push_back(pair.first); } @@ -397,13 +396,13 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) { for (auto id : need_del) rc.erase(id); } // cache the first message fragment - rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(msg.data_, remain) }); + rc.emplace(msg.id_, cache_t { data_length, make_cache(msg.data_, remain) }); } // has cached before this message else { auto& cac = cac_it->second; // this is the last message fragment - if (msg.head_.remain_ <= 0) { + if (msg.remain_ <= 0) { cac.append(&(msg.data_), remain); // finish this message, erase it from cache auto buff = std::move(cac.buff_); diff --git a/src/memory/resource.h b/src/memory/resource.h old mode 100644 new mode 100755 index 72d4718..d30cb17 --- a/src/memory/resource.h +++ b/src/memory/resource.h @@ -50,13 +50,13 @@ using unordered_map = std::unordered_map< Key, T, std::hash, std::equal_to, ipc::mem::allocator> >; -using string = std::basic_string< - char, std::char_traits, ipc::mem::allocator +template +using basic_string = std::basic_string< + Char, std::char_traits, ipc::mem::allocator >; -using wstring = std::basic_string< - wchar_t, std::char_traits, ipc::mem::allocator ->; +using string = basic_string; +using wstring = basic_string; template ipc::string to_string(T val) { diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index 4935dd5..8c4fe99 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -38,15 +38,15 @@ public: bool wait(std::size_t tm = invalid_value) { DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast(tm); - switch ((ret = ::WaitForSingleObject(h_, ms))) { - case WAIT_OBJECT_0: - return true; - case WAIT_ABANDONED: - case WAIT_TIMEOUT: - default: - ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); - return false; - } + switch ((ret = ::WaitForSingleObject(h_, ms))) { + case WAIT_OBJECT_0: + return true; + case WAIT_ABANDONED: + case WAIT_TIMEOUT: + default: + ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); + return false; + } } bool post(long count = 1) {