mirror of
https://github.com/Naios/continuable.git
synced 2025-12-06 16:56:44 +08:00
726 lines
19 KiB
C++
726 lines
19 KiB
C++
|
|
#include "Callback.h"
|
|
#include "WeakCallbackContainer.h"
|
|
#include "Continuable.h"
|
|
|
|
#include <iostream>
|
|
#include <exception>
|
|
#include <type_traits>
|
|
#include <string>
|
|
#include <vector>
|
|
#include <typeinfo>
|
|
|
|
#include <condition_variable>
|
|
#include <mutex>
|
|
#include <thread>
|
|
#include <atomic>
|
|
#include <random>
|
|
|
|
#include "concurrentqueue.h"
|
|
|
|
#include <boost/optional.hpp>
|
|
|
|
enum SpellCastResult
|
|
{
|
|
SPELL_FAILED_SUCCESS = 0,
|
|
SPELL_FAILED_AFFECTING_COMBAT = 1,
|
|
SPELL_FAILED_ALREADY_AT_FULL_HEALTH = 2,
|
|
SPELL_FAILED_ALREADY_AT_FULL_MANA = 3,
|
|
SPELL_FAILED_ALREADY_AT_FULL_POWER = 4,
|
|
SPELL_FAILED_ALREADY_BEING_TAMED = 5
|
|
};
|
|
|
|
template<typename T>
|
|
using Optional = boost::optional<T>;
|
|
|
|
Continuable<> Log(std::string const& message)
|
|
{
|
|
return make_continuable([=](Callback<>&& callback)
|
|
{
|
|
std::cout << message << std::endl;
|
|
callback();
|
|
});
|
|
}
|
|
|
|
struct ResultSet
|
|
{
|
|
ResultSet(std::size_t affected_) :
|
|
affected(affected_) { };
|
|
|
|
std::size_t affected;
|
|
};
|
|
|
|
Continuable<ResultSet> AsyncQuery(std::string const& query)
|
|
{
|
|
return make_continuable([=](Callback<ResultSet>&& callback)
|
|
{
|
|
std::cout << query << std::endl;
|
|
callback(ResultSet(2));
|
|
});
|
|
}
|
|
|
|
// Original method taking an optional callback.
|
|
void CastSpell(int id, Optional<Callback<SpellCastResult>> const& callback = boost::none)
|
|
{
|
|
std::cout << "Casting " << id << std::endl;
|
|
|
|
// on success call the callback with SPELL_FAILED_SUCCESS
|
|
if (callback)
|
|
(*callback)(SPELL_FAILED_SUCCESS);
|
|
}
|
|
|
|
// Promise wrapped callback decorator.
|
|
Continuable<SpellCastResult> CastSpellPromise(int id)
|
|
{
|
|
return make_continuable([=](Callback<SpellCastResult>&& callback)
|
|
{
|
|
CastSpell(id, callback);
|
|
});
|
|
}
|
|
|
|
// Void instant returning continuable promise for testing purposes
|
|
Continuable<> TrivialPromise(std::string const& msg = "")
|
|
{
|
|
return Log(msg);
|
|
}
|
|
|
|
Continuable<bool> Validate()
|
|
{
|
|
return make_continuable([=](Callback<bool>&& callback)
|
|
{
|
|
std::cout << "Validate " << std::endl;
|
|
|
|
callback(true);
|
|
});
|
|
}
|
|
|
|
Continuable<std::unique_ptr<int>&&> MoveTest()
|
|
{
|
|
return make_continuable([=](Callback<std::unique_ptr<int>&&>&& callback)
|
|
{
|
|
// Move the unique ptr out to test moveability
|
|
std::unique_ptr<int> ptr(new int(5));
|
|
callback(std::move(ptr));
|
|
});
|
|
}
|
|
|
|
typedef std::unique_ptr<int> Moveable;
|
|
|
|
void testMoveAbleNormal(std::function<void(std::unique_ptr<int>&&)> callback)
|
|
{
|
|
std::unique_ptr<int> ptr(new int(5));
|
|
callback(std::move(ptr));
|
|
}
|
|
|
|
template <typename... T>
|
|
void test_unwrap(std::string const& msg)
|
|
{
|
|
std::cout << msg << " is unwrappable: " << (fu::is_unwrappable<T...>::value ? "true" : "false") << std::endl;
|
|
}
|
|
|
|
/*
|
|
namespace detail
|
|
{
|
|
template<typename, typename>
|
|
struct function_matches_to_args;
|
|
|
|
template<typename LeftReturn, typename... LeftArgs,
|
|
typename RightReturn, typename... RightArgs>
|
|
struct function_matches_to_args<
|
|
std::function<LeftReturn(LeftArgs...)>,
|
|
std::function<RightReturn(RightArgs...)>>
|
|
{
|
|
|
|
};
|
|
}
|
|
*/
|
|
|
|
template<typename Capture, typename Args, typename SeqOfCapture, typename SeqOfArgs>
|
|
class MoveCaptureLamda;
|
|
|
|
template<typename... Capture, typename... Args, std::size_t... SeqOfCapture, std::size_t... SeqOfArgs>
|
|
class MoveCaptureLamda
|
|
<
|
|
fu::identity<Capture...>,
|
|
fu::identity<void, Args...>,
|
|
fu::sequence<SeqOfCapture...>,
|
|
fu::sequence<SeqOfArgs...>
|
|
>
|
|
{
|
|
std::function<void(Capture..., Args...)> _functional;
|
|
|
|
std::tuple<Capture...> _capture;
|
|
|
|
public:
|
|
MoveCaptureLamda(std::function<void(Capture..., Args...)>&& functional, Capture&&... capture)
|
|
: _functional(std::move(functional)), _capture(std::make_tuple(std::forward<Capture>(capture)...)) { }
|
|
|
|
void operator() (Args&&... args)
|
|
{
|
|
_functional(std::move(std::get<SeqOfCapture>(_capture))..., std::forward<Args>(args)...);
|
|
}
|
|
};
|
|
|
|
template<typename... Capture, typename ReturnType, typename... Args, std::size_t... SeqOfCapture, std::size_t... SeqOfArgs>
|
|
class MoveCaptureLamda
|
|
<
|
|
fu::identity<Capture...>,
|
|
fu::identity<ReturnType, Args...>,
|
|
fu::sequence<SeqOfCapture...>,
|
|
fu::sequence<SeqOfArgs...>
|
|
>
|
|
{
|
|
std::function<ReturnType(Capture..., Args...)> _functional;
|
|
|
|
std::tuple<Capture...> _capture;
|
|
|
|
public:
|
|
MoveCaptureLamda(std::function<ReturnType(Capture..., Args...)>&& functional, Capture&&... capture)
|
|
: _functional(std::move(functional)), _capture(std::make_tuple(std::forward<Capture>(capture)...)) { }
|
|
|
|
ReturnType operator() (Args&&... args)
|
|
{
|
|
return _functional(std::move(std::get<SeqOfCapture>(_capture))..., std::forward<Args>(args)...);
|
|
}
|
|
};
|
|
|
|
template<typename _CTy, typename... _ATy>
|
|
class continuable_returner
|
|
{
|
|
_CTy returning_continuable;
|
|
|
|
public:
|
|
continuable_returner(continuable_returner&& right)
|
|
{
|
|
returning_continuable = std::move(right.returning_continuable);
|
|
}
|
|
|
|
continuable_returner(_CTy&& returning_continuable_)
|
|
: returning_continuable(std::move(returning_continuable_)) { }
|
|
|
|
continuable_returner& operator= (continuable_returner&& right)
|
|
{
|
|
returning_continuable = std::move(right.returning_continuable);
|
|
return *this;
|
|
}
|
|
|
|
continuable_returner& operator= (continuable_returner& right)
|
|
{
|
|
// returning_continuable = std::move(right.returning_continuable);
|
|
return *this;
|
|
}
|
|
|
|
auto operator()(_ATy&&...)
|
|
-> _CTy
|
|
{
|
|
return std::move(returning_continuable);
|
|
}
|
|
};
|
|
|
|
template<typename T>
|
|
class copymove
|
|
{
|
|
T _content;
|
|
|
|
bool consumed;
|
|
|
|
copymove& move(copymove& right)
|
|
{
|
|
// _content = std::move(right._content);
|
|
return *this;
|
|
}
|
|
|
|
copymove& move(copymove&& right)
|
|
{
|
|
// _content = std::move(right._content);
|
|
return *this;
|
|
}
|
|
|
|
public:
|
|
copymove(copymove const& right) : consumed(false)
|
|
{
|
|
// move(right._content);
|
|
}
|
|
|
|
copymove(T& right) : consumed(false)
|
|
{
|
|
_content = std::move(right);
|
|
}
|
|
|
|
copymove& operator= (copymove& right)
|
|
{
|
|
return move(right);
|
|
}
|
|
|
|
T get()
|
|
{
|
|
assert(!consumed && "already consumed!");
|
|
consumed = true;
|
|
return move(right._content);
|
|
}
|
|
};
|
|
|
|
class DispatcherPool
|
|
{
|
|
enum TerminationMode
|
|
{
|
|
NONE,
|
|
TERMINATE,
|
|
AWAIT
|
|
};
|
|
|
|
typedef std::function<void()> Callable;
|
|
|
|
std::vector<std::thread> _pool;
|
|
|
|
std::atomic<TerminationMode> _shutdown;
|
|
|
|
std::mutex _mutex;
|
|
|
|
std::condition_variable _condition;
|
|
|
|
moodycamel::ConcurrentQueue<Callable> _queue;
|
|
|
|
public:
|
|
DispatcherPool() : DispatcherPool(std::thread::hardware_concurrency()) { }
|
|
|
|
DispatcherPool(unsigned int const threads) : _shutdown(NONE)
|
|
{
|
|
for (unsigned int i = 0; i < threads; ++i)
|
|
{
|
|
_pool.emplace_back([&, i]
|
|
{
|
|
// Reserve the consumer token
|
|
moodycamel::ConsumerToken token(_queue);
|
|
|
|
Callable callable;
|
|
while (_shutdown != TERMINATE)
|
|
{
|
|
if (_queue.try_dequeue(token, callable))
|
|
{
|
|
std::string msg = "Thread " + std::to_string(i) + " is dispatching...\n";
|
|
std::cout << msg;
|
|
callable();
|
|
}
|
|
else
|
|
{
|
|
if (_shutdown == AWAIT)
|
|
break;
|
|
|
|
{
|
|
std::string msg = "Thread " + std::to_string(i) + " out of work...\n";
|
|
std::cout << msg;
|
|
}
|
|
|
|
std::unique_lock<std::mutex> lock(_mutex);
|
|
|
|
// Lock until new tasks are added
|
|
_condition.wait(lock);
|
|
|
|
{
|
|
std::string msg = "Thread " + std::to_string(i) + " wakes up...\n";
|
|
std::cout << msg;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
~DispatcherPool()
|
|
{
|
|
Shutdown();
|
|
}
|
|
|
|
template<typename Functional>
|
|
void Dispatch(Functional&& functional)
|
|
{
|
|
_queue.enqueue(std::forward<Functional>(functional));
|
|
std::unique_lock<std::mutex> lock(_mutex);
|
|
_condition.notify_one();
|
|
}
|
|
|
|
void Shutdown()
|
|
{
|
|
_Shutdown(TERMINATE);
|
|
}
|
|
|
|
void Await()
|
|
{
|
|
_Shutdown(AWAIT);
|
|
}
|
|
|
|
void _Shutdown(TerminationMode const mode)
|
|
{
|
|
_shutdown = mode;
|
|
for (auto& thread : _pool)
|
|
if (thread.joinable())
|
|
thread.join();
|
|
}
|
|
};
|
|
|
|
int main(int /*argc*/, char** /*argv*/)
|
|
{
|
|
/*
|
|
CastSpellPromise(1)
|
|
.then([](SpellCastResult)
|
|
{
|
|
return CastSpellPromise(2);
|
|
})
|
|
.then([](SpellCastResult)
|
|
{
|
|
std::cout << "Pause a callback (void test) " << std::endl;
|
|
})
|
|
.then(Validate())
|
|
.then(AsyncQuery("SELECT * FROM `users`")
|
|
.then([](ResultSet result)
|
|
{
|
|
// Evaluate result
|
|
std::size_t const affected = result.affected;
|
|
return Log(std::to_string(affected) + " rows affected\n");
|
|
})
|
|
)
|
|
.then(TrivialPromise("huhu"))
|
|
.then(CastSpellPromise(3))
|
|
.then(CastSpellPromise(4)
|
|
.then(CastSpellPromise(5))
|
|
)
|
|
.then(CastSpellPromise(6))
|
|
.then([](SpellCastResult)
|
|
{
|
|
return Validate();
|
|
});
|
|
|
|
MoveTest()
|
|
.then([](std::unique_ptr<int>&& ptr)
|
|
{
|
|
static_assert(std::is_rvalue_reference<decltype(ptr)>::value, "no rvalue");
|
|
|
|
// Error here
|
|
std::unique_ptr<int> other = std::move(ptr);
|
|
});
|
|
|
|
// Mockup of aggregate methods
|
|
make_continuable()
|
|
.all(
|
|
[] { return TrivialPromise(); },
|
|
[] { return TrivialPromise(); },
|
|
[] { return TrivialPromise(); }
|
|
)
|
|
.some(2, // Only 2 of 3 must complete
|
|
[] { return TrivialPromise(); },
|
|
[] { return TrivialPromise(); },
|
|
[] { return TrivialPromise(); }
|
|
)
|
|
.any( // Any of 2.
|
|
[] { return TrivialPromise(); },
|
|
[] { return TrivialPromise(); }
|
|
)
|
|
.then([]
|
|
{
|
|
std::cout << "Finished" << std::endl;
|
|
});
|
|
*/
|
|
//Continuable<bool> cb = make_continuable([](Callback<bool>&& callback)
|
|
//{
|
|
|
|
// callback(true);
|
|
//});
|
|
|
|
//test_unwrap<void()>("void()");
|
|
//test_unwrap<std::function<void()>>("std::function<void()>");
|
|
//test_unwrap<std::vector<std::string>>("std::vector<std::string>");
|
|
|
|
//make_continuable([=](Callback<>&&)
|
|
//{
|
|
|
|
//});
|
|
|
|
//int i = 0;
|
|
//++i;
|
|
|
|
//auto lam = [=](Callback<SpellCastResult>&&)
|
|
//{
|
|
// // on success call the callback with SPELL_FAILED_SUCCESS
|
|
// // callback(SPELL_FAILED_SUCCESS);
|
|
//};
|
|
|
|
//fu::function_type_of_t<decltype(lam)> fun1;
|
|
//fun1 = lam;
|
|
//fun1(Callback<SpellCastResult>());
|
|
|
|
//fu::function_type_of_t<Callback<int>> fun2;
|
|
//
|
|
//shared_callback_of_t<std::function<void(int)>> sc1;
|
|
//weak_callback_of_t<Callback<int>> sc2;
|
|
//
|
|
//make_weak_wrapped_callback(sc1);
|
|
//make_weak_wrapped_callback(sc2);
|
|
|
|
//WeakCallbackContainer callback;
|
|
//
|
|
//auto weakCallback = callback([]
|
|
//{
|
|
//});
|
|
|
|
//typedef Continuable<bool> cont123;
|
|
|
|
//typedef Continuable<bool> myty1;
|
|
//typedef Continuable<bool, float> myty2;
|
|
|
|
//// Convertible test
|
|
//
|
|
//// Continuable<Callback<SpellCastResult>> spell
|
|
//{
|
|
// auto stack =
|
|
|
|
// int iii = 0;
|
|
// iii = 1;
|
|
//}
|
|
|
|
//std::vector<int> myvec;
|
|
|
|
//typedef fu::requires_functional_constructible<std::function<void()>>::type test_assert1;
|
|
//// typedef fu::requires_functional_constructible<std::vector<int>>::type test_assert2;
|
|
|
|
//// Brainstorming: this shows an example callback chain
|
|
//// Given by continuable
|
|
//std::function<void(Callback<SpellCastResult>&&)> continuable_1 = [](Callback<SpellCastResult>&& callback)
|
|
//{
|
|
// callback(SPELL_FAILED_AFFECTING_COMBAT);
|
|
//};
|
|
|
|
//// Implemented by user
|
|
//std::function<std::function<void(Callback<bool>&&)>(SpellCastResult)> callback_by_user_1 = [](SpellCastResult)
|
|
//{
|
|
// // Given by continuable
|
|
// // Fn2
|
|
// return [](Callback<bool>&& callback)
|
|
// {
|
|
// callback(true);
|
|
// };
|
|
//};
|
|
|
|
//// Implemented by user
|
|
//std::function<std::function<void(Callback<>&&)>(bool)> cn2 = [](bool val)
|
|
//{
|
|
// // Finished
|
|
// std::cout << "Callback chain finished! -> " << val << std::endl;
|
|
|
|
// // Given by continuable (auto end)
|
|
// return [](Callback<>&&)
|
|
// {
|
|
// // Empty callback
|
|
// };
|
|
//};
|
|
|
|
//// Entry point
|
|
//std::function<void(Callback<bool>&&>)> entry = [continuable_1 /*= move*/, callback_by_user_1 /*given by the user (::then(...))*/]
|
|
// (std::function<void(Callback<bool>&&)>)
|
|
//{
|
|
// // Call with auto created wrapper by the continuable
|
|
// continuable_1([&](SpellCastResult result /*forward args*/)
|
|
// {
|
|
// // Wrapper functional to process unary or multiple promised callbacks
|
|
// // Returned from the user
|
|
// std::function<void(Callback<bool>&&)> fn2 = callback_by_user_1(/*forward args*/ result);
|
|
// return std::move(fn2);
|
|
// });
|
|
//};
|
|
|
|
//// Here we go
|
|
//entry();
|
|
|
|
detail::unary_chainer_t<
|
|
std::function<Continuable<bool>()>
|
|
>::callback_arguments_t args213987;
|
|
|
|
typedef detail::functional_traits<>::result_maker_of_t<
|
|
|
|
std::function<Continuable<bool>()>,
|
|
decltype(CastSpellPromise(2)),
|
|
decltype(TrivialPromise()),
|
|
std::function<Continuable<float, double>()>,
|
|
std::function<Continuable<>()>,
|
|
std::function<Continuable<bool>()>
|
|
|
|
> maker;
|
|
|
|
maker::arguments_t test282_args;
|
|
maker::partial_results_t test282_pack;
|
|
auto test282_size = maker::size;
|
|
|
|
// static_assert(std::is_same<>::value,
|
|
|
|
detail::concat_identities<fu::identity<int, bool, char>, fu::identity<float, double>>::type myt;
|
|
|
|
// fu::identity<detail::functional_traits<>::position<1>> i;
|
|
|
|
std::tuple<int, std::vector<int>> tup;
|
|
|
|
Moveable moveable(new int(7));
|
|
|
|
auto myargs = std::make_tuple(7, std::vector<int>({ 1, 2, 3 }), std::move(moveable));
|
|
|
|
std::function<int(int, std::vector<int>, Moveable&&)> lam = [](int given_i, std::vector<int> given_vec, Moveable&& moveable)
|
|
{
|
|
Moveable other = std::move(moveable);
|
|
|
|
++given_i;
|
|
return 1;
|
|
};
|
|
|
|
fu::invoke_from_tuple(lam, std::move(myargs));
|
|
|
|
fu::sequence_generator<2>::type seqtype;
|
|
fu::sequence_generator<1>::type zero_seqtype;
|
|
|
|
detail::multiple_when_all_chainer_t<
|
|
fu::identity<>,
|
|
fu::identity<
|
|
std::function<Continuable<>()>,
|
|
std::function<Continuable<std::string>()>
|
|
>
|
|
>::result_maker::partial_results_t myres123345;
|
|
|
|
/*
|
|
auto firstType = detail::multiple_when_all_chainer_t<
|
|
fu::identity<>,
|
|
fu::identity<
|
|
std::function<Continuable<SpellCastResult>()>,
|
|
std::function<Continuable<>()>,
|
|
std::function<Continuable<SpellCastResult>()>
|
|
>
|
|
>::make_when_all(
|
|
[]
|
|
{
|
|
// void
|
|
return CastSpellPromise(10);
|
|
},
|
|
[]
|
|
{
|
|
return make_continuable();
|
|
},
|
|
[]
|
|
{
|
|
return CastSpellPromise(20);
|
|
})
|
|
.then([](SpellCastResult, SpellCastResult)
|
|
{
|
|
|
|
})
|
|
.then([]
|
|
{
|
|
|
|
});
|
|
*/
|
|
|
|
make_continuable()
|
|
.all(
|
|
CastSpellPromise(10)
|
|
.then(CastSpellPromise(15)),
|
|
CastSpellPromise(20),
|
|
make_continuable([](Callback<bool, bool, double , std::unique_ptr<std::string>>&& callback)
|
|
{
|
|
callback(true, false, 0.3f, std::unique_ptr<std::string>(new std::string("oh, all work is done!")));
|
|
}),
|
|
TrivialPromise())
|
|
.then([](SpellCastResult r0, SpellCastResult r1, bool r2, bool r3, double r4, std::unique_ptr<std::string> message)
|
|
{
|
|
return TrivialPromise("Lets see... ").then(Log(*message));
|
|
})
|
|
.then([]
|
|
{
|
|
|
|
});
|
|
|
|
std::function<void()> callable = []
|
|
{
|
|
std::cout << "ok" << std::endl;
|
|
};
|
|
|
|
auto conv_test_1 = std::bind(callable);
|
|
|
|
conv_test_1(1, 1);
|
|
|
|
|
|
/*
|
|
continuable_returner<std::unique_ptr<int>> test26151_start(std::unique_ptr<int>(new int(5)));
|
|
|
|
continuable_returner<std::unique_ptr<int>> test26151_moved = std::move(test26151_start);
|
|
|
|
std::function<std::unique_ptr<int>()> test26151_fn = std::move(test26151_moved);
|
|
*/
|
|
|
|
// static_assert(fu::is_unwrappable<TestFunctor>::value, "not unwrappable!");
|
|
|
|
// std::function<void(int)> fntest = std::move(fn);
|
|
|
|
MoveCaptureLamda<fu::identity<std::unique_ptr<int>>, fu::identity<void, int>, fu::sequence_of_t<1>, fu::sequence_of_t<0>> capture(
|
|
[](std::unique_ptr<int> capture, int arg)
|
|
{
|
|
|
|
int i = 0;
|
|
|
|
++i;
|
|
|
|
}, std::unique_ptr<int>(new int(90)));
|
|
|
|
capture(1);
|
|
|
|
capture(2);
|
|
|
|
std::unique_ptr<int> uptr(new int(90));
|
|
copymove<std::unique_ptr<int>> mycapture(std::move(uptr));
|
|
auto capt = [mycapture]
|
|
{
|
|
|
|
};
|
|
|
|
DispatcherPool countPool(1);
|
|
|
|
DispatcherPool pool(3);
|
|
|
|
auto const seed = std::chrono::steady_clock::now().time_since_epoch().count();
|
|
|
|
std::mt19937 rng(seed);
|
|
std::uniform_int_distribution<int> gen(10, 150);
|
|
|
|
std::vector<int> container;
|
|
|
|
unsigned int counter = 0;
|
|
|
|
for (unsigned int run = 0; run < 2; ++run)
|
|
{
|
|
for (unsigned int i = 0; i < 20; ++i)
|
|
{
|
|
unsigned int wait = gen(rng);
|
|
|
|
++counter;
|
|
|
|
pool.Dispatch([&countPool, &container, i, run, wait]
|
|
{
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
|
std::string str = "Pass " + std::to_string(run) + " dispatching " + std::to_string(i) + " (" + std::to_string(wait) + "ms delay)" + "\n";
|
|
std::cout << str;
|
|
|
|
countPool.Dispatch([&container, wait]
|
|
{
|
|
container.emplace_back(wait);
|
|
});
|
|
});
|
|
}
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
|
}
|
|
|
|
std::cout << "Awaiting termination...\n";
|
|
|
|
// std::this_thread::sleep_for(std::chrono::seconds(5));
|
|
pool.Await();
|
|
countPool.Await();
|
|
|
|
std::cout << container.size() << " == " << counter;
|
|
return 0;
|
|
}
|