Basic implementation of the all composition

This commit is contained in:
Denis Blank 2018-03-01 03:19:49 +01:00
parent 1853ec8b42
commit 7602dfd50d
3 changed files with 65 additions and 48 deletions

View File

@ -89,9 +89,9 @@ constexpr auto deduce_hint(Composition && /*composition*/)
/// Caches the partial results and invokes the callback when all results /// Caches the partial results and invokes the callback when all results
/// are arrived. This class is thread safe. /// are arrived. This class is thread safe.
template <typename Callback, typename Result> template <typename Callback, typename Result>
class all_result_submitter : public std::enable_shared_from_this< class result_submitter
all_result_submitter<Callback, Result>>, : public std::enable_shared_from_this<result_submitter<Callback, Result>>,
public util::non_movable { public util::non_movable {
Callback callback_; Callback callback_;
Result result_; Result result_;
@ -99,36 +99,16 @@ class all_result_submitter : public std::enable_shared_from_this<
std::atomic<std::size_t> left_; std::atomic<std::size_t> left_;
std::once_flag flag_; std::once_flag flag_;
template <std::size_t From, std::size_t To, typename... PartialArgs>
void resolve(traits::size_constant<From> from, traits::size_constant<To>,
PartialArgs&&... args) {
static_assert(sizeof...(args) == (To - From),
"Submission called with the wrong amount of arguments!");
// Assign the values from the result to it's correct positions of the
// tuple. Maybe think about the thread safety again...:
// http://stackoverflow.com/questions/40845699
assign(from, result_, std::forward<PartialArgs>(args)...);
// Complete the current result
complete_one();
}
template <std::size_t From, std::size_t To>
void resolve(traits::size_constant<From>, traits::size_constant<To>,
types::dispatch_error_tag tag, types::error_type error) {
// We never complete the composition, but we forward the first error
// which was raised.
std::call_once(flag_, std::move(callback_), tag, std::move(error));
}
// Invokes the callback with the cached result // Invokes the callback with the cached result
void invoke() { void invoke() {
assert((left_ == 0U) && "Expected that the submitter is finished!"); assert((left_ == 0U) && "Expected that the submitter is finished!");
std::atomic_thread_fence(std::memory_order_acquire); std::atomic_thread_fence(std::memory_order_acquire);
traits::unpack(std::move(result_), [&](auto&&... args) {
auto cleaned =
map_pack(remapping::unpack_result_guards{}, std::move(result_));
// Call the final callback with the cleaned result
traits::unpack(std::move(cleaned), [&](auto&&... args) {
std::call_once(flag_, std::move(callback_), std::call_once(flag_, std::move(callback_),
std::forward<decltype(args)>(args)...); std::forward<decltype(args)>(args)...);
}); });
@ -145,20 +125,30 @@ class all_result_submitter : public std::enable_shared_from_this<
template <typename Target> template <typename Target>
struct partial_all_callback { struct partial_all_callback {
Target* target_; Target* target;
std::shared_ptr<all_result_submitter> me_; std::shared_ptr<result_submitter> me;
template <typename... PartialArgs> template <typename... Args>
void operator()(PartialArgs&&... args) && { void operator()(Args&&... args) && {
// Assign the result to the target
*target = remapping::wrap(std::forward<decltype(args)>(args)...);
// Complete one result
me->complete_one();
} }
template <typename... PartialArgs> template <typename... PartialArgs>
void operator()(types::dispatch_error_tag, types::error_type) && { void operator()(types::dispatch_error_tag tag, types::error_type error) && {
// We never complete the composition, but we forward the first error
// which was raised.
std::call_once(me->flag_, std::move(me->callback_), tag,
std::move(error));
} }
}; };
public: public:
explicit all_result_submitter(Callback callback, Result&& result) explicit result_submitter(Callback callback, Result&& result)
: callback_(std::move(callback)), result_(std::move(result)), left_(1) { : callback_(std::move(callback)), result_(std::move(result)), left_(1) {
} }
@ -173,15 +163,25 @@ public:
/// Initially the counter is created with an initial count of 1 in order /// Initially the counter is created with an initial count of 1 in order
/// to prevent that the composition is finished before all callbacks /// to prevent that the composition is finished before all callbacks
/// were registered. /// were registered.
void start_accept() { void accept() {
complete_one();
}
constexpr Result* result_ptr() noexcept {
return &result_;
} }
}; };
template <typename Submitter>
struct continuable_dispatcher { struct continuable_dispatcher {
std::shared_ptr<Submitter>& submitter;
template <typename Index, typename Target, template <typename Index, typename Target,
std::enable_if_t< std::enable_if_t<
base::is_continuable<std::decay_t<Index>>::value>* = nullptr> base::is_continuable<std::decay_t<Index>>::value>* = nullptr>
auto operator()(Index* index, Target* target) const noexcept { void operator()(Index* index, Target* target) const {
// Retrieve a callback from the submitter and attach it to the continuable
std::move(*index).next(submitter->create_callback(target)).done();
} }
}; };
} // namespace all } // namespace all
@ -197,11 +197,29 @@ struct composition_finalizer<composition_strategy_all_tag> {
/// Finalizes the all logic of a given composition /// Finalizes the all logic of a given composition
template <typename Composition> template <typename Composition>
static auto finalize(Composition&& composition) { static auto finalize(Composition&& composition) {
return [composition = std::forward<Composition>(composition)]( return [composition = std::forward<Composition>(composition)] // ...
auto&& callback) mutable { (auto&& callback) mutable {
auto result = // Create the target result from the composition
remapping::create_result_pack(std::forward<Composition>(composition)); auto result = remapping::create_result_pack(std::move(composition));
using submitter_t =
all::result_submitter<std::decay_t<decltype(callback)>,
std::decay_t<decltype(result)>>;
// Create the shared state which holds the result and the final callback
auto state = std::make_shared<submitter_t>(
std::forward<decltype(callback)>(callback), std::move(result));
// Dispatch the continuables and store its partial result
// in the whole result
// TODO Fix use after move here
remapping::relocate_index_pack(
all::continuable_dispatcher<submitter_t>{state}, &composition,
state->result_ptr());
// Finalize the composition if all results arrived in-place
state->accept();
}; };
} }
}; };

