diff --git a/include/def.h b/include/def.h index 6764d31..945c0ef 100644 --- a/include/def.h +++ b/include/def.h @@ -29,7 +29,8 @@ using uint_t = typename uint::type; enum : std::size_t { invalid_value = (std::numeric_limits::max)(), data_length = 64, - name_length = 64 + name_length = 64, + send_wait_for = 100 // ms }; enum class relat { // multiplicity of the relationship diff --git a/include/ipc.h b/include/ipc.h index 80ca8a2..b851d70 100644 --- a/include/ipc.h +++ b/include/ipc.h @@ -19,10 +19,10 @@ struct IPC_EXPORT chan_impl { static void disconnect(handle_t h); static std::size_t recv_count(handle_t h); - static bool wait_for_recv(handle_t h, std::size_t r_count); + static bool wait_for_recv(handle_t h, std::size_t r_count, std::size_t tm); static bool send(handle_t h, void const * data, std::size_t size); - static buff_t recv(handle_t h); + static buff_t recv(handle_t h, std::size_t tm); }; template @@ -92,12 +92,12 @@ public: return detail_t::recv_count(h_); } - bool wait_for_recv(std::size_t r_count) const { - return detail_t::wait_for_recv(h_, r_count); + bool wait_for_recv(std::size_t r_count, std::size_t tm = invalid_value) const { + return detail_t::wait_for_recv(h_, r_count, tm); } - static bool wait_for_recv(char const * name, std::size_t r_count) { - return chan_wrapper(name).wait_for_recv(r_count); + static bool wait_for_recv(char const * name, std::size_t r_count, std::size_t tm = invalid_value) { + return chan_wrapper(name).wait_for_recv(r_count, tm); } bool send(void const * data, std::size_t size) { @@ -112,8 +112,8 @@ public: return this->send(str.c_str(), str.size() + 1); } - buff_t recv() { - return detail_t::recv(h_); + buff_t recv(std::size_t tm = invalid_value) { + return detail_t::recv(h_, tm); } }; diff --git a/include/waiter.h b/include/waiter.h index cc3b10d..91bea3c 100644 --- a/include/waiter.h +++ b/include/waiter.h @@ -1,6 +1,7 @@ #pragma once #include "export.h" +#include "def.h" namespace ipc { @@ -22,7 +23,7 @@ public: bool open (char const * name); void close(); - bool lock(); + bool lock (); bool unlock(); private: @@ -49,7 +50,7 @@ public: bool open (char const * name, long count = 0); void close(); - bool wait(); + bool wait(std::size_t tm = invalid_value); bool post(long count = 1); private: @@ -74,7 +75,7 @@ public: bool open (char const * name); void close(); - bool wait(mutex&); + bool wait(mutex&, std::size_t tm = invalid_value); bool notify(); bool broadcast(); diff --git a/src/ipc.cpp b/src/ipc.cpp index 9bd6b67..2d8cf2b 100644 --- a/src/ipc.cpp +++ b/src/ipc.cpp @@ -76,16 +76,19 @@ struct cache_t { }; template -void wait_for(W& waiter, F&& pred) { +bool wait_for(W& waiter, F&& pred, std::size_t tm) { for (unsigned k = 0; pred();) { - bool ret = true; - ipc::sleep(k, [&ret, &waiter, &pred] { - return waiter.wait_if([&ret, &pred] { - return ret = pred(); - }); + bool loop = true, ret = true; + ipc::sleep(k, [&loop, &ret, &waiter, &pred, tm] { + ret = waiter.wait_if([&loop, &ret, &pred] { + return loop = pred(); + }, tm); + return true; }); - if (!ret) break; + if (!ret ) return false; // timeout or fail + if (!loop) break; } + return true; } template @@ -156,15 +159,14 @@ static std::size_t recv_count(ipc::handle_t h) { return que->conn_count(); } -static bool wait_for_recv(ipc::handle_t h, std::size_t r_count) { +static bool wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm) { auto que = queue_of(h); if (que == nullptr) { return false; } - wait_for(info_of(h)->cc_waiter_, [que, r_count] { + return wait_for(info_of(h)->cc_waiter_, [que, r_count] { return que->conn_count() < r_count; - }); - return true; + }, tm); } static bool send(ipc::handle_t h, void const * data, std::size_t size) { @@ -201,7 +203,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size) { return true; } -static buff_t recv(ipc::handle_t h) { +static buff_t recv(ipc::handle_t h, std::size_t tm) { auto que = queue_of(h); if (que == nullptr) return {}; if (que->connect()) { // wouldn't connect twice @@ -211,9 +213,10 @@ static buff_t recv(ipc::handle_t h) { while (1) { // pop a new message typename queue_t::value_t msg; - wait_for(info_of(h)->rd_waiter_, [que, &msg] { - return !que->pop(msg); - }); + if (!wait_for(info_of(h)->rd_waiter_, + [que, &msg] { return !que->pop(msg); }, tm)) { + return {}; + } if (msg.head_.que_ == nullptr) return {}; if (msg.head_.que_ == que) continue; // pop next // msg.head_.remain_ may minus & abs(msg.head_.remain_) < data_length @@ -279,8 +282,8 @@ std::size_t chan_impl::recv_count(ipc::handle_t h) { } template -bool chan_impl::wait_for_recv(ipc::handle_t h, std::size_t r_count) { - return detail_impl>::wait_for_recv(h, r_count); +bool chan_impl::wait_for_recv(ipc::handle_t h, std::size_t r_count, std::size_t tm) { + return detail_impl>::wait_for_recv(h, r_count, tm); } template @@ -289,8 +292,8 @@ bool chan_impl::send(ipc::handle_t h, void const * data, std::size_t size) } template -buff_t chan_impl::recv(ipc::handle_t h) { - return detail_impl>::recv(h); +buff_t chan_impl::recv(ipc::handle_t h, std::size_t tm) { + return detail_impl>::recv(h, tm); } template struct chan_impl>; diff --git a/src/platform/waiter_linux.h b/src/platform/waiter_linux.h index 9503596..516d9fa 100644 --- a/src/platform/waiter_linux.h +++ b/src/platform/waiter_linux.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -17,6 +18,12 @@ namespace ipc { namespace detail { +inline static void calc_wait_time(timespec& ts, std::size_t tm) { + ::clock_gettime(CLOCK_REALTIME, &ts); + ts.tv_sec += (tm / 1000); // seconds + ts.tv_nsec += (tm % 1000) * 1000000; // nanoseconds +} + #pragma push_macro("IPC_PTHREAD_FUNC_") #undef IPC_PTHREAD_FUNC_ #define IPC_PTHREAD_FUNC_(CALL, ...) \ @@ -96,8 +103,15 @@ public: IPC_PTHREAD_FUNC_(pthread_cond_destroy, &cond_); } - bool wait(mutex& mtx) { - IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native()); + bool wait(mutex& mtx, std::size_t tm = invalid_value) { + if (tm == invalid_value) { + IPC_PTHREAD_FUNC_(pthread_cond_wait, &cond_, &mtx.native()); + } + else { + timespec ts; + calc_wait_time(ts, tm); + IPC_PTHREAD_FUNC_(pthread_cond_timedwait, &cond_, &mtx.native(), &ts); + } } bool notify() { @@ -130,8 +144,8 @@ public: #pragma push_macro("IPC_SEMAPHORE_FUNC_") #undef IPC_SEMAPHORE_FUNC_ -#define IPC_SEMAPHORE_FUNC_(CALL, PAR) \ - if (::CALL(PAR) != 0) { \ +#define IPC_SEMAPHORE_FUNC_(CALL, ...) \ + if (::CALL(__VA_ARGS__) != 0) { \ ipc::error("fail " #CALL "[%d]\n", errno); \ return false; \ } \ @@ -151,9 +165,16 @@ public: IPC_SEMAPHORE_FUNC_(sem_post, h); } - static bool wait(handle_t h) { + static bool wait(handle_t h, std::size_t tm = invalid_value) { if (h == invalid()) return false; - IPC_SEMAPHORE_FUNC_(sem_wait, h); + if (tm == invalid_value) { + IPC_SEMAPHORE_FUNC_(sem_wait, h); + } + else { + timespec ts; + calc_wait_time(ts, tm); + IPC_SEMAPHORE_FUNC_(sem_timedwait, h, &ts); + } } #pragma pop_macro("IPC_SEMAPHORE_FUNC_") @@ -204,14 +225,14 @@ public: } template - bool wait_if(handle_t const & h, F&& pred) { + bool wait_if(handle_t const & h, F&& pred, std::size_t tm = invalid_value) { waiting_.fetch_add(1, std::memory_order_release); { IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); if (!std::forward(pred)()) return true; ++ counter_; } - bool ret = sem_helper::wait(std::get<1>(h)); + bool ret = sem_helper::wait(std::get<1>(h), tm); waiting_.fetch_sub(1, std::memory_order_release); ret = sem_helper::post(std::get<2>(h)) && ret; return ret; @@ -283,9 +304,9 @@ public: } template - bool wait_if(handle_t h, F&& pred) { + bool wait_if(handle_t h, F&& pred, std::size_t tm = invalid_value) { if (h == invalid()) return false; - return helper_.wait_if(h, std::forward(pred)); + return helper_.wait_if(h, std::forward(pred), tm); } void notify(handle_t h) { diff --git a/src/platform/waiter_win.h b/src/platform/waiter_win.h index fbe08de..bcee368 100644 --- a/src/platform/waiter_win.h +++ b/src/platform/waiter_win.h @@ -33,11 +33,15 @@ public: ::CloseHandle(h_); } - bool wait() { - DWORD ret; - if ((ret = ::WaitForSingleObject(h_, INFINITE)) == WAIT_OBJECT_0) { + bool wait(std::size_t tm = invalid_value) { + DWORD ret, ms = (tm == invalid_value) ? INFINITE : static_cast(tm); + if ((ret = ::WaitForSingleObject(h_, ms)) == WAIT_OBJECT_0) { return true; } + if (ret == WAIT_TIMEOUT) { + ipc::log("WaitForSingleObject is timeout.\n"); + return false; + } ipc::error("fail WaitForSingleObject[%lu]: 0x%08X\n", ::GetLastError(), ret); return false; } @@ -98,7 +102,7 @@ public: } template - bool wait_if(Mutex& mtx, F&& pred) { + bool wait_if(Mutex& mtx, F&& pred, std::size_t tm = invalid_value) { waiting_->fetch_add(1, std::memory_order_release); { IPC_UNUSED_ auto guard = ipc::detail::unique_lock(lock_); @@ -106,7 +110,7 @@ public: ++ *counter_; } mtx.unlock(); - bool ret = sema_.wait(); + bool ret = sema_.wait(tm); waiting_->fetch_sub(1, std::memory_order_release); ret = handshake_.post() && ret; mtx.lock(); @@ -175,7 +179,7 @@ public: } template - bool wait_if(handle_t& h, F&& pred) { + bool wait_if(handle_t& h, F&& pred, std::size_t tm = invalid_value) { if (h == invalid()) return false; class non_mutex { @@ -184,7 +188,7 @@ public: void unlock() noexcept {} } nm; - return h.wait_if(nm, std::forward(pred)); + return h.wait_if(nm, std::forward(pred), tm); } void notify(handle_t& h) { diff --git a/src/platform/waiter_wrapper.h b/src/platform/waiter_wrapper.h index 0941676..6bcb6d3 100644 --- a/src/platform/waiter_wrapper.h +++ b/src/platform/waiter_wrapper.h @@ -40,8 +40,8 @@ public: cnt_h_ .release(); } - bool wait(mutex_impl& mtx) { - return ipc::detail::condition::wait_if(mtx, [] { return true; }); + bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) { + return ipc::detail::condition::wait_if(mtx, [] { return true; }, tm); } }; @@ -100,8 +100,8 @@ public: class condition_impl : public object_impl { public: - bool wait(mutex_impl& mtx) { - return object().wait(mtx.object()); + bool wait(mutex_impl& mtx, std::size_t tm = invalid_value) { + return object().wait(mtx.object(), tm); } bool notify () { return object().notify (); } @@ -140,9 +140,9 @@ public: opened_.release(); } - bool wait() { + bool wait(std::size_t tm = invalid_value) { if (h_ == sem_helper::invalid()) return false; - return sem_helper::wait(h_); + return sem_helper::wait(h_, tm); } bool post(long count) { @@ -205,9 +205,9 @@ public: } template - bool wait_if(F&& pred) { + bool wait_if(F&& pred, std::size_t tm = invalid_value) { if (!valid()) return false; - return w_->wait_if(h_, std::forward(pred)); + return w_->wait_if(h_, std::forward(pred), tm); } bool notify() { diff --git a/src/waiter.cpp b/src/waiter.cpp index 5ff6cd2..54e8840 100644 --- a/src/waiter.cpp +++ b/src/waiter.cpp @@ -43,8 +43,8 @@ bool mutex::unlock() { #include "waiter_template.inc" -bool semaphore::wait() { - return impl(p_)->h_.wait(); +bool semaphore::wait(std::size_t tm) { + return impl(p_)->h_.wait(tm); } bool semaphore::post(long count) { @@ -61,8 +61,8 @@ bool semaphore::post(long count) { #include "waiter_template.inc" -bool condition::wait(mutex& mtx) { - return impl(p_)->h_.wait(impl(mtx.p_)->h_); +bool condition::wait(mutex& mtx, std::size_t tm) { + return impl(p_)->h_.wait(impl(mtx.p_)->h_, tm); } bool condition::notify() {