split condition & mutex from waiter-linux

This commit is contained in:
mutouyun 2019-01-25 22:57:47 +08:00
parent 617d18e1ce
commit 59197f6c68

View File

@ -12,71 +12,57 @@
namespace ipc { namespace ipc {
namespace detail { namespace detail {
class waiter { class condition {
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER; pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER;
std::atomic<unsigned> counter_ { 0 };
public: public:
using handle_t = bool; bool open(char const * name) {
if (name == nullptr || name[0] == '\0') return false;
public: int eno;
constexpr static handle_t invalid() { // init mutex
return false; pthread_mutexattr_t mutex_attr;
} if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) {
ipc::log("fail pthread_mutexattr_init[%d]: %s\n", eno, name);
handle_t open(char const * name) { return false;
if (name == nullptr || name[0] == '\0') return invalid();
if (counter_.fetch_add(1, std::memory_order_acq_rel) == 0) {
int eno;
// init mutex
pthread_mutexattr_t mutex_attr;
if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) {
ipc::log("fail pthread_mutexattr_init[%d]: %s\n", eno, name);
return invalid();
}
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy);
if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::log("fail pthread_mutexattr_setpshared[%d]: %s\n", eno, name);
return invalid();
}
if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) {
ipc::log("fail pthread_mutex_init[%d]: %s\n", eno, name);
return invalid();
}
auto guard_mutex = unique_ptr(&mutex_, ::pthread_mutex_destroy);
// init condition
pthread_condattr_t cond_attr;
if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) {
ipc::log("fail pthread_condattr_init[%d]: %s\n", eno, name);
return invalid();
}
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::log("fail pthread_condattr_setpshared[%d]: %s\n", eno, name);
return invalid();
}
if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) {
ipc::log("fail pthread_cond_init[%d]: %s\n", eno, name);
return invalid();
}
// no need to guard condition
// release guards
guard_mutex.release();
} }
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy);
if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::log("fail pthread_mutexattr_setpshared[%d]: %s\n", eno, name);
return false;
}
if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) {
ipc::log("fail pthread_mutex_init[%d]: %s\n", eno, name);
return false;
}
auto guard_mutex = unique_ptr(&mutex_, ::pthread_mutex_destroy);
// init condition
pthread_condattr_t cond_attr;
if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) {
ipc::log("fail pthread_condattr_init[%d]: %s\n", eno, name);
return false;
}
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::log("fail pthread_condattr_setpshared[%d]: %s\n", eno, name);
return false;
}
if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) {
ipc::log("fail pthread_cond_init[%d]: %s\n", eno, name);
return false;
}
// no need to guard condition
// release guards
guard_mutex.release();
return true; return true;
} }
void close(handle_t h) { void close() {
if (h == invalid()) return; ::pthread_cond_destroy(&cond_);
if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) { ::pthread_mutex_destroy(&mutex_);
::pthread_cond_destroy(&cond_);
::pthread_mutex_destroy(&mutex_);
}
} }
bool wait(handle_t h) { bool wait() {
if (h == invalid()) return false;
int eno; int eno;
if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) { if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) {
ipc::log("fail pthread_mutex_lock[%d]\n", eno); ipc::log("fail pthread_mutex_lock[%d]\n", eno);
@ -90,16 +76,14 @@ public:
return true; return true;
} }
void notify(handle_t h) { void notify() {
if (h == invalid()) return;
int eno; int eno;
if ((eno = ::pthread_cond_signal(&cond_)) != 0) { if ((eno = ::pthread_cond_signal(&cond_)) != 0) {
ipc::log("fail pthread_cond_signal[%d]\n", eno); ipc::log("fail pthread_cond_signal[%d]\n", eno);
} }
} }
void broadcast(handle_t h) { void broadcast() {
if (h == invalid()) return;
int eno; int eno;
if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) { if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) {
ipc::log("fail pthread_cond_broadcast[%d]\n", eno); ipc::log("fail pthread_cond_broadcast[%d]\n", eno);
@ -107,5 +91,48 @@ public:
} }
}; };
class waiter {
ipc::detail::condition cond_;
std::atomic<unsigned> counter_ { 0 };
public:
using handle_t = waiter*;
public:
constexpr static handle_t invalid() {
return nullptr;
}
handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') return invalid();
if ((counter_.fetch_add(1, std::memory_order_acq_rel) == 0) && !cond_.open(name)) {
return invalid();
}
return this;
}
void close(handle_t h) {
if (h == invalid()) return;
if (counter_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
cond_.close();
}
}
bool wait(handle_t h) {
if (h == invalid()) return false;
return cond_.wait();
}
void notify(handle_t h) {
if (h == invalid()) return;
cond_.notify();
}
void broadcast(handle_t h) {
if (h == invalid()) return;
cond_.broadcast();
}
};
} // namespace detail } // namespace detail
} // namespace ipc } // namespace ipc