View File

@ -117,7 +117,7 @@ struct index_relocator {
template <typename Index, typename Target, template <typename Index, typename Target,
std::enable_if_t< std::enable_if_t<
is_indexed_continuable<std::decay_t<Index>>::value>* = nullptr> is_indexed_continuable<std::decay_t<Index>>::value>* = nullptr>
auto operator()(Index* index, Target* target) const noexcept { void operator()(Index* index, Target* target) const noexcept {
// Assign the address of the target to the indexed continuable // Assign the address of the target to the indexed continuable
index->target = target; index->target = target;
} }
@ -217,12 +217,11 @@ struct composition_finalizer<composition_strategy_seq_tag> {
/// Finalizes the all logic of a given composition /// Finalizes the all logic of a given composition
template <typename Composition> template <typename Composition>
static auto finalize(Composition&& composition) { static auto finalize(Composition&& composition) {
return [composition = std::forward<Composition>(composition)]( return [composition = std::forward<Composition>(composition)] // ...
auto&& callback) mutable { (auto&& callback) mutable {
auto index = seq::create_index_pack(composition); auto index = seq::create_index_pack(composition);
auto result = auto result = remapping::create_result_pack(std::move(composition));
remapping::create_result_pack(std::forward<Composition>(composition));
// The data from which the visitor is constructed in-place // The data from which the visitor is constructed in-place
using data_t = using data_t =

View File

@ -158,7 +158,7 @@ int main(int, char**) {
// ... // ...
}); });
/*composition::apply_composition( composition::apply_composition(
composition::composition_strategy_all_tag{}, composition::composition_strategy_all_tag{},
cti::make_ready_continuable(0, 1), 2, //< See this plain value cti::make_ready_continuable(0, 1), 2, //< See this plain value
std::vector<cti::continuable<int>>{cti::make_ready_continuable(3), std::vector<cti::continuable<int>>{cti::make_ready_continuable(3),
@ -168,5 +168,5 @@ int main(int, char**) {
std::tuple<std::tuple<int>> r5) { std::tuple<std::tuple<int>> r5) {
// ... // ...
util::unused(r0, r1, r2, r34, r5); util::unused(r0, r1, r2, r34, r5);
});*/ });
} }