Ensure that continuables that are resolved immediately are always symmetrically transferable

This commit is contained in:
Rogiel Sulzbach 2023-09-09 23:30:17 -03:00 committed by Denis Blank
parent c7f5b1cbaf
commit 23a724cf5c
2 changed files with 95 additions and 2 deletions

View File

@ -96,6 +96,16 @@ class awaitable {
/// A cache which is used to pass the result of the continuation /// A cache which is used to pass the result of the continuation
/// to the coroutine. /// to the coroutine.
result_t result_; result_t result_;
/// Enumeration that represents the suspension state of the awaitable.
enum class state : std::uint8_t {
suspended,
pending,
resolved,
};
/// An atomic that specifies whether the awaitable has suspended or not.
/// Allows to perform symmetric transfer on continuables that are
/// immediately resolved.
std::atomic<state> state_{state::pending};
public: public:
explicit constexpr awaitable(Continuable&& continuable) explicit constexpr awaitable(Continuable&& continuable)
@ -117,16 +127,27 @@ public:
/// Suspend the current context /// Suspend the current context
// TODO Convert this to an r-value function once possible // TODO Convert this to an r-value function once possible
void await_suspend(coroutine_handle<> h) { bool await_suspend(coroutine_handle<> h) {
assert(result_.is_empty()); assert(result_.is_empty());
// Forward every result to the current awaitable // Forward every result to the current awaitable
std::move(continuable_) std::move(continuable_)
.next([h, this](auto&&... args) mutable { .next([h, this](auto&&... args) mutable {
assert(result_.is_empty()); assert(result_.is_empty());
result_ = result_t::from(std::forward<decltype(args)>(args)...); result_ = result_t::from(std::forward<decltype(args)>(args)...);
h.resume();
// If true, it means that the promise was suspended (i.e., the
// awaitable await_suspend method has already returned). That
// means we must call the resume coroutine from the continuation
// chain.
if (state_.exchange(state::resolved, std::memory_order_acq_rel) ==
state::suspended) {
return h.resume();
}
}) })
.done(); .done();
return state_.exchange(state::suspended, std::memory_order_acq_rel) !=
state::resolved;
} }
/// Resume the coroutine represented by the handle /// Resume the coroutine represented by the handle

View File

@ -169,6 +169,78 @@ TYPED_TEST(single_dimension_tests, are_awaitable_with_cancellation_from_coro) {
ASSERT_ASYNC_CANCELLATION(resolve_coro_canceled(supply)) ASSERT_ASYNC_CANCELLATION(resolve_coro_canceled(supply))
} }
template <typename S>
cti::continuable<> test_symmetric_transfer(S&& supplier) {
// If symmetric transfer is not working properly, large
// loops will quickly cause stack overflows.
for (size_t index = 0; index < 10000; index++) {
co_await supplier();
}
co_return;
}
TYPED_TEST(single_dimension_tests, are_symmetric_transferable) {
auto const& supply = [&]() {
return cti::make_continuable<int>([](auto&& promise) {
promise.set_value(0);
});
};
ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}
TYPED_TEST(single_dimension_tests, are_symmetric_transferable_type_erased) {
auto const& supply = [&]() -> cti::continuable<int> {
return cti::make_continuable<int>([](auto&& promise) {
promise.set_value(0);
});
};
ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}
TYPED_TEST(single_dimension_tests,
are_symmetric_transferable_using_make_ready) {
auto const& supply = [&]() {
return cti::make_ready_continuable<int>(0);
};
ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}
TYPED_TEST(single_dimension_tests,
are_symmetric_transferable_using_type_erased_make_ready) {
auto const& supply = [&]() -> cti::continuable<int> {
return cti::make_ready_continuable<int>(0);
};
ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}
TYPED_TEST(single_dimension_tests, are_symmetric_transferable_using_type_erased_from_thread) {
auto const& supply = [&]() -> cti::continuable<int> {
return cti::make_continuable<int>([](auto&& promise) {
std::async(std::launch::async, std::forward<decltype(promise)>(promise), 0);
});
};
ASSERT_ASYNC_COMPLETION(test_symmetric_transfer(supply));
}
TYPED_TEST(single_dimension_tests, are_symmetric_transferable_except) {
size_t count = 0;
auto const& supply = [&]() -> cti::continuable<int> {
// NOTE: The symmetric transfer loop does 10000 iterations.
if(++count == 5000) {
return cti::make_exceptional_continuable<int>(
std::make_exception_ptr(std::runtime_error("Failed")));
}
return cti::make_ready_continuable<int>(0);
};
ASSERT_ASYNC_EXCEPTION_COMPLETION(test_symmetric_transfer(supply));
}
# endif // CONTINUABLE_WITH_NO_EXCEPTIONS # endif // CONTINUABLE_WITH_NO_EXCEPTIONS
#endif // CONTINUABLE_HAS_EXPERIMENTAL_COROUTINE #endif // CONTINUABLE_HAS_EXPERIMENTAL_COROUTINE