#include "Callback.h" #include "WeakCallbackContainer.h" #include "Continuable.h" #include #include #include #include #include #include #include #include #include #include #include #include "concurrentqueue.h" #include enum SpellCastResult { SPELL_FAILED_SUCCESS = 0, SPELL_FAILED_AFFECTING_COMBAT = 1, SPELL_FAILED_ALREADY_AT_FULL_HEALTH = 2, SPELL_FAILED_ALREADY_AT_FULL_MANA = 3, SPELL_FAILED_ALREADY_AT_FULL_POWER = 4, SPELL_FAILED_ALREADY_BEING_TAMED = 5 }; template using Optional = boost::optional; Continuable<> Log(std::string const& message) { return make_continuable([=](Callback<>&& callback) { std::cout << message << std::endl; callback(); }); } struct ResultSet { ResultSet(std::size_t affected_) : affected(affected_) { }; std::size_t affected; }; Continuable AsyncQuery(std::string const& query) { return make_continuable([=](Callback&& callback) { std::cout << query << std::endl; callback(ResultSet(2)); }); } // Original method taking an optional callback. void CastSpell(int id, Optional> const& callback = boost::none) { std::cout << "Casting " << id << std::endl; // on success call the callback with SPELL_FAILED_SUCCESS if (callback) (*callback)(SPELL_FAILED_SUCCESS); } // Promise wrapped callback decorator. Continuable CastSpellPromise(int id) { return make_continuable([=](Callback&& callback) { CastSpell(id, callback); }); } // Void instant returning continuable promise for testing purposes Continuable<> TrivialPromise(std::string const& msg = "") { return Log(msg); } Continuable Validate() { return make_continuable([=](Callback&& callback) { std::cout << "Validate " << std::endl; callback(true); }); } Continuable&&> MoveTest() { return make_continuable([=](Callback&&>&& callback) { // Move the unique ptr out to test moveability std::unique_ptr ptr(new int(5)); callback(std::move(ptr)); }); } typedef std::unique_ptr Moveable; void testMoveAbleNormal(std::function&&)> callback) { std::unique_ptr ptr(new int(5)); callback(std::move(ptr)); } template void test_unwrap(std::string const& msg) { std::cout << msg << " is unwrappable: " << (fu::is_unwrappable::value ? "true" : "false") << std::endl; } /* namespace detail { template struct function_matches_to_args; template struct function_matches_to_args< std::function, std::function> { }; } */ template class MoveCaptureLamda; template class MoveCaptureLamda < fu::identity, fu::identity, fu::sequence, fu::sequence > { std::function _functional; std::tuple _capture; public: MoveCaptureLamda(std::function&& functional, Capture&&... capture) : _functional(std::move(functional)), _capture(std::make_tuple(std::forward(capture)...)) { } void operator() (Args&&... args) { _functional(std::move(std::get(_capture))..., std::forward(args)...); } }; template class MoveCaptureLamda < fu::identity, fu::identity, fu::sequence, fu::sequence > { std::function _functional; std::tuple _capture; public: MoveCaptureLamda(std::function&& functional, Capture&&... capture) : _functional(std::move(functional)), _capture(std::make_tuple(std::forward(capture)...)) { } ReturnType operator() (Args&&... args) { return _functional(std::move(std::get(_capture))..., std::forward(args)...); } }; template class continuable_returner { _CTy returning_continuable; public: continuable_returner(continuable_returner&& right) { returning_continuable = std::move(right.returning_continuable); } continuable_returner(_CTy&& returning_continuable_) : returning_continuable(std::move(returning_continuable_)) { } continuable_returner& operator= (continuable_returner&& right) { returning_continuable = std::move(right.returning_continuable); return *this; } continuable_returner& operator= (continuable_returner& right) { // returning_continuable = std::move(right.returning_continuable); return *this; } auto operator()(_ATy&&...) -> _CTy { return std::move(returning_continuable); } }; template class copymove { T _content; bool consumed; copymove& move(copymove& right) { // _content = std::move(right._content); return *this; } copymove& move(copymove&& right) { // _content = std::move(right._content); return *this; } public: copymove(copymove const& right) : consumed(false) { // move(right._content); } copymove(T& right) : consumed(false) { _content = std::move(right); } copymove& operator= (copymove& right) { return move(right); } T get() { assert(!consumed && "already consumed!"); consumed = true; return move(right._content); } }; class DispatcherPool { enum TerminationMode { NONE, TERMINATE, AWAIT }; typedef std::function Callable; std::vector _pool; std::atomic _shutdown; std::mutex _mutex; std::condition_variable _condition; moodycamel::ConcurrentQueue _queue; public: DispatcherPool() : DispatcherPool(std::thread::hardware_concurrency()) { } DispatcherPool(unsigned int const threads) : _shutdown(NONE) { for (unsigned int i = 0; i < threads; ++i) { _pool.emplace_back([&, i] { Callable callable; while (_shutdown != TERMINATE) { if (_queue.try_dequeue(callable)) { std::string msg = "Thread " + std::to_string(i) + " is dispatching...\n"; std::cout << msg; callable(); } else { if (_shutdown == AWAIT) break; { std::string msg = "Thread " + std::to_string(i) + " out of work...\n"; std::cout << msg; } std::unique_lock lock(_mutex); // Lock until new tasks are added _condition.wait(lock); { std::string msg = "Thread " + std::to_string(i) + " wakes up...\n"; std::cout << msg; } } } }); } } ~DispatcherPool() { Shutdown(); } template void Dispatch(Functional&& functional) { _queue.enqueue(std::forward(functional)); std::unique_lock lock(_mutex); _condition.notify_one(); } void Shutdown() { _Shutdown(TERMINATE); } void Await() { _Shutdown(AWAIT); } void _Shutdown(TerminationMode const mode) { _shutdown = mode; for (auto& thread : _pool) if (thread.joinable()) thread.join(); } }; int main(int /*argc*/, char** /*argv*/) { /* CastSpellPromise(1) .then([](SpellCastResult) { return CastSpellPromise(2); }) .then([](SpellCastResult) { std::cout << "Pause a callback (void test) " << std::endl; }) .then(Validate()) .then(AsyncQuery("SELECT * FROM `users`") .then([](ResultSet result) { // Evaluate result std::size_t const affected = result.affected; return Log(std::to_string(affected) + " rows affected\n"); }) ) .then(TrivialPromise("huhu")) .then(CastSpellPromise(3)) .then(CastSpellPromise(4) .then(CastSpellPromise(5)) ) .then(CastSpellPromise(6)) .then([](SpellCastResult) { return Validate(); }); MoveTest() .then([](std::unique_ptr&& ptr) { static_assert(std::is_rvalue_reference::value, "no rvalue"); // Error here std::unique_ptr other = std::move(ptr); }); // Mockup of aggregate methods make_continuable() .all( [] { return TrivialPromise(); }, [] { return TrivialPromise(); }, [] { return TrivialPromise(); } ) .some(2, // Only 2 of 3 must complete [] { return TrivialPromise(); }, [] { return TrivialPromise(); }, [] { return TrivialPromise(); } ) .any( // Any of 2. [] { return TrivialPromise(); }, [] { return TrivialPromise(); } ) .then([] { std::cout << "Finished" << std::endl; }); */ //Continuable cb = make_continuable([](Callback&& callback) //{ // callback(true); //}); //test_unwrap("void()"); //test_unwrap>("std::function"); //test_unwrap>("std::vector"); //make_continuable([=](Callback<>&&) //{ //}); //int i = 0; //++i; //auto lam = [=](Callback&&) //{ // // on success call the callback with SPELL_FAILED_SUCCESS // // callback(SPELL_FAILED_SUCCESS); //}; //fu::function_type_of_t fun1; //fun1 = lam; //fun1(Callback()); //fu::function_type_of_t> fun2; // //shared_callback_of_t> sc1; //weak_callback_of_t> sc2; // //make_weak_wrapped_callback(sc1); //make_weak_wrapped_callback(sc2); //WeakCallbackContainer callback; // //auto weakCallback = callback([] //{ //}); //typedef Continuable cont123; //typedef Continuable myty1; //typedef Continuable myty2; //// Convertible test // //// Continuable> spell //{ // auto stack = // int iii = 0; // iii = 1; //} //std::vector myvec; //typedef fu::requires_functional_constructible>::type test_assert1; //// typedef fu::requires_functional_constructible>::type test_assert2; //// Brainstorming: this shows an example callback chain //// Given by continuable //std::function&&)> continuable_1 = [](Callback&& callback) //{ // callback(SPELL_FAILED_AFFECTING_COMBAT); //}; //// Implemented by user //std::function&&)>(SpellCastResult)> callback_by_user_1 = [](SpellCastResult) //{ // // Given by continuable // // Fn2 // return [](Callback&& callback) // { // callback(true); // }; //}; //// Implemented by user //std::function&&)>(bool)> cn2 = [](bool val) //{ // // Finished // std::cout << "Callback chain finished! -> " << val << std::endl; // // Given by continuable (auto end) // return [](Callback<>&&) // { // // Empty callback // }; //}; //// Entry point //std::function&&>)> entry = [continuable_1 /*= move*/, callback_by_user_1 /*given by the user (::then(...))*/] // (std::function&&)>) //{ // // Call with auto created wrapper by the continuable // continuable_1([&](SpellCastResult result /*forward args*/) // { // // Wrapper functional to process unary or multiple promised callbacks // // Returned from the user // std::function&&)> fn2 = callback_by_user_1(/*forward args*/ result); // return std::move(fn2); // }); //}; //// Here we go //entry(); detail::unary_chainer_t< std::function()> >::callback_arguments_t args213987; typedef detail::functional_traits<>::result_maker_of_t< std::function()>, decltype(CastSpellPromise(2)), decltype(TrivialPromise()), std::function()>, std::function()>, std::function()> > maker; maker::arguments_t test282_args; maker::partial_results_t test282_pack; auto test282_size = maker::size; // static_assert(std::is_same<>::value, detail::concat_identities, fu::identity>::type myt; // fu::identity::position<1>> i; std::tuple> tup; Moveable moveable(new int(7)); auto myargs = std::make_tuple(7, std::vector({ 1, 2, 3 }), std::move(moveable)); std::function, Moveable&&)> lam = [](int given_i, std::vector given_vec, Moveable&& moveable) { Moveable other = std::move(moveable); ++given_i; return 1; }; fu::invoke_from_tuple(lam, std::move(myargs)); fu::sequence_generator<2>::type seqtype; fu::sequence_generator<1>::type zero_seqtype; detail::multiple_when_all_chainer_t< fu::identity<>, fu::identity< std::function()>, std::function()> > >::result_maker::partial_results_t myres123345; /* auto firstType = detail::multiple_when_all_chainer_t< fu::identity<>, fu::identity< std::function()>, std::function()>, std::function()> > >::make_when_all( [] { // void return CastSpellPromise(10); }, [] { return make_continuable(); }, [] { return CastSpellPromise(20); }) .then([](SpellCastResult, SpellCastResult) { }) .then([] { }); */ make_continuable() .all( CastSpellPromise(10) .then(CastSpellPromise(15)), CastSpellPromise(20), make_continuable([](Callback>&& callback) { callback(true, false, 0.3f, std::unique_ptr(new std::string("oh, all work is done!"))); }), TrivialPromise()) .then([](SpellCastResult r0, SpellCastResult r1, bool r2, bool r3, double r4, std::unique_ptr message) { return TrivialPromise("Lets see... ").then(Log(*message)); }) .then([] { }); std::function callable = [] { std::cout << "ok" << std::endl; }; auto conv_test_1 = std::bind(callable); conv_test_1(1, 1); /* continuable_returner> test26151_start(std::unique_ptr(new int(5))); continuable_returner> test26151_moved = std::move(test26151_start); std::function()> test26151_fn = std::move(test26151_moved); */ // static_assert(fu::is_unwrappable::value, "not unwrappable!"); // std::function fntest = std::move(fn); MoveCaptureLamda>, fu::identity, fu::sequence_of_t<1>, fu::sequence_of_t<0>> capture( [](std::unique_ptr capture, int arg) { int i = 0; ++i; }, std::unique_ptr(new int(90))); capture(1); capture(2); std::unique_ptr uptr(new int(90)); copymove> mycapture(std::move(uptr)); auto capt = [mycapture] { }; typedef std::function Callable; typedef std::unique_ptr CallableHolder; moodycamel::ConcurrentQueue producerConsumerQueue; Callable cal = [] { }; producerConsumerQueue.enqueue(std::move(cal)); DispatcherPool pool(3); auto const seed = std::chrono::steady_clock::now().time_since_epoch().count(); std::mt19937 rng(seed); std::uniform_int_distribution gen(10, 150); for (unsigned int run = 0; run < 2; ++run) { for (unsigned int i = 0; i < 20; ++i) { unsigned int wait = gen(rng); pool.Dispatch([i, run, wait] { std::this_thread::sleep_for(std::chrono::milliseconds(wait)); std::string str = "Pass " + std::to_string(run) + " dispatching " + std::to_string(i) + " (" + std::to_string(wait) + "ms delay)" + "\n"; std::cout << str; }); } std::this_thread::sleep_for(std::chrono::milliseconds(50)); } std::cout << "Awaiting termination...\n"; // std::this_thread::sleep_for(std::chrono::seconds(5)); pool.Await(); return 0; }