optimize & fix bugs

This commit is contained in:
mutouyun 2019-03-22 17:49:20 +08:00
parent b65be99045
commit 93030e1997
4 changed files with 250 additions and 79 deletions

View File

@ -1,6 +1,13 @@
#pragma once
#include <pthread.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <errno.h>
#include <atomic>
#include <tuple>
#include "def.h"
#include "log.h"
@ -104,98 +111,191 @@ public:
#pragma pop_macro("IPC_PTHREAD_FUNC_")
class semaphore {
mutex lock_;
condition cond_;
long counter_;
class sem_helper {
public:
using handle_t = sem_t*;
constexpr static handle_t invalid() noexcept {
return SEM_FAILED;
}
static handle_t open(char const* name, long count) {
handle_t sem = ::sem_open(name, O_CREAT, 0666, count);
if (sem == SEM_FAILED) {
ipc::error("fail sem_open[%d]: %s\n", errno, name);
return invalid();
}
return sem;
}
#pragma push_macro("IPC_SEMAPHORE_FUNC_")
#undef IPC_SEMAPHORE_FUNC_
#define IPC_SEMAPHORE_FUNC_(CALL, PAR) \
if (::CALL(PAR) != 0) { \
ipc::error("fail " #CALL "[%d]\n", errno); \
return false; \
} \
return true
static bool close(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_close, h);
}
static bool destroy(char const* name) {
IPC_SEMAPHORE_FUNC_(sem_unlink, name);
}
static bool post(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_post, h);
}
static bool wait(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_wait, h);
}
#pragma pop_macro("IPC_SEMAPHORE_FUNC_")
};
class waiter_helper {
mutex lock_;
std::atomic<unsigned> waiting_ { 0 };
long counter_ = 0;
public:
bool open(long count = 0) {
if (lock_.open() && cond_.open()) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
counter_ = count;
return true;
using handle_t = std::tuple<std::string, sem_helper::handle_t, sem_helper::handle_t>;
static handle_t invalid() noexcept {
return std::make_tuple(std::string{}, sem_helper::invalid(), sem_helper::invalid());
}
handle_t open_h(std::string && name) {
auto sem = sem_helper::open(("__WAITER_HELPER_SEM__" + name).c_str(), 0);
if (sem == sem_helper::invalid()) {
return invalid();
}
return false;
auto han = sem_helper::open(("__WAITER_HELPER_HAN__" + name).c_str(), 0);
if (han == sem_helper::invalid()) {
return invalid();
}
return std::make_tuple(std::move(name), sem, han);
}
void release_h(handle_t const & h) {
sem_helper::close(std::get<2>(h));
sem_helper::close(std::get<1>(h));
}
void close_h(handle_t const & h) {
auto const & name = std::get<0>(h);
sem_helper::destroy(("__WAITER_HELPER_HAN__" + name).c_str());
sem_helper::destroy(("__WAITER_HELPER_SEM__" + name).c_str());
}
bool open() {
return lock_.open();
}
void close() {
cond_.close();
lock_.close();
}
template <typename F>
bool wait_if(F&& check) {
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
while ((counter_ <= 0) &&
std::forward<F>(check)() &&
(ret = cond_.wait(lock_))) ;
-- counter_;
bool wait_if(handle_t const & h, F&& pred) {
waiting_.fetch_add(1, std::memory_order_release);
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (!std::forward<F>(pred)()) return true;
++ counter_;
}
bool ret = sem_helper::wait(std::get<1>(h));
waiting_.fetch_sub(1, std::memory_order_release);
ret = sem_helper::post(std::get<2>(h)) && ret;
return ret;
}
template <typename F>
bool post(F&& count) {
bool notify(handle_t const & h) {
std::atomic_thread_fence(std::memory_order_acq_rel);
if (waiting_.load(std::memory_order_relaxed) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
auto c = std::forward<F>(count)();
if (c <= 0) return false;
counter_ += c;
return cond_.broadcast();
if (counter_ > 0) {
ret = sem_helper::post(std::get<1>(h));
-- counter_;
ret = ret && sem_helper::wait(std::get<2>(h));
}
return ret;
}
bool broadcast(handle_t const & h) {
std::atomic_thread_fence(std::memory_order_acq_rel);
if (waiting_.load(std::memory_order_relaxed) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (counter_ > 0) {
for (long i = 0; i < counter_; ++i) {
ret = ret && sem_helper::post(std::get<1>(h));
}
do {
-- counter_;
ret = ret && sem_helper::wait(std::get<2>(h));
} while (counter_ > 0);
}
return ret;
}
};
class waiter {
semaphore sem_;
std::atomic<long> counter_ { 0 };
std::atomic<unsigned> opened_ { 0 };
waiter_helper helper_;
std::atomic<unsigned> opened_ { 0 };
public:
using handle_t = waiter * ;
using handle_t = waiter_helper::handle_t;
constexpr static handle_t invalid() {
return nullptr;
static handle_t invalid() noexcept {
return waiter_helper::invalid();
}
handle_t open(char const * name) {
if (name == nullptr || name[0] == '\0') {
return invalid();
}
if ((opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && !sem_.open()) {
if ((opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && !helper_.open()) {
return invalid();
}
return this;
return helper_.open_h(name);
}
void close(handle_t h) {
if (h == invalid()) return;
helper_.release_h(h);
if (opened_.fetch_sub(1, std::memory_order_release) == 1) {
if (h == invalid()) return;
sem_.close();
helper_.close_h(h);
helper_.close();
}
}
template <typename F>
bool wait_if(handle_t h, F&& pred) {
if (h == invalid()) return false;
counter_.fetch_add(1, std::memory_order_release);
IPC_UNUSED_ auto guard = unique_ptr(&counter_, [](decltype(counter_)* c) {
c->fetch_sub(1, std::memory_order_release);
});
return sem_.wait_if(std::forward<F>(pred));
return helper_.wait_if(h, std::forward<F>(pred));
}
void notify(handle_t h) {
if (h == invalid()) return;
sem_.post([this] {
return (0 < counter_.load(std::memory_order_relaxed)) ? 1 : 0;
});
helper_.notify(h);
}
void broadcast(handle_t h) {
if (h == invalid()) return;
sem_.post([this] {
return counter_.load(std::memory_order_relaxed);
});
helper_.broadcast(h);
}
};

View File

@ -8,6 +8,7 @@
#include "rw_lock.h"
#include "pool_alloc.h"
#include "log.h"
#include "shm.h"
#include "platform/to_tchar.h"
#include "platform/detail.h"
@ -37,11 +38,20 @@ public:
}
bool wait() {
return ::WaitForSingleObject(h_, INFINITE) == WAIT_OBJECT_0;
DWORD ret;
if ((ret = ::WaitForSingleObject(h_, INFINITE)) == WAIT_OBJECT_0) {
return true;
}
ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret);
return false;
}
bool post(long count = 1) {
return !!::ReleaseSemaphore(h_, count, NULL);
if (::ReleaseSemaphore(h_, count, NULL)) {
return true;
}
ipc::error("fail ReleaseSemaphore[%lu]\n", ::GetLastError());
return false;
}
};
@ -62,8 +72,13 @@ class condition {
mutex lock_;
semaphore sema_, handshake_;
ipc::shm::handle waiting_; // std::atomic<unsigned>
long * counter_ = nullptr;
auto waiting_cnt() {
return static_cast<std::atomic<unsigned>*>(waiting_.get());
}
public:
friend bool operator==(condition const & c1, condition const & c2) {
return c1.counter_ == c2.counter_;
@ -74,9 +89,10 @@ public:
}
bool open(std::string const & name, long * counter) {
if (lock_ .open(name + "__COND_MTX__") &&
sema_ .open(name + "__COND_SEM__") &&
handshake_.open(name + "__COND_HAN__")) {
if (lock_ .open ("__COND_MTX__" + name) &&
sema_ .open ("__COND_SEM__" + name) &&
handshake_.open ("__COND_HAN__" + name) &&
waiting_ .acquire(("__COND_WAITING_CNT__" + name).c_str(), sizeof(std::atomic<unsigned>))) {
counter_ = counter;
return true;
}
@ -84,6 +100,7 @@ public:
}
void close() {
waiting_ .release();
handshake_.close();
sema_ .close();
lock_ .close();
@ -91,41 +108,57 @@ public:
template <typename Mutex, typename F>
bool wait_if(Mutex& mtx, F&& pred) {
auto cnt = waiting_cnt();
if (cnt != nullptr) {
cnt->fetch_add(1, std::memory_order_release);
}
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (!std::forward<F>(pred)()) return true;
++ *counter_;
}
mtx.unlock();
bool ret_s = sema_.wait();
bool ret_h = handshake_.post();
bool ret = sema_.wait();
if (cnt != nullptr) {
cnt->fetch_sub(1, std::memory_order_release);
}
ret = handshake_.post() && ret;
mtx.lock();
return ret_s && ret_h;
return ret;
}
bool notify() {
bool ret_s = true, ret_h = true;
std::atomic_thread_fence(std::memory_order_acq_rel);
if (waiting_cnt() != nullptr &&
waiting_cnt()->load(std::memory_order_relaxed) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (*counter_ > 0) {
ret_s = sema_.post();
ret = sema_.post();
-- *counter_;
ret_h = handshake_.wait();
ret = ret && handshake_.wait();
}
return ret_s && ret_h;
return ret;
}
bool broadcast() {
bool ret_s = true, ret_h = true;
std::atomic_thread_fence(std::memory_order_acq_rel);
if (waiting_cnt() != nullptr &&
waiting_cnt()->load(std::memory_order_relaxed) == 0) {
return true;
}
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (*counter_ > 0) {
ret_s = sema_.post(*counter_);
ret = sema_.post(*counter_);
do {
-- *counter_;
bool rh = handshake_.wait();
ret_h = ret_h && rh;
ret = ret && handshake_.wait();
} while (*counter_ > 0);
}
return ret_s && ret_h;
return ret;
}
};

View File

@ -24,7 +24,7 @@ class condition_impl : public ipc::detail::condition {
public:
bool open(std::string const & name) {
if (h_.acquire((name + "__COND_CNT__").c_str(), sizeof(long volatile))) {
if (h_.acquire((name + "__COND_CNT__").c_str(), sizeof(long))) {
return ipc::detail::condition::open(name, static_cast<long *>(h_.get()));
}
return false;
@ -69,8 +69,9 @@ public:
if (!h_.acquire(name, sizeof(info_t))) {
return false;
}
if ((static_cast<info_t*>(h_.get())->opened_.fetch_add(1, std::memory_order_acq_rel) == 0) &&
!static_cast<info_t*>(h_.get())->object_.open(std::forward<P>(params)...)) {
auto info = static_cast<info_t*>(h_.get());
if ((info->opened_.fetch_add(1, std::memory_order_acq_rel) == 0) &&
!info->object_.open(std::forward<P>(params)...)) {
return false;
}
return true;
@ -78,8 +79,9 @@ public:
void close() {
if (!h_.valid()) return;
if (static_cast<info_t*>(h_.get())->opened_.fetch_sub(1, std::memory_order_release) == 1) {
static_cast<info_t*>(h_.get())->object_.close();
auto info = static_cast<info_t*>(h_.get());
if (info->opened_.fetch_sub(1, std::memory_order_release) == 1) {
info->object_.close();
}
h_.release();
}
@ -91,17 +93,6 @@ public:
bool unlock() { return object().unlock(); }
};
class semaphore_impl : public object_impl<ipc::detail::semaphore> {
public:
bool wait() {
return object().wait_if([] { return true; });
}
bool post(long count) {
return object().post([count] { return count; });
}
};
class condition_impl : public object_impl<ipc::detail::condition> {
public:
bool wait(mutex_impl& mtx) {
@ -112,6 +103,53 @@ public:
bool broadcast() { return object().broadcast(); }
};
class semaphore_impl {
sem_helper::handle_t h_;
ipc::shm::handle opened_; // std::atomic<unsigned>
std::string name_;
auto cnt() {
return static_cast<std::atomic<unsigned>*>(opened_.get());
}
public:
bool open(char const * name, long count) {
name_ = name;
if (!opened_.acquire(("__SEMAPHORE_IMPL_CNT__" + name_).c_str(), sizeof(std::atomic<unsigned>))) {
return false;
}
if ((h_ = sem_helper::open(("__SEMAPHORE_IMPL_SEM__" + name_).c_str(), count)) == sem_helper::invalid()) {
return false;
}
cnt()->fetch_add(1, std::memory_order_acq_rel);
return true;
}
void close() {
if (h_ == sem_helper::invalid()) return;
sem_helper::close(h_);
if (cnt() == nullptr) return;
if (cnt()->fetch_sub(1, std::memory_order_release) == 1) {
sem_helper::destroy(("__SEMAPHORE_IMPL_SEM__" + name_).c_str());
}
opened_.release();
}
bool wait() {
if (h_ == sem_helper::invalid()) return false;
return sem_helper::wait(h_);
}
bool post(long count) {
if (h_ == sem_helper::invalid()) return false;
bool ret = true;
for (long i = 0; i < count; ++i) {
ret = ret && sem_helper::post(h_);
}
return ret;
}
};
} // namespace detail
} // namespace ipc

View File

@ -266,7 +266,7 @@ struct prod_cons_impl<wr<relat::multi , relat::multi, trans::broadcast>> {
ct_.store(nxt_ct = cur_ct + 1, std::memory_order_release);
std::forward<F>(f)(&(el->data_));
// set flag & try update wt
el->f_ct_.store(~static_cast<flag_t>(cur_ct));
el->f_ct_.store(~static_cast<flag_t>(cur_ct), std::memory_order_release);
return true;
}