continuable/test.cpp
2015-07-16 23:09:20 +02:00

737 lines
19 KiB
C++

#include "Callback.h"
#include "WeakCallbackContainer.h"
#include "Continuable.h"
#include <iostream>
#include <exception>
#include <type_traits>
#include <string>
#include <vector>
#include <typeinfo>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <atomic>
#include <random>
#include "concurrentqueue.h"
#include "blockingconcurrentqueue.h"
#include <boost/optional.hpp>
enum SpellCastResult
{
SPELL_FAILED_SUCCESS = 0,
SPELL_FAILED_AFFECTING_COMBAT = 1,
SPELL_FAILED_ALREADY_AT_FULL_HEALTH = 2,
SPELL_FAILED_ALREADY_AT_FULL_MANA = 3,
SPELL_FAILED_ALREADY_AT_FULL_POWER = 4,
SPELL_FAILED_ALREADY_BEING_TAMED = 5
};
template<typename T>
using Optional = boost::optional<T>;
Continuable<> Log(std::string const& message)
{
return make_continuable([=](Callback<>&& callback)
{
std::cout << message << std::endl;
callback();
});
}
struct ResultSet
{
ResultSet(std::size_t affected_) :
affected(affected_) { };
std::size_t affected;
};
Continuable<ResultSet> AsyncQuery(std::string const& query)
{
return make_continuable([=](Callback<ResultSet>&& callback)
{
std::cout << query << std::endl;
callback(ResultSet(2));
});
}
// Original method taking an optional callback.
void CastSpell(int id, Optional<Callback<SpellCastResult>> const& callback = boost::none)
{
std::cout << "Casting " << id << std::endl;
// on success call the callback with SPELL_FAILED_SUCCESS
if (callback)
(*callback)(SPELL_FAILED_SUCCESS);
}
// Promise wrapped callback decorator.
Continuable<SpellCastResult> CastSpellPromise(int id)
{
return make_continuable([=](Callback<SpellCastResult>&& callback)
{
CastSpell(id, callback);
});
}
// Void instant returning continuable promise for testing purposes
Continuable<> TrivialPromise(std::string const& msg = "")
{
return Log(msg);
}
Continuable<bool> Validate()
{
return make_continuable([=](Callback<bool>&& callback)
{
std::cout << "Validate " << std::endl;
callback(true);
});
}
Continuable<std::unique_ptr<int>&&> MoveTest()
{
return make_continuable([=](Callback<std::unique_ptr<int>&&>&& callback)
{
// Move the unique ptr out to test moveability
std::unique_ptr<int> ptr(new int(5));
callback(std::move(ptr));
});
}
typedef std::unique_ptr<int> Moveable;
void testMoveAbleNormal(std::function<void(std::unique_ptr<int>&&)> callback)
{
std::unique_ptr<int> ptr(new int(5));
callback(std::move(ptr));
}
template <typename... T>
void test_unwrap(std::string const& msg)
{
std::cout << msg << " is unwrappable: " << (fu::is_unwrappable<T...>::value ? "true" : "false") << std::endl;
}
/*
namespace detail
{
template<typename, typename>
struct function_matches_to_args;
template<typename LeftReturn, typename... LeftArgs,
typename RightReturn, typename... RightArgs>
struct function_matches_to_args<
std::function<LeftReturn(LeftArgs...)>,
std::function<RightReturn(RightArgs...)>>
{
};
}
*/
template<typename Capture, typename Args, typename SeqOfCapture, typename SeqOfArgs>
class MoveCaptureLamda;
template<typename... Capture, typename... Args, std::size_t... SeqOfCapture, std::size_t... SeqOfArgs>
class MoveCaptureLamda
<
fu::identity<Capture...>,
fu::identity<void, Args...>,
fu::sequence<SeqOfCapture...>,
fu::sequence<SeqOfArgs...>
>
{
std::function<void(Capture..., Args...)> _functional;
std::tuple<Capture...> _capture;
public:
MoveCaptureLamda(std::function<void(Capture..., Args...)>&& functional, Capture&&... capture)
: _functional(std::move(functional)), _capture(std::make_tuple(std::forward<Capture>(capture)...)) { }
void operator() (Args&&... args)
{
_functional(std::move(std::get<SeqOfCapture>(_capture))..., std::forward<Args>(args)...);
}
};
template<typename... Capture, typename ReturnType, typename... Args, std::size_t... SeqOfCapture, std::size_t... SeqOfArgs>
class MoveCaptureLamda
<
fu::identity<Capture...>,
fu::identity<ReturnType, Args...>,
fu::sequence<SeqOfCapture...>,
fu::sequence<SeqOfArgs...>
>
{
std::function<ReturnType(Capture..., Args...)> _functional;
std::tuple<Capture...> _capture;
public:
MoveCaptureLamda(std::function<ReturnType(Capture..., Args...)>&& functional, Capture&&... capture)
: _functional(std::move(functional)), _capture(std::make_tuple(std::forward<Capture>(capture)...)) { }
ReturnType operator() (Args&&... args)
{
return _functional(std::move(std::get<SeqOfCapture>(_capture))..., std::forward<Args>(args)...);
}
};
/*
template<typename _CTy, typename... _ATy>
class continuable_returner
{
_CTy returning_continuable;
public:
continuable_returner(continuable_returner&& right)
{
returning_continuable = std::move(right.returning_continuable);
}
continuable_returner(_CTy&& returning_continuable_)
: returning_continuable(std::move(returning_continuable_)) { }
continuable_returner& operator= (continuable_returner&& right)
{
returning_continuable = std::move(right.returning_continuable);
return *this;
}
continuable_returner& operator= (continuable_returner& right)
{
// returning_continuable = std::move(right.returning_continuable);
return *this;
}
auto operator()(_ATy&&...)
-> _CTy
{
return std::move(returning_continuable);
}
};
*/
/*
template<typename T>
class copymove
{
T _content;
bool consumed;
copymove& move(copymove& right)
{
// _content = std::move(right._content);
return *this;
}
copymove& move(copymove&& right)
{
// _content = std::move(right._content);
return *this;
}
public:
copymove(copymove const& right) : consumed(false)
{
// move(right._content);
}
copymove(T& right) : consumed(false)
{
_content = std::move(right);
}
copymove& operator= (copymove& right)
{
return move(right);
}
T get()
{
assert(!consumed && "already consumed!");
consumed = true;
return move(right._content);
}
};
*/
class DispatcherPool
{
enum TerminationMode
{
NONE,
TERMINATE,
AWAIT
};
typedef std::function<void()> Callable;
std::vector<std::thread> _pool;
std::atomic<TerminationMode> _shutdown;
std::mutex _mutex;
std::condition_variable _condition;
moodycamel::BlockingConcurrentQueue<Callable> _queue;
public:
DispatcherPool() : DispatcherPool(std::thread::hardware_concurrency()) { }
DispatcherPool(unsigned int const threads) : _shutdown(NONE)
{
for (unsigned int i = 0; i < threads; ++i)
{
_pool.emplace_back([&, i]
{
// Reserve the consumer token
moodycamel::ConsumerToken token(_queue);
Callable callable;
while (_shutdown != TERMINATE)
{
if (_queue.try_dequeue(token, callable))
{
std::string msg = "Thread " + std::to_string(i) + " is dispatching...\n";
// std::cout << msg;
callable();
}
else
{
if (_shutdown == AWAIT)
break;
{
std::string msg = "Thread " + std::to_string(i) + " out of work...\n";
// std::cout << msg;
}
std::unique_lock<std::mutex> lock(_mutex);
// Lock until new tasks are added
_condition.wait(lock);
{
std::string msg = "Thread " + std::to_string(i) + " wakes up...\n";
// std::cout << msg;
}
}
}
});
}
}
~DispatcherPool()
{
Shutdown();
}
template<typename Functional>
void Dispatch(Functional&& functional)
{
_queue.enqueue(std::forward<Functional>(functional));
std::unique_lock<std::mutex> lock(_mutex);
_condition.notify_one();
}
void Shutdown()
{
_Shutdown(TERMINATE);
}
void Await()
{
_Shutdown(AWAIT);
}
void _Shutdown(TerminationMode const mode)
{
_shutdown = mode;
_condition.notify_all();
for (auto&& thread : _pool)
if (thread.joinable())
thread.join();
}
};
int main(int /*argc*/, char** /*argv*/)
{
/*
CastSpellPromise(1)
.then([](SpellCastResult)
{
return CastSpellPromise(2);
})
.then([](SpellCastResult)
{
std::cout << "Pause a callback (void test) " << std::endl;
})
.then(Validate())
.then(AsyncQuery("SELECT * FROM `users`")
.then([](ResultSet result)
{
// Evaluate result
std::size_t const affected = result.affected;
return Log(std::to_string(affected) + " rows affected\n");
})
)
.then(TrivialPromise("huhu"))
.then(CastSpellPromise(3))
.then(CastSpellPromise(4)
.then(CastSpellPromise(5))
)
.then(CastSpellPromise(6))
.then([](SpellCastResult)
{
return Validate();
});
MoveTest()
.then([](std::unique_ptr<int>&& ptr)
{
static_assert(std::is_rvalue_reference<decltype(ptr)>::value, "no rvalue");
// Error here
std::unique_ptr<int> other = std::move(ptr);
});
// Mockup of aggregate methods
make_continuable()
.all(
[] { return TrivialPromise(); },
[] { return TrivialPromise(); },
[] { return TrivialPromise(); }
)
.some(2, // Only 2 of 3 must complete
[] { return TrivialPromise(); },
[] { return TrivialPromise(); },
[] { return TrivialPromise(); }
)
.any( // Any of 2.
[] { return TrivialPromise(); },
[] { return TrivialPromise(); }
)
.then([]
{
std::cout << "Finished" << std::endl;
});
*/
//Continuable<bool> cb = make_continuable([](Callback<bool>&& callback)
//{
// callback(true);
//});
//test_unwrap<void()>("void()");
//test_unwrap<std::function<void()>>("std::function<void()>");
//test_unwrap<std::vector<std::string>>("std::vector<std::string>");
//make_continuable([=](Callback<>&&)
//{
//});
//int i = 0;
//++i;
//auto lam = [=](Callback<SpellCastResult>&&)
//{
// // on success call the callback with SPELL_FAILED_SUCCESS
// // callback(SPELL_FAILED_SUCCESS);
//};
//fu::function_type_of_t<decltype(lam)> fun1;
//fun1 = lam;
//fun1(Callback<SpellCastResult>());
//fu::function_type_of_t<Callback<int>> fun2;
//
//shared_callback_of_t<std::function<void(int)>> sc1;
//weak_callback_of_t<Callback<int>> sc2;
//
//make_weak_wrapped_callback(sc1);
//make_weak_wrapped_callback(sc2);
//WeakCallbackContainer callback;
//
//auto weakCallback = callback([]
//{
//});
//typedef Continuable<bool> cont123;
//typedef Continuable<bool> myty1;
//typedef Continuable<bool, float> myty2;
//// Convertible test
//
//// Continuable<Callback<SpellCastResult>> spell
//{
// auto stack =
// int iii = 0;
// iii = 1;
//}
//std::vector<int> myvec;
//typedef fu::requires_functional_constructible<std::function<void()>>::type test_assert1;
//// typedef fu::requires_functional_constructible<std::vector<int>>::type test_assert2;
//// Brainstorming: this shows an example callback chain
//// Given by continuable
//std::function<void(Callback<SpellCastResult>&&)> continuable_1 = [](Callback<SpellCastResult>&& callback)
//{
// callback(SPELL_FAILED_AFFECTING_COMBAT);
//};
//// Implemented by user
//std::function<std::function<void(Callback<bool>&&)>(SpellCastResult)> callback_by_user_1 = [](SpellCastResult)
//{
// // Given by continuable
// // Fn2
// return [](Callback<bool>&& callback)
// {
// callback(true);
// };
//};
//// Implemented by user
//std::function<std::function<void(Callback<>&&)>(bool)> cn2 = [](bool val)
//{
// // Finished
// std::cout << "Callback chain finished! -> " << val << std::endl;
// // Given by continuable (auto end)
// return [](Callback<>&&)
// {
// // Empty callback
// };
//};
//// Entry point
//std::function<void(Callback<bool>&&>)> entry = [continuable_1 /*= move*/, callback_by_user_1 /*given by the user (::then(...))*/]
// (std::function<void(Callback<bool>&&)>)
//{
// // Call with auto created wrapper by the continuable
// continuable_1([&](SpellCastResult result /*forward args*/)
// {
// // Wrapper functional to process unary or multiple promised callbacks
// // Returned from the user
// std::function<void(Callback<bool>&&)> fn2 = callback_by_user_1(/*forward args*/ result);
// return std::move(fn2);
// });
//};
//// Here we go
//entry();
detail::unary_chainer_t<
std::function<Continuable<bool>()>
>::callback_arguments_t args213987;
typedef detail::functional_traits<>::result_maker_of_t<
std::function<Continuable<bool>()>,
decltype(CastSpellPromise(2)),
decltype(TrivialPromise()),
std::function<Continuable<float, double>()>,
std::function<Continuable<>()>,
std::function<Continuable<bool>()>
> maker;
maker::arguments_t test282_args;
maker::partial_results_t test282_pack;
auto test282_size = maker::size;
// static_assert(std::is_same<>::value,
detail::concat_identities<fu::identity<int, bool, char>, fu::identity<float, double>>::type myt;
// fu::identity<detail::functional_traits<>::position<1>> i;
std::tuple<int, std::vector<int>> tup;
Moveable moveable(new int(7));
auto myargs = std::make_tuple(7, std::vector<int>({ 1, 2, 3 }), std::move(moveable));
std::function<int(int, std::vector<int>, Moveable&&)> lam = [](int given_i, std::vector<int> given_vec, Moveable&& moveable)
{
Moveable other = std::move(moveable);
++given_i;
return 1;
};
fu::invoke_from_tuple(lam, std::move(myargs));
fu::sequence_generator<2>::type seqtype;
fu::sequence_generator<1>::type zero_seqtype;
detail::multiple_when_all_chainer_t<
fu::identity<>,
fu::identity<
std::function<Continuable<>()>,
std::function<Continuable<std::string>()>
>
>::result_maker::partial_results_t myres123345;
/*
auto firstType = detail::multiple_when_all_chainer_t<
fu::identity<>,
fu::identity<
std::function<Continuable<SpellCastResult>()>,
std::function<Continuable<>()>,
std::function<Continuable<SpellCastResult>()>
>
>::make_when_all(
[]
{
// void
return CastSpellPromise(10);
},
[]
{
return make_continuable();
},
[]
{
return CastSpellPromise(20);
})
.then([](SpellCastResult, SpellCastResult)
{
})
.then([]
{
});
*/
make_continuable()
.all(
CastSpellPromise(10)
.then(CastSpellPromise(15)),
CastSpellPromise(20),
make_continuable([](Callback<bool, bool, double , std::unique_ptr<std::string>>&& callback)
{
callback(true, false, 0.3f, std::unique_ptr<std::string>(new std::string("oh, all work is done!")));
}),
TrivialPromise())
.then([](SpellCastResult r0, SpellCastResult r1, bool r2, bool r3, double r4, std::unique_ptr<std::string> message)
{
return TrivialPromise("Lets see... ").then(Log(*message));
})
.then([]
{
});
std::function<void()> callable = []
{
std::cout << "ok" << std::endl;
};
auto conv_test_1 = std::bind(callable);
conv_test_1(1, 1);
/*
continuable_returner<std::unique_ptr<int>> test26151_start(std::unique_ptr<int>(new int(5)));
continuable_returner<std::unique_ptr<int>> test26151_moved = std::move(test26151_start);
std::function<std::unique_ptr<int>()> test26151_fn = std::move(test26151_moved);
*/
// static_assert(fu::is_unwrappable<TestFunctor>::value, "not unwrappable!");
// std::function<void(int)> fntest = std::move(fn);
MoveCaptureLamda<fu::identity<std::unique_ptr<int>>, fu::identity<void, int>, fu::sequence_of_t<1>, fu::sequence_of_t<0>> capture(
[](std::unique_ptr<int> capture, int arg)
{
int i = 0;
++i;
}, std::unique_ptr<int>(new int(90)));
capture(1);
capture(2);
/*
std::unique_ptr<int> uptr(new int(90));
copymove<std::unique_ptr<int>> mycapture(std::move(uptr));
auto capt = [mycapture]
{
};
*/
DispatcherPool countPool(1);
DispatcherPool pool;
auto const seed = std::chrono::steady_clock::now().time_since_epoch().count();
std::mt19937 rng(static_cast<unsigned int>(seed));
std::uniform_int_distribution<int> gen(10, 150);
std::vector<int> container;
unsigned int counter = 0;
for (unsigned int run = 0; run < 2; ++run)
{
for (unsigned int i = 0; i < 20; ++i)
{
unsigned int wait = gen(rng);
++counter;
pool.Dispatch([&countPool, &container, i, run, wait]
{
// std::this_thread::sleep_for(std::chrono::milliseconds(wait));
std::string str = "Pass " + std::to_string(run) + " dispatching " + std::to_string(i) + " (" + std::to_string(wait) + "ms delay)" + "\n";
// std::cout << str;
countPool.Dispatch([&container, wait]
{
container.emplace_back(wait);
});
});
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
// std::cout << "Awaiting termination...\n";
// std::this_thread::sleep_for(std::chrono::seconds(5));
// std::this_thread::sleep_for(std::chrono::seconds(5));
pool.Await();
countPool.Await();
std::cout << container.size() << " == " << counter;
return 0;
}