From 70e1ac68652f4bd6076a8c417f57d5e68d4a83e1 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Thu, 20 Dec 2018 11:38:40 +0800 Subject: [PATCH] linux shm should clear itself when all handles pointed it are released --- include/circ_queue.h | 6 ++--- src/ipc.cpp | 4 ++-- src/platform/shm_linux.cpp | 45 ++++++++++++++++++++++++++++++++++++-- src/platform/shm_win.cpp | 14 +++++++----- src/route.hpp | 2 +- test/test_shm.cpp | 13 +++++++++-- 6 files changed, 69 insertions(+), 15 deletions(-) diff --git a/include/circ_queue.h b/include/circ_queue.h index 32f23d2..84b4ffc 100644 --- a/include/circ_queue.h +++ b/include/circ_queue.h @@ -90,7 +90,7 @@ public: } template - auto push(P&& param) // disable this if P is the same as T + auto push(P&& param) // disable this if P is as same as T -> Requires, T>::value, bool> { if (elems_ == nullptr) return false; auto ptr = elems_->acquire(); @@ -100,7 +100,7 @@ public: } template - auto push(P&&... params) // some old compilers are not support this well + auto push(P&&... params) // some compilers are not support this well -> Requires<(sizeof...(P) != 1), bool> { if (elems_ == nullptr) return false; auto ptr = elems_->acquire(); @@ -114,7 +114,7 @@ public: while (1) { auto [ques, size] = upd(); if (size == 0) throw std::invalid_argument { "Invalid size." }; - for (std::size_t i = 0; i < size; ++i) { + for (std::size_t i = 0; i < static_cast(size); ++i) { queue* que = ques[i]; if (que->elems_ == nullptr) throw std::logic_error { "This queue hasn't attached any elem_array." diff --git a/src/ipc.cpp b/src/ipc.cpp index eb37088..644bab4 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -140,7 +140,7 @@ bool send(handle_t h, void const * data, std::size_t size) { } template -buff_t updating_recv(F&& upd) { +buff_t multi_recv(F&& upd) { auto& rc = recv_cache(); while(1) { // pop a new message @@ -186,7 +186,7 @@ buff_t recv(handle_t const * hs, std::size_t size) { if (q_arr.empty()) { return {}; } - return updating_recv([&] { + return multi_recv([&] { return std::make_tuple(q_arr.data(), q_arr.size()); }); } diff --git a/src/platform/shm_linux.cpp b/src/platform/shm_linux.cpp index e70d654..7693b27 100644 --- a/src/platform/shm_linux.cpp +++ b/src/platform/shm_linux.cpp @@ -6,6 +6,29 @@ #include #include +#include +#include +#include + +namespace { + +using acc_t = std::atomic_size_t; + +constexpr acc_t* acc_of(void* mem) { + return static_cast(mem); +} + +constexpr void* mem_of(void* mem) { + return static_cast(mem) - 1; +} + +inline auto& m2h() { + thread_local std::unordered_map cache; + return cache; +} + +} // internal-linkage + namespace ipc { namespace shm { @@ -20,23 +43,41 @@ void* acquire(char const * name, std::size_t size) { if (fd == -1) { return nullptr; } + size += sizeof(acc_t); if (::ftruncate(fd, static_cast(size)) != 0) { ::close(fd); + ::shm_unlink(name); return nullptr; } void* mem = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); ::close(fd); if (mem == MAP_FAILED) { + ::shm_unlink(name); return nullptr; } - return mem; + auto acc = acc_of(mem); + acc->fetch_add(1, std::memory_order_release); + m2h().emplace(++acc, name); + return acc; } void release(void* mem, std::size_t size) { if (mem == nullptr) { return; } - ::munmap(mem, size); + auto& cc = m2h(); + auto it = cc.find(mem); + if (it == cc.end()) { + return; + } + mem = mem_of(mem); + size += sizeof(acc_t); + if (acc_of(mem)->fetch_sub(1, std::memory_order_acquire) == 1) { + ::munmap(mem, size); + ::shm_unlink(it->second.c_str()); + } + else ::munmap(mem, size); + cc.erase(it); } } // namespace shm diff --git a/src/platform/shm_win.cpp b/src/platform/shm_win.cpp index e1f7a68..f0684e2 100644 --- a/src/platform/shm_win.cpp +++ b/src/platform/shm_win.cpp @@ -26,7 +26,10 @@ constexpr auto to_tchar(std::string && str) -> IsSame { return std::wstring_convert>{}.from_bytes(std::move(str)); } -std::unordered_map m2h__; +inline auto& m2h() { + thread_local std::unordered_map cache; + return cache; +} } // internal-linkage @@ -49,7 +52,7 @@ void* acquire(char const * name, std::size_t size) { ::CloseHandle(h); return nullptr; } - m2h__.emplace(mem, h); + m2h().emplace(mem, h); return mem; } @@ -57,13 +60,14 @@ void release(void* mem, std::size_t /*size*/) { if (mem == nullptr) { return; } - auto it = m2h__.find(mem); - if (it == m2h__.end()) { + auto& cc = m2h(); + auto it = cc.find(mem); + if (it == cc.end()) { return; } ::UnmapViewOfFile(mem); ::CloseHandle(it->second); - m2h__.erase(it); + cc.erase(it); } } // namespace shm diff --git a/src/route.hpp b/src/route.hpp index 2b684ff..4558cf5 100644 --- a/src/route.hpp +++ b/src/route.hpp @@ -69,7 +69,7 @@ std::size_t route::recv_count() const { return ipc::recv_count(impl(p_)->h_); } -bool route::send(void const *data, std::size_t size) { +bool route::send(void const * data, std::size_t size) { return ipc::send(impl(p_)->h_, data, size); } diff --git a/test/test_shm.cpp b/test/test_shm.cpp index 42046f5..de6d8a3 100644 --- a/test/test_shm.cpp +++ b/test/test_shm.cpp @@ -58,8 +58,7 @@ void Unit::test_get() { QVERIFY(mem != nullptr); QVERIFY(mem == shm_hd__.get()); - std::uint8_t buf[1024]; - memset(buf, 0, sizeof(buf)); + std::uint8_t buf[1024] = {}; QVERIFY(memcmp(mem, buf, sizeof(buf)) == 0); handle shm_other(shm_hd__.name(), shm_hd__.size()); @@ -73,6 +72,16 @@ void Unit::test_hello() { constexpr char hello[] = "hello!"; std::memcpy(mem, hello, sizeof(hello)); QCOMPARE((char*)shm_hd__.get(), hello); + + shm_hd__.release(); + QVERIFY(shm_hd__.get() == nullptr); + QVERIFY(shm_hd__.acquire("my-test", 1024)); + + std::uint8_t buf[1024] = {}; + QVERIFY(memcmp(mem, buf, sizeof(buf)) == 0); + + std::memcpy(mem, hello, sizeof(hello)); + QCOMPARE((char*)shm_hd__.get(), hello); } void Unit::test_mt() {