diff --git a/include/continuable/detail/composition-all.hpp b/include/continuable/detail/composition-all.hpp index 8eecfee..cd4c232 100644 --- a/include/continuable/detail/composition-all.hpp +++ b/include/continuable/detail/composition-all.hpp @@ -89,9 +89,9 @@ constexpr auto deduce_hint(Composition && /*composition*/) /// Caches the partial results and invokes the callback when all results /// are arrived. This class is thread safe. template -class all_result_submitter : public std::enable_shared_from_this< - all_result_submitter>, - public util::non_movable { +class result_submitter + : public std::enable_shared_from_this>, + public util::non_movable { Callback callback_; Result result_; @@ -99,36 +99,16 @@ class all_result_submitter : public std::enable_shared_from_this< std::atomic left_; std::once_flag flag_; - template - void resolve(traits::size_constant from, traits::size_constant, - PartialArgs&&... args) { - - static_assert(sizeof...(args) == (To - From), - "Submission called with the wrong amount of arguments!"); - - // Assign the values from the result to it's correct positions of the - // tuple. Maybe think about the thread safety again...: - // http://stackoverflow.com/questions/40845699 - assign(from, result_, std::forward(args)...); - - // Complete the current result - complete_one(); - } - - template - void resolve(traits::size_constant, traits::size_constant, - types::dispatch_error_tag tag, types::error_type error) { - - // We never complete the composition, but we forward the first error - // which was raised. - std::call_once(flag_, std::move(callback_), tag, std::move(error)); - } - // Invokes the callback with the cached result void invoke() { assert((left_ == 0U) && "Expected that the submitter is finished!"); std::atomic_thread_fence(std::memory_order_acquire); - traits::unpack(std::move(result_), [&](auto&&... args) { + + auto cleaned = + map_pack(remapping::unpack_result_guards{}, std::move(result_)); + + // Call the final callback with the cleaned result + traits::unpack(std::move(cleaned), [&](auto&&... args) { std::call_once(flag_, std::move(callback_), std::forward(args)...); }); @@ -145,20 +125,30 @@ class all_result_submitter : public std::enable_shared_from_this< template struct partial_all_callback { - Target* target_; - std::shared_ptr me_; + Target* target; + std::shared_ptr me; - template - void operator()(PartialArgs&&... args) && { + template + void operator()(Args&&... args) && { + + // Assign the result to the target + *target = remapping::wrap(std::forward(args)...); + + // Complete one result + me->complete_one(); } template - void operator()(types::dispatch_error_tag, types::error_type) && { + void operator()(types::dispatch_error_tag tag, types::error_type error) && { + // We never complete the composition, but we forward the first error + // which was raised. + std::call_once(me->flag_, std::move(me->callback_), tag, + std::move(error)); } }; public: - explicit all_result_submitter(Callback callback, Result&& result) + explicit result_submitter(Callback callback, Result&& result) : callback_(std::move(callback)), result_(std::move(result)), left_(1) { } @@ -173,15 +163,25 @@ public: /// Initially the counter is created with an initial count of 1 in order /// to prevent that the composition is finished before all callbacks /// were registered. - void start_accept() { + void accept() { + complete_one(); + } + + constexpr Result* result_ptr() noexcept { + return &result_; } }; +template struct continuable_dispatcher { + std::shared_ptr& submitter; + template >::value>* = nullptr> - auto operator()(Index* index, Target* target) const noexcept { + void operator()(Index* index, Target* target) const { + // Retrieve a callback from the submitter and attach it to the continuable + std::move(*index).next(submitter->create_callback(target)).done(); } }; } // namespace all @@ -197,11 +197,29 @@ struct composition_finalizer { /// Finalizes the all logic of a given composition template static auto finalize(Composition&& composition) { - return [composition = std::forward(composition)]( - auto&& callback) mutable { + return [composition = std::forward(composition)] // ... + (auto&& callback) mutable { - auto result = - remapping::create_result_pack(std::forward(composition)); + // Create the target result from the composition + auto result = remapping::create_result_pack(std::move(composition)); + + using submitter_t = + all::result_submitter, + std::decay_t>; + + // Create the shared state which holds the result and the final callback + auto state = std::make_shared( + std::forward(callback), std::move(result)); + + // Dispatch the continuables and store its partial result + // in the whole result + // TODO Fix use after move here + remapping::relocate_index_pack( + all::continuable_dispatcher{state}, &composition, + state->result_ptr()); + + // Finalize the composition if all results arrived in-place + state->accept(); }; } }; diff --git a/include/continuable/detail/composition-seq.hpp b/include/continuable/detail/composition-seq.hpp index c50ad4d..fb95337 100644 --- a/include/continuable/detail/composition-seq.hpp +++ b/include/continuable/detail/composition-seq.hpp @@ -117,7 +117,7 @@ struct index_relocator { template >::value>* = nullptr> - auto operator()(Index* index, Target* target) const noexcept { + void operator()(Index* index, Target* target) const noexcept { // Assign the address of the target to the indexed continuable index->target = target; } @@ -217,12 +217,11 @@ struct composition_finalizer { /// Finalizes the all logic of a given composition template static auto finalize(Composition&& composition) { - return [composition = std::forward(composition)]( - auto&& callback) mutable { + return [composition = std::forward(composition)] // ... + (auto&& callback) mutable { auto index = seq::create_index_pack(composition); - auto result = - remapping::create_result_pack(std::forward(composition)); + auto result = remapping::create_result_pack(std::move(composition)); // The data from which the visitor is constructed in-place using data_t = diff --git a/test/playground/test-playground.cpp b/test/playground/test-playground.cpp index 656599a..a82ba84 100644 --- a/test/playground/test-playground.cpp +++ b/test/playground/test-playground.cpp @@ -158,7 +158,7 @@ int main(int, char**) { // ... }); - /*composition::apply_composition( + composition::apply_composition( composition::composition_strategy_all_tag{}, cti::make_ready_continuable(0, 1), 2, //< See this plain value std::vector>{cti::make_ready_continuable(3), @@ -168,5 +168,5 @@ int main(int, char**) { std::tuple> r5) { // ... util::unused(r0, r1, r2, r34, r5); - });*/ + }); }