diff --git a/include/continuable/detail/transforms/wait.hpp b/include/continuable/detail/transforms/wait.hpp index 1235c04..0ab4db3 100644 --- a/include/continuable/detail/transforms/wait.hpp +++ b/include/continuable/detail/transforms/wait.hpp @@ -33,15 +33,15 @@ #include #include -#include #include +#include #include #include +#include #include #include #include #include -#include #if defined(CONTINUABLE_HAS_EXCEPTIONS) # include @@ -50,37 +50,6 @@ namespace cti { namespace detail { namespace transforms { - -/* -template -struct sync_trait { - /// The promise type used to create the future - using promise_t = std::tuple; - /// Boxes the argument pack into a tuple - static void resolve(promise_t& promise, Args... args) { - promise.set_value(std::make_tuple(std::move(args)...)); - } -}; -template <> -struct sync_trait<> { - /// The promise type used to create the future - using promise_t = result; - /// Boxes the argument pack into void - static void resolve(promise_t& promise) { - promise.set_value(); - } -}; -template -struct sync_trait { - /// The promise type used to create the future - using promise_t = std::promise; - /// Boxes the argument pack into nothing - static void resolve(promise_t& promise, First first) { - promise.set_value(std::move(first)); - } -}; -*/ - template struct sync_trait; template @@ -88,15 +57,23 @@ struct sync_trait> { using result_t = result; }; -/// Transforms the continuation to sync execution +using lock_t = std::unique_lock; +using condition_variable_t = std::condition_variable; + template -auto wait(continuable_base&& continuable) { +auto wait_relaxed(continuable_base&& continuable) { + + // Do an immediate unpack if the continuable is ready + if (continuable.is_ready()) { + return std::move(continuable).unpack(); + } + constexpr auto hint = base::annotation_of(identify{}); using result_t = typename sync_trait>::result_t; (void)hint; - std::recursive_mutex mutex; - std::condition_variable_any cv; + std::mutex cv_mutex; + condition_variable_t cv; std::atomic_bool ready{false}; result_t sync_result; @@ -110,12 +87,22 @@ auto wait(continuable_base&& continuable) { .done(); if (!ready.load(std::memory_order_acquire)) { - std::unique_lock lock(mutex); + lock_t lock(cv_mutex); cv.wait(lock, [&] { return ready.load(std::memory_order_acquire); }); } + return sync_result; +} + +/// Transforms the continuation to sync execution and unpacks the result the if +/// possible +template +auto wait_and_unpack(continuable_base&& continuable) { + + auto sync_result = wait_relaxed(std::move(continuable)); + #if defined(CONTINUABLE_HAS_EXCEPTIONS) if (sync_result.is_value()) { return std::move(sync_result).get_value(); @@ -127,6 +114,60 @@ auto wait(continuable_base&& continuable) { return sync_result; #endif // CONTINUABLE_HAS_EXCEPTIONS } + +template +struct wait_frame { + std::mutex cv_mutex; + std::mutex rw_mutex; + condition_variable_t cv; + std::atomic_bool ready{false}; + Result sync_result; +}; + +template +auto wait_unsafe(continuable_base&& continuable, + Waiter&& waiter) { + + // Do an immediate unpack if the continuable is ready + if (continuable.is_ready()) { + return std::move(continuable).unpack(); + } + + constexpr auto hint = base::annotation_of(identify{}); + using result_t = typename sync_trait>::result_t; + (void)hint; + using frame_t = wait_frame; + + auto frame = std::make_shared(); + + std::move(continuable) + .next([frame = std::weak_ptr(frame)](auto&&... args) { + if (auto locked = frame.lock()) { + { + std::lock_guard rw_lock(locked->rw_mutex); + locked->sync_result = result_t::from( + std::forward(args)...); + } + + locked->ready.store(true, std::memory_order_release); + locked->cv.notify_all(); + } + }) + .done(); + + if (!frame->ready.load(std::memory_order_acquire)) { + lock_t lock(frame->cv_mutex); + std::forward(waiter)(frame->cv, lock, [&] { + return frame->ready.load(std::memory_order_acquire); + }); + } + + return ([&] { + std::lock_guard rw_lock(frame->rw_mutex); + result_t cached = std::move(frame->sync_result); + return cached; + })(); +} } // namespace transforms } // namespace detail } // namespace cti diff --git a/include/continuable/transforms/wait.hpp b/include/continuable/transforms/wait.hpp index 4ae8914..08f1e4c 100644 --- a/include/continuable/transforms/wait.hpp +++ b/include/continuable/transforms/wait.hpp @@ -31,6 +31,8 @@ #ifndef CONTINUABLE_TRANSFORMS_WAIT_HPP_INCLUDED #define CONTINUABLE_TRANSFORMS_WAIT_HPP_INCLUDED +#include +#include #include #include @@ -57,10 +59,52 @@ namespace transforms { /// \since 4.0.0 inline auto wait() { return [](auto&& continuable) { - return detail::transforms::wait( + return detail::transforms::wait_and_unpack( std::forward(continuable)); }; } + +/// \copybrief wait +/// +/// \returns Returns a result that is available immediately. +/// The signature of the future depends on the result type: +/// | Continuation type | Return type | +/// | : ------------------------------- | : -------------------------------- | +/// | `continuable_base with <>` | `result<>` | +/// | `continuable_base with ` | `result` | +/// | `continuable_base with ` | `result` | +/// +/// \attention Thrown exceptions returned through the result, also +/// make sure to check for a valid result value in case the +/// underlying time constraint timed out. +/// +/// \since 4.0.0 +template +auto wait_for(std::chrono::duration duration) { + return [duration](auto&& continuable) { + return detail::transforms::wait_unsafe( + std::forward(continuable), + [duration](detail::transforms::condition_variable_t& cv, + detail::transforms::lock_t& lock, auto&& predicate) { + cv.wait_for(lock, duration, + std::forward(predicate)); + }); + }; +} + +/// \copydoc wait_for +template +auto wait_until(std::chrono::time_point time_point) { + return [time_point](auto&& continuable) { + return detail::transforms::wait_unsafe( + std::forward(continuable), + [time_point](detail::transforms::condition_variable_t& cv, + detail::transforms::lock_t& lock, auto&& predicate) { + cv.wait_until(lock, time_point, + std::forward(predicate)); + }); + }; +} } // namespace transforms /// \} } // namespace cti diff --git a/test/playground/test-playground.cpp b/test/playground/test-playground.cpp index a413533..803077d 100644 --- a/test/playground/test-playground.cpp +++ b/test/playground/test-playground.cpp @@ -31,26 +31,28 @@ using namespace cti; using namespace std::chrono_literals; int main(int, char**) { - asio::io_context ioc(1); - asio::steady_timer t(ioc); - auto work = std::make_shared(ioc); + asio::io_context context(1); + asio::steady_timer timer(context); + auto work = std::make_shared(context); - t.expires_after(1s); + timer.expires_after(5s); - std::thread th([&] { - ioc.run(); + std::thread thread([&] { + context.run(); puts("io_context finished"); }); - int res = t.async_wait(cti::use_continuable) - .then([] { - return 1; - }) - .apply(transforms::wait()); + result res = timer.async_wait(cti::use_continuable) + .then([] { + return 1; + }) + .apply(transforms::wait_for(1s)); + assert(res.is_empty()); puts("async_wait finished"); work.reset(); + timer.cancel(); - th.join(); + thread.join(); return 0; } diff --git a/test/unit-test/CMakeLists.txt b/test/unit-test/CMakeLists.txt index 5247732..053a6e3 100644 --- a/test/unit-test/CMakeLists.txt +++ b/test/unit-test/CMakeLists.txt @@ -10,6 +10,7 @@ target_link_libraries(test-continuable-base PUBLIC gtest gtest-main + asio continuable continuable-features-flags continuable-features-warnings diff --git a/test/unit-test/multi/test-continuable-transforms.cpp b/test/unit-test/multi/test-continuable-transforms.cpp index de323da..1e5fe8a 100644 --- a/test/unit-test/multi/test-continuable-transforms.cpp +++ b/test/unit-test/multi/test-continuable-transforms.cpp @@ -23,14 +23,15 @@ #include #include +#include #include - #include - +#include +#include +#include #include using namespace cti; -using namespace cti::detail; using namespace std::chrono_literals; template @@ -73,7 +74,57 @@ TYPED_TEST(single_dimension_tests, to_future_test) { } } -TYPED_TEST(single_dimension_tests, to_wait_test) { +class async_test_helper { +public: + async_test_helper() + : context_(1) + , timer_(context_) + , work_(std::make_shared(context_)) + , thread_([&] { + context_.run(); + }) {} + + ~async_test_helper() { + assert(work_); + } + + void stop() { + assert(work_); + work_.reset(); + thread_.join(); + } + + auto wait_for(asio::steady_timer::duration duration) { + timer_.expires_after(std::chrono::seconds(1)); + return timer_.async_wait(use_continuable); + } + +private: + asio::io_context context_; + asio::steady_timer timer_; + std::shared_ptr work_; + std::thread thread_; +}; + +TYPED_TEST(single_dimension_tests, to_wait_test_sync) { + { + this->supply().apply(cti::transforms::wait()); // + } +} + +TYPED_TEST(single_dimension_tests, to_wait_test_async) { + { + this->supply().apply(cti::transforms::wait()); // + } +} + +TYPED_TEST(single_dimension_tests, to_wait_test_ready) { + { + this->supply().apply(cti::transforms::wait()); // + } +} + +TYPED_TEST(single_dimension_tests, to_wait_test_exception) { { this->supply().apply(cti::transforms::wait()); // }