Implement wait, wait_for and wait_until transforms properly

* wait is implemented by a atomic and default condition variable.
* wait_for and wait_until are expensive since we can't assume
  anything about the environment thus we have to allocate
  a persistent frame.
This commit is contained in:
Denis Blank 2020-04-04 01:11:27 +02:00
parent df4d6ed971
commit ab9669fa2a
5 changed files with 194 additions and 55 deletions

View File

@ -33,15 +33,15 @@
#include <atomic>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <continuable/continuable-primitives.hpp>
#include <continuable/continuable-result.hpp>
#include <continuable/detail/core/annotation.hpp>
#include <continuable/detail/core/base.hpp>
#include <continuable/detail/core/types.hpp>
#include <continuable/detail/features.hpp>
#include <continuable/detail/utility/util.hpp>
#if defined(CONTINUABLE_HAS_EXCEPTIONS)
# include <exception>
@ -50,37 +50,6 @@
namespace cti {
namespace detail {
namespace transforms {
/*
template <typename... Args>
struct sync_trait {
/// The promise type used to create the future
using promise_t = std::tuple<Args...>;
/// Boxes the argument pack into a tuple
static void resolve(promise_t& promise, Args... args) {
promise.set_value(std::make_tuple(std::move(args)...));
}
};
template <>
struct sync_trait<> {
/// The promise type used to create the future
using promise_t = result<Args...>;
/// Boxes the argument pack into void
static void resolve(promise_t& promise) {
promise.set_value();
}
};
template <typename First>
struct sync_trait<First> {
/// The promise type used to create the future
using promise_t = std::promise<First>;
/// Boxes the argument pack into nothing
static void resolve(promise_t& promise, First first) {
promise.set_value(std::move(first));
}
};
*/
template <typename Hint>
struct sync_trait;
template <typename... Args>
@ -88,15 +57,23 @@ struct sync_trait<identity<Args...>> {
using result_t = result<Args...>;
};
/// Transforms the continuation to sync execution
using lock_t = std::unique_lock<std::mutex>;
using condition_variable_t = std::condition_variable;
template <typename Data, typename Annotation>
auto wait(continuable_base<Data, Annotation>&& continuable) {
auto wait_relaxed(continuable_base<Data, Annotation>&& continuable) {
// Do an immediate unpack if the continuable is ready
if (continuable.is_ready()) {
return std::move(continuable).unpack();
}
constexpr auto hint = base::annotation_of(identify<decltype(continuable)>{});
using result_t = typename sync_trait<std::decay_t<decltype(hint)>>::result_t;
(void)hint;
std::recursive_mutex mutex;
std::condition_variable_any cv;
std::mutex cv_mutex;
condition_variable_t cv;
std::atomic_bool ready{false};
result_t sync_result;
@ -110,12 +87,22 @@ auto wait(continuable_base<Data, Annotation>&& continuable) {
.done();
if (!ready.load(std::memory_order_acquire)) {
std::unique_lock<std::recursive_mutex> lock(mutex);
lock_t lock(cv_mutex);
cv.wait(lock, [&] {
return ready.load(std::memory_order_acquire);
});
}
return sync_result;
}
/// Transforms the continuation to sync execution and unpacks the result the if
/// possible
template <typename Data, typename Annotation>
auto wait_and_unpack(continuable_base<Data, Annotation>&& continuable) {
auto sync_result = wait_relaxed(std::move(continuable));
#if defined(CONTINUABLE_HAS_EXCEPTIONS)
if (sync_result.is_value()) {
return std::move(sync_result).get_value();
@ -127,6 +114,60 @@ auto wait(continuable_base<Data, Annotation>&& continuable) {
return sync_result;
#endif // CONTINUABLE_HAS_EXCEPTIONS
}
template <typename Result>
struct wait_frame {
std::mutex cv_mutex;
std::mutex rw_mutex;
condition_variable_t cv;
std::atomic_bool ready{false};
Result sync_result;
};
template <typename Data, typename Annotation, typename Waiter>
auto wait_unsafe(continuable_base<Data, Annotation>&& continuable,
Waiter&& waiter) {
// Do an immediate unpack if the continuable is ready
if (continuable.is_ready()) {
return std::move(continuable).unpack();
}
constexpr auto hint = base::annotation_of(identify<decltype(continuable)>{});
using result_t = typename sync_trait<std::decay_t<decltype(hint)>>::result_t;
(void)hint;
using frame_t = wait_frame<result_t>;
auto frame = std::make_shared<frame_t>();
std::move(continuable)
.next([frame = std::weak_ptr<frame_t>(frame)](auto&&... args) {
if (auto locked = frame.lock()) {
{
std::lock_guard<std::mutex> rw_lock(locked->rw_mutex);
locked->sync_result = result_t::from(
std::forward<decltype(args)>(args)...);
}
locked->ready.store(true, std::memory_order_release);
locked->cv.notify_all();
}
})
.done();
if (!frame->ready.load(std::memory_order_acquire)) {
lock_t lock(frame->cv_mutex);
std::forward<Waiter>(waiter)(frame->cv, lock, [&] {
return frame->ready.load(std::memory_order_acquire);
});
}
return ([&] {
std::lock_guard<std::mutex> rw_lock(frame->rw_mutex);
result_t cached = std::move(frame->sync_result);
return cached;
})();
}
} // namespace transforms
} // namespace detail
} // namespace cti

View File

@ -31,6 +31,8 @@
#ifndef CONTINUABLE_TRANSFORMS_WAIT_HPP_INCLUDED
#define CONTINUABLE_TRANSFORMS_WAIT_HPP_INCLUDED
#include <chrono>
#include <condition_variable>
#include <utility>
#include <continuable/detail/transforms/wait.hpp>
@ -57,10 +59,52 @@ namespace transforms {
/// \since 4.0.0
inline auto wait() {
return [](auto&& continuable) {
return detail::transforms::wait(
return detail::transforms::wait_and_unpack(
std::forward<decltype(continuable)>(continuable));
};
}
/// \copybrief wait
///
/// \returns Returns a result that is available immediately.
/// The signature of the future depends on the result type:
/// | Continuation type | Return type |
/// | : ------------------------------- | : -------------------------------- |
/// | `continuable_base with <>` | `result<>` |
/// | `continuable_base with <Arg>` | `result<Arg>` |
/// | `continuable_base with <Args...>` | `result<Args...>` |
///
/// \attention Thrown exceptions returned through the result, also
/// make sure to check for a valid result value in case the
/// underlying time constraint timed out.
///
/// \since 4.0.0
template <typename Rep, typename Period>
auto wait_for(std::chrono::duration<Rep, Period> duration) {
return [duration](auto&& continuable) {
return detail::transforms::wait_unsafe(
std::forward<decltype(continuable)>(continuable),
[duration](detail::transforms::condition_variable_t& cv,
detail::transforms::lock_t& lock, auto&& predicate) {
cv.wait_for(lock, duration,
std::forward<decltype(predicate)>(predicate));
});
};
}
/// \copydoc wait_for
template <typename Clock, typename Duration>
auto wait_until(std::chrono::time_point<Clock, Duration> time_point) {
return [time_point](auto&& continuable) {
return detail::transforms::wait_unsafe(
std::forward<decltype(continuable)>(continuable),
[time_point](detail::transforms::condition_variable_t& cv,
detail::transforms::lock_t& lock, auto&& predicate) {
cv.wait_until(lock, time_point,
std::forward<decltype(predicate)>(predicate));
});
};
}
} // namespace transforms
/// \}
} // namespace cti

View File

@ -31,26 +31,28 @@ using namespace cti;
using namespace std::chrono_literals;
int main(int, char**) {
asio::io_context ioc(1);
asio::steady_timer t(ioc);
auto work = std::make_shared<asio::io_context::work>(ioc);
asio::io_context context(1);
asio::steady_timer timer(context);
auto work = std::make_shared<asio::io_context::work>(context);
t.expires_after(1s);
timer.expires_after(5s);
std::thread th([&] {
ioc.run();
std::thread thread([&] {
context.run();
puts("io_context finished");
});
int res = t.async_wait(cti::use_continuable)
result<int> res = timer.async_wait(cti::use_continuable)
.then([] {
return 1;
})
.apply(transforms::wait());
.apply(transforms::wait_for(1s));
assert(res.is_empty());
puts("async_wait finished");
work.reset();
timer.cancel();
th.join();
thread.join();
return 0;
}

View File

@ -10,6 +10,7 @@ target_link_libraries(test-continuable-base
PUBLIC
gtest
gtest-main
asio
continuable
continuable-features-flags
continuable-features-warnings

View File

@ -23,14 +23,15 @@
#include <chrono>
#include <future>
#include <memory>
#include <thread>
#include <continuable/continuable-transforms.hpp>
#include <continuable/continuable.hpp>
#include <continuable/external/asio.hpp>
#include <asio.hpp>
#include <test-continuable.hpp>
using namespace cti;
using namespace cti::detail;
using namespace std::chrono_literals;
template <typename T>
@ -73,7 +74,57 @@ TYPED_TEST(single_dimension_tests, to_future_test) {
}
}
TYPED_TEST(single_dimension_tests, to_wait_test) {
class async_test_helper {
public:
async_test_helper()
: context_(1)
, timer_(context_)
, work_(std::make_shared<asio::io_context::work>(context_))
, thread_([&] {
context_.run();
}) {}
~async_test_helper() {
assert(work_);
}
void stop() {
assert(work_);
work_.reset();
thread_.join();
}
auto wait_for(asio::steady_timer::duration duration) {
timer_.expires_after(std::chrono::seconds(1));
return timer_.async_wait(use_continuable);
}
private:
asio::io_context context_;
asio::steady_timer timer_;
std::shared_ptr<asio::io_context::work> work_;
std::thread thread_;
};
TYPED_TEST(single_dimension_tests, to_wait_test_sync) {
{
this->supply().apply(cti::transforms::wait()); //
}
}
TYPED_TEST(single_dimension_tests, to_wait_test_async) {
{
this->supply().apply(cti::transforms::wait()); //
}
}
TYPED_TEST(single_dimension_tests, to_wait_test_ready) {
{
this->supply().apply(cti::transforms::wait()); //
}
}
TYPED_TEST(single_dimension_tests, to_wait_test_exception) {
{
this->supply().apply(cti::transforms::wait()); //
}