fix bugs for waiter of linux (still has some bugs in win, multi-wait is TBD)

This commit is contained in:
mutouyun 2019-01-31 14:57:42 +08:00
parent eef3cc01f0
commit e94318c9a6
6 changed files with 230 additions and 150 deletions

View File

@ -23,126 +23,164 @@
namespace ipc {
namespace detail {
class semaphore {
class mutex {
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER;
public:
using handle_t = sem_t*;
constexpr static handle_t invalid() {
return SEM_FAILED;
pthread_mutex_t& native() {
return mutex_;
}
static handle_t open(char const* name) {
handle_t sem = ::sem_open(name, O_CREAT, 0666, 0);
if (sem == SEM_FAILED) {
ipc::error("fail sem_open[%d]: %s\n", errno, name);
return invalid();
bool open() {
int eno;
// init mutex
pthread_mutexattr_t mutex_attr;
if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) {
ipc::error("fail pthread_mutexattr_init[%d]\n", eno);
return false;
}
return sem;
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy);
if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno);
return false;
}
if ((eno = ::pthread_mutex_init(&mutex_, &mutex_attr)) != 0) {
ipc::error("fail pthread_mutex_init[%d]\n", eno);
return false;
}
return true;
}
#pragma push_macro("IPC_SEMAPHORE_FUNC_")
#undef IPC_SEMAPHORE_FUNC_
#define IPC_SEMAPHORE_FUNC_(CALL, PAR) \
if (::CALL(PAR) != 0) { \
ipc::error("fail " #CALL "[%d]\n", errno); \
return false; \
} \
return true
static bool close(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_close, h);
bool close() {
int eno;
if ((eno = ::pthread_mutex_destroy(&mutex_)) != 0) {
ipc::error("fail pthread_mutex_destroy[%d]\n", eno);
return false;
}
return true;
}
static bool destroy(char const* name) {
IPC_SEMAPHORE_FUNC_(sem_unlink, name);
bool lock() {
int eno;
if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) {
ipc::error("fail pthread_mutex_lock[%d]\n", eno);
return false;
}
return true;
}
static bool post(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_post, h);
bool unlock() {
int eno;
if ((eno = ::pthread_mutex_unlock(&mutex_)) != 0) {
ipc::error("fail pthread_mutex_unlock[%d]\n", eno);
return false;
}
return true;
}
static bool wait(handle_t h) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_wait, h);
}
#pragma pop_macro("IPC_SEMAPHORE_FUNC_")
};
class event {
std::atomic<std::size_t>* cnt_ = nullptr;
semaphore::handle_t sem_ = semaphore::invalid();
std::size_t wait_id_;
std::string name() const {
return "__IPC_WAIT__" + std::to_string(wait_id_);
}
class condition {
pthread_cond_t cond_ = PTHREAD_COND_INITIALIZER;
public:
event(std::size_t id)
: wait_id_(static_cast<uint_t<16>>(id)) {
auto n = name();
cnt_ = static_cast<std::atomic<std::size_t>*>(
shm::acquire(n.c_str(), sizeof(std::atomic<std::size_t>)));
if (cnt_ == nullptr) {
ipc::error("fail shm::acquire: %s\n", n.c_str());
return;
bool open() {
int eno;
// init condition
pthread_condattr_t cond_attr;
if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) {
ipc::error("fail pthread_condattr_init[%d]\n", eno);
return false;
}
cnt_->fetch_add(1, std::memory_order_acquire);
sem_ = semaphore::open(n.c_str());
}
~event() {
semaphore::close(sem_);
if (cnt_->fetch_sub(1, std::memory_order_release) == 1) {
semaphore::destroy(name().c_str());
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
ipc::error("fail pthread_condattr_setpshared[%d]\n", eno);
return false;
}
shm::release(cnt_, sizeof(std::atomic<std::size_t>));
if ((eno = ::pthread_cond_init(&cond_, &cond_attr)) != 0) {
ipc::error("fail pthread_cond_init[%d]\n", eno);
return false;
}
return true;
}
auto get_id() const noexcept {
return wait_id_;
bool close() {
int eno;
if ((eno = ::pthread_cond_destroy(&cond_)) != 0) {
ipc::error("fail pthread_cond_destroy[%d]\n", eno);
return false;
}
return true;
}
bool wait() {
if (sem_ == semaphore::invalid()) return false;
return semaphore::wait(sem_);
bool wait(mutex& mtx) {
int eno;
if ((eno = ::pthread_cond_wait(&cond_, &mtx.native())) != 0) {
ipc::error("fail pthread_cond_wait[%d]\n", eno);
return false;
}
return true;
}
bool notify() {
if (sem_ == semaphore::invalid()) return false;
return semaphore::post(sem_);
int eno;
if ((eno = ::pthread_cond_signal(&cond_)) != 0) {
ipc::error("fail pthread_cond_signal[%d]\n", eno);
return false;
}
return true;
}
bool broadcast() {
int eno;
if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) {
ipc::error("fail pthread_cond_broadcast[%d]\n", eno);
return false;
}
return true;
}
};
class semaphore {
mutex lock_;
condition cond_;
long counter_ = 0;
public:
bool open() {
return lock_.open() && cond_.open();
}
void close() {
cond_.close();
lock_.close();
}
template <typename F>
void wait_if(F&& check) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
while ((counter_ <= 0) && std::forward<F>(check)()) {
cond_.wait(lock_);
}
-- counter_;
}
template <typename F>
bool post(F&& count) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
auto c = std::forward<F>(count)();
if (c <= 0) return false;
counter_ += c;
return cond_.broadcast();
}
};
class waiter {
using evt_id_t = decltype(std::declval<event>().get_id());
std::atomic<unsigned> counter_ { 0 };
spin_lock evt_lc_;
id_pool<sizeof(evt_id_t)> evt_ids_;
std::size_t push_event(event const & evt) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_);
std::size_t id = evt_ids_.acquire();
if (id == invalid_value) {
ipc::error("fail push_event[has too many waiters]\n");
return id;
}
(*static_cast<evt_id_t*>(evt_ids_.at(id))) = evt.get_id();
evt_ids_.mark_acquired(id);
return id;
}
void pop_event(std::size_t id) {
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_);
evt_ids_.release(id);
}
semaphore sem_;
std::atomic<long> counter_ { 0 };
std::atomic<unsigned> opened_ { 0 };
public:
using handle_t = waiter*;
using handle_t = waiter * ;
constexpr static handle_t invalid() {
return nullptr;
@ -152,60 +190,48 @@ public:
if (name == nullptr || name[0] == '\0') {
return invalid();
}
if (counter_.fetch_add(1, std::memory_order_acq_rel) == 0) {
evt_ids_.init();
if ((opened_.fetch_add(1, std::memory_order_acq_rel) == 0) && !sem_.open()) {
return invalid();
}
return this;
}
void close(handle_t h) {
if (h == invalid()) return;
counter_.fetch_sub(1, std::memory_order_acq_rel);
if (opened_.fetch_sub(1, std::memory_order_release) == 1) {
sem_.close();
}
}
static bool wait_all(std::tuple<waiter*, handle_t> const * all, std::size_t size) {
template <typename F>
static bool multi_wait_if(std::tuple<waiter*, handle_t> const * all, std::size_t size, F&& /*check*/) {
if (all == nullptr || size == 0) {
return false;
}
// calc a new wait-id & construct event object
event evt { ipc::detail::calc_unique_id() };
auto ids = static_cast<std::size_t*>(mem::alloc(sizeof(std::size_t) * size));
for (std::size_t i = 0; i < size; ++i) {
ids[i] = std::get<0>(all[i])->push_event(evt);
}
IPC_UNUSED_ auto guard = unique_ptr(ids, [all, size](std::size_t* ids) {
for (std::size_t i = 0; i < size; ++i) {
std::get<0>(all[i])->pop_event(ids[i]);
}
mem::free(ids, sizeof(std::size_t) * size);
});
// wait for event signal
return evt.wait();
return true;
}
bool wait(handle_t h) {
template <typename F>
bool wait_if(handle_t h, F&& check) {
if (h == invalid()) return false;
auto info = std::make_tuple(this, h);
return wait_all(&info, 1);
counter_.fetch_add(1, std::memory_order_release);
IPC_UNUSED_ auto guard = unique_ptr(&counter_, [](decltype(counter_)* c) {
c->fetch_sub(1, std::memory_order_release);
});
sem_.wait_if(std::forward<F>(check));
return true;
}
void notify(handle_t h) {
if (h == invalid()) return;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_);
evt_ids_.for_acquired([this](auto id) {
event evt { *static_cast<evt_id_t*>(evt_ids_.at(id)) };
return !evt.notify(); // return if succ
sem_.post([this] {
return (0 < counter_.load(std::memory_order_relaxed)) ? 1 : 0;
});
}
void broadcast(handle_t h) {
if (h == invalid()) return;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(evt_lc_);
evt_ids_.for_acquired([this](auto id) {
event evt { *static_cast<evt_id_t*>(evt_ids_.at(id)) };
evt.notify();
return true; // return after all
});
sem_.post([this] { return counter_.load(std::memory_order_relaxed); });
}
};

View File

@ -37,10 +37,12 @@ public:
::CloseHandle(h);
}
static bool wait_all(std::tuple<waiter*, handle_t> const * all, std::size_t size) {
template <typename F>
static bool multi_wait_if(std::tuple<waiter*, handle_t> const * all, std::size_t size, F&& check) {
if (all == nullptr || size == 0) {
return false;
}
if (!std::forward<F>(check)()) return true;
auto hs = static_cast<handle_t*>(mem::alloc(sizeof(handle_t) * size));
IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(handle_t) * size); });
std::size_t i = 0;
@ -55,8 +57,10 @@ public:
return ::WaitForMultipleObjects(static_cast<DWORD>(i), hs, FALSE, INFINITE) != WAIT_FAILED;
}
bool wait(handle_t h) {
template <typename F>
bool wait_if(handle_t h, F&& check) {
if (h == invalid()) return false;
if (!std::forward<F>(check)()) return true;
counter_.fetch_add(1, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release);
return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0;

View File

@ -42,6 +42,7 @@ public:
waiter_t const * waiter() const { return w_; }
void attach(waiter_t* w) {
close();
w_ = w;
}
@ -62,7 +63,9 @@ public:
h_ = waiter_t::invalid();
}
bool wait_all(waiter_wrapper * all, std::size_t size) {
template <typename F>
bool multi_wait_if(waiter_wrapper * all, std::size_t size, F&& check) {
if (all == nullptr || size == 0) {
return false;
}
@ -75,12 +78,13 @@ public:
if (!w.valid()) continue;
hs[i] = w.to_w_info();
}
return waiter_t::wait_all(hs, i);
return waiter_t::multi_wait_if(hs, i, std::forward<F>(check));
}
bool wait() {
template <typename F>
bool wait_if(F&& check) {
if (!valid()) return false;
return w_->wait(h_);
return w_->wait_if(h_, std::forward<F>(check));
}
bool notify() {

View File

@ -110,7 +110,11 @@ public:
bool wait_for_connect(Elems* elems, std::size_t count) {
if (elems == nullptr) return false;
for (unsigned k = 0; elems->conn_count() < count;) {
ipc::sleep(k, [this] { return cc_waiter_.wait(); });
ipc::sleep(k, [this, elems, count] {
return cc_waiter_.wait_if([elems, count] {
return elems->conn_count() < count;
});
});
}
return true;
}
@ -214,13 +218,20 @@ public:
return {};
}
T item;
for (unsigned k = 0;;) {
if (elems_->pop(&this->cursor_, [&item](void* p) {
auto pop_item = [this, &item] {
return elems_->pop(&this->cursor_, [&item](void* p) {
::new (&item) T(std::move(*static_cast<T*>(p)));
})) {
return item;
}
ipc::sleep(k, [this] { return this->waiter_.wait(); });
});
};
for (unsigned k = 0;;) {
if (pop_item()) return item;
bool succ = false;
ipc::sleep(k, [this, &succ, &pop_item] {
return this->waiter_.wait_if([&succ, &pop_item] {
return !(succ = pop_item());
});
});
if (succ) return item;
}
}
};

View File

@ -64,7 +64,7 @@ void waiter::close() {
}
bool waiter::wait() {
return impl(p_)->w_.wait();
return impl(p_)->w_.wait_if([] { return true; });
}
bool waiter::notify() {

View File

@ -1,4 +1,5 @@
#include <thread>
#include <iostream>
#include "platform/waiter_wrapper.h"
#include "test.h"
@ -13,34 +14,68 @@ class Unit : public TestSuite {
}
private slots:
void test_wakeup();
void test_broadcast();
void test_multiwait();
} unit__;
#include "test_waiter.moc"
void Unit::test_wakeup() {
void Unit::test_broadcast() {
ipc::detail::waiter w;
std::thread ts[10];
std::thread t1 {[&w] {
ipc::detail::waiter_wrapper wp { &w };
QVERIFY(wp.open("test-ipc-waiter"));
QVERIFY(wp.wait());
}};
std::thread t2 {[&w] {
ipc::detail::waiter_wrapper wp { &w };
QVERIFY(wp.open("test-ipc-waiter"));
QVERIFY(wp.wait());
}};
for (auto& t : ts) {
t = std::thread([&w] {
ipc::detail::waiter_wrapper wp { &w };
QVERIFY(wp.open("test-ipc-waiter"));
QVERIFY(wp.wait_if([] { return true; }));
});
}
ipc::detail::waiter_wrapper wp { &w };
QVERIFY(wp.open("test-ipc-waiter"));
std::cout << "waiting for broadcast...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
QVERIFY(wp.broadcast());
t1.join();
t2.join();
for (auto& t : ts) t.join();
}
void Unit::test_multiwait() {
ipc::detail::waiter w, mw;
std::thread ts[10];
ipc::detail::waiter_wrapper ws[10];
std::size_t i = 0;
for (auto& t : ts) {
t = std::thread([&w, &mw, &ws, i] {
ipc::detail::waiter_wrapper wp { &w };
ws[i].attach(&mw);
QVERIFY(wp.open("test-ipc-waiter"));
QVERIFY(ws[i].open("test-ipc-multiwait"));
QVERIFY(wp.wait_if([] { return true; }));
if (i == 3) {
std::cout << "waiting for notify...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
QVERIFY(ws[i].broadcast());
}
});
++i;
}
ipc::detail::waiter_wrapper wp { &w }, wm { &mw };
QVERIFY(wp.open("test-ipc-waiter"));
QVERIFY(wm.open("test-ipc-multiwait"));
std::cout << "waiting for broadcast...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
QVERIFY(wp.broadcast());
QVERIFY(wm.multi_wait_if(ws, 10, [] { return true; }));
for (auto& t : ts) t.join();
}
} // internal-linkage