try semaphore

This commit is contained in:
mutouyun 2019-01-27 20:43:23 +08:00
parent 0d948b9dfd
commit 018bea223e

View File

@ -1,6 +1,10 @@
#pragma once
#include <pthread.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <errno.h>
#include <cstring>
#include <atomic>
@ -20,6 +24,54 @@
namespace ipc {
namespace detail {
class semaphore {
public:
using handle_t = sem_t*;
constexpr static handle_t invalid() {
return SEM_FAILED;
}
static handle_t open(char const* name) {
handle_t sem = ::sem_open(name, O_CREAT, 0666, 0);
if (sem == SEM_FAILED) {
ipc::error("fail sem_open[%d]: %s\n", errno, name);
return invalid();
}
return sem;
}
#pragma push_macro("IPC_SEMAPHORE_FUNC_")
#undef IPC_SEMAPHORE_FUNC_
#define IPC_SEMAPHORE_FUNC_(CALL, PAR) \
if (::CALL(PAR) != 0) { \
ipc::error("fail " #CALL "[%d]\n", errno); \
return false; \
} \
return true
static bool close(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_close, h);
}
static bool destroy(char const* name) {
IPC_SEMAPHORE_FUNC_(sem_unlink, name);
}
static bool post(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_post, h);
}
static bool wait(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_wait, h);
}
#pragma pop_macro("IPC_SEMAPHORE_FUNC_")
};
class mutex {
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER;
@ -138,14 +190,8 @@ public:
};
class event {
using cnt_t = std::atomic<std::size_t>;
struct info_t {
cnt_t cnt_;
mutex mutex_;
condition cond_;
} * info_ = nullptr;
std::atomic<std::size_t>* cnt_ = nullptr;
semaphore::handle_t sem_ = semaphore::invalid();
uint_t<16> wait_id_;
std::string name() const {
@ -154,25 +200,23 @@ class event {
void open() {
auto n = name();
info_ = static_cast<info_t*>(shm::acquire(n.c_str(), sizeof(info_t)));
if (info_ == nullptr) {
cnt_ = static_cast<std::atomic<std::size_t>*>(
shm::acquire(n.c_str(), sizeof(std::atomic<std::size_t>)));
if (cnt_ == nullptr) {
ipc::error("fail shm::acquire: %s\n", n.c_str());
return;
}
if (info_->cnt_.fetch_add(1, std::memory_order_acq_rel) == 0) {
if (!info_->mutex_.open()) return;
if (!info_->cond_.open()) return;
if (cnt_->fetch_add(1, std::memory_order_acq_rel) == 0) {
sem_ = semaphore::open(n.c_str());
}
info_->mutex_.lock();
}
void close() {
info_->mutex_.unlock();
if (info_->cnt_.fetch_sub(1, std::memory_order_acq_rel) == 1) {
info_->cond_.close();
info_->mutex_.close();
semaphore::close(sem_);
if (cnt_->fetch_sub(1, std::memory_order_acq_rel) == 1) {
semaphore::destroy(name().c_str());
}
shm::release(info_, sizeof(info_t));
shm::release(cnt_, sizeof(std::atomic<std::size_t>));
}
public:
@ -190,13 +234,13 @@ public:
}
bool wait() {
if (info_ == nullptr) return false;
return info_->cond_.wait(info_->mutex_);
if (sem_ == semaphore::invalid()) return false;
return semaphore::wait(sem_);
}
bool notify() {
if (info_ == nullptr) return false;
return info_->cond_.notify();
if (sem_ == semaphore::invalid()) return false;
return semaphore::post(sem_);
}
};
@ -278,7 +322,7 @@ public:
evt_ids_.for_acquired([this](auto id) {
event evt { *static_cast<evt_id_t*>(evt_ids_.at(id)) };
evt.notify();
return false;
return false; // return first
});
}
@ -288,7 +332,7 @@ public:
evt_ids_.for_acquired([this](auto id) {
event evt { *static_cast<evt_id_t*>(evt_ids_.at(id)) };
evt.notify();
return true;
return true; // return after all
});
}
};