From b51be39e7126c397cec9d13fefdfa3cbbe8291f0 Mon Sep 17 00:00:00 2001 From: Denis Blank Date: Thu, 2 Jun 2022 01:01:26 +0200 Subject: [PATCH] Fix Stopping a continuable in a failure handler makes wait() hang forever * Closes #46 * Closes #48 --- .../continuable/detail/transforms/wait.hpp | 120 ++++++++++++++---- .../async/test-continuable-async.cpp | 39 ++++++ 2 files changed, 132 insertions(+), 27 deletions(-) diff --git a/include/continuable/detail/transforms/wait.hpp b/include/continuable/detail/transforms/wait.hpp index f9a888f..c605f10 100644 --- a/include/continuable/detail/transforms/wait.hpp +++ b/include/continuable/detail/transforms/wait.hpp @@ -72,6 +72,45 @@ struct sync_trait> { using lock_t = std::unique_lock; using condition_variable_t = std::condition_variable; +template +struct unsafe_unlocker { + unsafe_unlocker(unsafe_unlocker const&) = delete; + unsafe_unlocker(unsafe_unlocker&&) = default; + unsafe_unlocker& operator=(unsafe_unlocker const&) = delete; + unsafe_unlocker& operator=(unsafe_unlocker&&) = default; + + ~unsafe_unlocker() { + unlock(Result::empty()); + } + + template + void operator()(Args&&... args) { + unlock(Result::from(std::forward(args)...)); + } + + void unlock(Result&& result) { + if (!ownership_.is_acquired()) { + return; + } + ownership_.release(); + + lock_t lock(*mutex_); + + *result_ = std::move(result); + + assert(!ready_->load(std::memory_order_acquire)); + ready_->store(true, std::memory_order_release); + + cv_->notify_all(); + } + + std::atomic_bool* ready_; + condition_variable_t* cv_; + std::mutex* mutex_; + Result* result_; + util::ownership ownership_; +}; + template ::result_t> Result wait_relaxed(continuable_base&& continuable) { @@ -84,23 +123,22 @@ Result wait_relaxed(continuable_base&& continuable) { condition_variable_t cv; std::mutex cv_mutex; - bool ready{false}; + std::atomic_bool ready{false}; Result sync_result; std::move(continuable) - .next([&](auto&&... args) { - sync_result = Result::from(std::forward(args)...); - - lock_t lock(cv_mutex); - ready = true; - cv.notify_all(); + .next(unsafe_unlocker{ + &ready, + &cv, + &cv_mutex, + &sync_result, }) .done(); lock_t lock(cv_mutex); - if (!ready) { + if (!ready.load(std::memory_order_acquire)) { cv.wait(lock, [&] { - return ready; + return ready.load(std::memory_order_acquire); }); } @@ -117,14 +155,15 @@ auto wait_and_unpack(continuable_base&& continuable) { #if defined(CONTINUABLE_HAS_EXCEPTIONS) if (sync_result.is_value()) { return std::move(sync_result).get_value(); - } else { - assert(sync_result.is_exception()); - if (exception_t e = sync_result.get_exception()) { - std::rethrow_exception(e); - } else { - throw wait_transform_canceled_exception(); + } else if (sync_result.is_exception()) { + if (sync_result.is_exception()) { + if (exception_t e = sync_result.get_exception()) { + std::rethrow_exception(e); + } } } + + throw wait_transform_canceled_exception(); #else return sync_result; #endif // CONTINUABLE_HAS_EXCEPTIONS @@ -139,6 +178,44 @@ struct wait_frame { Result sync_result; }; +template +struct unlocker { + unlocker(unlocker const&) = delete; + unlocker(unlocker&&) = default; + unlocker& operator=(unlocker const&) = delete; + unlocker& operator=(unlocker&&) = default; + + ~unlocker() { + unlock(Result::empty()); + } + + template + void operator()(Args&&... args) { + unlock(Result::from(std::forward(args)...)); + } + + void unlock(Result&& result) { + if (!ownership_.is_acquired()) { + return; + } + ownership_.release(); + + if (auto locked = frame_.lock()) { + { + std::lock_guard rw_lock(locked->rw_mutex); + assert(!locked->ready.load(std::memory_order_acquire)); + locked->sync_result = std::move(result); + } + + locked->ready.store(true, std::memory_order_release); + locked->cv.notify_all(); + } + } + + std::weak_ptr> frame_; + util::ownership ownership_; +}; + template ::result_t> Result wait_unsafe(continuable_base&& continuable, @@ -154,18 +231,7 @@ Result wait_unsafe(continuable_base&& continuable, auto frame = std::make_shared(); std::move(continuable) - .next([frame = std::weak_ptr(frame)](auto&&... args) { - if (auto locked = frame.lock()) { - { - std::lock_guard rw_lock(locked->rw_mutex); - locked->sync_result = Result::from( - std::forward(args)...); - } - - locked->ready.store(true, std::memory_order_release); - locked->cv.notify_all(); - } - }) + .next(unlocker{std::weak_ptr(frame)}) .done(); if (!frame->ready.load(std::memory_order_acquire)) { diff --git a/test/unit-test/async/test-continuable-async.cpp b/test/unit-test/async/test-continuable-async.cpp index 84a7c48..382ace3 100644 --- a/test/unit-test/async/test-continuable-async.cpp +++ b/test/unit-test/async/test-continuable-async.cpp @@ -115,11 +115,50 @@ TYPED_TEST(single_dimension_tests, wait_test_exception) { test_exception); } +TYPED_TEST(single_dimension_tests, wait_test_unlocked) { + make_continuable([&](promise<> p) { + p.set_value(); + }).apply(transforms::wait()); + + ASSERT_TRUE(true); +} + TYPED_TEST(single_dimension_tests, wait_test_cancellation) { ASSERT_THROW(make_cancelling_continuable().apply( cti::transforms::wait()), transforms::wait_transform_canceled_exception); } + +TYPED_TEST(single_dimension_tests, + wait_test_exception_unlocked_void_failure_handle) { + ASSERT_THROW(make_exceptional_continuable(supply_test_exception()) + .fail([](exception_t) {}) + .apply(transforms::wait()), + transforms::wait_transform_canceled_exception); +} + +TYPED_TEST(single_dimension_tests, wait_test_unlocked_empty_result) { + ASSERT_THROW(async([]() -> result<> { + return empty_result(); + }).apply(transforms::wait()), + transforms::wait_transform_canceled_exception); +} + +TYPED_TEST(single_dimension_tests, + wait_for_test_exception_unlocked_void_failure_handle) { + ASSERT_TRUE(make_exceptional_continuable(supply_test_exception()) + .fail([](exception_t) {}) + .apply(transforms::wait_for(24h)) + .is_empty()); +} + +TYPED_TEST(single_dimension_tests, wait_for_test_unlocked_empty_result) { + ASSERT_TRUE(async([]() -> result<> { + return empty_result(); + }) + .apply(transforms::wait_for(24h)) + .is_empty()); +} #endif // CONTINUABLE_HAS_EXCEPTIONS TYPED_TEST(single_dimension_tests, wait_for_test_sync) {