linux shm should clear itself when all handles pointed it are released

This commit is contained in:
mutouyun 2018-12-20 11:38:40 +08:00
parent 238166bdc1
commit 70e1ac6865
6 changed files with 69 additions and 15 deletions

View File

@ -90,7 +90,7 @@ public:
} }
template <typename P> template <typename P>
auto push(P&& param) // disable this if P is the same as T auto push(P&& param) // disable this if P is as same as T
-> Requires<!std::is_same<std::remove_reference_t<P>, T>::value, bool> { -> Requires<!std::is_same<std::remove_reference_t<P>, T>::value, bool> {
if (elems_ == nullptr) return false; if (elems_ == nullptr) return false;
auto ptr = elems_->acquire(); auto ptr = elems_->acquire();
@ -100,7 +100,7 @@ public:
} }
template <typename... P> template <typename... P>
auto push(P&&... params) // some old compilers are not support this well auto push(P&&... params) // some compilers are not support this well
-> Requires<(sizeof...(P) != 1), bool> { -> Requires<(sizeof...(P) != 1), bool> {
if (elems_ == nullptr) return false; if (elems_ == nullptr) return false;
auto ptr = elems_->acquire(); auto ptr = elems_->acquire();
@ -114,7 +114,7 @@ public:
while (1) { while (1) {
auto [ques, size] = upd(); auto [ques, size] = upd();
if (size == 0) throw std::invalid_argument { "Invalid size." }; if (size == 0) throw std::invalid_argument { "Invalid size." };
for (std::size_t i = 0; i < size; ++i) { for (std::size_t i = 0; i < static_cast<std::size_t>(size); ++i) {
queue* que = ques[i]; queue* que = ques[i];
if (que->elems_ == nullptr) throw std::logic_error { if (que->elems_ == nullptr) throw std::logic_error {
"This queue hasn't attached any elem_array." "This queue hasn't attached any elem_array."

View File

@ -140,7 +140,7 @@ bool send(handle_t h, void const * data, std::size_t size) {
} }
template <typename F> template <typename F>
buff_t updating_recv(F&& upd) { buff_t multi_recv(F&& upd) {
auto& rc = recv_cache(); auto& rc = recv_cache();
while(1) { while(1) {
// pop a new message // pop a new message
@ -186,7 +186,7 @@ buff_t recv(handle_t const * hs, std::size_t size) {
if (q_arr.empty()) { if (q_arr.empty()) {
return {}; return {};
} }
return updating_recv([&] { return multi_recv([&] {
return std::make_tuple(q_arr.data(), q_arr.size()); return std::make_tuple(q_arr.data(), q_arr.size());
}); });
} }

View File

@ -6,6 +6,29 @@
#include <unistd.h> #include <unistd.h>
#include <fcntl.h> #include <fcntl.h>
#include <unordered_map>
#include <atomic>
#include <string>
namespace {
using acc_t = std::atomic_size_t;
constexpr acc_t* acc_of(void* mem) {
return static_cast<acc_t*>(mem);
}
constexpr void* mem_of(void* mem) {
return static_cast<acc_t*>(mem) - 1;
}
inline auto& m2h() {
thread_local std::unordered_map<void*, std::string> cache;
return cache;
}
} // internal-linkage
namespace ipc { namespace ipc {
namespace shm { namespace shm {
@ -20,23 +43,41 @@ void* acquire(char const * name, std::size_t size) {
if (fd == -1) { if (fd == -1) {
return nullptr; return nullptr;
} }
size += sizeof(acc_t);
if (::ftruncate(fd, static_cast<off_t>(size)) != 0) { if (::ftruncate(fd, static_cast<off_t>(size)) != 0) {
::close(fd); ::close(fd);
::shm_unlink(name);
return nullptr; return nullptr;
} }
void* mem = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); void* mem = ::mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
::close(fd); ::close(fd);
if (mem == MAP_FAILED) { if (mem == MAP_FAILED) {
::shm_unlink(name);
return nullptr; return nullptr;
} }
return mem; auto acc = acc_of(mem);
acc->fetch_add(1, std::memory_order_release);
m2h().emplace(++acc, name);
return acc;
} }
void release(void* mem, std::size_t size) { void release(void* mem, std::size_t size) {
if (mem == nullptr) { if (mem == nullptr) {
return; return;
} }
auto& cc = m2h();
auto it = cc.find(mem);
if (it == cc.end()) {
return;
}
mem = mem_of(mem);
size += sizeof(acc_t);
if (acc_of(mem)->fetch_sub(1, std::memory_order_acquire) == 1) {
::munmap(mem, size); ::munmap(mem, size);
::shm_unlink(it->second.c_str());
}
else ::munmap(mem, size);
cc.erase(it);
} }
} // namespace shm } // namespace shm

View File

@ -26,7 +26,10 @@ constexpr auto to_tchar(std::string && str) -> IsSame<T, std::wstring> {
return std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>>{}.from_bytes(std::move(str)); return std::wstring_convert<std::codecvt_utf8_utf16<wchar_t>>{}.from_bytes(std::move(str));
} }
std::unordered_map<void*, HANDLE> m2h__; inline auto& m2h() {
thread_local std::unordered_map<void*, HANDLE> cache;
return cache;
}
} // internal-linkage } // internal-linkage
@ -49,7 +52,7 @@ void* acquire(char const * name, std::size_t size) {
::CloseHandle(h); ::CloseHandle(h);
return nullptr; return nullptr;
} }
m2h__.emplace(mem, h); m2h().emplace(mem, h);
return mem; return mem;
} }
@ -57,13 +60,14 @@ void release(void* mem, std::size_t /*size*/) {
if (mem == nullptr) { if (mem == nullptr) {
return; return;
} }
auto it = m2h__.find(mem); auto& cc = m2h();
if (it == m2h__.end()) { auto it = cc.find(mem);
if (it == cc.end()) {
return; return;
} }
::UnmapViewOfFile(mem); ::UnmapViewOfFile(mem);
::CloseHandle(it->second); ::CloseHandle(it->second);
m2h__.erase(it); cc.erase(it);
} }
} // namespace shm } // namespace shm

View File

@ -69,7 +69,7 @@ std::size_t route::recv_count() const {
return ipc::recv_count(impl(p_)->h_); return ipc::recv_count(impl(p_)->h_);
} }
bool route::send(void const *data, std::size_t size) { bool route::send(void const * data, std::size_t size) {
return ipc::send(impl(p_)->h_, data, size); return ipc::send(impl(p_)->h_, data, size);
} }

View File

@ -58,8 +58,7 @@ void Unit::test_get() {
QVERIFY(mem != nullptr); QVERIFY(mem != nullptr);
QVERIFY(mem == shm_hd__.get()); QVERIFY(mem == shm_hd__.get());
std::uint8_t buf[1024]; std::uint8_t buf[1024] = {};
memset(buf, 0, sizeof(buf));
QVERIFY(memcmp(mem, buf, sizeof(buf)) == 0); QVERIFY(memcmp(mem, buf, sizeof(buf)) == 0);
handle shm_other(shm_hd__.name(), shm_hd__.size()); handle shm_other(shm_hd__.name(), shm_hd__.size());
@ -73,6 +72,16 @@ void Unit::test_hello() {
constexpr char hello[] = "hello!"; constexpr char hello[] = "hello!";
std::memcpy(mem, hello, sizeof(hello)); std::memcpy(mem, hello, sizeof(hello));
QCOMPARE((char*)shm_hd__.get(), hello); QCOMPARE((char*)shm_hd__.get(), hello);
shm_hd__.release();
QVERIFY(shm_hd__.get() == nullptr);
QVERIFY(shm_hd__.acquire("my-test", 1024));
std::uint8_t buf[1024] = {};
QVERIFY(memcmp(mem, buf, sizeof(buf)) == 0);
std::memcpy(mem, hello, sizeof(hello));
QCOMPARE((char*)shm_hd__.get(), hello);
} }
void Unit::test_mt() { void Unit::test_mt() {