From d223e3abb87035eef0fc5a97aa2f8710a7e48510 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Mon, 25 Mar 2019 16:25:14 +0800 Subject: [PATCH] fix bugs; modify shm interfaces --- include/def.h | 3 +- include/shm.h | 7 ++-- src/circ/elem_array.h | 10 +++--- src/ipc.cpp | 68 +++++++++++++++++++++----------------- src/platform/shm_linux.cpp | 61 +++++++++++++++++----------------- src/platform/shm_win.cpp | 46 ++++++++------------------ src/policy.h | 7 ++-- src/prod_cons.h | 59 +++++++++++++++++---------------- src/queue.h | 21 ++++++------ src/shm.cpp | 15 +++++---- test/test_circ.cpp | 4 +-- test/test_shm.cpp | 2 ++ test/test_waiter.cpp | 2 ++ 13 files changed, 151 insertions(+), 154 deletions(-) diff --git a/include/def.h b/include/def.h index fd3a608..6764d31 100644 --- a/include/def.h +++ b/include/def.h @@ -28,7 +28,8 @@ using uint_t = typename uint::type; enum : std::size_t { invalid_value = (std::numeric_limits::max)(), - data_length = 64 + data_length = 64, + name_length = 64 }; enum class relat { // multiplicity of the relationship diff --git a/include/shm.h b/include/shm.h index 02f0c0c..163d3bd 100644 --- a/include/shm.h +++ b/include/shm.h @@ -7,8 +7,11 @@ namespace ipc { namespace shm { -IPC_EXPORT void* acquire(char const * name, std::size_t size); -IPC_EXPORT void release(void* mem, std::size_t size); +using id_t = void*; + +IPC_EXPORT id_t acquire(char const * name, std::size_t size); +IPC_EXPORT void * to_mem (id_t id); +IPC_EXPORT void release(id_t id, void * mem, std::size_t size); class IPC_EXPORT handle { public: diff --git a/src/circ/elem_array.h b/src/circ/elem_array.h index 33c8fab..b5ef71f 100644 --- a/src/circ/elem_array.h +++ b/src/circ/elem_array.h @@ -13,12 +13,12 @@ namespace ipc { namespace circ { namespace detail { -template +template class elem_array { public: using policy_t = Policy; using cursor_t = decltype(std::declval().cursor()); - using elem_t = typename policy_t::template elem_t; + using elem_t = typename policy_t::template elem_t; enum : std::size_t { data_size = DataSize, @@ -29,7 +29,7 @@ public: private: policy_t head_; - elem_t block_[elem_max]; + elem_t block_[elem_max]; public: cursor_t cursor() const noexcept { @@ -50,11 +50,11 @@ public: } // namespace detail -template +template class elem_array : public ipc::circ::conn_head { public: using base_t = ipc::circ::conn_head; - using array_t = detail::elem_array; + using array_t = detail::elem_array; using policy_t = typename array_t::policy_t; using cursor_t = typename array_t::cursor_t; using elem_t = typename array_t::elem_t; diff --git a/src/ipc.cpp b/src/ipc.cpp index 8d85411..8c9e9bf 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include "def.h" #include "shm.h" @@ -38,8 +39,16 @@ struct msg_t<0, AlignSize> { template struct msg_t { - msg_t<0, AlignSize> head_; - alignas(AlignSize) byte_t data_[DataSize]; + msg_t<0, AlignSize> head_ {}; + std::aligned_storage_t data_ {}; + + msg_t() = default; + msg_t(void* q, msg_id_t i, int r, void const * d, std::size_t s) { + head_.que_ = q; + head_.id_ = i; + head_.remain_ = r; + std::memcpy(&data_, d, s); + } }; buff_t make_cache(void const * data, std::size_t size) { @@ -62,20 +71,6 @@ struct cache_t { } }; -auto& recv_cache() { - /* - thread_local may have some bugs. - See: https://sourceforge.net/p/mingw-w64/bugs/727/ - https://sourceforge.net/p/mingw-w64/bugs/527/ - https://github.com/Alexpux/MINGW-packages/issues/2519 - https://github.com/ChaiScript/ChaiScript/issues/402 - https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html - https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827 - */ - static tls::pointer> rc; - return *rc.create(); -} - template struct detail_impl { @@ -94,6 +89,20 @@ static auto& queues_cache() { return *qc.create(); } +static auto& recv_cache() { + /* + thread_local may have some bugs. + See: https://sourceforge.net/p/mingw-w64/bugs/727/ + https://sourceforge.net/p/mingw-w64/bugs/527/ + https://github.com/Alexpux/MINGW-packages/issues/2519 + https://github.com/ChaiScript/ChaiScript/issues/402 + https://developercommunity.visualstudio.com/content/problem/124121/thread-local-variables-fail-to-be-initialized-when.html + https://software.intel.com/en-us/forums/intel-c-compiler/topic/684827 + */ + static tls::pointer> rc; + return *rc.create(); +} + /* API implementations */ static ipc::handle_t connect(char const * name) { @@ -141,21 +150,18 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { // push message fragment int offset = 0; for (int i = 0; i < static_cast(size / data_length); ++i, offset += data_length) { - msg_t msg { - { que, msg_id, static_cast(size) - offset - static_cast(data_length) }, {} - }; - std::memcpy(msg.data_, static_cast(data) + offset, data_length); - while (!que->push(msg)) std::this_thread::yield(); + while (!que->push(que, msg_id, static_cast(size) - offset - static_cast(data_length), + static_cast(data) + offset, data_length)) { + std::this_thread::yield(); + } } // if remain > 0, this is the last message fragment int remain = static_cast(size) - offset; if (remain > 0) { - msg_t msg { - { que, msg_id, remain - static_cast(data_length) }, {} - }; - std::memcpy(msg.data_, static_cast(data) + offset, - static_cast(remain)); - while (!que->push(msg)) std::this_thread::yield(); + while (!que->push(que, msg_id, remain - static_cast(data_length), + static_cast(data) + offset, static_cast(remain))) { + std::this_thread::yield(); + } } return true; } @@ -177,24 +183,24 @@ static buff_t recv(ipc::handle_t h) { auto cac_it = rc.find(msg.head_.id_); if (cac_it == rc.end()) { if (remain <= data_length) { - return make_cache(msg.data_, remain); + return make_cache(&(msg.data_), remain); } // cache the first message fragment - else rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(msg.data_, remain) }); + else rc.emplace(msg.head_.id_, cache_t { data_length, make_cache(&(msg.data_), remain) }); } // has cached before this message else { auto& cac = cac_it->second; // this is the last message fragment if (msg.head_.remain_ <= 0) { - cac.append(msg.data_, remain); + cac.append(&(msg.data_), remain); // finish this message, erase it from cache auto buff = std::move(cac.buff_); rc.erase(cac_it); return buff; } // there are remain datas after this message - cac.append(msg.data_, data_length); + cac.append(&(msg.data_), data_length); } } } diff --git a/src/platform/shm_linux.cpp b/src/platform/shm_linux.cpp index 8f9f25d..a291da1 100644 --- a/src/platform/shm_linux.cpp +++ b/src/platform/shm_linux.cpp @@ -10,7 +10,7 @@ #include #include #include -#include +#include #include "def.h" #include "log.h" @@ -18,22 +18,21 @@ namespace { -using acc_t = std::atomic_size_t; +struct info_t { + std::atomic_size_t acc_; + char name_[ipc::name_length]; +}; -constexpr acc_t* acc_of(void* mem) { - return static_cast(mem); +constexpr std::size_t calc_size(std::size_t size) { + return ((((size - 1) / alignof(info_t)) + 1) * alignof(info_t)) + sizeof(info_t); } -constexpr void* mem_of(void* mem) { - return static_cast(mem) - 1; +inline auto& acc_of(void* mem, std::size_t size) { + return reinterpret_cast(static_cast(mem) + size - sizeof(info_t))->acc_; } -inline auto* m2h() { - static struct { - std::mutex lc_; - ipc::mem::unordered_map cache_; - } m2h_; - return &m2h_; +inline auto& str_of(void* mem, std::size_t size) { + return reinterpret_cast(static_cast(mem) + size - sizeof(info_t))->name_; } } // internal-linkage @@ -41,11 +40,15 @@ inline auto* m2h() { namespace ipc { namespace shm { -void* acquire(char const * name, std::size_t size) { +id_t acquire(char const * name, std::size_t size) { if (name == nullptr || name[0] == '\0' || size == 0) { return nullptr; } std::string op_name = std::string{"__IPC_SHM__"} + name; + if (op_name.size() >= ipc::name_length) { + ipc::error("name is too long!: [%d]%s\n", static_cast(op_name.size()), op_name.c_str()); + return nullptr; + } int fd = ::shm_open(op_name.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | @@ -54,7 +57,7 @@ void* acquire(char const * name, std::size_t size) { ipc::error("fail shm_open[%d]: %s\n", errno, name); return nullptr; } - size += sizeof(acc_t); + size = calc_size(size); if (::ftruncate(fd, static_cast(size)) != 0) { ipc::error("fail ftruncate[%d]: %s\n", errno, name); ::close(fd); @@ -68,33 +71,31 @@ void* acquire(char const * name, std::size_t size) { ::shm_unlink(op_name.c_str()); return nullptr; } - auto acc = acc_of(mem); - acc->fetch_add(1, std::memory_order_release); - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(m2h()->lc_); - m2h()->cache_.emplace(++acc, std::move(op_name)); + if (acc_of(mem, size).fetch_add(1, std::memory_order_release) == 0) { + std::memcpy(&str_of(mem, size), op_name.c_str(), op_name.size()); } - return acc; + return static_cast(mem); } -void release(void* mem, std::size_t size) { +void * to_mem(id_t id) { + return static_cast(id); +} + +void release(id_t id, void * mem, std::size_t size) { if (mem == nullptr) { return; } - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(m2h()->lc_); - auto& cc = m2h()->cache_; - auto it = cc.find(mem); - if (it == cc.end()) { + if (mem != to_mem(id)) { return; } - mem = mem_of(mem); - size += sizeof(acc_t); - if (acc_of(mem)->fetch_sub(1, std::memory_order_acquire) == 1) { + size = calc_size(size); + if (acc_of(mem, size).fetch_sub(1, std::memory_order_acquire) == 1) { + char name[ipc::name_length] = {}; + std::memcpy(name, &str_of(mem, size), sizeof(name)); ::munmap(mem, size); - ::shm_unlink(it->second.c_str()); + ::shm_unlink(name); } else ::munmap(mem, size); - cc.erase(it); } } // namespace shm diff --git a/src/platform/shm_win.cpp b/src/platform/shm_win.cpp index b2d6815..0d24bf4 100644 --- a/src/platform/shm_win.cpp +++ b/src/platform/shm_win.cpp @@ -4,29 +4,16 @@ #include #include -#include #include "def.h" #include "log.h" #include "memory/resource.h" #include "platform/to_tchar.h" -namespace { - -inline auto* m2h() { - static struct { - std::mutex lc_; - ipc::mem::unordered_map cache_; - } m2h_; - return &m2h_; -} - -} // internal-linkage - namespace ipc { namespace shm { -void* acquire(char const * name, std::size_t size) { +id_t acquire(char const * name, std::size_t size) { if (name == nullptr || name[0] == '\0' || size == 0) { return nullptr; } @@ -38,32 +25,25 @@ void* acquire(char const * name, std::size_t size) { ipc::error("fail CreateFileMapping[%d]: %s\n", static_cast(::GetLastError()), name); return nullptr; } + return static_cast(h); +} + +void * to_mem(id_t id) { + if (id == nullptr) return nullptr; + HANDLE h = static_cast(id); LPVOID mem = ::MapViewOfFile(h, FILE_MAP_ALL_ACCESS, 0, 0, 0); if (mem == NULL) { ipc::error("fail MapViewOfFile[%d]: %s\n", static_cast(::GetLastError()), name); - ::CloseHandle(h); return nullptr; } - { - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(m2h()->lc_); - m2h()->cache_.emplace(mem, h); - } - return mem; + return static_cast(mem); } -void release(void* mem, std::size_t /*size*/) { - if (mem == nullptr) { - return; - } - IPC_UNUSED_ auto guard = ipc::detail::unique_lock(m2h()->lc_); - auto& cc = m2h()->cache_; - auto it = cc.find(mem); - if (it == cc.end()) { - return; - } - ::UnmapViewOfFile(mem); - ::CloseHandle(it->second); - cc.erase(it); +void release(id_t id, void * mem, std::size_t /*size*/) { + if (id == nullptr) return; + if (mem == nullptr) return; + ::UnmapViewOfFile(static_cast(mem)); + ::CloseHandle(static_cast(id)); } } // namespace shm diff --git a/src/policy.h b/src/policy.h index 79da3ae..3cc5006 100644 --- a/src/policy.h +++ b/src/policy.h @@ -10,14 +10,13 @@ namespace ipc { namespace policy { -template