diff --git a/include/libipc/shm.h b/include/libipc/shm.h index 395b6c8..12e3223 100755 --- a/include/libipc/shm.h +++ b/include/libipc/shm.h @@ -15,13 +15,13 @@ enum : unsigned { open = 0x02 }; -IPC_EXPORT id_t acquire(char const * name, std::size_t size, unsigned mode = create | open); -IPC_EXPORT void * get_mem(id_t id, std::size_t * size); -IPC_EXPORT void release(id_t id); -IPC_EXPORT void remove (id_t id); -IPC_EXPORT void remove (char const * name); +IPC_EXPORT id_t acquire(char const * name, std::size_t size, unsigned mode = create | open); +IPC_EXPORT void * get_mem(id_t id, std::size_t * size); +IPC_EXPORT std::int32_t release(id_t id); +IPC_EXPORT void remove (id_t id); +IPC_EXPORT void remove (char const * name); -IPC_EXPORT std::uint32_t get_ref(id_t id); +IPC_EXPORT std::int32_t get_ref(id_t id); IPC_EXPORT void sub_ref(id_t id); class IPC_EXPORT handle { @@ -39,11 +39,11 @@ public: std::size_t size () const noexcept; char const * name () const noexcept; - std::uint32_t ref() const noexcept; + std::int32_t ref() const noexcept; void sub_ref() noexcept; bool acquire(char const * name, std::size_t size, unsigned mode = create | open); - void release(); + std::int32_t release(); void* get() const; diff --git a/src/libipc/platform/condition_linux.h b/src/libipc/platform/condition_linux.h index 2c824a9..d9d4280 100644 --- a/src/libipc/platform/condition_linux.h +++ b/src/libipc/platform/condition_linux.h @@ -50,33 +50,34 @@ public: if ((cond_ = acquire_cond(name)) == nullptr) { return false; } - if (shm_.ref() == 1) { - ::pthread_cond_destroy(cond_); - auto finally = ipc::guard([this] { close(); }); // close when failed - // init condition - int eno; - pthread_condattr_t cond_attr; - if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) { - ipc::error("fail pthread_condattr_init[%d]\n", eno); - return false; - } - IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); - if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { - ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); - return false; - } - *cond_ = PTHREAD_COND_INITIALIZER; - if ((eno = ::pthread_cond_init(cond_, &cond_attr)) != 0) { - ipc::error("fail pthread_cond_init[%d]\n", eno); - return false; - } - finally.dismiss(); + if (shm_.ref() > 1) { + return valid(); } + ::pthread_cond_destroy(cond_); + auto finally = ipc::guard([this] { close(); }); // close when failed + // init condition + int eno; + pthread_condattr_t cond_attr; + if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) { + ipc::error("fail pthread_condattr_init[%d]\n", eno); + return false; + } + IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy); + if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) { + ipc::error("fail pthread_condattr_setpshared[%d]\n", eno); + return false; + } + *cond_ = PTHREAD_COND_INITIALIZER; + if ((eno = ::pthread_cond_init(cond_, &cond_attr)) != 0) { + ipc::error("fail pthread_cond_init[%d]\n", eno); + return false; + } + finally.dismiss(); return valid(); } void close() noexcept { - if (shm_.ref() == 1) { + if ((shm_.ref() <= 1) && cond_ != nullptr) { int eno; if ((eno = ::pthread_cond_destroy(cond_)) != 0) { ipc::error("fail pthread_cond_destroy[%d]\n", eno); @@ -87,9 +88,8 @@ public: } bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept { + if (!valid()) return false; switch (tm) { - case 0: - return true; case invalid_value: { int eno; if ((eno = ::pthread_cond_wait(cond_, static_cast(mtx.native()))) != 0) { @@ -115,6 +115,7 @@ public: } bool notify() noexcept { + if (!valid()) return false; int eno; if ((eno = ::pthread_cond_signal(cond_)) != 0) { ipc::error("fail pthread_cond_signal[%d]\n", eno); @@ -124,6 +125,7 @@ public: } bool broadcast() noexcept { + if (!valid()) return false; int eno; if ((eno = ::pthread_cond_broadcast(cond_)) != 0) { ipc::error("fail pthread_cond_broadcast[%d]\n", eno); diff --git a/src/libipc/platform/mutex_linux.h b/src/libipc/platform/mutex_linux.h index 5c45dfe..9080f58 100644 --- a/src/libipc/platform/mutex_linux.h +++ b/src/libipc/platform/mutex_linux.h @@ -2,8 +2,10 @@ #include #include +#include #include #include +#include #include @@ -19,33 +21,61 @@ namespace detail { namespace sync { class mutex { - ipc::shm::handle shm_; + ipc::shm::handle *shm_ = nullptr; + std::atomic *ref_ = nullptr; pthread_mutex_t *mutex_ = nullptr; - pthread_mutex_t *acquire_mutex(char const *name) { - if (!shm_.acquire(name, sizeof(pthread_mutex_t))) { - ipc::error("[acquire_mutex] fail shm.acquire: %s\n", name); - return nullptr; - } - return static_cast(shm_.get()); - } + struct curr_prog { + struct shm_data { + ipc::shm::handle shm; + std::atomic ref; - pthread_mutex_t *get_mutex(char const *name) { + struct init { + char const *name; + std::size_t size; + }; + shm_data(init arg) + : shm{arg.name, arg.size}, ref{0} {} + }; + ipc::map mutex_handles; + std::mutex lock; + + static curr_prog &get() { + static curr_prog info; + return info; + } + }; + + pthread_mutex_t *acquire_mutex(char const *name) { if (name == nullptr) { return nullptr; } - static ipc::map mutex_handles; - static std::mutex lock; - IPC_UNUSED_ std::lock_guard guard {lock}; - auto it = mutex_handles.find(name); - if (it == mutex_handles.end()) { - auto ptr = acquire_mutex(name); - if (ptr != nullptr) { - mutex_handles.emplace(name, ptr); - } - return ptr; + auto &info = curr_prog::get(); + IPC_UNUSED_ std::lock_guard guard {info.lock}; + auto it = info.mutex_handles.find(name); + if (it == info.mutex_handles.end()) { + it = curr_prog::get().mutex_handles.emplace(name, + curr_prog::shm_data::init{name, sizeof(pthread_mutex_t)}).first; + } + shm_ = &it->second.shm; + ref_ = &it->second.ref; + if (shm_ == nullptr) { + return nullptr; + } + return static_cast(shm_->get()); + } + + template + void release_mutex(ipc::string const &name, F &&clear) { + if (name.empty()) return; + IPC_UNUSED_ std::lock_guard guard {curr_prog::get().lock}; + auto it = curr_prog::get().mutex_handles.find(name); + if (it == curr_prog::get().mutex_handles.end()) { + return; + } + if (clear()) { + curr_prog::get().mutex_handles.erase(it); } - return it->second; } public: @@ -62,56 +92,69 @@ public: bool valid() const noexcept { static const char tmp[sizeof(pthread_mutex_t)] {}; - return (mutex_ != nullptr) + return (shm_ != nullptr) && (ref_ != nullptr) && (mutex_ != nullptr) && (std::memcmp(tmp, mutex_, sizeof(pthread_mutex_t)) != 0); } bool open(char const *name) noexcept { close(); - if ((mutex_ = get_mutex(name)) == nullptr) { + if ((mutex_ = acquire_mutex(name)) == nullptr) { return false; } - if (shm_.ref() == 1) { - ::pthread_mutex_destroy(mutex_); - auto finally = ipc::guard([this] { close(); }); // close when failed - // init mutex - int eno; - pthread_mutexattr_t mutex_attr; - if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) { - ipc::error("fail pthread_mutexattr_init[%d]\n", eno); - return false; - } - IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); - if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { - ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); - return false; - } - if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) { - ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno); - return false; - } - *mutex_ = PTHREAD_MUTEX_INITIALIZER; - if ((eno = ::pthread_mutex_init(mutex_, &mutex_attr)) != 0) { - ipc::error("fail pthread_mutex_init[%d]\n", eno); - return false; - } - finally.dismiss(); + auto self_ref = ref_->fetch_add(1, std::memory_order_relaxed); + if (shm_->ref() > 1 || self_ref > 0) { + return valid(); } + ::pthread_mutex_destroy(mutex_); + auto finally = ipc::guard([this] { close(); }); // close when failed + // init mutex + int eno; + pthread_mutexattr_t mutex_attr; + if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) { + ipc::error("fail pthread_mutexattr_init[%d]\n", eno); + return false; + } + IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy); + if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) { + ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno); + return false; + } + if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) { + ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno); + return false; + } + *mutex_ = PTHREAD_MUTEX_INITIALIZER; + if ((eno = ::pthread_mutex_init(mutex_, &mutex_attr)) != 0) { + ipc::error("fail pthread_mutex_init[%d]\n", eno); + return false; + } + finally.dismiss(); return valid(); } void close() noexcept { - if (shm_.ref() == 1) { - int eno; - if ((eno = ::pthread_mutex_destroy(mutex_)) != 0) { - ipc::error("fail pthread_mutex_destroy[%d]\n", eno); - } + if ((ref_ != nullptr) && (shm_ != nullptr) && (mutex_ != nullptr)) { + if (shm_->name() != nullptr) { + release_mutex(shm_->name(), [this] { + auto self_ref = ref_->fetch_sub(1, std::memory_order_relaxed); + if ((shm_->ref() <= 1) && (self_ref <= 1)) { + int eno; + if ((eno = ::pthread_mutex_destroy(mutex_)) != 0) { + ipc::error("fail pthread_mutex_destroy[%d]\n", eno); + } + return true; + } + return false; + }); + } else shm_->release(); } - shm_.release(); + shm_ = nullptr; + ref_ = nullptr; mutex_ = nullptr; } bool lock(std::uint64_t tm) noexcept { + if (!valid()) return false; for (;;) { auto ts = detail::make_timespec(tm); int eno = (tm == invalid_value) @@ -123,8 +166,8 @@ public: case ETIMEDOUT: return false; case EOWNERDEAD: { - if (shm_.ref() > 1) { - shm_.sub_ref(); + if (shm_->ref() > 1) { + shm_->sub_ref(); } int eno2 = ::pthread_mutex_consistent(mutex_); if (eno2 != 0) { @@ -146,6 +189,7 @@ public: } bool try_lock() noexcept(false) { + if (!valid()) return false; auto ts = detail::make_timespec(0); int eno = ::pthread_mutex_timedlock(mutex_, &ts); switch (eno) { @@ -154,8 +198,8 @@ public: case ETIMEDOUT: return false; case EOWNERDEAD: { - if (shm_.ref() > 1) { - shm_.sub_ref(); + if (shm_->ref() > 1) { + shm_->sub_ref(); } int eno2 = ::pthread_mutex_consistent(mutex_); if (eno2 != 0) { @@ -177,6 +221,7 @@ public: } bool unlock() noexcept { + if (!valid()) return false; int eno; if ((eno = ::pthread_mutex_unlock(mutex_)) != 0) { ipc::error("fail pthread_mutex_unlock[%d]\n", eno); diff --git a/src/libipc/platform/semaphore_linux.h b/src/libipc/platform/semaphore_linux.h index 8f6ee40..cbc4973 100644 --- a/src/libipc/platform/semaphore_linux.h +++ b/src/libipc/platform/semaphore_linux.h @@ -50,13 +50,15 @@ public: if (::sem_close(h_) != 0) { ipc::error("fail sem_close[%d]: %s\n", errno); } - if (shm_.ref() == 1) { - if (::sem_unlink(shm_.name()) != 0) { - ipc::error("fail sem_unlink[%d]: %s\n", errno); + h_ = SEM_FAILED; + if (shm_.name() != nullptr) { + std::string name = shm_.name(); + if (shm_.release() <= 1) { + if (::sem_unlink(name.c_str()) != 0) { + ipc::error("fail sem_unlink[%d]: %s, name: %s\n", errno, name.c_str()); + } } } - shm_.release(); - h_ = SEM_FAILED; } bool wait(std::uint64_t tm) noexcept { diff --git a/src/libipc/platform/shm_linux.cpp b/src/libipc/platform/shm_linux.cpp index 8a16a3e..4baf8b5 100755 --- a/src/libipc/platform/shm_linux.cpp +++ b/src/libipc/platform/shm_linux.cpp @@ -22,7 +22,7 @@ namespace { struct info_t { - std::atomic acc_; + std::atomic acc_; }; struct id_info_t { @@ -81,7 +81,7 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) { return ii; } -std::uint32_t get_ref(id_t id) { +std::int32_t get_ref(id_t id) { if (id == nullptr) { return 0; } @@ -152,16 +152,17 @@ void * get_mem(id_t id, std::size_t * size) { return mem; } -void release(id_t id) { +std::int32_t release(id_t id) { if (id == nullptr) { ipc::error("fail release: invalid id (null)\n"); - return; + return -1; } + std::int32_t ret = -1; auto ii = static_cast(id); if (ii->mem_ == nullptr || ii->size_ == 0) { ipc::error("fail release: invalid id (mem = %p, size = %zd)\n", ii->mem_, ii->size_); } - else if (acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel) == 1) { + else if ((ret = acc_of(ii->mem_, ii->size_).fetch_sub(1, std::memory_order_acq_rel)) <= 1) { ::munmap(ii->mem_, ii->size_); if (!ii->name_.empty()) { ::shm_unlink(ii->name_.c_str()); @@ -169,6 +170,7 @@ void release(id_t id) { } else ::munmap(ii->mem_, ii->size_); mem::free(ii); + return ret; } void remove(id_t id) { diff --git a/src/libipc/platform/shm_win.cpp b/src/libipc/platform/shm_win.cpp index 858a1e5..ae49268 100755 --- a/src/libipc/platform/shm_win.cpp +++ b/src/libipc/platform/shm_win.cpp @@ -58,7 +58,7 @@ id_t acquire(char const * name, std::size_t size, unsigned mode) { return ii; } -std::uint32_t get_ref(id_t) { +std::int32_t get_ref(id_t) { return 0; } @@ -96,10 +96,10 @@ void * get_mem(id_t id, std::size_t * size) { return static_cast(mem); } -void release(id_t id) { +std::int32_t release(id_t id) { if (id == nullptr) { ipc::error("fail release: invalid id (null)\n"); - return; + return -1; } auto ii = static_cast(id); if (ii->mem_ == nullptr || ii->size_ == 0) { @@ -111,6 +111,7 @@ void release(id_t id) { } else ::CloseHandle(ii->h_); mem::free(ii); + return 0; } void remove(id_t id) { diff --git a/src/libipc/shm.cpp b/src/libipc/shm.cpp index 24cb377..92f96ef 100755 --- a/src/libipc/shm.cpp +++ b/src/libipc/shm.cpp @@ -59,7 +59,7 @@ char const * handle::name() const noexcept { return impl(p_)->n_.c_str(); } -std::uint32_t handle::ref() const noexcept { +std::int32_t handle::ref() const noexcept { return shm::get_ref(impl(p_)->id_); } @@ -74,9 +74,9 @@ bool handle::acquire(char const * name, std::size_t size, unsigned mode) { return valid(); } -void handle::release() { - if (impl(p_)->id_ == nullptr) return; - shm::release(detach()); +std::int32_t handle::release() { + if (impl(p_)->id_ == nullptr) return -1; + return shm::release(detach()); } void* handle::get() const { diff --git a/src/libipc/waiter.h b/src/libipc/waiter.h index 6c2cb45..679409c 100644 --- a/src/libipc/waiter.h +++ b/src/libipc/waiter.h @@ -42,8 +42,8 @@ public: } void close() noexcept { - cond_.close(); - lock_.close(); + // cond_.close(); + // lock_.close(); } template diff --git a/test/test_waiter.cpp b/test/test_waiter.cpp index 0802109..700e751 100755 --- a/test/test_waiter.cpp +++ b/test/test_waiter.cpp @@ -7,28 +7,31 @@ namespace { TEST(Waiter, broadcast) { - ipc::detail::waiter waiter; - std::thread ts[10]; + for (int i = 0; i < 10; ++i) { + ipc::detail::waiter waiter; + std::thread ts[10]; - int k = 0; - for (auto& t : ts) { - t = std::thread([&k] { - ipc::detail::waiter waiter {"test-ipc-waiter"}; - EXPECT_TRUE(waiter.valid()); - for (int i = 0; i < 99; ++i) { - ASSERT_TRUE(waiter.wait_if([&k, &i] { return k == i; })); - } - }); - } + int k = 0; + for (auto& t : ts) { + t = std::thread([&k] { + ipc::detail::waiter waiter {"test-ipc-waiter"}; + EXPECT_TRUE(waiter.valid()); + for (int i = 0; i < 9; ++i) { + while (!waiter.wait_if([&k, &i] { return k == i; })) ; + } + }); + } - EXPECT_TRUE(waiter.open("test-ipc-waiter")); - std::cout << "waiting for broadcast...\n"; - for (k = 1; k < 100; ++k) { - std::cout << "broadcast: " << k << "\n"; - ASSERT_TRUE(waiter.broadcast()); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + EXPECT_TRUE(waiter.open("test-ipc-waiter")); + std::cout << "waiting for broadcast...\n"; + for (k = 1; k < 10; ++k) { + std::cout << "broadcast: " << k << "\n"; + ASSERT_TRUE(waiter.broadcast()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + for (auto& t : ts) t.join(); + std::cout << "quit... " << i << "\n"; } - for (auto& t : ts) t.join(); } } // internal-linkage