Fix Stopping a continuable in a failure handler makes wait() hang forever

* Closes #46
* Closes #48
This commit is contained in:
Denis Blank 2022-06-02 01:01:26 +02:00
parent ed8310e345
commit b51be39e71
2 changed files with 132 additions and 27 deletions

View File

@ -72,6 +72,45 @@ struct sync_trait<identity<Args...>> {
using lock_t = std::unique_lock<std::mutex>; using lock_t = std::unique_lock<std::mutex>;
using condition_variable_t = std::condition_variable; using condition_variable_t = std::condition_variable;
template <typename Result>
struct unsafe_unlocker {
unsafe_unlocker(unsafe_unlocker const&) = delete;
unsafe_unlocker(unsafe_unlocker&&) = default;
unsafe_unlocker& operator=(unsafe_unlocker const&) = delete;
unsafe_unlocker& operator=(unsafe_unlocker&&) = default;
~unsafe_unlocker() {
unlock(Result::empty());
}
template <typename... Args>
void operator()(Args&&... args) {
unlock(Result::from(std::forward<Args>(args)...));
}
void unlock(Result&& result) {
if (!ownership_.is_acquired()) {
return;
}
ownership_.release();
lock_t lock(*mutex_);
*result_ = std::move(result);
assert(!ready_->load(std::memory_order_acquire));
ready_->store(true, std::memory_order_release);
cv_->notify_all();
}
std::atomic_bool* ready_;
condition_variable_t* cv_;
std::mutex* mutex_;
Result* result_;
util::ownership ownership_;
};
template <typename Data, typename Annotation, template <typename Data, typename Annotation,
typename Result = typename sync_trait<Annotation>::result_t> typename Result = typename sync_trait<Annotation>::result_t>
Result wait_relaxed(continuable_base<Data, Annotation>&& continuable) { Result wait_relaxed(continuable_base<Data, Annotation>&& continuable) {
@ -84,23 +123,22 @@ Result wait_relaxed(continuable_base<Data, Annotation>&& continuable) {
condition_variable_t cv; condition_variable_t cv;
std::mutex cv_mutex; std::mutex cv_mutex;
bool ready{false}; std::atomic_bool ready{false};
Result sync_result; Result sync_result;
std::move(continuable) std::move(continuable)
.next([&](auto&&... args) { .next(unsafe_unlocker<Result>{
sync_result = Result::from(std::forward<decltype(args)>(args)...); &ready,
&cv,
lock_t lock(cv_mutex); &cv_mutex,
ready = true; &sync_result,
cv.notify_all();
}) })
.done(); .done();
lock_t lock(cv_mutex); lock_t lock(cv_mutex);
if (!ready) { if (!ready.load(std::memory_order_acquire)) {
cv.wait(lock, [&] { cv.wait(lock, [&] {
return ready; return ready.load(std::memory_order_acquire);
}); });
} }
@ -117,14 +155,15 @@ auto wait_and_unpack(continuable_base<Data, Annotation>&& continuable) {
#if defined(CONTINUABLE_HAS_EXCEPTIONS) #if defined(CONTINUABLE_HAS_EXCEPTIONS)
if (sync_result.is_value()) { if (sync_result.is_value()) {
return std::move(sync_result).get_value(); return std::move(sync_result).get_value();
} else { } else if (sync_result.is_exception()) {
assert(sync_result.is_exception()); if (sync_result.is_exception()) {
if (exception_t e = sync_result.get_exception()) { if (exception_t e = sync_result.get_exception()) {
std::rethrow_exception(e); std::rethrow_exception(e);
} else { }
}
}
throw wait_transform_canceled_exception(); throw wait_transform_canceled_exception();
}
}
#else #else
return sync_result; return sync_result;
#endif // CONTINUABLE_HAS_EXCEPTIONS #endif // CONTINUABLE_HAS_EXCEPTIONS
@ -139,6 +178,44 @@ struct wait_frame {
Result sync_result; Result sync_result;
}; };
template <typename Result>
struct unlocker {
unlocker(unlocker const&) = delete;
unlocker(unlocker&&) = default;
unlocker& operator=(unlocker const&) = delete;
unlocker& operator=(unlocker&&) = default;
~unlocker() {
unlock(Result::empty());
}
template <typename... Args>
void operator()(Args&&... args) {
unlock(Result::from(std::forward<decltype(args)>(args)...));
}
void unlock(Result&& result) {
if (!ownership_.is_acquired()) {
return;
}
ownership_.release();
if (auto locked = frame_.lock()) {
{
std::lock_guard<std::mutex> rw_lock(locked->rw_mutex);
assert(!locked->ready.load(std::memory_order_acquire));
locked->sync_result = std::move(result);
}
locked->ready.store(true, std::memory_order_release);
locked->cv.notify_all();
}
}
std::weak_ptr<wait_frame<Result>> frame_;
util::ownership ownership_;
};
template <typename Data, typename Annotation, typename Waiter, template <typename Data, typename Annotation, typename Waiter,
typename Result = typename sync_trait<Annotation>::result_t> typename Result = typename sync_trait<Annotation>::result_t>
Result wait_unsafe(continuable_base<Data, Annotation>&& continuable, Result wait_unsafe(continuable_base<Data, Annotation>&& continuable,
@ -154,18 +231,7 @@ Result wait_unsafe(continuable_base<Data, Annotation>&& continuable,
auto frame = std::make_shared<frame_t>(); auto frame = std::make_shared<frame_t>();
std::move(continuable) std::move(continuable)
.next([frame = std::weak_ptr<frame_t>(frame)](auto&&... args) { .next(unlocker<Result>{std::weak_ptr<frame_t>(frame)})
if (auto locked = frame.lock()) {
{
std::lock_guard<std::mutex> rw_lock(locked->rw_mutex);
locked->sync_result = Result::from(
std::forward<decltype(args)>(args)...);
}
locked->ready.store(true, std::memory_order_release);
locked->cv.notify_all();
}
})
.done(); .done();
if (!frame->ready.load(std::memory_order_acquire)) { if (!frame->ready.load(std::memory_order_acquire)) {

View File

@ -115,11 +115,50 @@ TYPED_TEST(single_dimension_tests, wait_test_exception) {
test_exception); test_exception);
} }
TYPED_TEST(single_dimension_tests, wait_test_unlocked) {
make_continuable<void>([&](promise<> p) {
p.set_value();
}).apply(transforms::wait());
ASSERT_TRUE(true);
}
TYPED_TEST(single_dimension_tests, wait_test_cancellation) { TYPED_TEST(single_dimension_tests, wait_test_cancellation) {
ASSERT_THROW(make_cancelling_continuable<void>().apply( ASSERT_THROW(make_cancelling_continuable<void>().apply(
cti::transforms::wait()), cti::transforms::wait()),
transforms::wait_transform_canceled_exception); transforms::wait_transform_canceled_exception);
} }
TYPED_TEST(single_dimension_tests,
wait_test_exception_unlocked_void_failure_handle) {
ASSERT_THROW(make_exceptional_continuable<void>(supply_test_exception())
.fail([](exception_t) {})
.apply(transforms::wait()),
transforms::wait_transform_canceled_exception);
}
TYPED_TEST(single_dimension_tests, wait_test_unlocked_empty_result) {
ASSERT_THROW(async([]() -> result<> {
return empty_result();
}).apply(transforms::wait()),
transforms::wait_transform_canceled_exception);
}
TYPED_TEST(single_dimension_tests,
wait_for_test_exception_unlocked_void_failure_handle) {
ASSERT_TRUE(make_exceptional_continuable<void>(supply_test_exception())
.fail([](exception_t) {})
.apply(transforms::wait_for(24h))
.is_empty());
}
TYPED_TEST(single_dimension_tests, wait_for_test_unlocked_empty_result) {
ASSERT_TRUE(async([]() -> result<> {
return empty_result();
})
.apply(transforms::wait_for(24h))
.is_empty());
}
#endif // CONTINUABLE_HAS_EXCEPTIONS #endif // CONTINUABLE_HAS_EXCEPTIONS
TYPED_TEST(single_dimension_tests, wait_for_test_sync) { TYPED_TEST(single_dimension_tests, wait_for_test_sync) {