mirror of
https://github.com/Naios/continuable.git
synced 2026-01-01 03:12:12 +08:00
Fix "all" compositions with error handling
This commit is contained in:
parent
268245b5e4
commit
22c9ee01d4
@ -41,6 +41,7 @@
|
||||
#include <continuable/detail/api.hpp>
|
||||
#include <continuable/detail/base.hpp>
|
||||
#include <continuable/detail/traits.hpp>
|
||||
#include <continuable/detail/types.hpp>
|
||||
|
||||
namespace cti {
|
||||
namespace detail {
|
||||
@ -111,6 +112,7 @@ constexpr void assign(traits::size_constant<Pos> /*pos*/, T& /*storage*/) {
|
||||
template <std::size_t Pos, typename T, typename Current, typename... Args>
|
||||
void assign(traits::size_constant<Pos> pos, T& storage, Current&& current,
|
||||
Args&&... args) {
|
||||
// TODO Improve this -> linear instantiation
|
||||
std::get<Pos>(storage) = std::forward<Current>(current);
|
||||
assign(pos + traits::size_constant_of<1>(), storage,
|
||||
std::forward<Args>(args)...);
|
||||
@ -125,6 +127,7 @@ class all_result_submitter : public std::enable_shared_from_this<
|
||||
|
||||
T callback_;
|
||||
std::atomic<std::size_t> left_;
|
||||
std::once_flag flag_;
|
||||
std::tuple<Args...> result_;
|
||||
|
||||
public:
|
||||
@ -134,30 +137,50 @@ public:
|
||||
|
||||
/// Creates a submitter which submits it's result into the tuple
|
||||
template <std::size_t From, std::size_t To>
|
||||
auto create_callback(traits::size_constant<From> from,
|
||||
auto create_callback(traits::size_constant<From> /*from*/,
|
||||
traits::size_constant<To> /*to*/) {
|
||||
|
||||
return [ me = this->shared_from_this(), from ](auto&&... args) {
|
||||
static_assert(sizeof...(args) == (To - From),
|
||||
"Submission called with the wrong amount of arguments!");
|
||||
|
||||
// Assign the values from the result to it's correct positions of the
|
||||
// tuple. Maybe think about the thread safety again...:
|
||||
// http://stackoverflow.com/questions/40845699
|
||||
assign(from, me->result_, std::forward<decltype(args)>(args)...);
|
||||
|
||||
// Complete the current result
|
||||
me->complete_one();
|
||||
return [me = this->shared_from_this()](auto&&... args) {
|
||||
// Resolve the and composition with the given arguments at the
|
||||
// stored position
|
||||
me->resolve(traits::size_constant<From>{}, traits::size_constant<To>{},
|
||||
std::forward<decltype(args)>(args)...);
|
||||
};
|
||||
}
|
||||
|
||||
private:
|
||||
template <std::size_t From, std::size_t To, typename... PartialArgs>
|
||||
void resolve(traits::size_constant<From> from, traits::size_constant<To>,
|
||||
PartialArgs&&... args) {
|
||||
|
||||
static_assert(sizeof...(args) == (To - From),
|
||||
"Submission called with the wrong amount of arguments!");
|
||||
|
||||
// Assign the values from the result to it's correct positions of the
|
||||
// tuple. Maybe think about the thread safety again...:
|
||||
// http://stackoverflow.com/questions/40845699
|
||||
assign(from, result_, std::forward<PartialArgs>(args)...);
|
||||
|
||||
// Complete the current result
|
||||
complete_one();
|
||||
}
|
||||
|
||||
template <std::size_t From, std::size_t To>
|
||||
void resolve(traits::size_constant<From>, traits::size_constant<To>,
|
||||
types::dispatch_error_tag tag, types::error_type error) {
|
||||
|
||||
// We never complete the composition, but we forward the first error
|
||||
// which was raised.
|
||||
std::call_once(flag_, std::move(callback_), tag, std::move(error));
|
||||
}
|
||||
|
||||
// Invokes the callback with the cached result
|
||||
void invoke() {
|
||||
assert((left_ == 0U) && "Expected that the submitter is finished!");
|
||||
std::atomic_thread_fence(std::memory_order_acquire);
|
||||
traits::unpack(std::move(result_), [&](auto&&... args) {
|
||||
std::move(callback_)(std::forward<decltype(args)>(args)...);
|
||||
std::call_once(flag_, std::move(callback_),
|
||||
std::forward<decltype(args)>(args)...);
|
||||
});
|
||||
}
|
||||
// Completes one result
|
||||
|
||||
@ -69,7 +69,7 @@ int main(int, char**) {
|
||||
.fail([](std::error_condition) {
|
||||
// ...
|
||||
})
|
||||
.then([](int) {
|
||||
.then([] {
|
||||
// ...
|
||||
});
|
||||
|
||||
@ -78,10 +78,26 @@ int main(int, char**) {
|
||||
// ...
|
||||
return 0;
|
||||
})
|
||||
.fail([](std::error_condition) {
|
||||
.then([](int) {
|
||||
// ...
|
||||
})
|
||||
.then([](int) {
|
||||
.fail([](std::error_condition) {
|
||||
// ...
|
||||
});
|
||||
|
||||
(http_request("github.com") && http_request("github.com"))
|
||||
.then([](std::string, std::string) {
|
||||
// ...
|
||||
})
|
||||
.fail([](std::error_condition) {
|
||||
// ...
|
||||
});
|
||||
|
||||
(http_request("github.com") || http_request("github.com"))
|
||||
.then([](std::string) {
|
||||
// ...
|
||||
})
|
||||
.fail([](std::error_condition) {
|
||||
// ...
|
||||
});
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user