mirror of
https://github.com/mutouyun/cpp-ipc.git
synced 2025-12-06 16:56:45 +08:00
添加正常退出的机制(win)
This commit is contained in:
parent
523d38d247
commit
bce3894707
@ -16,3 +16,4 @@ add_subdirectory(src)
|
||||
add_subdirectory(3rdparty/gtest)
|
||||
add_subdirectory(test)
|
||||
add_subdirectory(demo/chat)
|
||||
add_subdirectory(demo/msg_que)
|
||||
|
||||
@ -1,3 +1,6 @@
|
||||
|
||||
#include <signal.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
@ -8,36 +11,39 @@
|
||||
|
||||
namespace {
|
||||
|
||||
char name__[] = "ipc-chat";
|
||||
char quit__[] = "q";
|
||||
char id__ [] = "c";
|
||||
constexpr char const name__[] = "ipc-chat";
|
||||
constexpr char const quit__[] = "q";
|
||||
constexpr char const id__ [] = "c";
|
||||
|
||||
std::size_t calc_unique_id() {
|
||||
inline std::size_t calc_unique_id() {
|
||||
static ipc::shm::handle g_shm { "__CHAT_ACC_STORAGE__", sizeof(std::atomic<std::size_t>) };
|
||||
return static_cast<std::atomic<std::size_t>*>(g_shm.get())->fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
ipc::channel sender__ { name__, ipc::sender };
|
||||
ipc::channel receiver__ { name__, ipc::receiver };
|
||||
|
||||
} // namespace
|
||||
|
||||
int main() {
|
||||
::signal(SIGINT, [](int) {
|
||||
receiver__.disconnect();
|
||||
});
|
||||
|
||||
std::string buf, id = id__ + std::to_string(calc_unique_id());
|
||||
std::regex reg { "(c\\d+)> (.*)" };
|
||||
|
||||
ipc::channel cc { name__, ipc::sender };
|
||||
|
||||
std::thread r {[&id, ®] {
|
||||
ipc::channel cc { name__, ipc::receiver };
|
||||
std::cout << id << " is ready." << std::endl;
|
||||
while (1) {
|
||||
auto buf = cc.recv();
|
||||
if (buf.empty()) continue;
|
||||
ipc::buff_t buf = receiver__.recv();
|
||||
if (buf.empty()) break; // quit
|
||||
std::string dat { buf.get<char const *>(), buf.size() - 1 };
|
||||
std::smatch mid;
|
||||
if (std::regex_match(dat, mid, reg)) {
|
||||
if (mid.str(1) == id) {
|
||||
if (mid.str(2) == quit__) {
|
||||
std::cout << "receiver quit..." << std::endl;
|
||||
return;
|
||||
break;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
@ -48,11 +54,16 @@ int main() {
|
||||
|
||||
for (/*int i = 1*/;; /*++i*/) {
|
||||
std::cin >> buf;
|
||||
if (buf.empty()) break;
|
||||
// std::cout << "[" << i << "]" << std::endl;
|
||||
cc.send(id + "> " + buf);
|
||||
if (buf == quit__) break;
|
||||
sender__.send(id + "> " + buf);
|
||||
if (buf == quit__) {
|
||||
receiver__.disconnect();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
r.join();
|
||||
std::cout << id << " is quit..." << std::endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
8
demo/msg_que/CMakeLists.txt
Normal file
8
demo/msg_que/CMakeLists.txt
Normal file
@ -0,0 +1,8 @@
|
||||
project(msg_que)
|
||||
|
||||
file(GLOB SRC_FILES ./*.cpp)
|
||||
file(GLOB HEAD_FILES ./*.h)
|
||||
|
||||
add_executable(${PROJECT_NAME} ${SRC_FILES} ${HEAD_FILES})
|
||||
|
||||
target_link_libraries(${PROJECT_NAME} ipc)
|
||||
19
demo/msg_que/main.cpp
Normal file
19
demo/msg_que/main.cpp
Normal file
@ -0,0 +1,19 @@
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <regex>
|
||||
#include <atomic>
|
||||
|
||||
#include "libipc/ipc.h"
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr char const name__[] = "ipc-msg-que";
|
||||
constexpr char const quit__[] = "q";
|
||||
constexpr char const id__ [] = "c";
|
||||
|
||||
} // namespace
|
||||
|
||||
int main() {
|
||||
return 0;
|
||||
}
|
||||
@ -19,8 +19,9 @@ enum : unsigned {
|
||||
|
||||
template <typename Flag>
|
||||
struct IPC_EXPORT chan_impl {
|
||||
static handle_t connect (char const * name, unsigned mode);
|
||||
static void disconnect(handle_t h);
|
||||
static bool connect (handle_t * ph, char const * name, unsigned mode);
|
||||
static void disconnect(handle_t h);
|
||||
static void destroy (handle_t h);
|
||||
|
||||
static char const * name(handle_t h);
|
||||
|
||||
@ -54,7 +55,7 @@ public:
|
||||
}
|
||||
|
||||
~chan_wrapper() {
|
||||
disconnect();
|
||||
detail_t::destroy(h_);
|
||||
}
|
||||
|
||||
void swap(chan_wrapper& rhs) noexcept {
|
||||
@ -89,14 +90,13 @@ public:
|
||||
bool connect(char const * name, unsigned mode = ipc::sender | ipc::receiver) {
|
||||
if (name == nullptr || name[0] == '\0') return false;
|
||||
this->disconnect();
|
||||
h_ = detail_t::connect(name, mode_ = mode);
|
||||
detail_t::connect(&h_, name, mode_ = mode);
|
||||
return valid();
|
||||
}
|
||||
|
||||
void disconnect() {
|
||||
if (!valid()) return;
|
||||
detail_t::disconnect(h_);
|
||||
h_ = nullptr;
|
||||
}
|
||||
|
||||
std::size_t recv_count() const {
|
||||
@ -111,13 +111,25 @@ public:
|
||||
return chan_wrapper(name).wait_for_recv(r_count, tm);
|
||||
}
|
||||
|
||||
bool send (void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::send(h_, data, size, tm) ; }
|
||||
bool send (buff_t const & buff , std::size_t tm = default_timeout) { return this->send(buff.data(), buff.size(), tm) ; }
|
||||
bool send (std::string const & str , std::size_t tm = default_timeout) { return this->send(str.c_str(), str.size() + 1, tm); }
|
||||
bool send(void const * data, std::size_t size, std::size_t tm = default_timeout) {
|
||||
return detail_t::send(h_, data, size, tm);
|
||||
}
|
||||
bool send(buff_t const & buff, std::size_t tm = default_timeout) {
|
||||
return this->send(buff.data(), buff.size(), tm);
|
||||
}
|
||||
bool send(std::string const & str, std::size_t tm = default_timeout) {
|
||||
return this->send(str.c_str(), str.size() + 1, tm);
|
||||
}
|
||||
|
||||
bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) { return detail_t::try_send(h_, data, size, tm) ; }
|
||||
bool try_send(buff_t const & buff , std::size_t tm = default_timeout) { return this->try_send(buff.data(), buff.size(), tm) ; }
|
||||
bool try_send(std::string const & str , std::size_t tm = default_timeout) { return this->try_send(str.c_str(), str.size() + 1, tm); }
|
||||
bool try_send(void const * data, std::size_t size, std::size_t tm = default_timeout) {
|
||||
return detail_t::try_send(h_, data, size, tm);
|
||||
}
|
||||
bool try_send(buff_t const & buff, std::size_t tm = default_timeout) {
|
||||
return this->try_send(buff.data(), buff.size(), tm);
|
||||
}
|
||||
bool try_send(std::string const & str, std::size_t tm = default_timeout) {
|
||||
return this->try_send(str.c_str(), str.size() + 1, tm);
|
||||
}
|
||||
|
||||
buff_t recv(std::size_t tm = invalid_value) {
|
||||
return detail_t::recv(h_, tm);
|
||||
|
||||
@ -77,11 +77,8 @@ inline void sleep(K& k, F&& f) {
|
||||
if (k < static_cast<K>(N)) {
|
||||
std::this_thread::yield();
|
||||
}
|
||||
else if (std::forward<F>(f)()) {
|
||||
return;
|
||||
}
|
||||
else {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
static_cast<void>(std::forward<F>(f)());
|
||||
return;
|
||||
}
|
||||
++k;
|
||||
@ -89,7 +86,9 @@ inline void sleep(K& k, F&& f) {
|
||||
|
||||
template <std::size_t N = 4096, typename K>
|
||||
inline void sleep(K& k) {
|
||||
sleep<N>(k, [] { return false; });
|
||||
sleep<N>(k, [] {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(1));
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace ipc
|
||||
|
||||
@ -19,6 +19,7 @@ aux_source_directory(${CMAKE_SOURCE_DIR}/src SRC_FILES)
|
||||
|
||||
file(GLOB HEAD_FILES
|
||||
${CMAKE_SOURCE_DIR}/include/libipc/*.h
|
||||
${CMAKE_SOURCE_DIR}/src/libipc/*.h
|
||||
${CMAKE_SOURCE_DIR}/src/libipc/*.inc
|
||||
${CMAKE_SOURCE_DIR}/src/libipc/circ/*.h
|
||||
${CMAKE_SOURCE_DIR}/src/libipc/memory/*.h
|
||||
|
||||
55
src/ipc.cpp
55
src/ipc.cpp
@ -8,6 +8,7 @@
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <array>
|
||||
#include <cassert>
|
||||
|
||||
#include "libipc/ipc.h"
|
||||
#include "libipc/def.h"
|
||||
@ -279,6 +280,12 @@ struct conn_info_head {
|
||||
, acc_h_ (("__AC_CONN__" + name_).c_str(), sizeof(acc_t)) {
|
||||
}
|
||||
|
||||
void enable(bool e) {
|
||||
cc_waiter_.set_enabled(e);
|
||||
wt_waiter_.set_enabled(e);
|
||||
rd_waiter_.set_enabled(e);
|
||||
}
|
||||
|
||||
auto acc() {
|
||||
return static_cast<acc_t*>(acc_h_.get());
|
||||
}
|
||||
@ -295,10 +302,9 @@ bool wait_for(W& waiter, F&& pred, std::size_t tm) {
|
||||
bool loop = true, ret = true;
|
||||
ipc::sleep(k, [&k, &loop, &ret, &waiter, &pred, tm] {
|
||||
ret = waiter.wait_if([&loop, &pred] {
|
||||
return loop = pred();
|
||||
}, tm);
|
||||
return loop = pred();
|
||||
}, tm);
|
||||
k = 0;
|
||||
return true;
|
||||
});
|
||||
if (!ret ) return false; // timeout or fail
|
||||
if (!loop) break;
|
||||
@ -341,18 +347,22 @@ constexpr static queue_t* queue_of(ipc::handle_t h) {
|
||||
|
||||
/* API implementations */
|
||||
|
||||
static ipc::handle_t connect(char const * name, bool start) {
|
||||
auto h = mem::alloc<conn_info_t>(name);
|
||||
auto que = queue_of(h);
|
||||
if (que == nullptr) {
|
||||
return nullptr;
|
||||
static bool connect(handle_t * ph, char const * name, bool start) {
|
||||
assert(ph != nullptr);
|
||||
if (*ph == nullptr) {
|
||||
*ph = mem::alloc<conn_info_t>(name);
|
||||
}
|
||||
auto que = queue_of(*ph);
|
||||
if (que == nullptr) {
|
||||
return false;
|
||||
}
|
||||
info_of(*ph)->enable(true);
|
||||
if (start) {
|
||||
if (que->connect()) { // wouldn't connect twice
|
||||
info_of(h)->cc_waiter_.broadcast();
|
||||
info_of(*ph)->cc_waiter_.broadcast();
|
||||
}
|
||||
}
|
||||
return h;
|
||||
return true;
|
||||
}
|
||||
|
||||
static void disconnect(ipc::handle_t h) {
|
||||
@ -360,9 +370,15 @@ static void disconnect(ipc::handle_t h) {
|
||||
if (que == nullptr) {
|
||||
return;
|
||||
}
|
||||
if (que->disconnect()) {
|
||||
info_of(h)->cc_waiter_.broadcast();
|
||||
bool dis = que->disconnect();
|
||||
info_of(h)->enable(false);
|
||||
if (dis) {
|
||||
info_of(h)->recv_cache().clear();
|
||||
}
|
||||
}
|
||||
|
||||
static void destroy(ipc::handle_t h) {
|
||||
disconnect(h);
|
||||
mem::free(info_of(h));
|
||||
}
|
||||
|
||||
@ -485,14 +501,16 @@ static buff_t recv(ipc::handle_t h, std::size_t tm) {
|
||||
ipc::error("fail: recv, queue_of(h) == nullptr\n");
|
||||
return {};
|
||||
}
|
||||
if (que->connect()) { // wouldn't connect twice
|
||||
info_of(h)->cc_waiter_.broadcast();
|
||||
if (!que->connected()) {
|
||||
// hasn't connected yet, just return.
|
||||
return {};
|
||||
}
|
||||
auto& rc = info_of(h)->recv_cache();
|
||||
while (1) {
|
||||
// pop a new message
|
||||
typename queue_t::value_t msg;
|
||||
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] { return !que->pop(msg); }, tm)) {
|
||||
// pop failed, just return.
|
||||
return {};
|
||||
}
|
||||
info_of(h)->wt_waiter_.broadcast();
|
||||
@ -562,8 +580,8 @@ using policy_t = policy::choose<circ::elem_array, Flag>;
|
||||
namespace ipc {
|
||||
|
||||
template <typename Flag>
|
||||
ipc::handle_t chan_impl<Flag>::connect(char const * name, unsigned mode) {
|
||||
return detail_impl<policy_t<Flag>>::connect(name, mode & receiver);
|
||||
bool chan_impl<Flag>::connect(handle_t * ph, char const * name, unsigned mode) {
|
||||
return detail_impl<policy_t<Flag>>::connect(ph, name, mode & receiver);
|
||||
}
|
||||
|
||||
template <typename Flag>
|
||||
@ -571,6 +589,11 @@ void chan_impl<Flag>::disconnect(ipc::handle_t h) {
|
||||
detail_impl<policy_t<Flag>>::disconnect(h);
|
||||
}
|
||||
|
||||
template <typename Flag>
|
||||
void chan_impl<Flag>::destroy(handle_t h) {
|
||||
detail_impl<policy_t<Flag>>::destroy(h);
|
||||
}
|
||||
|
||||
template <typename Flag>
|
||||
char const * chan_impl<Flag>::name(ipc::handle_t h) {
|
||||
auto info = detail_impl<policy_t<Flag>>::info_of(h);
|
||||
|
||||
@ -9,10 +9,12 @@
|
||||
|
||||
#include <atomic>
|
||||
#include <tuple>
|
||||
#include <utility>
|
||||
|
||||
#include "libipc/def.h"
|
||||
|
||||
#include "libipc/utility/log.h"
|
||||
#include "libipc/utility/scope_guard.h"
|
||||
#include "libipc/platform/detail.h"
|
||||
#include "libipc/memory/resource.h"
|
||||
|
||||
@ -280,20 +282,22 @@ public:
|
||||
template <typename F>
|
||||
bool wait_if(handle_t const & h, F&& pred, std::size_t tm = invalid_value) {
|
||||
waiting_.fetch_add(1, std::memory_order_release);
|
||||
auto finally = ipc::guard([this] {
|
||||
waiting_->fetch_sub(1, std::memory_order_release);
|
||||
});
|
||||
{
|
||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||
if (!std::forward<F>(pred)()) return true;
|
||||
++ counter_;
|
||||
}
|
||||
bool ret = sem_helper::wait(std::get<1>(h), tm);
|
||||
waiting_.fetch_sub(1, std::memory_order_release);
|
||||
finally.do_exit();
|
||||
ret = sem_helper::post(std::get<2>(h)) && ret;
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool notify(handle_t const & h) {
|
||||
std::atomic_thread_fence(std::memory_order_acq_rel);
|
||||
if (waiting_.load(std::memory_order_relaxed) == 0) {
|
||||
if (waiting_.load(std::memory_order_acquire) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
@ -307,8 +311,7 @@ public:
|
||||
}
|
||||
|
||||
bool broadcast(handle_t const & h) {
|
||||
std::atomic_thread_fence(std::memory_order_acq_rel);
|
||||
if (waiting_.load(std::memory_order_relaxed) == 0) {
|
||||
if (waiting_.load(std::memory_order_acquire) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
|
||||
@ -3,13 +3,15 @@
|
||||
#include <Windows.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <tuple>
|
||||
#include <utility>
|
||||
#include <limits>
|
||||
|
||||
#include "libipc/rw_lock.h"
|
||||
#include "libipc/pool_alloc.h"
|
||||
#include "libipc/shm.h"
|
||||
|
||||
#include "libipc/utility/log.h"
|
||||
#include "libipc/utility/scope_guard.h"
|
||||
#include "libipc/platform/to_tchar.h"
|
||||
#include "libipc/platform/get_sa.h"
|
||||
#include "libipc/platform/detail.h"
|
||||
@ -79,6 +81,11 @@ class condition {
|
||||
std::atomic<unsigned> * waiting_ = nullptr;
|
||||
long * counter_ = nullptr;
|
||||
|
||||
enum : unsigned {
|
||||
destruct_mask = (std::numeric_limits<unsigned>::max)() >> 1,
|
||||
destruct_flag = ~destruct_mask
|
||||
};
|
||||
|
||||
public:
|
||||
friend bool operator==(condition const & c1, condition const & c2) {
|
||||
return (c1.waiting_ == c2.waiting_) && (c1.counter_ == c2.counter_);
|
||||
@ -112,24 +119,35 @@ public:
|
||||
}
|
||||
|
||||
template <typename Mutex, typename F>
|
||||
bool wait_if(Mutex& mtx, F&& pred, std::size_t tm = invalid_value) {
|
||||
bool wait_if(Mutex & mtx, std::atomic<bool> const & enabled, F && pred, std::size_t tm = invalid_value) {
|
||||
if (!enabled.load(std::memory_order_acquire)) {
|
||||
return false;
|
||||
}
|
||||
waiting_->fetch_add(1, std::memory_order_release);
|
||||
auto finally = ipc::guard([this] {
|
||||
waiting_->fetch_sub(1, std::memory_order_release);
|
||||
});
|
||||
{
|
||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||
if (!std::forward<F>(pred)()) return true;
|
||||
++ *counter_;
|
||||
}
|
||||
mtx.unlock();
|
||||
bool ret = sema_.wait(tm);
|
||||
waiting_->fetch_sub(1, std::memory_order_release);
|
||||
bool ret = false;
|
||||
do {
|
||||
if (!enabled.load(std::memory_order_acquire)) {
|
||||
break;
|
||||
}
|
||||
ret = sema_.wait(tm);
|
||||
} while (waiting_->load(std::memory_order_acquire) & destruct_flag);
|
||||
finally.do_exit();
|
||||
ret = handshake_.post() && ret;
|
||||
mtx.lock();
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool notify() {
|
||||
std::atomic_thread_fence(std::memory_order_acq_rel);
|
||||
if (waiting_->load(std::memory_order_relaxed) == 0) {
|
||||
if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
@ -143,8 +161,7 @@ public:
|
||||
}
|
||||
|
||||
bool broadcast() {
|
||||
std::atomic_thread_fence(std::memory_order_acq_rel);
|
||||
if (waiting_->load(std::memory_order_relaxed) == 0) {
|
||||
if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
@ -158,6 +175,26 @@ public:
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool emit_destruction() {
|
||||
if ((waiting_->load(std::memory_order_acquire) & destruct_mask) == 0) {
|
||||
return true;
|
||||
}
|
||||
bool ret = true;
|
||||
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
|
||||
waiting_->fetch_or(destruct_flag, std::memory_order_relaxed);
|
||||
IPC_UNUSED_ auto finally = ipc::guard([this] {
|
||||
waiting_->fetch_and(destruct_mask, std::memory_order_relaxed);
|
||||
});
|
||||
if (*counter_ > 0) {
|
||||
ret = sema_.post(*counter_);
|
||||
do {
|
||||
-- *counter_;
|
||||
ret = ret && handshake_.wait(default_timeout);
|
||||
} while (*counter_ > 0);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
};
|
||||
|
||||
class waiter {
|
||||
@ -189,7 +226,7 @@ public:
|
||||
}
|
||||
|
||||
template <typename F>
|
||||
bool wait_if(handle_t& h, F&& pred, std::size_t tm = invalid_value) {
|
||||
bool wait_if(handle_t& h, std::atomic<bool> const & enabled, F&& pred, std::size_t tm = invalid_value) {
|
||||
if (h == invalid()) return false;
|
||||
|
||||
class non_mutex {
|
||||
@ -198,17 +235,22 @@ public:
|
||||
void unlock() noexcept {}
|
||||
} nm;
|
||||
|
||||
return h.wait_if(nm, std::forward<F>(pred), tm);
|
||||
return h.wait_if(nm, enabled, std::forward<F>(pred), tm);
|
||||
}
|
||||
|
||||
void notify(handle_t& h) {
|
||||
if (h == invalid()) return;
|
||||
h.notify();
|
||||
bool notify(handle_t& h) {
|
||||
if (h == invalid()) return false;
|
||||
return h.notify();
|
||||
}
|
||||
|
||||
void broadcast(handle_t& h) {
|
||||
if (h == invalid()) return;
|
||||
h.broadcast();
|
||||
bool broadcast(handle_t& h) {
|
||||
if (h == invalid()) return false;
|
||||
return h.broadcast();
|
||||
}
|
||||
|
||||
bool emit_destruction(handle_t& h) {
|
||||
if (h == invalid()) return false;
|
||||
return h.emit_destruction();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@ using semaphore_impl = ipc::detail::semaphore;
|
||||
class condition_impl : public ipc::detail::condition {
|
||||
|
||||
ipc::shm::handle wait_h_, cnt_h_;
|
||||
std::atomic<bool> enabled_ { false };
|
||||
|
||||
public:
|
||||
static void remove(char const * name) {
|
||||
@ -35,6 +36,7 @@ public:
|
||||
bool open(ipc::string const & name) {
|
||||
if (wait_h_.acquire((name + "__COND_WAIT__").c_str(), sizeof(std::atomic<unsigned>)) &&
|
||||
cnt_h_ .acquire((name + "__COND_CNT__" ).c_str(), sizeof(long))) {
|
||||
enabled_.store(true, std::memory_order_release);
|
||||
return ipc::detail::condition::open(name,
|
||||
static_cast<std::atomic<unsigned> *>(wait_h_.get()),
|
||||
static_cast<long *>(cnt_h_.get()));
|
||||
@ -43,13 +45,15 @@ public:
|
||||
}
|
||||
|
||||
void close() {
|
||||
enabled_.store(false, std::memory_order_release);
|
||||
ipc::detail::condition::emit_destruction();
|
||||
ipc::detail::condition::close();
|
||||
cnt_h_ .release();
|
||||
wait_h_.release();
|
||||
}
|
||||
|
||||
bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) {
|
||||
return ipc::detail::condition::wait_if(mtx, [] { return true; }, tm);
|
||||
return ipc::detail::condition::wait_if(mtx, enabled_, [] { return true; }, tm);
|
||||
}
|
||||
};
|
||||
|
||||
@ -194,6 +198,7 @@ public:
|
||||
private:
|
||||
waiter_t* w_ = nullptr;
|
||||
waiter_t::handle_t h_ = waiter_t::invalid();
|
||||
std::atomic<bool> enabled_ { true };
|
||||
|
||||
public:
|
||||
waiter_wrapper() = default;
|
||||
@ -228,10 +233,17 @@ public:
|
||||
h_ = waiter_t::invalid();
|
||||
}
|
||||
|
||||
void set_enabled(bool e) {
|
||||
if (enabled_.exchange(e, std::memory_order_acq_rel) == e) {
|
||||
return;
|
||||
}
|
||||
if (!e) w_->emit_destruction(h_);
|
||||
}
|
||||
|
||||
template <typename F>
|
||||
bool wait_if(F&& pred, std::size_t tm = invalid_value) {
|
||||
if (!valid()) return false;
|
||||
return w_->wait_if(h_, std::forward<F>(pred), tm);
|
||||
return w_->wait_if(h_, enabled_, std::forward<F>(pred), tm);
|
||||
}
|
||||
|
||||
bool notify() {
|
||||
|
||||
@ -7,8 +7,8 @@ namespace ipc {
|
||||
namespace detail {
|
||||
|
||||
template <typename O>
|
||||
void print(O out, char const * fmt) {
|
||||
std::fprintf(out, "%s", fmt);
|
||||
void print(O out, char const * str) {
|
||||
std::fprintf(out, "%s", str);
|
||||
}
|
||||
|
||||
template <typename O, typename P1, typename... P>
|
||||
@ -27,8 +27,8 @@ void log(char const * fmt, P1&& p1, P&&... params) {
|
||||
ipc::detail::print(stdout, fmt, std::forward<P1>(p1), std::forward<P>(params)...);
|
||||
}
|
||||
|
||||
inline void error(char const * fmt) {
|
||||
ipc::detail::print(stderr, fmt);
|
||||
inline void error(char const * str) {
|
||||
ipc::detail::print(stderr, str);
|
||||
}
|
||||
|
||||
template <typename P1, typename... P>
|
||||
|
||||
64
src/libipc/utility/scope_guard.h
Normal file
64
src/libipc/utility/scope_guard.h
Normal file
@ -0,0 +1,64 @@
|
||||
#pragma once
|
||||
|
||||
#include <utility> // std::forward, std::move
|
||||
#include <algorithm> // std::swap
|
||||
#include <type_traits> // std::decay
|
||||
|
||||
namespace ipc {
|
||||
|
||||
////////////////////////////////////////////////////////////////
|
||||
/// Execute guard function when the enclosing scope exits
|
||||
////////////////////////////////////////////////////////////////
|
||||
|
||||
template <typename F>
|
||||
class scope_guard {
|
||||
F destructor_;
|
||||
mutable bool dismiss_;
|
||||
|
||||
public:
|
||||
template <typename D>
|
||||
scope_guard(D && destructor)
|
||||
: destructor_(std::forward<D>(destructor))
|
||||
, dismiss_(false) {
|
||||
}
|
||||
|
||||
scope_guard(scope_guard&& rhs)
|
||||
: destructor_(std::move(rhs.destructor_))
|
||||
, dismiss_(true) /* dismiss rhs */ {
|
||||
std::swap(dismiss_, rhs.dismiss_);
|
||||
}
|
||||
|
||||
~scope_guard() {
|
||||
try { do_exit(); }
|
||||
/**
|
||||
* In the realm of exceptions, it is fundamental that you can do nothing
|
||||
* if your "undo/recover" action fails.
|
||||
*/
|
||||
catch (...) { /* Do nothing */ }
|
||||
}
|
||||
|
||||
void swap(scope_guard & rhs) {
|
||||
std::swap(destructor_, rhs.destructor_);
|
||||
std::swap(dismiss_ , rhs.dismiss_);
|
||||
}
|
||||
|
||||
void dismiss() const noexcept {
|
||||
dismiss_ = true;
|
||||
}
|
||||
|
||||
void do_exit() {
|
||||
if (!dismiss_) {
|
||||
dismiss_ = true;
|
||||
destructor_();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <typename D>
|
||||
constexpr auto guard(D && destructor) noexcept {
|
||||
return scope_guard<std::decay_t<D>> {
|
||||
std::forward<D>(destructor)
|
||||
};
|
||||
}
|
||||
|
||||
} // namespace ipc
|
||||
Loading…
x
Reference in New Issue
Block a user