give up multi-wait temporarily

This commit is contained in:
mutouyun 2019-02-04 10:41:39 +08:00
parent e94318c9a6
commit f7f06ab052
5 changed files with 35 additions and 122 deletions

View File

@ -1,9 +1,6 @@
#pragma once
#include <fcntl.h>
#include <sys/stat.h>
#include <semaphore.h>
#include <errno.h>
#include <pthread.h>
#include <cstring>
#include <atomic>
@ -23,6 +20,16 @@
namespace ipc {
namespace detail {
#pragma push_macro("IPC_PTHREAD_FUNC_")
#undef IPC_PTHREAD_FUNC_
#define IPC_PTHREAD_FUNC_(CALL, ...) \
int eno; \
if ((eno = ::CALL(__VA_ARGS__)) != 0) { \
ipc::error("fail " #CALL "[%d]\n", eno); \
return false; \
} \
return true
class mutex {
pthread_mutex_t mutex_ = PTHREAD_MUTEX_INITIALIZER;
@ -52,30 +59,15 @@ public:
}
bool close() {
int eno;
if ((eno = ::pthread_mutex_destroy(&mutex_)) != 0) {
ipc::error("fail pthread_mutex_destroy[%d]\n", eno);
return false;
}
return true;
IPC_PTHREAD_FUNC_(pthread_mutex_destroy, &mutex_);
}
bool lock() {
int eno;
if ((eno = ::pthread_mutex_lock(&mutex_)) != 0) {
ipc::error("fail pthread_mutex_lock[%d]\n", eno);
return false;
}
return true;
IPC_PTHREAD_FUNC_(pthread_mutex_lock, &mutex_);
}
bool unlock() {
int eno;
if ((eno = ::pthread_mutex_unlock(&mutex_)) != 0) {
ipc::error("fail pthread_mutex_unlock[%d]\n", eno);
return false;
}
return true;
IPC_PTHREAD_FUNC_(pthread_mutex_unlock, &mutex_);
}
};
@ -104,42 +96,24 @@ public:
}
bool close() {
int eno;
if ((eno = ::pthread_cond_destroy(&cond_)) != 0) {
ipc::error("fail pthread_cond_destroy[%d]\n", eno);
return false;
}
return true;
IPC_PTHREAD_FUNC_(pthread_cond_destroy, &cond_);
}
bool wait(mutex& mtx) {
int eno;
if ((eno = ::pthread_cond_wait(&cond_, &mtx.native())) != 0) {
ipc::error("fail pthread_cond_wait[%d]\n", eno);
return false;
}
return true;
IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native());
}
bool notify() {
int eno;
if ((eno = ::pthread_cond_signal(&cond_)) != 0) {
ipc::error("fail pthread_cond_signal[%d]\n", eno);
return false;
}
return true;
IPC_PTHREAD_FUNC_(pthread_cond_signal, &cond_);
}
bool broadcast() {
int eno;
if ((eno = ::pthread_cond_broadcast(&cond_)) != 0) {
ipc::error("fail pthread_cond_broadcast[%d]\n", eno);
return false;
}
return true;
IPC_PTHREAD_FUNC_(pthread_cond_broadcast, &cond_);
}
};
#pragma pop_macro("IPC_PTHREAD_FUNC_")
class semaphore {
mutex lock_;
condition cond_;
@ -156,12 +130,14 @@ public:
}
template <typename F>
void wait_if(F&& check) {
bool wait_if(F&& check) {
bool ret = true;
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
while ((counter_ <= 0) && std::forward<F>(check)()) {
cond_.wait(lock_);
}
while ((counter_ <= 0) &&
std::forward<F>(check)() &&
(ret = cond_.wait(lock_))) ;
-- counter_;
return ret;
}
template <typename F>
@ -204,21 +180,13 @@ public:
}
template <typename F>
static bool multi_wait_if(std::tuple<waiter*, handle_t> const * all, std::size_t size, F&& /*check*/) {
if (all == nullptr || size == 0) {
return false;
}
return true;
}
template <typename F>
bool wait_if(handle_t h, F&& check) {
bool wait_if(handle_t h, F&& pred) {
if (h == invalid()) return false;
counter_.fetch_add(1, std::memory_order_release);
IPC_UNUSED_ auto guard = unique_ptr(&counter_, [](decltype(counter_)* c) {
c->fetch_sub(1, std::memory_order_release);
});
sem_.wait_if(std::forward<F>(check));
sem_.wait_if(std::forward<F>(pred));
return true;
}

View File

@ -38,11 +38,11 @@ public:
}
template <typename F>
static bool multi_wait_if(std::tuple<waiter*, handle_t> const * all, std::size_t size, F&& check) {
static bool multi_wait_if(std::tuple<waiter*, handle_t> const * all, std::size_t size, F&& pred) {
if (all == nullptr || size == 0) {
return false;
}
if (!std::forward<F>(check)()) return true;
if (!std::forward<F>(pred)()) return true;
auto hs = static_cast<handle_t*>(mem::alloc(sizeof(handle_t) * size));
IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(handle_t) * size); });
std::size_t i = 0;
@ -58,9 +58,9 @@ public:
}
template <typename F>
bool wait_if(handle_t h, F&& check) {
bool wait_if(handle_t h, F&& pred) {
if (h == invalid()) return false;
if (!std::forward<F>(check)()) return true;
if (!std::forward<F>(pred)()) return true;
counter_.fetch_add(1, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release);
return ::WaitForSingleObject(h, INFINITE) == WAIT_OBJECT_0;

View File

@ -63,28 +63,10 @@ public:
h_ = waiter_t::invalid();
}
template <typename F>
bool multi_wait_if(waiter_wrapper * all, std::size_t size, F&& check) {
if (all == nullptr || size == 0) {
return false;
}
using tp_t = decltype(std::declval<waiter_wrapper>().to_w_info());
auto hs = static_cast<tp_t*>(mem::alloc(sizeof(tp_t) * size));
IPC_UNUSED_ auto guard = unique_ptr(hs, [size](void* p) { mem::free(p, sizeof(tp_t) * size); });
std::size_t i = 0;
for (; i < size; ++i) {
auto& w = all[i];
if (!w.valid()) continue;
hs[i] = w.to_w_info();
}
return waiter_t::multi_wait_if(hs, i, std::forward<F>(check));
}
template <typename F>
bool wait_if(F&& check) {
bool wait_if(F&& pred) {
if (!valid()) return false;
return w_->wait_if(h_, std::forward<F>(check));
return w_->wait_if(h_, std::forward<F>(pred));
}
bool notify() {

View File

@ -30,7 +30,7 @@ std::vector<ipc::buff_t> datas__;
constexpr int DataMin = 2;
constexpr int DataMax = 256;
constexpr int LoopCount = 100000;
//constexpr int LoopCount = 10000;
//constexpr int LoopCount = 1000;
} // internal-linkage

View File

@ -15,7 +15,6 @@ class Unit : public TestSuite {
private slots:
void test_broadcast();
void test_multiwait();
} unit__;
#include "test_waiter.moc"
@ -42,40 +41,4 @@ void Unit::test_broadcast() {
for (auto& t : ts) t.join();
}
void Unit::test_multiwait() {
ipc::detail::waiter w, mw;
std::thread ts[10];
ipc::detail::waiter_wrapper ws[10];
std::size_t i = 0;
for (auto& t : ts) {
t = std::thread([&w, &mw, &ws, i] {
ipc::detail::waiter_wrapper wp { &w };
ws[i].attach(&mw);
QVERIFY(wp.open("test-ipc-waiter"));
QVERIFY(ws[i].open("test-ipc-multiwait"));
QVERIFY(wp.wait_if([] { return true; }));
if (i == 3) {
std::cout << "waiting for notify...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
QVERIFY(ws[i].broadcast());
}
});
++i;
}
ipc::detail::waiter_wrapper wp { &w }, wm { &mw };
QVERIFY(wp.open("test-ipc-waiter"));
QVERIFY(wm.open("test-ipc-multiwait"));
std::cout << "waiting for broadcast...\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
QVERIFY(wp.broadcast());
QVERIFY(wm.multi_wait_if(ws, 10, [] { return true; }));
for (auto& t : ts) t.join();
}
} // internal-linkage