support timeout

This commit is contained in:
zhangyi 2019-03-26 14:08:08 +08:00
parent 6730fa578d
commit 785abd1845
8 changed files with 90 additions and 60 deletions

View File

@ -29,7 +29,8 @@ using uint_t = typename uint<N>::type;
enum : std::size_t {
invalid_value = (std::numeric_limits<std::size_t>::max)(),
data_length = 64,
name_length = 64
name_length = 64,
send_wait_for = 100 // ms
};
enum class relat { // multiplicity of the relationship

View File

@ -19,10 +19,10 @@ struct IPC_EXPORT chan_impl {
static void disconnect(handle_t h);
static std::size_t recv_count(handle_t h);
static bool wait_for_recv(handle_t h, std::size_t r_count);
static bool wait_for_recv(handle_t h, std::size_t r_count, std::size_t tm);
static bool send(handle_t h, void const * data, std::size_t size);
static buff_t recv(handle_t h);
static buff_t recv(handle_t h, std::size_t tm);
};
template <typename Flag>
@ -92,12 +92,12 @@ public:
return detail_t::recv_count(h_);
}
bool wait_for_recv(std::size_t r_count) const {
return detail_t::wait_for_recv(h_, r_count);
bool wait_for_recv(std::size_t r_count, std::size_t tm = invalid_value) const {
return detail_t::wait_for_recv(h_, r_count, tm);
}
static bool wait_for_recv(char const * name, std::size_t r_count) {
return chan_wrapper(name).wait_for_recv(r_count);
static bool wait_for_recv(char const * name, std::size_t r_count, std::size_t tm = invalid_value) {
return chan_wrapper(name).wait_for_recv(r_count, tm);
}
bool send(void const * data, std::size_t size) {
@ -112,8 +112,8 @@ public:
return this->send(str.c_str(), str.size() + 1);
}
buff_t recv() {
return detail_t::recv(h_);
buff_t recv(std::size_t tm = invalid_value) {
return detail_t::recv(h_, tm);
}
};

View File

@ -1,6 +1,7 @@
#pragma once
#include "export.h"
#include "def.h"
namespace ipc {
@ -22,7 +23,7 @@ public:
bool open (char const * name);
void close();
bool lock();
bool lock ();
bool unlock();
private:
@ -49,7 +50,7 @@ public:
bool open (char const * name, long count = 0);
void close();
bool wait();
bool wait(std::size_t tm = invalid_value);
bool post(long count = 1);
private:
@ -74,7 +75,7 @@ public:
bool open (char const * name);
void close();
bool wait(mutex&);
bool wait(mutex&, std::size_t tm = invalid_value);
bool notify();
bool broadcast();

View File

@ -76,16 +76,19 @@ struct cache_t {
};
template <typename W, typename F>
void wait_for(W& waiter, F&& pred) {
bool wait_for(W& waiter, F&& pred, std::size_t tm) {
for (unsigned k = 0; pred();) {
bool ret = true;
ipc::sleep(k, [&ret, &waiter, &pred] {
return waiter.wait_if([&ret, &pred] {
return ret = pred();
});
bool loop = true, ret = true;
ipc::sleep(k, [&loop, &ret, &waiter, &pred, tm] {
ret = waiter.wait_if([&loop, &ret, &pred] {
return loop = pred();
}, tm);
return true;
});
if (!ret) break;
if (!ret ) return false; // timeout or fail
if (!loop) break;
}
return true;
}
template <typename Policy>
@ -156,15 +159,14 @@ static std::size_t recv_count(ipc::handle_t h) {
return que->conn_count();
}
static bool wait_for_recv(ipc::handle_t h, std::size_t r_count) {
static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm) {
auto que = queue_of(h);
if (que == nullptr) {
return false;
}
wait_for(info_of(h)->cc_waiter_, [que, r_count] {
return wait_for(info_of(h)->cc_waiter_, [que, r_count] {
return que->conn_count() < r_count;
});
return true;
}, tm);
}
static bool send(ipc::handle_t h, void const * data, std::size_t size) {
@ -201,7 +203,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) {
return true;
}
static buff_t recv(ipc::handle_t h) {
static buff_t recv(ipc::handle_t h, std::size_t tm) {
auto que = queue_of(h);
if (que == nullptr) return {};
if (que->connect()) { // wouldn't connect twice
@ -211,9 +213,10 @@ static buff_t recv(ipc::handle_t h) {
while (1) {
// pop a new message
typename queue_t::value_t msg;
wait_for(info_of(h)->rd_waiter_, [que, &msg] {
return !que->pop(msg);
});
if (!wait_for(info_of(h)->rd_waiter_,
[que, &msg] { return !que->pop(msg); }, tm)) {
return {};
}
if (msg.head_.que_ == nullptr) return {};
if (msg.head_.que_ == que) continue; // pop next
// msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length
@ -279,8 +282,8 @@ std::size_t chan_impl<Flag>::recv_count(ipc::handle_t h) {
}
template <typename Flag>
bool chan_impl<Flag>::wait_for_recv(ipc::handle_t h, std::size_t r_count) {
return detail_impl<policy_t<Flag>>::wait_for_recv(h, r_count);
bool chan_impl<Flag>::wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm) {
return detail_impl<policy_t<Flag>>::wait_for_recv(h, r_count, tm);
}
template <typename Flag>
@ -289,8 +292,8 @@ bool chan_impl<Flag>::send(ipc::handle_t h, void const * data, std::size_t size)
}
template <typename Flag>
buff_t chan_impl<Flag>::recv(ipc::handle_t h) {
return detail_impl<policy_t<Flag>>::recv(h);
buff_t chan_impl<Flag>::recv(ipc::handle_t h, std::size_t tm) {
return detail_impl<policy_t<Flag>>::recv(h, tm);
}
template struct chan_impl<ipc::wr<relat::single, relat::single, trans::unicast >>;

View File

@ -5,6 +5,7 @@
#include <sys/stat.h>
#include <semaphore.h>
#include <errno.h>
#include <time.h>
#include <atomic>
#include <tuple>
@ -17,6 +18,12 @@
namespace ipc {
namespace detail {
inline static void calc_wait_time(timespec& ts, std::size_t tm) {
::clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += (tm / 1000); // seconds
ts.tv_nsec += (tm % 1000) * 1000000; // nanoseconds
}
#pragma push_macro("IPC_PTHREAD_FUNC_")
#undef IPC_PTHREAD_FUNC_
#define IPC_PTHREAD_FUNC_(CALL, ...) \
@ -96,8 +103,15 @@ public:
IPC_PTHREAD_FUNC_(pthread_cond_destroy, &cond_);
}
bool wait(mutex& mtx) {
IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native());
bool wait(mutex& mtx, std::size_t tm = invalid_value) {
if (tm == invalid_value) {
IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native());
}
else {
timespec ts;
calc_wait_time(ts, tm);
IPC_PTHREAD_FUNC_(pthread_cond_timedwait, &cond_, &mtx.native(), &ts);
}
}
bool notify() {
@ -130,8 +144,8 @@ public:
#pragma push_macro("IPC_SEMAPHORE_FUNC_")
#undef IPC_SEMAPHORE_FUNC_
#define IPC_SEMAPHORE_FUNC_(CALL, PAR) \
if (::CALL(PAR) != 0) { \
#define IPC_SEMAPHORE_FUNC_(CALL, ...) \
if (::CALL(__VA_ARGS__) != 0) { \
ipc::error("fail " #CALL "[%d]\n", errno); \
return false; \
} \
@ -151,9 +165,16 @@ public:
IPC_SEMAPHORE_FUNC_(sem_post, h);
}
static bool wait(handle_t h) {
static bool wait(handle_t h, std::size_t tm = invalid_value) {
if (h == invalid()) return false;
IPC_SEMAPHORE_FUNC_(sem_wait, h);
if (tm == invalid_value) {
IPC_SEMAPHORE_FUNC_(sem_wait, h);
}
else {
timespec ts;
calc_wait_time(ts, tm);
IPC_SEMAPHORE_FUNC_(sem_timedwait, h, &ts);
}
}
#pragma pop_macro("IPC_SEMAPHORE_FUNC_")
@ -204,14 +225,14 @@ public:
}
template <typename F>
bool wait_if(handle_t const & h, F&& pred) {
bool wait_if(handle_t const & h, F&& pred, std::size_t tm = invalid_value) {
waiting_.fetch_add(1, std::memory_order_release);
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
if (!std::forward<F>(pred)()) return true;
++ counter_;
}
bool ret = sem_helper::wait(std::get<1>(h));
bool ret = sem_helper::wait(std::get<1>(h), tm);
waiting_.fetch_sub(1, std::memory_order_release);
ret = sem_helper::post(std::get<2>(h)) && ret;
return ret;
@ -283,9 +304,9 @@ public:
}
template <typename F>
bool wait_if(handle_t h, F&& pred) {
bool wait_if(handle_t h, F&& pred, std::size_t tm = invalid_value) {
if (h == invalid()) return false;
return helper_.wait_if(h, std::forward<F>(pred));
return helper_.wait_if(h, std::forward<F>(pred), tm);
}
void notify(handle_t h) {

View File

@ -33,11 +33,15 @@ public:
::CloseHandle(h_);
}
bool wait() {
DWORD ret;
if ((ret = ::WaitForSingleObject(h_, INFINITE)) == WAIT_OBJECT_0) {
bool wait(std::size_t tm = invalid_value) {
DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast<DWORD>(tm);
if ((ret = ::WaitForSingleObject(h_, ms)) == WAIT_OBJECT_0) {
return true;
}
if (ret == WAIT_TIMEOUT) {
ipc::log("WaitForSingleObject is timeout.\n");
return false;
}
ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret);
return false;
}
@ -98,7 +102,7 @@ public:
}
template <typename Mutex, typename F>
bool wait_if(Mutex& mtx, F&& pred) {
bool wait_if(Mutex& mtx, F&& pred, std::size_t tm = invalid_value) {
waiting_->fetch_add(1, std::memory_order_release);
{
IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_);
@ -106,7 +110,7 @@ public:
++ *counter_;
}
mtx.unlock();
bool ret = sema_.wait();
bool ret = sema_.wait(tm);
waiting_->fetch_sub(1, std::memory_order_release);
ret = handshake_.post() && ret;
mtx.lock();
@ -175,7 +179,7 @@ public:
}
template <typename F>
bool wait_if(handle_t& h, F&& pred) {
bool wait_if(handle_t& h, F&& pred, std::size_t tm = invalid_value) {
if (h == invalid()) return false;
class non_mutex {
@ -184,7 +188,7 @@ public:
void unlock() noexcept {}
} nm;
return h.wait_if(nm, std::forward<F>(pred));
return h.wait_if(nm, std::forward<F>(pred), tm);
}
void notify(handle_t& h) {

View File

@ -40,8 +40,8 @@ public:
cnt_h_ .release();
}
bool wait(mutex_impl& mtx) {
return ipc::detail::condition::wait_if(mtx, [] { return true; });
bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) {
return ipc::detail::condition::wait_if(mtx, [] { return true; }, tm);
}
};
@ -100,8 +100,8 @@ public:
class condition_impl : public object_impl<ipc::detail::condition> {
public:
bool wait(mutex_impl& mtx) {
return object().wait(mtx.object());
bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) {
return object().wait(mtx.object(), tm);
}
bool notify () { return object().notify (); }
@ -140,9 +140,9 @@ public:
opened_.release();
}
bool wait() {
bool wait(std::size_t tm = invalid_value) {
if (h_ == sem_helper::invalid()) return false;
return sem_helper::wait(h_);
return sem_helper::wait(h_, tm);
}
bool post(long count) {
@ -205,9 +205,9 @@ public:
}
template <typename F>
bool wait_if(F&& pred) {
bool wait_if(F&& pred, std::size_t tm = invalid_value) {
if (!valid()) return false;
return w_->wait_if(h_, std::forward<F>(pred));
return w_->wait_if(h_, std::forward<F>(pred), tm);
}
bool notify() {

View File

@ -43,8 +43,8 @@ bool mutex::unlock() {
#include "waiter_template.inc"
bool semaphore::wait() {
return impl(p_)->h_.wait();
bool semaphore::wait(std::size_t tm) {
return impl(p_)->h_.wait(tm);
}
bool semaphore::post(long count) {
@ -61,8 +61,8 @@ bool semaphore::post(long count) {
#include "waiter_template.inc"
bool condition::wait(mutex& mtx) {
return impl(p_)->h_.wait(impl(mtx.p_)->h_);
bool condition::wait(mutex& mtx, std::size_t tm) {
return impl(p_)->h_.wait(impl(mtx.p_)->h_, tm);
}
bool condition::notify() {