Started on simplifying the all and seq composition heavily

This commit is contained in:
Denis Blank 2018-03-02 01:25:59 +01:00
parent a9da11149b
commit 92ba25cd23
3 changed files with 156 additions and 258 deletions

View File

@ -108,15 +108,12 @@ class result_submitter
assert((left_ == 0U) && "Expected that the submitter is finished!"); assert((left_ == 0U) && "Expected that the submitter is finished!");
std::atomic_thread_fence(std::memory_order_acquire); std::atomic_thread_fence(std::memory_order_acquire);
auto cleaned =
map_pack(remapping::unpack_result_guards{}, std::move(result_));
// Call the final callback with the cleaned result // Call the final callback with the cleaned result
traits::unpack(std::move(cleaned), [&](auto&&... args) { std::call_once(flag_, [&](auto&&... args) {
std::call_once(flag_, std::move(callback_), remapping::finalize_data(std::move(callback_), std::move(result_));
std::forward<decltype(args)>(args)...);
}); });
} }
// Completes one result // Completes one result
void complete_one() { void complete_one() {
assert((left_ > 0U) && "Expected that the submitter isn't finished!"); assert((left_ > 0U) && "Expected that the submitter isn't finished!");
@ -127,16 +124,16 @@ class result_submitter
} }
} }
template <typename Target> template <typename Box>
struct partial_all_callback { struct partial_all_callback {
Target* target; Box* box;
std::shared_ptr<result_submitter> me; std::shared_ptr<result_submitter> me;
template <typename... Args> template <typename... Args>
void operator()(Args&&... args) && { void operator()(Args&&... args) && {
// Assign the result to the target // Assign the result to the target
*target = remapping::wrap(std::forward<decltype(args)>(args)...); box->assign(std::forward<decltype(args)>(args)...);
// Complete one result // Complete one result
me->complete_one(); me->complete_one();
@ -157,11 +154,11 @@ public:
} }
/// Creates a submitter which submits it's result into the storage /// Creates a submitter which submits it's result into the storage
template <typename Target> template <typename Box>
auto create_callback(Target* target) { auto create_callback(Box* box) {
left_.fetch_add(1, std::memory_order_seq_cst); left_.fetch_add(1, std::memory_order_seq_cst);
return partial_all_callback<std::decay_t<Target>>{target, return partial_all_callback<std::decay_t<Box>>{box,
this->shared_from_this()}; this->shared_from_this()};
} }
/// Initially the counter is created with an initial count of 1 in order /// Initially the counter is created with an initial count of 1 in order
@ -171,7 +168,7 @@ public:
complete_one(); complete_one();
} }
constexpr Result* result_ptr() noexcept { constexpr auto& head() noexcept {
return &result_; return &result_;
} }
}; };
@ -180,12 +177,11 @@ template <typename Submitter>
struct continuable_dispatcher { struct continuable_dispatcher {
std::shared_ptr<Submitter>& submitter; std::shared_ptr<Submitter>& submitter;
template <typename Index, typename Target, template <typename Box, std::enable_if_t<remapping::is_continuable_box<
std::enable_if_t< std::decay_t<Box>>::value>* = nullptr>
base::is_continuable<std::decay_t<Index>>::value>* = nullptr> void operator()(Box&& box) const {
void operator()(Index* index, Target* target) const {
// Retrieve a callback from the submitter and attach it to the continuable // Retrieve a callback from the submitter and attach it to the continuable
std::move(*index).next(submitter->create_callback(target)).done(); box.fetch().next(submitter->create_callback(std::addressof(box))).done();
} }
}; };
} // namespace all } // namespace all
@ -205,7 +201,7 @@ struct composition_finalizer<composition_strategy_all_tag> {
(auto&& callback) mutable { (auto&& callback) mutable {
// Create the target result from the composition // Create the target result from the composition
auto result = remapping::create_result_pack(std::move(composition)); auto result = remapping::box_continuables(std::move(composition));
using submitter_t = using submitter_t =
all::result_submitter<std::decay_t<decltype(callback)>, all::result_submitter<std::decay_t<decltype(callback)>,
@ -217,10 +213,8 @@ struct composition_finalizer<composition_strategy_all_tag> {
// Dispatch the continuables and store its partial result // Dispatch the continuables and store its partial result
// in the whole result // in the whole result
// TODO Fix use after move here traverse_pack(all::continuable_dispatcher<submitter_t>{state},
remapping::relocate_index_pack( state->head());
all::continuable_dispatcher<submitter_t>{state}, &composition,
state->result_ptr());
// Finalize the composition if all results arrived in-place // Finalize the composition if all results arrived in-place
state->accept(); state->accept();

View File

@ -55,171 +55,154 @@ namespace composition {
/// - multiple async value -> tuple of async values. /// - multiple async value -> tuple of async values.
namespace remapping { namespace remapping {
// Guard object for representing void results // Guard object for representing void results
struct void_result_guard {}; template <typename Continuable>
// Guard object for representing multiple results class continuable_box;
template <typename... Args> template <typename Data>
struct multi_result_guard { class continuable_box<continuable_base<Data, hints::signature_hint_tag<>>> {
std::tuple<Args...> result_;
multi_result_guard& operator=(std::tuple<Args...> result) { continuable_base<Data, hints::signature_hint_tag<>> continuable_;
result_ = std::move(result);
return *this; public:
explicit continuable_box(
continuable_base<Data, hints::signature_hint_tag<>>&& continuable)
: continuable_(std::move(continuable)) {
} }
};
// Callable object that maps void_result_guard zo zero arguments continuable_base<Data, hints::signature_hint_tag<>>&& fetch() {
struct unpack_result_guards { return std::move(continuable_);
auto operator()(void_result_guard) const noexcept { }
void assign() {
}
auto unbox() && {
return spread_this(); return spread_this();
} }
template <typename... Args> };
auto operator()(multi_result_guard<Args...> guard) const noexcept { template <typename Data, typename First>
// Spread the result of the continuable into the current depth. class continuable_box<
return traits::unpack(std::move(guard.result_), [](auto&&... args) { continuable_base<Data, hints::signature_hint_tag<First>>> {
continuable_base<Data, hints::signature_hint_tag<First>> continuable_;
First first_;
public:
explicit continuable_box(
continuable_base<Data, hints::signature_hint_tag<First>>&& continuable)
: continuable_(std::move(continuable)) {
}
continuable_base<Data, hints::signature_hint_tag<First>>&& fetch() {
return std::move(continuable_);
}
void assign(First first) {
first_ = std::move(first);
}
auto unbox() && {
return std::move(first_);
}
};
template <typename Data, typename First, typename Second, typename... Rest>
class continuable_box<
continuable_base<Data, hints::signature_hint_tag<First, Second, Rest...>>> {
continuable_base<Data, hints::signature_hint_tag<First, Second, Rest...>>
continuable_;
std::tuple<First, Second, Rest...> args_;
public:
explicit continuable_box(
continuable_base<Data,
hints::signature_hint_tag<First, Second, Rest...>>&&
continuable)
: continuable_(std::move(continuable)) {
}
continuable_base<Data, hints::signature_hint_tag<First, Second, Rest...>>&&
fetch() {
return std::move(continuable_);
}
void assign(First first, Second second, Rest... rest) {
args_ = std::make_tuple(std::move(first), std::move(second),
std::move(rest)...);
}
auto unbox() && {
return traits::unpack(std::move(args_), [](auto&&... args) {
return spread_this(std::forward<decltype(args)>(args)...); return spread_this(std::forward<decltype(args)>(args)...);
}); });
} }
}; };
constexpr void_result_guard wrap() { template <typename T>
return {}; struct is_continuable_box : std::false_type {};
} template <typename Continuable>
template <typename First> struct is_continuable_box<continuable_box<Continuable>> : std::true_type {};
constexpr decltype(auto) wrap(First&& first) {
return std::forward<First>(first);
}
template <typename First, typename Second, typename... Rest>
constexpr decltype(auto) wrap(First&& first, Second&& second, Rest&&... rest) {
return std::make_tuple(std::forward<First>(first),
std::forward<Second>(second),
std::forward<Rest>(rest)...);
}
namespace detail { namespace detail {
struct result_extractor_mapper { /// Maps a deeply nested pack of continuables to a continuable_box
/// Create slots for a void result which is removed later. struct continuable_box_packer {
/// This is required due to the fact that each continuable has exactly
/// one matching valuen inside the result tuple.
static constexpr auto initialize(hints::signature_hint_tag<>) noexcept {
return void_result_guard{};
}
/// Initialize a single value
template <typename First>
static constexpr auto initialize(hints::signature_hint_tag<First>) {
return First{};
}
/// Initialize a multiple values as tuple
template <typename First, typename Second, typename... Args>
static constexpr auto
initialize(hints::signature_hint_tag<First, Second, Args...>) {
// TODO Fix non default constructible values
return multi_result_guard<First, Second, Args...>{
std::make_tuple(First{}, Second{}, Args{}...)};
}
/// Remap a continuable to its corresponding result values
/// A void result is mapped to a guard type, single values to the value
/// itself and multiple ones to a tuple of values.
template < template <
typename T, typename T,
std::enable_if_t<base::is_continuable<std::decay_t<T>>::value>* = nullptr> std::enable_if_t<base::is_continuable<std::decay_t<T>>::value>* = nullptr>
auto operator()(T&& /*continuable*/) { auto operator()(T&& continuable) {
auto constexpr const hint = hints::hint_of(traits::identify<T>{}); return continuable_box<std::decay_t<T>>{std::forward<T>(continuable)};
return initialize(hint);
} }
}; };
/// Maps a deeply nested pack of continuable_boxes to its result
/// Relocates the target of a deeply nested pack of indexed_continuable objects struct continuable_box_unpacker {
/// to the given target. template <
template <typename Evaluator> typename T,
struct result_relocator_mapper { std::enable_if_t<is_continuable_box<std::decay_t<T>>::value>* = nullptr>
Evaluator evaluator; auto operator()(T&& box) {
return std::forward<T>(box).unpack();
template <typename Index, typename Result>
void traverse_one(std::false_type, Index*, Result*) {
// Don't do anything when dealing with casual objects
}
template <typename Index, typename Result>
void traverse_one(std::true_type, Index* index, Result* result) {
// Call the evaluator with the address of the indexed object and its target
evaluator(index, result);
}
template <typename Index, typename Result>
void traverse(traversal::container_category_tag<false, false>, Index* index,
Result* result) {
traverse_one(traits::is_invocable<Evaluator, Index*, Result*>{}, index,
result);
}
/// Traverse a homogeneous container
template <bool IsTupleLike, typename Index, typename Result>
void traverse(traversal::container_category_tag<true, IsTupleLike>,
Index* index, Result* result) {
auto index_itr = index->begin();
auto const index_end = index->end();
auto result_itr = result->begin();
auto const result_end = result->end();
using element_t = std::decay_t<decltype(*index->begin())>;
traversal::container_category_of_t<element_t> constexpr const tag;
for (; index_itr != index_end; ++index_itr, ++result_itr) {
assert(result_itr != result_end);
traverse(tag, &*index_itr, &*result_itr);
}
}
template <std::size_t... I, typename Index, typename Result>
void traverse_tuple_like(std::integer_sequence<std::size_t, I...>,
Index* index, Result* result) {
(void)std::initializer_list<int>{
((void)traverse(traversal::container_category_of_t<
std::decay_t<decltype(std::get<I>(*index))>>{},
&std::get<I>(*index), &std::get<I>(*result)),
0)...};
(void)index;
(void)result;
}
/// Traverse tuple like container
template <typename Index, typename Result>
void traverse(traversal::container_category_tag<false, true>, Index* index,
Result* result) {
std::make_index_sequence<std::tuple_size<Index>::value> constexpr const i{};
traverse_tuple_like(i, index, result);
} }
}; };
} // namespace detail } // namespace detail
/// Returns the result pack of the given deeply nested pack. /// Returns the boxed pack of the given deeply nested pack.
/// This invalidates all non-continuable values contained inside the pack. /// This transforms all continuables into a continuable_box which is
/// /// capable of caching the result from the corresponding continuable.
/// This consumes all non continuables inside the pack.
template <typename... Args> template <typename... Args>
constexpr auto create_result_pack(Args&&... args) { constexpr auto box_continuables(Args&&... args) {
return cti::map_pack(detail::result_extractor_mapper{}, return cti::map_pack(detail::continuable_box_packer{},
std::forward<Args>(args)...); std::forward<Args>(args)...);
} }
/// Sets the target pointers of indexed_continuable's inside the index pack /// Returns the unboxed pack of the given deeply nested boxed pack.
/// to point to their given counterparts inside the given target. /// This transforms all continuable_boxes into its result.
template <typename Relocator, typename Index, typename Target> template <typename... Args>
constexpr void relocate_index_pack(Relocator&& relocator, Index* index, constexpr auto unbox_continuables(Args&&... args) {
Target* target) { return cti::map_pack(detail::continuable_box_unpacker{},
std::forward<Args>(args)...);
}
constexpr traversal::container_category_of_t<std::decay_t<Index>> const tag; namespace detail {
template <typename T, typename Callback, typename Data>
void finalize_impl(traits::identity<void>, Callback&& callback, Data&&) {
std::forward<Callback>(callback)();
}
template <typename T, typename Callback, typename Data>
void finalize_impl(traits::identity<T>, Callback&& callback, Data&& data) {
// Call the final callback with the cleaned result
traits::unpack(unbox_continuables(std::forward<Data>(data)),
std::forward<Callback>(callback));
}
} // namespace detail
detail::result_relocator_mapper<std::decay_t<Relocator>> mapper{ template <typename Callback, typename Data>
std::forward<Relocator>(relocator)}; void finalize_data(Callback&& callback, Data&& data) {
using result_t =
decltype(traits::unpack(unbox_continuables(std::forward<Data>(data)),
std::forward<Callback>(callback)));
mapper.traverse(tag, index, target); // Guard the final result against void
return detail::finalize_impl(traits::identity<result_t>{},
std::forward<Data>(data),
std::forward<Callback>(callback));
} }
} // namespace remapping } // namespace remapping
} // namespace composition } // namespace composition

View File

@ -69,65 +69,10 @@ auto sequential_connect(Left&& left, Right&& right) {
}); });
} }
/// Contains an continuable together with a location where the template <typename Callback, typename Box>
/// result shall be stored.
template <typename Continuable, typename Target>
struct indexed_continuable {
Continuable continuable;
Target* target;
};
template <typename T>
struct is_indexed_continuable : std::false_type {};
template <typename Continuable, typename Target>
struct is_indexed_continuable<indexed_continuable<Continuable, Target>>
: std::true_type {};
/// Maps a deeply nested pack of continuables to an indexed continuable
struct result_indexer_mapper {
/// Index a given continuable together with its target location
template <
typename T,
std::enable_if_t<base::is_continuable<std::decay_t<T>>::value>* = nullptr>
auto operator()(T&& continuable) {
auto constexpr const hint = hints::hint_of(traits::identify<T>{});
using target =
decltype(remapping::detail::result_extractor_mapper::initialize(hint));
using type = indexed_continuable<std::decay_t<T>, target>;
// We have to pass the continuables as l-value so we can move the whole pack
// afterwards as r-value, thus we move the continuable from a l-value here.
// NOLINTNEXTLINE(misc-move-forwarding-reference)
return type{std::move(continuable), nullptr};
}
};
/// Returns the result pack of the given deeply nested pack.
/// This invalidates all non-continuable values contained inside the pack.
///
/// This consumes all continuables inside the pack.
template <typename... Args>
constexpr auto create_index_pack(Args&&... args) {
return cti::map_pack(result_indexer_mapper{}, std::forward<Args>(args)...);
}
struct index_relocator {
template <typename Index, typename Target,
std::enable_if_t<
is_indexed_continuable<std::decay_t<Index>>::value>* = nullptr>
void operator()(Index* index, Target* target) const noexcept {
// Assign the address of the target to the indexed continuable
index->target = target;
}
};
template <typename Callback, typename Index, typename Result>
struct sequential_dispatch_data { struct sequential_dispatch_data {
Callback callback; Callback callback;
Index index; Box box;
Result result;
}; };
template <typename Data> template <typename Data>
@ -139,37 +84,29 @@ class sequential_dispatch_visitor
public: public:
explicit sequential_dispatch_visitor(Data&& data) : data_(std::move(data)) { explicit sequential_dispatch_visitor(Data&& data) : data_(std::move(data)) {
// Assign the address of each result target to the corresponding
// indexed continuable.
remapping::relocate_index_pack(index_relocator{}, &data_.index,
&data_.result);
} }
virtual ~sequential_dispatch_visitor() = default; virtual ~sequential_dispatch_visitor() = default;
/// Returns the pack that should be traversed /// Returns the pack that should be traversed
auto& head() { auto& head() {
return data_.index; return data_.box;
} }
template <typename Index, std::enable_if_t<is_indexed_continuable< template <typename Box, std::enable_if_t<remapping::is_continuable_box<
std::decay_t<Index>>::value>* = nullptr> std::decay_t<Box>>::value>* = nullptr>
bool operator()(async_traverse_visit_tag, Index&& /*index*/) { bool operator()(async_traverse_visit_tag, Box&& /*box*/) {
return false; return false;
} }
template <typename Index, typename N> template <typename Box, typename N>
void operator()(async_traverse_detach_tag, Index&& index, N&& next) { void operator()(async_traverse_detach_tag, Box&& box, N&& next) {
assert(index.target && "The target should be non null here!" box->fetch()
"Probably this is caused through a bug in " .then([ box = std::addressof(box),
"result_relocator_mapper!");
std::move(index.continuable)
.then([ target = index.target,
next = std::forward<N>(next) ](auto&&... args) mutable { next = std::forward<N>(next) ](auto&&... args) mutable {
// Assign the result to the target // Assign the result to the target
*target = remapping::wrap(std::forward<decltype(args)>(args)...); box->assign(std::forward<decltype(args)>(args)...);
// Continue the asynchronous sequential traversal // Continue the asynchronous sequential traversal
next(); next();
@ -182,24 +119,10 @@ public:
.done(); .done();
} }
void finalize(traits::identity<void>) {
std::move(data_.callback)();
}
template <typename T>
void finalize(traits::identity<T>) {
auto cleaned =
map_pack(remapping::unpack_result_guards{}, std::move(data_.result));
// Call the final callback with the cleaned result
traits::unpack(std::move(cleaned), std::move(data_.callback));
}
template <typename T> template <typename T>
void operator()(async_traverse_complete_tag, T&& /*pack*/) { void operator()(async_traverse_complete_tag, T&& /*pack*/) {
// Guard the final result against void return remapping::finalize_data(std::move(data_.callback),
using result_t = decltype( std::move(data_.box));
map_pack(remapping::unpack_result_guards{}, std::move(data_.result)));
finalize(traits::identity<result_t>{});
} }
}; };
} // namespace seq } // namespace seq
@ -220,21 +143,19 @@ struct composition_finalizer<composition_strategy_seq_tag> {
return [composition = std::forward<Composition>(composition)] // ... return [composition = std::forward<Composition>(composition)] // ...
(auto&& callback) mutable { (auto&& callback) mutable {
auto index = seq::create_index_pack(composition); auto boxed = remapping::box_continuables(std::move(composition));
auto result = remapping::create_result_pack(std::move(composition));
// The data from which the visitor is constructed in-place // The data from which the visitor is constructed in-place
using data_t = using data_t =
seq::sequential_dispatch_data<std::decay_t<decltype(callback)>, seq::sequential_dispatch_data<std::decay_t<decltype(callback)>,
std::decay_t<decltype(index)>, std::decay_t<decltype(boxed)>>;
std::decay_t<decltype(result)>>;
// The visitor type // The visitor type
using visitor_t = seq::sequential_dispatch_visitor<data_t>; using visitor_t = seq::sequential_dispatch_visitor<data_t>;
traverse_pack_async(async_traverse_in_place_tag<visitor_t>{}, traverse_pack_async(
data_t{std::forward<decltype(callback)>(callback), async_traverse_in_place_tag<visitor_t>{},
std::move(index), std::move(result)}); data_t{std::forward<decltype(callback)>(callback), std::move(boxed)});
}; };
} }
}; };