mirror of
https://github.com/Naios/continuable.git
synced 2025-12-06 08:46:44 +08:00
more work
This commit is contained in:
parent
5636e67a3a
commit
8ccf651149
@ -54,9 +54,9 @@ endif()
|
||||
file(GLOB_RECURSE LIB_SOURCES include/*.cpp include/*.hpp include/*.h)
|
||||
add_library(Continuable STATIC ${LIB_SOURCES})
|
||||
|
||||
include_directories(include dep/concurrentqueue)
|
||||
include_directories(include testing)
|
||||
|
||||
set(TEST_SOURCES test.cpp mockup.cpp)
|
||||
file(GLOB TEST_SOURCES *.cpp *.hpp *.h)
|
||||
|
||||
add_executable(ContinuableTest ${TEST_SOURCES})
|
||||
|
||||
|
||||
710
Incubator.cpp
Normal file
710
Incubator.cpp
Normal file
@ -0,0 +1,710 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) 2015 Naios <naios-dev@live.de>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <utility>
|
||||
#include <memory>
|
||||
|
||||
#include "Callback.h"
|
||||
#include "WeakCallbackContainer.h"
|
||||
#include "Continuable.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <exception>
|
||||
#include <type_traits>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <typeinfo>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <random>
|
||||
|
||||
// #include "concurrentqueue.h"
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
enum SpellCastResult
|
||||
{
|
||||
SPELL_FAILED_SUCCESS = 0,
|
||||
SPELL_FAILED_AFFECTING_COMBAT = 1,
|
||||
SPELL_FAILED_ALREADY_AT_FULL_HEALTH = 2,
|
||||
SPELL_FAILED_ALREADY_AT_FULL_MANA = 3,
|
||||
SPELL_FAILED_ALREADY_AT_FULL_POWER = 4,
|
||||
SPELL_FAILED_ALREADY_BEING_TAMED = 5
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
using Optional = boost::optional<T>;
|
||||
|
||||
Continuable<> Log(std::string const& message)
|
||||
{
|
||||
return make_continuable([=](Callback<>&& callback)
|
||||
{
|
||||
std::cout << message << std::endl;
|
||||
callback();
|
||||
});
|
||||
}
|
||||
|
||||
struct ResultSet
|
||||
{
|
||||
ResultSet(std::size_t affected_) :
|
||||
affected(affected_) { };
|
||||
|
||||
std::size_t affected;
|
||||
};
|
||||
|
||||
Continuable<ResultSet> AsyncQuery(std::string const& query)
|
||||
{
|
||||
return make_continuable([=](Callback<ResultSet>&& callback)
|
||||
{
|
||||
std::cout << query << std::endl;
|
||||
callback(ResultSet(2));
|
||||
});
|
||||
}
|
||||
|
||||
// Original method taking an optional callback.
|
||||
void CastSpell(int id, Optional<Callback<SpellCastResult>> const& callback = boost::none)
|
||||
{
|
||||
std::cout << "Casting " << id << std::endl;
|
||||
|
||||
// on success call the callback with SPELL_FAILED_SUCCESS
|
||||
if (callback)
|
||||
(*callback)(SPELL_FAILED_SUCCESS);
|
||||
}
|
||||
|
||||
// Promise wrapped callback decorator.
|
||||
Continuable<SpellCastResult> CastSpellPromise(int id)
|
||||
{
|
||||
return make_continuable([=](Callback<SpellCastResult>&& callback)
|
||||
{
|
||||
CastSpell(id, callback);
|
||||
});
|
||||
}
|
||||
|
||||
// Void instant returning continuable promise for testing purposes
|
||||
Continuable<> TrivialPromise(std::string const& msg = "")
|
||||
{
|
||||
return Log(msg);
|
||||
}
|
||||
|
||||
Continuable<bool> Validate()
|
||||
{
|
||||
return make_continuable([=](Callback<bool>&& callback)
|
||||
{
|
||||
std::cout << "Validate " << std::endl;
|
||||
|
||||
callback(true);
|
||||
});
|
||||
}
|
||||
|
||||
Continuable<std::unique_ptr<int>&&> MoveTest()
|
||||
{
|
||||
return make_continuable([=](Callback<std::unique_ptr<int>&&>&& callback)
|
||||
{
|
||||
// Move the unique ptr out to test moveability
|
||||
std::unique_ptr<int> ptr(new int(5));
|
||||
callback(std::move(ptr));
|
||||
});
|
||||
}
|
||||
|
||||
typedef std::unique_ptr<int> Moveable;
|
||||
|
||||
void testMoveAbleNormal(std::function<void(std::unique_ptr<int>&&)> callback)
|
||||
{
|
||||
std::unique_ptr<int> ptr(new int(5));
|
||||
callback(std::move(ptr));
|
||||
}
|
||||
|
||||
template <typename... T>
|
||||
void test_unwrap(std::string const& msg)
|
||||
{
|
||||
std::cout << msg << " is unwrappable: " << (fu::is_unwrappable<T...>::value ? "true" : "false") << std::endl;
|
||||
}
|
||||
|
||||
#include <iostream>
|
||||
#include <atomic>
|
||||
#include <random>
|
||||
|
||||
// static std::atomic_size_t move_tracer_index = 0;
|
||||
|
||||
/// Class to trace construct, destruct, copy and move operations.
|
||||
/*
|
||||
class CopyMoveTracer
|
||||
{
|
||||
std::size_t id;
|
||||
std::size_t flags;
|
||||
std::size_t copied;
|
||||
std::size_t moved;
|
||||
|
||||
bool IsEnabled(std::size_t mask) const
|
||||
{
|
||||
// msvc warning silencer
|
||||
return (flags & mask) != 0;
|
||||
}
|
||||
|
||||
void Log(std::string const& msg) const
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
public:
|
||||
enum Flags : std::size_t
|
||||
{
|
||||
CAPTURE_NONE = 0x1,
|
||||
CAPTURE_CONSTRUCT = 0x1,
|
||||
CAPTURE_DESTRUCT = 0x2,
|
||||
CAPTURE_COPY = 0x4,
|
||||
CAPTURE_MOVE = 0x8,
|
||||
CAPTURE_ALL = CAPTURE_CONSTRUCT | CAPTURE_DESTRUCT | CAPTURE_COPY | CAPTURE_MOVE
|
||||
};
|
||||
|
||||
// Empty construct
|
||||
CopyMoveTracer() : id(++move_tracer_index), flags(CAPTURE_ALL), copied(0), moved(0)
|
||||
{
|
||||
if (IsEnabled(CAPTURE_CONSTRUCT))
|
||||
Log("Tracer constructed");
|
||||
}
|
||||
|
||||
// Construct with flags
|
||||
CopyMoveTracer(std::size_t flags_) : id(++move_tracer_index), flags(flags_), copied(0), moved(0)
|
||||
{
|
||||
if (IsEnabled(CAPTURE_CONSTRUCT))
|
||||
Log("Tracer constructed");
|
||||
}
|
||||
|
||||
// Copy construct
|
||||
CopyMoveTracer(CopyMoveTracer const& right) : id(move_tracer_index++), flags(right.flags), copied(0), moved(0)
|
||||
{
|
||||
if (IsEnabled(CAPTURE_COPY))
|
||||
Log("Tracer copy constructed");
|
||||
}
|
||||
|
||||
// Copy construct
|
||||
CopyMoveTracer(CopyMoveTracer&& right) : id(right.id), flags(right.flags), copied(0), moved(0)
|
||||
{
|
||||
if (IsEnabled(CAPTURE_COPY))
|
||||
Log("Tracer copy constructed");
|
||||
}
|
||||
};
|
||||
*/
|
||||
/*
|
||||
namespace detail
|
||||
{
|
||||
template<typename, typename>
|
||||
struct function_matches_to_args;
|
||||
|
||||
template<typename LeftReturn, typename... LeftArgs,
|
||||
typename RightReturn, typename... RightArgs>
|
||||
struct function_matches_to_args<
|
||||
std::function<LeftReturn(LeftArgs...)>,
|
||||
std::function<RightReturn(RightArgs...)>>
|
||||
{
|
||||
|
||||
};
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
class DispatcherPool
|
||||
{
|
||||
enum TerminationMode
|
||||
{
|
||||
NONE,
|
||||
TERMINATE,
|
||||
AWAIT
|
||||
};
|
||||
|
||||
typedef std::function<void()> Callable;
|
||||
|
||||
std::vector<std::thread> _pool;
|
||||
|
||||
std::atomic<TerminationMode> _shutdown;
|
||||
|
||||
std::mutex _mutex;
|
||||
|
||||
std::condition_variable _condition;
|
||||
|
||||
// moodycamel::ConcurrentQueue<Callable> _queue;
|
||||
|
||||
public:
|
||||
DispatcherPool() : DispatcherPool(std::thread::hardware_concurrency()) { }
|
||||
|
||||
DispatcherPool(unsigned int const threads) : _shutdown(NONE)
|
||||
{
|
||||
for (unsigned int i = 0; i < threads; ++i)
|
||||
{
|
||||
_pool.emplace_back([&, i]
|
||||
{
|
||||
// Reserve the consumer token
|
||||
// moodycamel::ConsumerToken token(_queue);
|
||||
|
||||
Callable callable;
|
||||
while (_shutdown != TERMINATE)
|
||||
{
|
||||
if (_queue.try_dequeue(token, callable))
|
||||
{
|
||||
std::string msg = "Thread " + std::to_string(i) + " is dispatching...\n";
|
||||
// std::cout << msg;
|
||||
callable();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_shutdown == AWAIT)
|
||||
break;
|
||||
|
||||
{
|
||||
std::string msg = "Thread " + std::to_string(i) + " out of work...\n";
|
||||
// std::cout << msg;
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
|
||||
// Lock until new tasks are added
|
||||
_condition.wait(lock);
|
||||
|
||||
{
|
||||
std::string msg = "Thread " + std::to_string(i) + " wakes up...\n";
|
||||
// std::cout << msg;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
~DispatcherPool()
|
||||
{
|
||||
Shutdown();
|
||||
}
|
||||
|
||||
template<typename Functional>
|
||||
void Dispatch(Functional&& functional)
|
||||
{
|
||||
_queue.enqueue(std::forward<Functional>(functional));
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_condition.notify_one();
|
||||
}
|
||||
|
||||
void Shutdown()
|
||||
{
|
||||
_Shutdown(TERMINATE);
|
||||
}
|
||||
|
||||
void Await()
|
||||
{
|
||||
_Shutdown(AWAIT);
|
||||
}
|
||||
|
||||
void _Shutdown(TerminationMode const mode)
|
||||
{
|
||||
_shutdown = mode;
|
||||
_condition.notify_all();
|
||||
for (auto&& thread : _pool)
|
||||
if (thread.joinable())
|
||||
thread.join();
|
||||
}
|
||||
};
|
||||
*/
|
||||
|
||||
void some_examples()
|
||||
{
|
||||
// CopyMoveTracer tracer;
|
||||
|
||||
/*
|
||||
CastSpellPromise(1)
|
||||
.then([](SpellCastResult)
|
||||
{
|
||||
return CastSpellPromise(2);
|
||||
})
|
||||
.then([](SpellCastResult)
|
||||
{
|
||||
std::cout << "Pause a callback (void test) " << std::endl;
|
||||
})
|
||||
.then(Validate())
|
||||
.then(AsyncQuery("SELECT * FROM `users`")
|
||||
.then([](ResultSet result)
|
||||
{
|
||||
// Evaluate result
|
||||
std::size_t const affected = result.affected;
|
||||
return Log(std::to_string(affected) + " rows affected\n");
|
||||
})
|
||||
)
|
||||
.then(TrivialPromise("huhu"))
|
||||
.then(CastSpellPromise(3))
|
||||
.then(CastSpellPromise(4)
|
||||
.then(CastSpellPromise(5))
|
||||
)
|
||||
.then(CastSpellPromise(6))
|
||||
.then([](SpellCastResult)
|
||||
{
|
||||
return Validate();
|
||||
});
|
||||
|
||||
MoveTest()
|
||||
.then([](std::unique_ptr<int>&& ptr)
|
||||
{
|
||||
static_assert(std::is_rvalue_reference<decltype(ptr)>::value, "no rvalue");
|
||||
|
||||
// Error here
|
||||
std::unique_ptr<int> other = std::move(ptr);
|
||||
});
|
||||
|
||||
// Mockup of aggregate methods
|
||||
make_continuable()
|
||||
.all(
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); }
|
||||
)
|
||||
.some(2, // Only 2 of 3 must complete
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); }
|
||||
)
|
||||
.any( // Any of 2.
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); }
|
||||
)
|
||||
.then([]
|
||||
{
|
||||
std::cout << "Finished" << std::endl;
|
||||
});
|
||||
*/
|
||||
//Continuable<bool> cb = make_continuable([](Callback<bool>&& callback)
|
||||
//{
|
||||
|
||||
// callback(true);
|
||||
//});
|
||||
|
||||
//test_unwrap<void()>("void()");
|
||||
//test_unwrap<std::function<void()>>("std::function<void()>");
|
||||
//test_unwrap<std::vector<std::string>>("std::vector<std::string>");
|
||||
|
||||
//make_continuable([=](Callback<>&&)
|
||||
//{
|
||||
|
||||
//});
|
||||
|
||||
//int i = 0;
|
||||
//++i;
|
||||
|
||||
//auto lam = [=](Callback<SpellCastResult>&&)
|
||||
//{
|
||||
// // on success call the callback with SPELL_FAILED_SUCCESS
|
||||
// // callback(SPELL_FAILED_SUCCESS);
|
||||
//};
|
||||
|
||||
//fu::function_type_of_t<decltype(lam)> fun1;
|
||||
//fun1 = lam;
|
||||
//fun1(Callback<SpellCastResult>());
|
||||
|
||||
//fu::function_type_of_t<Callback<int>> fun2;
|
||||
//
|
||||
//shared_callback_of_t<std::function<void(int)>> sc1;
|
||||
//weak_callback_of_t<Callback<int>> sc2;
|
||||
//
|
||||
//make_weak_wrapped_callback(sc1);
|
||||
//make_weak_wrapped_callback(sc2);
|
||||
|
||||
//WeakCallbackContainer callback;
|
||||
//
|
||||
//auto weakCallback = callback([]
|
||||
//{
|
||||
//});
|
||||
|
||||
//typedef Continuable<bool> cont123;
|
||||
|
||||
//typedef Continuable<bool> myty1;
|
||||
//typedef Continuable<bool, float> myty2;
|
||||
|
||||
//// Convertible test
|
||||
//
|
||||
//// Continuable<Callback<SpellCastResult>> spell
|
||||
//{
|
||||
// auto stack =
|
||||
|
||||
// int iii = 0;
|
||||
// iii = 1;
|
||||
//}
|
||||
|
||||
//std::vector<int> myvec;
|
||||
|
||||
//typedef fu::requires_functional_constructible<std::function<void()>>::type test_assert1;
|
||||
//// typedef fu::requires_functional_constructible<std::vector<int>>::type test_assert2;
|
||||
|
||||
//// Brainstorming: this shows an example callback chain
|
||||
//// Given by continuable
|
||||
//std::function<void(Callback<SpellCastResult>&&)> continuable_1 = [](Callback<SpellCastResult>&& callback)
|
||||
//{
|
||||
// callback(SPELL_FAILED_AFFECTING_COMBAT);
|
||||
//};
|
||||
|
||||
//// Implemented by user
|
||||
//std::function<std::function<void(Callback<bool>&&)>(SpellCastResult)> callback_by_user_1 = [](SpellCastResult)
|
||||
//{
|
||||
// // Given by continuable
|
||||
// // Fn2
|
||||
// return [](Callback<bool>&& callback)
|
||||
// {
|
||||
// callback(true);
|
||||
// };
|
||||
//};
|
||||
|
||||
//// Implemented by user
|
||||
//std::function<std::function<void(Callback<>&&)>(bool)> cn2 = [](bool val)
|
||||
//{
|
||||
// // Finished
|
||||
// std::cout << "Callback chain finished! -> " << val << std::endl;
|
||||
|
||||
// // Given by continuable (auto end)
|
||||
// return [](Callback<>&&)
|
||||
// {
|
||||
// // Empty callback
|
||||
// };
|
||||
//};
|
||||
|
||||
//// Entry point
|
||||
//std::function<void(Callback<bool>&&>)> entry = [continuable_1 /*= move*/, callback_by_user_1 /*given by the user (::then(...))*/]
|
||||
// (std::function<void(Callback<bool>&&)>)
|
||||
//{
|
||||
// // Call with auto created wrapper by the continuable
|
||||
// continuable_1([&](SpellCastResult result /*forward args*/)
|
||||
// {
|
||||
// // Wrapper functional to process unary or multiple promised callbacks
|
||||
// // Returned from the user
|
||||
// std::function<void(Callback<bool>&&)> fn2 = callback_by_user_1(/*forward args*/ result);
|
||||
// return std::move(fn2);
|
||||
// });
|
||||
//};
|
||||
|
||||
//// Here we go
|
||||
//entry();
|
||||
|
||||
detail::unary_chainer_t<
|
||||
std::function<Continuable<bool>()>
|
||||
>::callback_arguments_t args213987;
|
||||
|
||||
typedef detail::functional_traits<>::result_maker_of_t<
|
||||
|
||||
std::function<Continuable<bool>()>,
|
||||
decltype(CastSpellPromise(2)),
|
||||
decltype(TrivialPromise()),
|
||||
std::function<Continuable<float, double>()>,
|
||||
std::function<Continuable<>()>,
|
||||
std::function<Continuable<bool>()>
|
||||
|
||||
> maker;
|
||||
|
||||
maker::arguments_t test282_args;
|
||||
maker::partial_results_t test282_pack;
|
||||
auto test282_size = maker::size;
|
||||
|
||||
// static_assert(std::is_same<>::value,
|
||||
|
||||
detail::concat_identities<fu::identity<int, bool, char>, fu::identity<float, double>>::type myt;
|
||||
|
||||
// fu::identity<detail::functional_traits<>::position<1>> i;
|
||||
|
||||
std::tuple<int, std::vector<int>> tup;
|
||||
|
||||
Moveable moveable(new int(7));
|
||||
|
||||
auto myargs = std::make_tuple(7, std::vector<int>({ 1, 2, 3 }), std::move(moveable));
|
||||
|
||||
std::function<int(int, std::vector<int>, Moveable&&)> lam = [](int given_i, std::vector<int> given_vec, Moveable&& moveable)
|
||||
{
|
||||
Moveable other = std::move(moveable);
|
||||
|
||||
++given_i;
|
||||
return 1;
|
||||
};
|
||||
|
||||
fu::invoke_from_tuple(lam, std::move(myargs));
|
||||
|
||||
fu::sequence_generator<2>::type seqtype;
|
||||
fu::sequence_generator<1>::type zero_seqtype;
|
||||
|
||||
detail::multiple_when_all_chainer_t<
|
||||
fu::identity<>,
|
||||
fu::identity<
|
||||
std::function<Continuable<>()>,
|
||||
std::function<Continuable<std::string>()>
|
||||
>
|
||||
>::result_maker::partial_results_t myres123345;
|
||||
|
||||
/*
|
||||
auto firstType = detail::multiple_when_all_chainer_t<
|
||||
fu::identity<>,
|
||||
fu::identity<
|
||||
std::function<Continuable<SpellCastResult>()>,
|
||||
std::function<Continuable<>()>,
|
||||
std::function<Continuable<SpellCastResult>()>
|
||||
>
|
||||
>::make_when_all(
|
||||
[]
|
||||
{
|
||||
// void
|
||||
return CastSpellPromise(10);
|
||||
},
|
||||
[]
|
||||
{
|
||||
return make_continuable();
|
||||
},
|
||||
[]
|
||||
{
|
||||
return CastSpellPromise(20);
|
||||
})
|
||||
.then([](SpellCastResult, SpellCastResult)
|
||||
{
|
||||
|
||||
})
|
||||
.then([]
|
||||
{
|
||||
|
||||
});
|
||||
*/
|
||||
|
||||
make_continuable()
|
||||
.all(
|
||||
CastSpellPromise(10)
|
||||
.then(CastSpellPromise(15)),
|
||||
CastSpellPromise(20),
|
||||
make_continuable([](Callback<bool, bool, double , std::unique_ptr<std::string>>&& callback)
|
||||
{
|
||||
callback(true, false, 0.3f, std::unique_ptr<std::string>(new std::string("oh, all work is done!")));
|
||||
}),
|
||||
TrivialPromise())
|
||||
.then([](SpellCastResult r0, SpellCastResult r1, bool r2, bool r3, double r4, std::unique_ptr<std::string> message)
|
||||
{
|
||||
return TrivialPromise("Lets see... ").then(Log(*message));
|
||||
})
|
||||
.then([]
|
||||
{
|
||||
return Log("ok, now its really finished!").then(CastSpellPromise(2));
|
||||
});
|
||||
|
||||
// DispatcherPool countPool(1);
|
||||
|
||||
// DispatcherPool pool;
|
||||
|
||||
/*
|
||||
auto const seed = std::chrono::steady_clock::now().time_since_epoch().count();
|
||||
|
||||
std::mt19937 rng(static_cast<unsigned int>(seed));
|
||||
std::uniform_int_distribution<int> gen(10, 150);
|
||||
|
||||
std::vector<int> container;
|
||||
|
||||
unsigned int counter = 0;
|
||||
|
||||
for (unsigned int run = 0; run < 15; ++run)
|
||||
{
|
||||
for (unsigned int i = 0; i < 20; ++i)
|
||||
{
|
||||
unsigned int wait = gen(rng);
|
||||
|
||||
++counter;
|
||||
|
||||
pool.Dispatch([&countPool, &container, i, run, wait]
|
||||
{
|
||||
// std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
||||
std::string str = "Pass " + std::to_string(run) + " dispatching " + std::to_string(i) + " (" + std::to_string(wait) + "ms delay)" + "\n";
|
||||
// std::cout << str;
|
||||
|
||||
countPool.Dispatch([&container, wait]
|
||||
{
|
||||
container.emplace_back(wait);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
// std::cout << "Awaiting termination...\n";
|
||||
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
|
||||
pool.Await();
|
||||
countPool.Await();
|
||||
|
||||
std::cout << container.size() << " == " << counter;
|
||||
*/
|
||||
}
|
||||
|
||||
template<typename T, typename V>
|
||||
using cross_forward_t =
|
||||
typename std::conditional<
|
||||
std::is_rvalue_reference<T&&>::value,
|
||||
typename std::decay<V>::type&&,
|
||||
typename std::conditional<
|
||||
std::is_lvalue_reference<T&&>::value,
|
||||
typename std::decay<V>::type&,
|
||||
typename std::decay<V>::type
|
||||
>::type
|
||||
>::type;
|
||||
|
||||
template<typename T, typename V>
|
||||
cross_forward_t<T, V> cross_forward(V&& var)
|
||||
{
|
||||
return static_cast<cross_forward_t<T, V>&&>(var);
|
||||
}
|
||||
|
||||
struct TestContainer
|
||||
{
|
||||
std::shared_ptr<int> ptr;
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
TestContainer extract(T&& c)
|
||||
{
|
||||
return TestContainer{cross_forward<T>(c.ptr)};
|
||||
}
|
||||
|
||||
void test_cross_forward()
|
||||
{
|
||||
TestContainer con;
|
||||
|
||||
con.ptr = std::make_shared<int>(5);
|
||||
|
||||
static_assert(
|
||||
std::is_same<cross_forward_t<TestContainer&&, std::unique_ptr<int>&>,
|
||||
std::unique_ptr<int>&&>::value,
|
||||
"oO");
|
||||
|
||||
static_assert(
|
||||
std::is_same<cross_forward_t<TestContainer&, std::unique_ptr<int>&>,
|
||||
std::unique_ptr<int>&>::value,
|
||||
"oO");
|
||||
|
||||
extract(con);
|
||||
|
||||
extract(std::move(con));
|
||||
|
||||
int i = 0;
|
||||
++i;
|
||||
}
|
||||
|
||||
void test_incubator()
|
||||
{
|
||||
test_cross_forward();
|
||||
}
|
||||
@ -1,31 +0,0 @@
|
||||
This license applies to everything in this repository except that which
|
||||
is explicitly annotated as being written by other authors, i.e. the Boost
|
||||
queue (included in the benchmarks for comparison), Intel's TBB library (ditto),
|
||||
the CDSChecker tool (used for verification), the Relacy model checker (ditto),
|
||||
and Jeff Preshing's semaphore implementation (used in the blocking queue) which
|
||||
has a zlib license (embedded in blockingconcurrentqueue.h).
|
||||
|
||||
|
||||
Simplified BSD License:
|
||||
|
||||
Copyright (c) 2013-2015, Cameron Desrochers.
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without modification,
|
||||
are permitted provided that the following conditions are met:
|
||||
|
||||
- Redistributions of source code must retain the above copyright notice, this list of
|
||||
conditions and the following disclaimer.
|
||||
- Redistributions in binary form must reproduce the above copyright notice, this list of
|
||||
conditions and the following disclaimer in the documentation and/or other materials
|
||||
provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
|
||||
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
|
||||
TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
|
||||
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
@ -1,449 +0,0 @@
|
||||
# moodycamel::ConcurrentQueue<T>
|
||||
|
||||
An industrial-strength lock-free queue for C++.
|
||||
|
||||
Note: If all you need is a single-producer, single-consumer queue, I have [one of those too][spsc].
|
||||
|
||||
## Features
|
||||
|
||||
- Knock-your-socks-off [blazing fast performance][benchmarks].
|
||||
- Single-header implementation. Just drop it in your project.
|
||||
- Fully thread-safe lock-free queue. Use concurrently from any number of threads.
|
||||
- C++11 implementation -- elements are moved (instead of copied) where possible.
|
||||
- Templated, obviating the need to deal exclusively with pointers -- memory is managed for you.
|
||||
- No artificial limitations on element types or maximum count.
|
||||
- Memory can be allocated once up-front, or dynamically as needed.
|
||||
- Fully portable (no assembly; all is done through standard C++11 primitives).
|
||||
- Supports super-fast bulk operations.
|
||||
- Includes a low-overhead blocking version (BlockingConcurrentQueue).
|
||||
- Exception safe.
|
||||
|
||||
## Reasons to use
|
||||
|
||||
There are not that many full-fledged lock-free queues for C++. Boost has one, but it's limited to objects with trivial
|
||||
assignment operators and trivial destructors, for example. Intel's TBB queue isn't lock-free, and requires trivial constructors too.
|
||||
There's many academic papers that implement lock-free queues in C++, but usable source code is
|
||||
hard to find, and tests even more so.
|
||||
|
||||
This queue not only has less limitations than others (for the most part), but [it's also faster][benchmarks].
|
||||
It's been fairly well-tested, and offers advanced features like **bulk enqueueing/dequeueing**
|
||||
(which, with my new design, is much faster than one element at a time, approaching and even surpassing
|
||||
the speed of a non-concurrent queue even under heavy contention).
|
||||
|
||||
In short, there was a lock-free queue shaped hole in the C++ open-source universe, and I set out
|
||||
to fill it with the fastest, most complete, and well-tested design and implementation I could.
|
||||
The result is `moodycamel::ConcurrentQueue` :-)
|
||||
|
||||
## Reasons *not* to use
|
||||
|
||||
The fastest synchronization of all is the kind that never takes place. Fundamentally,
|
||||
concurrent data structures require some synchronization, and that takes time. Every effort
|
||||
was made, of course, to minimize the overhead, but if you can avoid sharing data between
|
||||
threads, do so!
|
||||
|
||||
Why use concurrent data structures at all, then? Because they're gosh darn convenient! (And, indeed,
|
||||
sometimes sharing data concurrently is unavoidable.)
|
||||
|
||||
My queue is **not linearizable** (see the next section on high-level design). The foundations of
|
||||
its design assume that producers are independent; if this is not the case, and your producers
|
||||
co-ordinate amongst themselves in some fashion, be aware that the elements won't necessarily
|
||||
come out of the queue in the same order they were put in *relative to the ordering formed by that co-ordination*
|
||||
(but they will still come out in the order they were put in by any *individual* producer). If this affects
|
||||
your use case, you may be better off with another implementation; either way, it's an important limitation
|
||||
to be aware of.
|
||||
|
||||
My queue is also **not NUMA aware**, and does a lot of memory re-use internally, meaning it probably doesn't
|
||||
scale particularly well on NUMA architectures; however, I don't know of any other lock-free queue that *is*
|
||||
NUMA aware (except for [SALSA][salsa], which is very cool, but has no publicly available implementation that I know of).
|
||||
|
||||
Finally, the queue is **not sequentially consistent**; there *is* a happens-before relationship between when an element is put
|
||||
in the queue and when it comes out, but other things (such as pumping the queue until it's empty) require more thought
|
||||
to get right in all eventualities, because explicit memory ordering may have to be done to get the desired effect. In other words,
|
||||
it can sometimes be difficult to use the queue correctly. This is why it's a good idea to follow the [samples][samples.md] where possible.
|
||||
On the other hand, the upside of this lack of sequential consistency is better performance.
|
||||
|
||||
## High-level design
|
||||
|
||||
Elements are stored internally using contiguous blocks instead of linked lists for better performance.
|
||||
The queue is made up of a collection of sub-queues, one for each producer. When a consumer
|
||||
wants to dequeue an element, it checks all the sub-queues until it finds one that's not empty.
|
||||
All of this is largely transparent to the user of the queue, however -- it mostly just works<sup>TM</sup>.
|
||||
|
||||
One particular consequence of this design, however, (which seems to be non-intuitive) is that if two producers
|
||||
enqueue at the same time, there is no defined ordering between the elements when they're later dequeued.
|
||||
Normally this is fine, because even with a fully linearizable queue there'd be a race between the producer
|
||||
threads and so you couldn't rely on the ordering anyway. However, if for some reason you do extra explicit synchronization
|
||||
between the two producer threads yourself, thus defining a total order between enqueue operations, you might expect
|
||||
that the elements would come out in the same total order, which is a guarantee my queue does not offer. At that
|
||||
point, though, there semantically aren't really two separate producers, but rather one that happens to be spread
|
||||
across multiple threads. In this case, you can still establish a total ordering with my queue by creating
|
||||
a single producer token, and using that from both threads to enqueue (taking care to synchronize access to the token,
|
||||
of course, but there was already extra synchronization involved anyway).
|
||||
|
||||
I've written a more detailed [overview of the internal design][blog], as well as [the full
|
||||
nitty-gritty details of the design][design], on my blog. Finally, the
|
||||
[source][source] itself is available for perusal for those interested in its implementation.
|
||||
|
||||
## Basic use
|
||||
|
||||
The entire queue's implementation is contained in **one header**, [`concurrentqueue.h`][concurrentqueue.h].
|
||||
Simply download and include that to use the queue. The blocking version is in a separate header,
|
||||
[`blockingconcurrentqueue.h`][blockingconcurrentqueue.h], that depends on the first.
|
||||
The implementation makes use of certain key C++11 features, so it requires a fairly recent compiler
|
||||
(e.g. VS2012+ or g++ 4.8; note that g++ 4.6 has a known bug with `std::atomic` and is thus not supported).
|
||||
The algorithm implementations themselves are platform independent.
|
||||
|
||||
Use it like you would any other templated queue, with the exception that you can use
|
||||
it from many threads at once :-)
|
||||
|
||||
Simple example:
|
||||
|
||||
#include "concurrentqueue.h"
|
||||
|
||||
moodycamel::ConcurrentQueue<int> q;
|
||||
q.enqueue(25);
|
||||
|
||||
int item;
|
||||
bool found = q.try_dequeue(item);
|
||||
assert(found && item == 25);
|
||||
|
||||
Description of basic methods:
|
||||
- `ConcurrentQueue(size_t initialSizeEstimate)`
|
||||
Constructor which optionally accepts an estimate of the number of elements the queue will hold
|
||||
- `enqueue(T&& item)`
|
||||
Enqueues one item, allocating extra space if necessary
|
||||
- `try_enqueue(T&& item)`
|
||||
Enqueues one item, but only if enough memory is already allocated
|
||||
- `try_dequeue(T& item)`
|
||||
Dequeues one item, returning true if an item was found or false if the queue appeared empty
|
||||
|
||||
Note that it is up to the user to ensure that the queue object is completely constructed before
|
||||
being used by any other threads (this includes making the memory effects of construction
|
||||
visible, possibly via a memory barrier). Similarly, it's important that all threads have
|
||||
finished using the queue (and the memory effects have fully propagated) before it is
|
||||
destructed.
|
||||
|
||||
There's usually two versions of each method, one "explicit" version that takes a user-allocated per-producer or
|
||||
per-consumer token, and one "implicit" version that works without tokens. Using the explicit methods is almost
|
||||
always faster (though not necessarily by a huge factor). Apart from performance, the primary distinction between them
|
||||
is their sub-queue allocation behaviour for enqueue operations: Using the implicit enqueue methods causes an
|
||||
automatically-allocated thread-local producer sub-queue to be allocated (it is marked for reuse once the thread exits).
|
||||
Explicit producers, on the other hand, are tied directly to their tokens' lifetimes (and are also recycled as needed).
|
||||
|
||||
Full API (pseudocode):
|
||||
|
||||
# Allocates more memory if necessary
|
||||
enqueue(item) : bool
|
||||
enqueue(prod_token, item) : bool
|
||||
enqueue_bulk(item_first, count) : bool
|
||||
enqueue_bulk(prod_token, item_first, count) : bool
|
||||
|
||||
# Fails if not enough memory to enqueue
|
||||
try_enqueue(item) : bool
|
||||
try_enqueue(prod_token, item) : bool
|
||||
try_enqueue_bulk(item_first, count) : bool
|
||||
try_enqueue_bulk(prod_token, item_first, count) : bool
|
||||
|
||||
# Attempts to dequeue from the queue (never allocates)
|
||||
try_dequeue(item&) : bool
|
||||
try_dequeue(cons_token, item&) : bool
|
||||
try_dequeue_bulk(item_first, max) : size_t
|
||||
try_dequeue_bulk(cons_token, item_first, max) : size_t
|
||||
|
||||
# If you happen to know which producer you want to dequeue from
|
||||
try_dequeue_from_producer(prod_token, item&) : bool
|
||||
try_dequeue_bulk_from_producer(prod_token, item_first, max) : size_t
|
||||
|
||||
# A not-necessarily-accurate count of the total number of elements
|
||||
size_approx() : size_t
|
||||
|
||||
## Blocking version
|
||||
|
||||
As mentioned above, a full blocking wrapper of the queue is provided that adds
|
||||
`wait_dequeue` and `wait_dequeue_bulk` methods in addition to the regular interface.
|
||||
This wrapper is extremely low-overhead, but slightly less fast than the non-blocking
|
||||
queue (due to the necessary bookkeeping involving a lightweight semaphore).
|
||||
|
||||
The only major caveat with the blocking version is that you must be careful not to
|
||||
destroy the queue while somebody is waiting on it. This generally means you need to
|
||||
know for certain that another element is going to come along before you call one of
|
||||
the blocking methods.
|
||||
|
||||
Blocking example:
|
||||
|
||||
#include "blockingconcurrentqueue.h"
|
||||
|
||||
moodycamel::BlockingConcurrentQueue<int> q;
|
||||
std::thread producer([&]() {
|
||||
for (int i = 0; i != 100; ++i) {
|
||||
q.enqueue(i);
|
||||
}
|
||||
});
|
||||
std::thread consumer([&]() {
|
||||
for (int i = 0; i != 100; ++i) {
|
||||
int item;
|
||||
q.wait_dequeue(item);
|
||||
assert(item == i);
|
||||
}
|
||||
});
|
||||
producer.join();
|
||||
consumer.join();
|
||||
|
||||
assert(q.size_approx() == 0);
|
||||
|
||||
## Advanced features
|
||||
|
||||
#### Tokens
|
||||
|
||||
The queue can take advantage of extra per-producer and per-consumer storage if
|
||||
it's available to speed up its operations. This takes the form of "tokens":
|
||||
You can create a consumer token and/or a producer token for each thread or task
|
||||
(tokens themselves are not thread-safe), and use the methods that accept a token
|
||||
as their first parameter:
|
||||
|
||||
moodycamel::ConcurrentQueue<int> q;
|
||||
|
||||
moodycamel::ProducerToken ptok(q);
|
||||
q.enqueue(ptok, 17);
|
||||
|
||||
moodycamel::ConsumerToken ctok(q);
|
||||
int item;
|
||||
q.try_dequeue(ctok, item);
|
||||
assert(item == 17);
|
||||
|
||||
If you happen to know which producer you want to consume from (e.g. in
|
||||
a single-producer, multi-consumer scenario), you can use the `try_dequeue_from_producer`
|
||||
methods, which accept a producer token instead of a consumer token, and cut some overhead.
|
||||
|
||||
Note that tokens work with the blocking version of the queue too.
|
||||
|
||||
When producing or consuming many elements, the most efficient way is to:
|
||||
|
||||
1. Use the bulk methods of the queue with tokens
|
||||
2. Failing that, use the bulk methods without tokens
|
||||
3. Failing that, use the single-item methods with tokens
|
||||
4. Failing that, use the single-item methods without tokens
|
||||
|
||||
Having said that, don't create tokens willy-nilly -- ideally there would be
|
||||
one token (of each kind) per thread. The queue will work with what it is
|
||||
given, but it performs best when used with tokens.
|
||||
|
||||
Note that tokens aren't actually tied to any given thread; it's not technically
|
||||
required that they be local to the thread, only that they be used by a single
|
||||
producer/consumer at a time.
|
||||
|
||||
#### Bulk operations
|
||||
|
||||
Thanks to the [novel design][blog] of the queue, it's just as easy to enqueue/dequeue multiple
|
||||
items as it is to do one at a time. This means that overhead can be cut drastically for
|
||||
bulk operations. Example syntax:
|
||||
|
||||
moodycamel::ConcurrentQueue<int> q;
|
||||
|
||||
int items[] = { 1, 2, 3, 4, 5 };
|
||||
q.enqueue_bulk(items, 5);
|
||||
|
||||
int results[5]; // Could also be any iterator
|
||||
size_t count = q.try_dequeue_bulk(results, 5);
|
||||
for (size_t i = 0; i != count; ++i) {
|
||||
assert(results[i] == items[i]);
|
||||
}
|
||||
|
||||
#### Preallocation (correctly using `try_enqueue`)
|
||||
|
||||
`try_enqueue`, unlike just plain `enqueue`, will never allocate memory. If there's not enough room in the
|
||||
queue, it simply returns false. The key to using this method properly, then, is to ensure enough space is
|
||||
pre-allocated for your desired maximum element count.
|
||||
|
||||
The constructor accepts a count of the number of elements that it should reserve space for. Because the
|
||||
queue works with blocks of elements, however, and not individual elements themselves, the value to pass
|
||||
in order to obtain an effective number of pre-allocated element slots is non-obvious.
|
||||
|
||||
First, be aware that the count passed is rounded up to the next multiple of the block size. Note that the
|
||||
default block size is 32 (this can be changed via the traits). Second, once a slot in a block has been
|
||||
enqueued to, that slot cannot be re-used until the rest of the block has completely been completely filled
|
||||
up and then completely emptied. This affects the number of blocks you need in order to account for the
|
||||
overhead of partially-filled blocks. Third, each producer (whether implicit or explicit) claims and recycles
|
||||
blocks in a different manner, which again affects the number of blocks you need to account for a desired number of
|
||||
usable slots.
|
||||
|
||||
Suppose you want the queue to be able to hold at least `N` elements at any given time. Without delving too
|
||||
deep into the rather arcane implementation details, here are some simple formulas for the number of elements
|
||||
to request for pre-allocation in such a case. Note the division is intended to be arithmetic division and not
|
||||
integer division (in order for `ceil()` to work).
|
||||
|
||||
For explicit producers (using tokens to enqueue):
|
||||
|
||||
(ceil(N / BLOCK_SIZE) + 1) * MAX_NUM_PRODUCERS * BLOCK_SIZ
|
||||
|
||||
For implicit producers (no tokens):
|
||||
|
||||
(ceil(N / BLOCK_SIZE) - 1 + 2 * MAX_NUM_PRODUCERS) * BLOCK_SIZE
|
||||
|
||||
When using mixed producer types:
|
||||
|
||||
((ceil(N / BLOCK_SIZE) - 1) * (MAX_EXPLICIT_PRODUCERS + 1) + 2 * (MAX_IMPLICIT_PRODUCERS + MAX_EXPLICIT_PRODUCERS)) * BLOCK_SIZE
|
||||
|
||||
If these formulas seem rather inconvenient, you can use the constructor overload that accepts the minimum
|
||||
number of elements (`N`) and the maximum number of producers and consumers directly, and let it do the
|
||||
computation for you.
|
||||
|
||||
Finally, it's important to note that because the queue is only eventually consistent and takes advantage of
|
||||
weak memory ordering for speed, there's always a possibility that under contention `try_enqueue` will fail
|
||||
even if the queue is correctly pre-sized for the desired number of elements. (e.g. A given thread may think that
|
||||
the queue's full even when that's no longer the case.) So no matter what, you still need to handle the failure
|
||||
case (perhaps looping until it succeeds), unless you don't mind dropping elements.
|
||||
|
||||
#### Exception safety
|
||||
|
||||
The queue is exception safe, and will never become corrupted if used with a type that may throw exceptions.
|
||||
The queue itself never throws any exceptions (operations fail gracefully (return false) if memory allocation
|
||||
fails instead of throwing `std::bad_alloc`).
|
||||
|
||||
It is important to note that the guarantees of exception safety only hold if the element type never throws
|
||||
from its destructor, and that any iterators passed into the queue (for bulk operations) never throw either.
|
||||
Note that in particular this means `std::back_inserter` iterators must be used with care, since the vector
|
||||
being inserted into may need to allocate and throw a `std::bad_alloc` exception from inside the iterator;
|
||||
so be sure to reserve enough capacity in the target container first if you do this.
|
||||
|
||||
The guarantees are presently as follows:
|
||||
- Enqueue operations are rolled back completely if an exception is thrown from an element's constructor.
|
||||
For bulk enqueue operations, this means that elements are copied instead of moved (in order to avoid
|
||||
having only some of the objects be moved in the event of an exception). Non-bulk enqueues always use
|
||||
the move constructor if one is available.
|
||||
- If the assignment operator throws during a dequeue operation (both single and bulk), the element(s) are
|
||||
considered dequeued regardless. In such a case, the dequeued elements are all properly destructed before
|
||||
the exception is propagated, but there's no way to get the elements themselves back.
|
||||
- Any exception that is thrown is propagated up the call stack, at which point the queue is in a consistent
|
||||
state.
|
||||
|
||||
Note: If any of your type's copy constructors/move constructors/assignment operators don't throw, be sure
|
||||
to annotate them with `noexcept`; this will avoid the exception-checking overhead in the queue where possible
|
||||
(even with zero-cost exceptions, there's still a code size impact that has to be taken into account).
|
||||
|
||||
#### Traits
|
||||
|
||||
The queue also supports a traits template argument which defines various types, constants,
|
||||
and the memory allocation and deallocation functions that are to be used by the queue. The typical pattern
|
||||
to providing your own traits is to create a class that inherits from the default traits
|
||||
and override only the values you wish to change. Example:
|
||||
|
||||
struct MyTraits : public moodycamel::ConcurrentQueueDefaultTraits
|
||||
{
|
||||
static const size_t BLOCK_SIZE = 256; // Use bigger blocks
|
||||
};
|
||||
|
||||
moodycamel::ConcurrentQueue<int, MyTraits> q;
|
||||
|
||||
#### How to dequeue types without calling the constructor
|
||||
|
||||
The normal way to dequeue an item is to pass in an existing object by reference, which
|
||||
is then assigned to internally by the queue (using the move-assignment operator if possible).
|
||||
This can pose a problem for types that are
|
||||
expensive to construct or don't have a default constructor; fortunately, there is a simple
|
||||
workaround: Create a wrapper class that copies the memory contents of the object when it
|
||||
is assigned by the queue (a poor man's move, essentially). Note that this only works if
|
||||
the object contains no internal pointers. Example:
|
||||
|
||||
struct MyObjectMover
|
||||
{
|
||||
inline void operator=(MyObject&& obj)
|
||||
{
|
||||
std::memcpy(data, &obj, sizeof(MyObject));
|
||||
|
||||
// TODO: Cleanup obj so that when it's destructed by the queue
|
||||
// it doesn't corrupt the data of the object we just moved it into
|
||||
}
|
||||
|
||||
inline MyObject& obj() { return *reinterpret_cast<MyObject*>(data); }
|
||||
|
||||
private:
|
||||
align(alignof(MyObject)) char data[sizeof(MyObject)];
|
||||
};
|
||||
|
||||
## Samples
|
||||
|
||||
There are some more detailed samples [here][samples.md]. The source of
|
||||
the [unit tests][unittest-src] and [benchmarks][benchmark-src] are available for reference as well.
|
||||
|
||||
## Benchmarks
|
||||
|
||||
See my blog post for some [benchmark results][benchmarks] (including versus `boost::lockfree::queue` and `tbb::concurrent_queue`),
|
||||
or run the benchmarks yourself (requires MinGW and certain GnuWin32 utilities to build on Windows, or a recent
|
||||
g++ on Linux):
|
||||
|
||||
cd build
|
||||
make benchmarks
|
||||
bin/benchmarks
|
||||
|
||||
The short version of the benchmarks is that it's so fast (especially the bulk methods), that if you're actually
|
||||
using the queue to *do* anything, the queue won't be your bottleneck.
|
||||
|
||||
## Tests (and bugs)
|
||||
|
||||
I've written quite a few unit tests as well as a randomized long-running fuzz tester. I also ran the
|
||||
core queue algorithm through the [CDSChecker][cdschecker] C++11 memory model model checker. Some of the
|
||||
inner algorithms were tested separately using the [Relacy][relacy] model checker, and full integration
|
||||
tests were also performed with Relacy.
|
||||
I've tested
|
||||
on Linux (Fedora 19) and Windows (7), but only on x86 processors so far (Intel and AMD). The code was
|
||||
written to be platform-independent, however, and should work across all processors and OSes.
|
||||
|
||||
Due to the complexity of the implementation and the difficult-to-test nature of lock-free code in general,
|
||||
there may still be bugs. If anyone is seeing buggy behaviour, I'd like to hear about it! (Especially if
|
||||
a unit test for it can be cooked up.) Just open an issue on GitHub.
|
||||
|
||||
## License
|
||||
|
||||
I'm releasing the source of this repository (with the exception of third-party code, i.e. the Boost queue
|
||||
(used in the benchmarks for comparison), Intel's TBB library (ditto), CDSChecker, Relacy, and Jeff Preshing's
|
||||
cross-platform semaphore, which all have their own licenses)
|
||||
under a [simplified BSD license][license].
|
||||
|
||||
Note that lock-free programming is a patent minefield, and this code may very
|
||||
well violate a pending patent (I haven't looked), though it does not to my present knowledge.
|
||||
I did design and implement this queue from scratch.
|
||||
|
||||
## Diving into the code
|
||||
|
||||
If you're interested in the source code itself, it helps to have a rough idea of how it's laid out. This
|
||||
section attempts to describe that.
|
||||
|
||||
The queue is formed of several basic parts (listed here in roughly the order they appear in the source). There's the
|
||||
helper functions (e.g. for rounding to a power of 2). There's the default traits of the queue, which contain the
|
||||
constants and malloc/free functions used by the queue. There's the producer and consumer tokens. Then there's the queue's
|
||||
public API itself, starting with the constructor, destructor, and swap/assignment methods. There's the public enqueue methods,
|
||||
which are all wrappers around a small set of private enqueue methods found later on. There's the dequeue methods, which are
|
||||
defined inline and are relatively straightforward.
|
||||
|
||||
Then there's all the main internal data structures. First, there's a lock-free free list, used for recycling spent blocks (elements
|
||||
are enqueued to blocks internally). Then there's the block structure itself, which has two different ways of tracking whether
|
||||
it's fully emptied or not (remember, given two parallel consumers, there's no way to know which one will finish first) depending on where it's used.
|
||||
Then there's a small base class for the two types of internal SPMC producer queues (one for explicit producers that holds onto memory
|
||||
but attempts to be faster, and one for implicit ones which attempt to recycle more memory back into the parent but is a little slower).
|
||||
The explicit producer is defined first, then the implicit one. They both contain the same general four methods: One to enqueue, one to
|
||||
dequeue, one to enqueue in bulk, and one to dequeue in bulk. (Obviously they have constructors and destructors too, and helper methods.)
|
||||
The main difference between them is how the block handling is done (they both use the same blocks, but in different ways, and map indices
|
||||
to them in different ways).
|
||||
|
||||
Finally, there's the miscellaneous internal methods: There's the ones that handle the initial block pool (populated when the queue is constructed),
|
||||
and an abstract block pool that comprises the initial pool and any blocks on the free list. There's ones that handle the producer list
|
||||
(a lock-free add-only linked list of all the producers in the system). There's ones that handle the implicit producer lookup table (which
|
||||
is really a sort of specialized TLS lookup). And then there's some helper methods for allocating and freeing objects, and the data members
|
||||
of the queue itself, followed lastly by the free-standing swap functions.
|
||||
|
||||
|
||||
[blog]: http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
|
||||
[design]: http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
|
||||
[samples.md]: https://github.com/cameron314/concurrentqueue/blob/master/samples.md
|
||||
[source]: https://github.com/cameron314/concurrentqueue
|
||||
[concurrentqueue.h]: https://github.com/cameron314/concurrentqueue/blob/master/concurrentqueue.h
|
||||
[blockingconcurrentqueue.h]: https://github.com/cameron314/concurrentqueue/blob/master/blockingconcurrentqueue.h
|
||||
[unittest-src]: https://github.com/cameron314/concurrentqueue/tree/master/tests/unittests
|
||||
[benchmarks]: http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++#benchmarks
|
||||
[benchmark-src]: https://github.com/cameron314/concurrentqueue/tree/master/benchmarks
|
||||
[license]: https://github.com/cameron314/concurrentqueue/blob/master/LICENSE.md
|
||||
[cdschecker]: http://demsky.eecs.uci.edu/c11modelchecker.html
|
||||
[relacy]: http://www.1024cores.net/home/relacy-race-detector
|
||||
[spsc]: https://github.com/cameron314/readerwriterqueue
|
||||
[salsa]: http://webee.technion.ac.il/~idish/ftp/spaa049-gidron.pdf
|
||||
@ -1,760 +0,0 @@
|
||||
// Provides an efficient blocking version of moodycamel::ConcurrentQueue.
|
||||
// ©2015 Cameron Desrochers. Distributed under the terms of the simplified
|
||||
// BSD license, available at the top of concurrentqueue.h.
|
||||
// Uses Jeff Preshing's semaphore implementation (under the terms of its
|
||||
// separate zlib license, embedded below).
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "concurrentqueue.h"
|
||||
#include <type_traits>
|
||||
#include <memory>
|
||||
|
||||
#if defined(_WIN32)
|
||||
// Avoid including windows.h in a header; we only need a handful of
|
||||
// items, so we'll redeclare them here (this is relatively safe since
|
||||
// the API generally has to remain stable between Windows versions).
|
||||
// I know this is an ugly hack but it still beats polluting the global
|
||||
// namespace with thousands of generic names or adding a .cpp for nothing.
|
||||
extern "C" {
|
||||
struct _SECURITY_ATTRIBUTES;
|
||||
__declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
|
||||
__declspec(dllimport) int __stdcall CloseHandle(void* hObject);
|
||||
__declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
|
||||
__declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
|
||||
}
|
||||
#elif defined(__MACH__)
|
||||
#include <mach/mach.h>
|
||||
#elif defined(__unix__)
|
||||
#include <semaphore.h>
|
||||
#endif
|
||||
|
||||
namespace moodycamel
|
||||
{
|
||||
namespace details
|
||||
{
|
||||
// Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
|
||||
// portable + lightweight semaphore implementations, originally from
|
||||
// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
|
||||
// LICENSE:
|
||||
// Copyright (c) 2015 Jeff Preshing
|
||||
//
|
||||
// This software is provided 'as-is', without any express or implied
|
||||
// warranty. In no event will the authors be held liable for any damages
|
||||
// arising from the use of this software.
|
||||
//
|
||||
// Permission is granted to anyone to use this software for any purpose,
|
||||
// including commercial applications, and to alter it and redistribute it
|
||||
// freely, subject to the following restrictions:
|
||||
//
|
||||
// 1. The origin of this software must not be misrepresented; you must not
|
||||
// claim that you wrote the original software. If you use this software
|
||||
// in a product, an acknowledgement in the product documentation would be
|
||||
// appreciated but is not required.
|
||||
// 2. Altered source versions must be plainly marked as such, and must not be
|
||||
// misrepresented as being the original software.
|
||||
// 3. This notice may not be removed or altered from any source distribution.
|
||||
namespace mpmc_sema
|
||||
{
|
||||
#if defined(_WIN32)
|
||||
class Semaphore
|
||||
{
|
||||
private:
|
||||
void* m_hSema;
|
||||
|
||||
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
|
||||
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
|
||||
|
||||
public:
|
||||
Semaphore(int initialCount = 0)
|
||||
{
|
||||
assert(initialCount >= 0);
|
||||
const long maxLong = 0x7fffffff;
|
||||
m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
|
||||
}
|
||||
|
||||
~Semaphore()
|
||||
{
|
||||
CloseHandle(m_hSema);
|
||||
}
|
||||
|
||||
void wait()
|
||||
{
|
||||
const unsigned long infinite = 0xffffffff;
|
||||
WaitForSingleObject(m_hSema, infinite);
|
||||
}
|
||||
|
||||
void signal(int count = 1)
|
||||
{
|
||||
ReleaseSemaphore(m_hSema, count, nullptr);
|
||||
}
|
||||
};
|
||||
#elif defined(__MACH__)
|
||||
//---------------------------------------------------------
|
||||
// Semaphore (Apple iOS and OSX)
|
||||
// Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
|
||||
//---------------------------------------------------------
|
||||
class Semaphore
|
||||
{
|
||||
private:
|
||||
semaphore_t m_sema;
|
||||
|
||||
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
|
||||
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
|
||||
|
||||
public:
|
||||
Semaphore(int initialCount = 0)
|
||||
{
|
||||
assert(initialCount >= 0);
|
||||
semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
|
||||
}
|
||||
|
||||
~Semaphore()
|
||||
{
|
||||
semaphore_destroy(mach_task_self(), m_sema);
|
||||
}
|
||||
|
||||
void wait()
|
||||
{
|
||||
semaphore_wait(m_sema);
|
||||
}
|
||||
|
||||
void signal()
|
||||
{
|
||||
semaphore_signal(m_sema);
|
||||
}
|
||||
|
||||
void signal(int count)
|
||||
{
|
||||
while (count-- > 0)
|
||||
{
|
||||
semaphore_signal(m_sema);
|
||||
}
|
||||
}
|
||||
};
|
||||
#elif defined(__unix__)
|
||||
//---------------------------------------------------------
|
||||
// Semaphore (POSIX, Linux)
|
||||
//---------------------------------------------------------
|
||||
class Semaphore
|
||||
{
|
||||
private:
|
||||
sem_t m_sema;
|
||||
|
||||
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
|
||||
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
|
||||
|
||||
public:
|
||||
Semaphore(int initialCount = 0)
|
||||
{
|
||||
assert(initialCount >= 0);
|
||||
sem_init(&m_sema, 0, initialCount);
|
||||
}
|
||||
|
||||
~Semaphore()
|
||||
{
|
||||
sem_destroy(&m_sema);
|
||||
}
|
||||
|
||||
void wait()
|
||||
{
|
||||
// http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
|
||||
int rc;
|
||||
do
|
||||
{
|
||||
rc = sem_wait(&m_sema);
|
||||
}
|
||||
while (rc == -1 && errno == EINTR);
|
||||
}
|
||||
|
||||
void signal()
|
||||
{
|
||||
sem_post(&m_sema);
|
||||
}
|
||||
|
||||
void signal(int count)
|
||||
{
|
||||
while (count-- > 0)
|
||||
{
|
||||
sem_post(&m_sema);
|
||||
}
|
||||
}
|
||||
};
|
||||
#else
|
||||
#error Unsupported platform! (No semaphore wrapper available)
|
||||
#endif
|
||||
|
||||
//---------------------------------------------------------
|
||||
// LightweightSemaphore
|
||||
//---------------------------------------------------------
|
||||
class LightweightSemaphore
|
||||
{
|
||||
public:
|
||||
typedef std::make_signed<std::size_t>::type ssize_t;
|
||||
|
||||
private:
|
||||
std::atomic<ssize_t> m_count;
|
||||
Semaphore m_sema;
|
||||
|
||||
void waitWithPartialSpinning()
|
||||
{
|
||||
ssize_t oldCount;
|
||||
// Is there a better way to set the initial spin count?
|
||||
// If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
|
||||
// as threads start hitting the kernel semaphore.
|
||||
int spin = 10000;
|
||||
while (--spin >= 0)
|
||||
{
|
||||
oldCount = m_count.load(std::memory_order_relaxed);
|
||||
if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
|
||||
return;
|
||||
std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
|
||||
}
|
||||
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
|
||||
if (oldCount <= 0)
|
||||
{
|
||||
m_sema.wait();
|
||||
}
|
||||
}
|
||||
|
||||
ssize_t waitManyWithPartialSpinning(ssize_t max)
|
||||
{
|
||||
assert(max > 0);
|
||||
ssize_t oldCount;
|
||||
int spin = 10000;
|
||||
while (--spin >= 0)
|
||||
{
|
||||
oldCount = m_count.load(std::memory_order_relaxed);
|
||||
if (oldCount > 0)
|
||||
{
|
||||
ssize_t newCount = oldCount > max ? oldCount - max : 0;
|
||||
if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
|
||||
return oldCount - newCount;
|
||||
}
|
||||
std::atomic_signal_fence(std::memory_order_acquire);
|
||||
}
|
||||
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
|
||||
if (oldCount <= 0)
|
||||
m_sema.wait();
|
||||
if (max > 1)
|
||||
return 1 + tryWaitMany(max - 1);
|
||||
return 1;
|
||||
}
|
||||
|
||||
public:
|
||||
LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
|
||||
{
|
||||
assert(initialCount >= 0);
|
||||
}
|
||||
|
||||
bool tryWait()
|
||||
{
|
||||
ssize_t oldCount = m_count.load(std::memory_order_relaxed);
|
||||
while (oldCount > 0)
|
||||
{
|
||||
if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void wait()
|
||||
{
|
||||
if (!tryWait())
|
||||
waitWithPartialSpinning();
|
||||
}
|
||||
|
||||
// Acquires between 0 and (greedily) max, inclusive
|
||||
ssize_t tryWaitMany(ssize_t max)
|
||||
{
|
||||
assert(max >= 0);
|
||||
ssize_t oldCount = m_count.load(std::memory_order_relaxed);
|
||||
while (oldCount > 0)
|
||||
{
|
||||
ssize_t newCount = oldCount > max ? oldCount - max : 0;
|
||||
if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
|
||||
return oldCount - newCount;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Acquires at least one, and (greedily) at most max
|
||||
ssize_t waitMany(ssize_t max)
|
||||
{
|
||||
assert(max >= 0);
|
||||
ssize_t result = tryWaitMany(max);
|
||||
if (result == 0 && max > 0)
|
||||
result = waitManyWithPartialSpinning(max);
|
||||
return result;
|
||||
}
|
||||
|
||||
void signal(ssize_t count = 1)
|
||||
{
|
||||
assert(count >= 0);
|
||||
ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
|
||||
ssize_t toRelease = -oldCount < count ? -oldCount : count;
|
||||
if (toRelease > 0)
|
||||
{
|
||||
m_sema.signal((int)toRelease);
|
||||
}
|
||||
}
|
||||
|
||||
ssize_t availableApprox() const
|
||||
{
|
||||
ssize_t count = m_count.load(std::memory_order_relaxed);
|
||||
return count > 0 ? count : 0;
|
||||
}
|
||||
};
|
||||
} // end namespace mpmc_sema
|
||||
} // end namespace details
|
||||
|
||||
|
||||
// This is a blocking version of the queue. It has an almost identical interface to
|
||||
// the normal non-blocking version, with the addition of various wait_dequeue() methods
|
||||
// and the removal of producer-specific dequeue methods.
|
||||
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
|
||||
class BlockingConcurrentQueue
|
||||
{
|
||||
private:
|
||||
typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
|
||||
typedef details::mpmc_sema::LightweightSemaphore LightweightSemaphore;
|
||||
|
||||
public:
|
||||
typedef typename ConcurrentQueue::producer_token_t producer_token_t;
|
||||
typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
|
||||
|
||||
typedef typename ConcurrentQueue::index_t index_t;
|
||||
typedef typename ConcurrentQueue::size_t size_t;
|
||||
typedef typename std::make_signed<size_t>::type ssize_t;
|
||||
|
||||
static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
|
||||
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
|
||||
static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
|
||||
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
|
||||
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
|
||||
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
|
||||
static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
|
||||
|
||||
public:
|
||||
// Creates a queue with at least `capacity` element slots; note that the
|
||||
// actual number of elements that can be inserted without additional memory
|
||||
// allocation depends on the number of producers and the block size (e.g. if
|
||||
// the block size is equal to `capacity`, only a single block will be allocated
|
||||
// up-front, which means only a single producer will be able to enqueue elements
|
||||
// without an extra allocation -- blocks aren't shared between producers).
|
||||
// This method is not thread safe -- it is up to the user to ensure that the
|
||||
// queue is fully constructed before it starts being used by other threads (this
|
||||
// includes making the memory effects of construction visible, possibly with a
|
||||
// memory barrier).
|
||||
explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
|
||||
: inner(capacity), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
|
||||
{
|
||||
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
|
||||
if (!sema) {
|
||||
MOODYCAMEL_THROW(std::bad_alloc());
|
||||
}
|
||||
}
|
||||
|
||||
BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
|
||||
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
|
||||
{
|
||||
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
|
||||
if (!sema) {
|
||||
MOODYCAMEL_THROW(std::bad_alloc());
|
||||
}
|
||||
}
|
||||
|
||||
// Disable copying and copy assignment
|
||||
BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
|
||||
BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
|
||||
|
||||
// Moving is supported, but note that it is *not* a thread-safe operation.
|
||||
// Nobody can use the queue while it's being moved, and the memory effects
|
||||
// of that move must be propagated to other threads before they can use it.
|
||||
// Note: When a queue is moved, its tokens are still valid but can only be
|
||||
// used with the destination queue (i.e. semantically they are moved along
|
||||
// with the queue itself).
|
||||
BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
|
||||
: inner(std::move(other.inner)), sema(std::move(other.sema))
|
||||
{ }
|
||||
|
||||
inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
|
||||
{
|
||||
return swap_internal(other);
|
||||
}
|
||||
|
||||
// Swaps this queue's state with the other's. Not thread-safe.
|
||||
// Swapping two queues does not invalidate their tokens, however
|
||||
// the tokens that were created for one queue must be used with
|
||||
// only the swapped queue (i.e. the tokens are tied to the
|
||||
// queue's movable state, not the object itself).
|
||||
inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
|
||||
{
|
||||
swap_internal(other);
|
||||
}
|
||||
|
||||
private:
|
||||
BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
|
||||
{
|
||||
if (this == &other) {
|
||||
return *this;
|
||||
}
|
||||
|
||||
inner.swap(other.inner);
|
||||
sema.swap(other.sema);
|
||||
return *this;
|
||||
}
|
||||
|
||||
public:
|
||||
// Enqueues a single item (by copying it).
|
||||
// Allocates memory if required. Only fails if memory allocation fails (or implicit
|
||||
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
|
||||
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
|
||||
// Thread-safe.
|
||||
inline bool enqueue(T const& item)
|
||||
{
|
||||
if (details::likely(inner.enqueue(item))) {
|
||||
sema->signal();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues a single item (by moving it, if possible).
|
||||
// Allocates memory if required. Only fails if memory allocation fails (or implicit
|
||||
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
|
||||
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
|
||||
// Thread-safe.
|
||||
inline bool enqueue(T&& item)
|
||||
{
|
||||
if (details::likely(inner.enqueue(std::move(item)))) {
|
||||
sema->signal();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues a single item (by copying it) using an explicit producer token.
|
||||
// Allocates memory if required. Only fails if memory allocation fails (or
|
||||
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
|
||||
// Thread-safe.
|
||||
inline bool enqueue(producer_token_t const& token, T const& item)
|
||||
{
|
||||
if (details::likely(inner.enqueue(token, item))) {
|
||||
sema->signal();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues a single item (by moving it, if possible) using an explicit producer token.
|
||||
// Allocates memory if required. Only fails if memory allocation fails (or
|
||||
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
|
||||
// Thread-safe.
|
||||
inline bool enqueue(producer_token_t const& token, T&& item)
|
||||
{
|
||||
if (details::likely(inner.enqueue(token, std::move(item)))) {
|
||||
sema->signal();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues several items.
|
||||
// Allocates memory if required. Only fails if memory allocation fails (or
|
||||
// implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
|
||||
// is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
|
||||
// Note: Use std::make_move_iterator if the elements should be moved instead of copied.
|
||||
// Thread-safe.
|
||||
template<typename It>
|
||||
inline bool enqueue_bulk(It itemFirst, size_t count)
|
||||
{
|
||||
if (details::likely(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
|
||||
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues several items using an explicit producer token.
|
||||
// Allocates memory if required. Only fails if memory allocation fails
|
||||
// (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
|
||||
// Note: Use std::make_move_iterator if the elements should be moved
|
||||
// instead of copied.
|
||||
// Thread-safe.
|
||||
template<typename It>
|
||||
inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
|
||||
{
|
||||
if (details::likely(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
|
||||
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues a single item (by copying it).
|
||||
// Does not allocate memory. Fails if not enough room to enqueue (or implicit
|
||||
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
|
||||
// is 0).
|
||||
// Thread-safe.
|
||||
inline bool try_enqueue(T const& item)
|
||||
{
|
||||
if (inner.try_enqueue(item)) {
|
||||
sema->signal();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues a single item (by moving it, if possible).
|
||||
// Does not allocate memory (except for one-time implicit producer).
|
||||
// Fails if not enough room to enqueue (or implicit production is
|
||||
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
|
||||
// Thread-safe.
|
||||
inline bool try_enqueue(T&& item)
|
||||
{
|
||||
if (inner.try_enqueue(std::move(item))) {
|
||||
sema->signal();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues a single item (by copying it) using an explicit producer token.
|
||||
// Does not allocate memory. Fails if not enough room to enqueue.
|
||||
// Thread-safe.
|
||||
inline bool try_enqueue(producer_token_t const& token, T const& item)
|
||||
{
|
||||
if (inner.try_enqueue(token, item)) {
|
||||
sema->signal();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues a single item (by moving it, if possible) using an explicit producer token.
|
||||
// Does not allocate memory. Fails if not enough room to enqueue.
|
||||
// Thread-safe.
|
||||
inline bool try_enqueue(producer_token_t const& token, T&& item)
|
||||
{
|
||||
if (inner.try_enqueue(token, std::move(item))) {
|
||||
sema->signal();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues several items.
|
||||
// Does not allocate memory (except for one-time implicit producer).
|
||||
// Fails if not enough room to enqueue (or implicit production is
|
||||
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
|
||||
// Note: Use std::make_move_iterator if the elements should be moved
|
||||
// instead of copied.
|
||||
// Thread-safe.
|
||||
template<typename It>
|
||||
inline bool try_enqueue_bulk(It itemFirst, size_t count)
|
||||
{
|
||||
if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
|
||||
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Enqueues several items using an explicit producer token.
|
||||
// Does not allocate memory. Fails if not enough room to enqueue.
|
||||
// Note: Use std::make_move_iterator if the elements should be moved
|
||||
// instead of copied.
|
||||
// Thread-safe.
|
||||
template<typename It>
|
||||
inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
|
||||
{
|
||||
if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
|
||||
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
// Attempts to dequeue from the queue.
|
||||
// Returns false if all producer streams appeared empty at the time they
|
||||
// were checked (so, the queue is likely but not guaranteed to be empty).
|
||||
// Never allocates. Thread-safe.
|
||||
template<typename U>
|
||||
inline bool try_dequeue(U& item)
|
||||
{
|
||||
if (sema->tryWait()) {
|
||||
while (!inner.try_dequeue(item)) {
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Attempts to dequeue from the queue using an explicit consumer token.
|
||||
// Returns false if all producer streams appeared empty at the time they
|
||||
// were checked (so, the queue is likely but not guaranteed to be empty).
|
||||
// Never allocates. Thread-safe.
|
||||
template<typename U>
|
||||
inline bool try_dequeue(consumer_token_t& token, U& item)
|
||||
{
|
||||
if (sema->tryWait()) {
|
||||
while (!inner.try_dequeue(token, item)) {
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// Attempts to dequeue several elements from the queue.
|
||||
// Returns the number of items actually dequeued.
|
||||
// Returns 0 if all producer streams appeared empty at the time they
|
||||
// were checked (so, the queue is likely but not guaranteed to be empty).
|
||||
// Never allocates. Thread-safe.
|
||||
template<typename It>
|
||||
inline size_t try_dequeue_bulk(It itemFirst, size_t max)
|
||||
{
|
||||
size_t count = 0;
|
||||
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
|
||||
while (count != max) {
|
||||
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
// Attempts to dequeue several elements from the queue using an explicit consumer token.
|
||||
// Returns the number of items actually dequeued.
|
||||
// Returns 0 if all producer streams appeared empty at the time they
|
||||
// were checked (so, the queue is likely but not guaranteed to be empty).
|
||||
// Never allocates. Thread-safe.
|
||||
template<typename It>
|
||||
inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
|
||||
{
|
||||
size_t count = 0;
|
||||
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
|
||||
while (count != max) {
|
||||
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
|
||||
// Blocks the current thread until there's something to dequeue, then
|
||||
// dequeues it.
|
||||
// Never allocates. Thread-safe.
|
||||
template<typename U>
|
||||
inline void wait_dequeue(U& item)
|
||||
{
|
||||
sema->wait();
|
||||
while (!inner.try_dequeue(item)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Blocks the current thread until there's something to dequeue, then
|
||||
// dequeues it using an explicit consumer token.
|
||||
// Never allocates. Thread-safe.
|
||||
template<typename U>
|
||||
inline void wait_dequeue(consumer_token_t& token, U& item)
|
||||
{
|
||||
sema->wait();
|
||||
while (!inner.try_dequeue(token, item)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Attempts to dequeue several elements from the queue.
|
||||
// Returns the number of items actually dequeued, which will
|
||||
// always be at least one (this method blocks until the queue
|
||||
// is non-empty) and at most max.
|
||||
// Never allocates. Thread-safe.
|
||||
template<typename It>
|
||||
inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
|
||||
{
|
||||
size_t count = 0;
|
||||
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
|
||||
while (count != max) {
|
||||
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
// Attempts to dequeue several elements from the queue using an explicit consumer token.
|
||||
// Returns the number of items actually dequeued, which will
|
||||
// always be at least one (this method blocks until the queue
|
||||
// is non-empty) and at most max.
|
||||
// Never allocates. Thread-safe.
|
||||
template<typename It>
|
||||
inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
|
||||
{
|
||||
size_t count = 0;
|
||||
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
|
||||
while (count != max) {
|
||||
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
// Returns an estimate of the total number of elements currently in the queue. This
|
||||
// estimate is only accurate if the queue has completely stabilized before it is called
|
||||
// (i.e. all enqueue and dequeue operations have completed and their memory effects are
|
||||
// visible on the calling thread, and no further operations start while this method is
|
||||
// being called).
|
||||
// Thread-safe.
|
||||
inline size_t size_approx() const
|
||||
{
|
||||
return (size_t)sema->availableApprox();
|
||||
}
|
||||
|
||||
|
||||
// Returns true if the underlying atomic variables used by
|
||||
// the queue are lock-free (they should be on most platforms).
|
||||
// Thread-safe.
|
||||
static bool is_lock_free()
|
||||
{
|
||||
return ConcurrentQueue::is_lock_free();
|
||||
}
|
||||
|
||||
|
||||
private:
|
||||
template<typename U>
|
||||
static inline U* create()
|
||||
{
|
||||
auto p = Traits::malloc(sizeof(U));
|
||||
return p != nullptr ? new (p) U : nullptr;
|
||||
}
|
||||
|
||||
template<typename U, typename A1>
|
||||
static inline U* create(A1&& a1)
|
||||
{
|
||||
auto p = Traits::malloc(sizeof(U));
|
||||
return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
|
||||
}
|
||||
|
||||
template<typename U>
|
||||
static inline void destroy(U* p)
|
||||
{
|
||||
if (p != nullptr) {
|
||||
p->~U();
|
||||
}
|
||||
Traits::free(p);
|
||||
}
|
||||
|
||||
private:
|
||||
ConcurrentQueue inner;
|
||||
std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
|
||||
};
|
||||
|
||||
|
||||
template<typename T, typename Traits>
|
||||
inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
|
||||
{
|
||||
a.swap(b);
|
||||
}
|
||||
|
||||
} // end namespace moodycamel
|
||||
File diff suppressed because it is too large
Load Diff
@ -1,375 +0,0 @@
|
||||
# Samples for moodycamel::ConcurrentQueue
|
||||
|
||||
Here are some example usage scenarios with sample code. Note that most
|
||||
use the simplest version of each available method for demonstration purposes,
|
||||
but they can all be adapted to use tokens and/or the corresponding bulk methods for
|
||||
extra speed.
|
||||
|
||||
|
||||
## Hello queue
|
||||
|
||||
ConcurrentQueue<int> q;
|
||||
|
||||
for (int i = 0; i != 123; ++i)
|
||||
q.enqueue(i);
|
||||
|
||||
int item;
|
||||
for (int i = 0; i != 123; ++i) {
|
||||
q.try_dequeue(item);
|
||||
assert(item == i);
|
||||
}
|
||||
|
||||
|
||||
## Hello concurrency
|
||||
|
||||
Basic example of how to use the queue from multiple threads, with no
|
||||
particular goal (i.e. it does nothing, but in an instructive way).
|
||||
|
||||
ConcurrentQueue<int> q;
|
||||
int dequeued[100] = { 0 };
|
||||
std::thread threads[20];
|
||||
|
||||
// Producers
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
threads[i] = std::thread([&](int i) {
|
||||
for (int j = 0; j != 10; ++j) {
|
||||
q.enqueue(i * 10 + j);
|
||||
}
|
||||
}, i);
|
||||
}
|
||||
|
||||
// Consumers
|
||||
for (int i = 10; i != 20; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
int item;
|
||||
for (int j = 0; j != 20; ++j) {
|
||||
if (q.try_dequeue(item)) {
|
||||
++dequeued[item];
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for all threads
|
||||
for (int i = 0; i != 20; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
// Collect any leftovers (could be some if e.g. consumers finish before producers)
|
||||
int item;
|
||||
while (q.try_dequeue(item)) {
|
||||
++dequeued[item];
|
||||
}
|
||||
|
||||
// Make sure everything went in and came back out!
|
||||
for (int i = 0; i != 100; ++i) {
|
||||
assert(dequeued[i] == 1);
|
||||
}
|
||||
|
||||
|
||||
## Bulk up
|
||||
|
||||
Same as previous example, but runs faster.
|
||||
|
||||
ConcurrentQueue<int> q;
|
||||
int dequeued[100] = { 0 };
|
||||
std::thread threads[20];
|
||||
|
||||
// Producers
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
threads[i] = std::thread([&](int i) {
|
||||
int items[10];
|
||||
for (int j = 0; j != 10; ++j) {
|
||||
items[j] = i * 10 + j;
|
||||
}
|
||||
q.enqueue_bulk(items, 10);
|
||||
}, i);
|
||||
}
|
||||
|
||||
// Consumers
|
||||
for (int i = 10; i != 20; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
int items[20];
|
||||
for (std::size_t count = q.try_dequeue_bulk(items, 20); count != 0; --count) {
|
||||
++dequeued[items[count - 1]];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for all threads
|
||||
for (int i = 0; i != 20; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
// Collect any leftovers (could be some if e.g. consumers finish before producers)
|
||||
int items[10];
|
||||
std::size_t count;
|
||||
while ((count = q.try_dequeue_bulk(items, 10)) != 0) {
|
||||
for (std::size_t i = 0; i != count; ++i) {
|
||||
++dequeued[items[i]];
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure everything went in and came back out!
|
||||
for (int i = 0; i != 100; ++i) {
|
||||
assert(dequeued[i] == 1);
|
||||
}
|
||||
|
||||
|
||||
## Producer/consumer model (simultaneous)
|
||||
|
||||
In this model, one set of threads is producing items,
|
||||
and the other is consuming them concurrently until all of
|
||||
them have been consumed. The counters are required to
|
||||
ensure that all items eventually get consumed.
|
||||
|
||||
ConcurrentQueue<Item> q;
|
||||
const int ProducerCount = 8;
|
||||
const int ConsumerCount = 8;
|
||||
std::thread producers[ProducerCount];
|
||||
std::thread consumers[ConsumerCount];
|
||||
std::atomic<int> doneProducers(0);
|
||||
std::atomic<int> doneConsumers(0);
|
||||
for (int i = 0; i != ProducerCount; ++i) {
|
||||
producers[i] = std::thread([&]() {
|
||||
while (produce) {
|
||||
q.enqueue(produceItem());
|
||||
}
|
||||
doneProducers.fetch_add(1, std::memory_order_release);
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != ConsumerCount; ++i) {
|
||||
consumers[i] = std::thread([&]() {
|
||||
Item item;
|
||||
bool itemsLeft;
|
||||
do {
|
||||
// It's important to fence (if the producers have finished) *before* dequeueing
|
||||
itemsLeft = doneProducers.load(std::memory_order_acquire) != ProducerCount;
|
||||
while (q.try_dequeue(item)) {
|
||||
itemsLeft = true;
|
||||
consumeItem(item);
|
||||
}
|
||||
} while (itemsLeft || doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == ConsumerCount);
|
||||
// The condition above is a bit tricky, but it's necessary to ensure that the
|
||||
// last consumer sees the memory effects of all the other consumers before it
|
||||
// calls try_dequeue for the last time
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != ProducerCount; ++i) {
|
||||
producers[i].join();
|
||||
}
|
||||
for (int i = 0; i != ConsumerCount; ++i) {
|
||||
consumers[i].join();
|
||||
}
|
||||
|
||||
## Producer/consumer model (simultaneous, blocking)
|
||||
|
||||
The blocking version is different, since either the number of elements being produced needs
|
||||
to be known ahead of time, or some other coordination is required to tell the consumers when
|
||||
to stop calling wait_dequeue (not shown here). This is necessary because otherwise a consumer
|
||||
could end up blocking forever -- and destroying a queue while a consumer is blocking on it leads
|
||||
to undefined behaviour.
|
||||
|
||||
BlockingConcurrentQueue<Item> q;
|
||||
const int ProducerCount = 8;
|
||||
const int ConsumerCount = 8;
|
||||
std::thread producers[ProducerCount];
|
||||
std::thread consumers[ConsumerCount];
|
||||
std::atomic<int> promisedElementsRemaining(ProducerCount * 1000);
|
||||
for (int i = 0; i != ProducerCount; ++i) {
|
||||
producers[i] = std::thread([&]() {
|
||||
for (int j = 0; j != 1000; ++j) {
|
||||
q.enqueue(produceItem());
|
||||
}
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != ConsumerCount; ++i) {
|
||||
consumers[i] = std::thread([&]() {
|
||||
Item item;
|
||||
while (promisedElementsRemaining.fetch_sub(1, std::memory_order_relaxed)) {
|
||||
q.wait_dequeue(item);
|
||||
consumeItem(item);
|
||||
}
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != ProducerCount; ++i) {
|
||||
producers[i].join();
|
||||
}
|
||||
for (int i = 0; i != ConsumerCount; ++i) {
|
||||
consumers[i].join();
|
||||
}
|
||||
|
||||
|
||||
## Producer/consumer model (separate stages)
|
||||
|
||||
ConcurrentQueue<Item> q;
|
||||
|
||||
// Production stage
|
||||
std::thread threads[8];
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
while (produce) {
|
||||
q.enqueue(produceItem());
|
||||
}
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
// Consumption stage
|
||||
std::atomic<int> doneConsumers(0);
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
Item item;
|
||||
do {
|
||||
while (q.try_dequeue(item)) {
|
||||
consumeItem(item);
|
||||
}
|
||||
// Loop again one last time if we're the last producer (with the acquired
|
||||
// memory effects of the other producers):
|
||||
} while (doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == 8);
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
Note that there's no point trying to use the blocking queue with this model, since
|
||||
there's no need to use the `wait` methods (all the elements are produced before any
|
||||
are consumed), and hence the complexity would be the same but with additional overhead.
|
||||
|
||||
|
||||
## Object pool
|
||||
|
||||
If you don't know what threads will be using the queue in advance,
|
||||
you can't really declare any long-term tokens. The obvious solution
|
||||
is to use the implicit methods (that don't take any tokens):
|
||||
|
||||
// A pool of 'Something' objects that can be safely accessed
|
||||
// from any thread
|
||||
class SomethingPool
|
||||
{
|
||||
public:
|
||||
Something getSomething()
|
||||
{
|
||||
Something obj;
|
||||
queue.try_dequeue(obj);
|
||||
|
||||
// If the dequeue succeeded, obj will be an object from the
|
||||
// thread pool, otherwise it will be the default-constructed
|
||||
// object as declared above
|
||||
return obj;
|
||||
}
|
||||
|
||||
void recycleSomething(Something&& obj)
|
||||
{
|
||||
queue.enqueue(std::move(obj));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
## Threadpool task queue
|
||||
|
||||
BlockingConcurrentQueue<Task> q;
|
||||
|
||||
// To create a task from any thread:
|
||||
q.enqueue(...);
|
||||
|
||||
// On threadpool threads:
|
||||
Task task;
|
||||
while (true) {
|
||||
q.wait_dequeue(task);
|
||||
|
||||
// Process task...
|
||||
}
|
||||
|
||||
|
||||
## Multithreaded game loop
|
||||
|
||||
BlockingConcurrentQueue<Task> q;
|
||||
std::atomic<int> pendingTasks(0);
|
||||
|
||||
// On threadpool threads:
|
||||
Task task;
|
||||
while (true) {
|
||||
q.wait_dequeue(task);
|
||||
|
||||
// Process task...
|
||||
|
||||
pendingTasks.fetch_add(-1, std::memory_order_release);
|
||||
}
|
||||
|
||||
// Whenever a new task needs to be processed for the frame:
|
||||
pendingTasks.fetch_add(1, std::memory_order_release);
|
||||
q.enqueue(...);
|
||||
|
||||
// To wait for all the frame's tasks to complete before rendering:
|
||||
while (pendingTasks.load(std::memory_order_acquire) != 0)
|
||||
continue;
|
||||
|
||||
// Alternatively you could help out the thread pool while waiting:
|
||||
while (pendingTasks.load(std::memory_order_acquire) != 0) {
|
||||
if (!q.try_dequeue(task)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Process task...
|
||||
|
||||
pendingTasks.fetch_add(-1, std::memory_order_release);
|
||||
}
|
||||
|
||||
|
||||
## Pump until empty
|
||||
|
||||
This might be useful if, for example, you want to process any remaining items
|
||||
in the queue before it's destroyed. Note that it is your responsibility
|
||||
to ensure that the memory effects of any enqueue operations you wish to see on
|
||||
the dequeue thread are visible (i.e. if you're waiting for a certain set of elements,
|
||||
you need to use memory fences to ensure that those elements are visible to the dequeue
|
||||
thread after they've been enqueued).
|
||||
|
||||
ConcurrentQueue<Item> q;
|
||||
|
||||
// Single-threaded pumping:
|
||||
Item item;
|
||||
while (q.try_dequeue(item)) {
|
||||
// Process item...
|
||||
}
|
||||
// q is guaranteed to be empty here, unless there is another thread enqueueing still or
|
||||
// there was another thread dequeueing at one point and its memory effects have not
|
||||
// yet been propagated to this thread.
|
||||
|
||||
// Multi-threaded pumping:
|
||||
std::thread threads[8];
|
||||
std::atomic<int> doneConsumers(0);
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
Item item;
|
||||
do {
|
||||
while (q.try_dequeue(item)) {
|
||||
// Process item...
|
||||
}
|
||||
} while (doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == 8);
|
||||
// If there are still enqueue operations happening on other threads,
|
||||
// then the queue may not be empty at this point. However, if all enqueue
|
||||
// operations completed before we finished pumping (and the propagation of
|
||||
// their memory effects too), and all dequeue operations apart from those
|
||||
// our threads did above completed before we finished pumping (and the
|
||||
// propagation of their memory effects too), then the queue is guaranteed
|
||||
// to be empty at this point.
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
|
||||
## Wait for a queue to become empty (without dequeueing)
|
||||
|
||||
You can't (robustly) :-) However, you can set up your own atomic counter and
|
||||
poll that instead (see the game loop example). If you're satisfied with merely an estimate, you can use
|
||||
`size_approx()`. Note that `size_approx()` may return 0 even if the queue is
|
||||
not completely empty, unless the queue has already stabilized first (no threads
|
||||
are enqueueing or dequeueing, and all memory effects of any previous operations
|
||||
have been propagated to the thread before it calls `size_approx()`).
|
||||
@ -60,8 +60,6 @@ Continuable<> make_continuable(Args&&...)
|
||||
return Continuable<>();
|
||||
}
|
||||
|
||||
|
||||
|
||||
template <typename... LeftArgs, typename... RightArgs>
|
||||
Continuable<> operator&& (Continuable<LeftArgs...>&&, Continuable<RightArgs...>&&)
|
||||
{
|
||||
|
||||
682
test.cpp
682
test.cpp
@ -1,638 +1,94 @@
|
||||
|
||||
#include "Callback.h"
|
||||
#include "WeakCallbackContainer.h"
|
||||
#include "Continuable.h"
|
||||
|
||||
#include <iostream>
|
||||
#include <exception>
|
||||
#include <type_traits>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <typeinfo>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <random>
|
||||
|
||||
// #include "concurrentqueue.h"
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
enum SpellCastResult
|
||||
{
|
||||
SPELL_FAILED_SUCCESS = 0,
|
||||
SPELL_FAILED_AFFECTING_COMBAT = 1,
|
||||
SPELL_FAILED_ALREADY_AT_FULL_HEALTH = 2,
|
||||
SPELL_FAILED_ALREADY_AT_FULL_MANA = 3,
|
||||
SPELL_FAILED_ALREADY_AT_FULL_POWER = 4,
|
||||
SPELL_FAILED_ALREADY_BEING_TAMED = 5
|
||||
};
|
||||
|
||||
template<typename T>
|
||||
using Optional = boost::optional<T>;
|
||||
|
||||
Continuable<> Log(std::string const& message)
|
||||
{
|
||||
return make_continuable([=](Callback<>&& callback)
|
||||
{
|
||||
std::cout << message << std::endl;
|
||||
callback();
|
||||
});
|
||||
}
|
||||
|
||||
struct ResultSet
|
||||
{
|
||||
ResultSet(std::size_t affected_) :
|
||||
affected(affected_) { };
|
||||
|
||||
std::size_t affected;
|
||||
};
|
||||
|
||||
Continuable<ResultSet> AsyncQuery(std::string const& query)
|
||||
{
|
||||
return make_continuable([=](Callback<ResultSet>&& callback)
|
||||
{
|
||||
std::cout << query << std::endl;
|
||||
callback(ResultSet(2));
|
||||
});
|
||||
}
|
||||
|
||||
// Original method taking an optional callback.
|
||||
void CastSpell(int id, Optional<Callback<SpellCastResult>> const& callback = boost::none)
|
||||
{
|
||||
std::cout << "Casting " << id << std::endl;
|
||||
|
||||
// on success call the callback with SPELL_FAILED_SUCCESS
|
||||
if (callback)
|
||||
(*callback)(SPELL_FAILED_SUCCESS);
|
||||
}
|
||||
|
||||
// Promise wrapped callback decorator.
|
||||
Continuable<SpellCastResult> CastSpellPromise(int id)
|
||||
{
|
||||
return make_continuable([=](Callback<SpellCastResult>&& callback)
|
||||
{
|
||||
CastSpell(id, callback);
|
||||
});
|
||||
}
|
||||
|
||||
// Void instant returning continuable promise for testing purposes
|
||||
Continuable<> TrivialPromise(std::string const& msg = "")
|
||||
{
|
||||
return Log(msg);
|
||||
}
|
||||
|
||||
Continuable<bool> Validate()
|
||||
{
|
||||
return make_continuable([=](Callback<bool>&& callback)
|
||||
{
|
||||
std::cout << "Validate " << std::endl;
|
||||
|
||||
callback(true);
|
||||
});
|
||||
}
|
||||
|
||||
Continuable<std::unique_ptr<int>&&> MoveTest()
|
||||
{
|
||||
return make_continuable([=](Callback<std::unique_ptr<int>&&>&& callback)
|
||||
{
|
||||
// Move the unique ptr out to test moveability
|
||||
std::unique_ptr<int> ptr(new int(5));
|
||||
callback(std::move(ptr));
|
||||
});
|
||||
}
|
||||
|
||||
typedef std::unique_ptr<int> Moveable;
|
||||
|
||||
void testMoveAbleNormal(std::function<void(std::unique_ptr<int>&&)> callback)
|
||||
{
|
||||
std::unique_ptr<int> ptr(new int(5));
|
||||
callback(std::move(ptr));
|
||||
}
|
||||
|
||||
template <typename... T>
|
||||
void test_unwrap(std::string const& msg)
|
||||
{
|
||||
std::cout << msg << " is unwrappable: " << (fu::is_unwrappable<T...>::value ? "true" : "false") << std::endl;
|
||||
}
|
||||
|
||||
#include <iostream>
|
||||
#include <atomic>
|
||||
#include <random>
|
||||
|
||||
// static std::atomic_size_t move_tracer_index = 0;
|
||||
|
||||
/// Class to trace construct, destruct, copy and move operations.
|
||||
/*
|
||||
class CopyMoveTracer
|
||||
{
|
||||
std::size_t id;
|
||||
std::size_t flags;
|
||||
std::size_t copied;
|
||||
std::size_t moved;
|
||||
|
||||
bool IsEnabled(std::size_t mask) const
|
||||
{
|
||||
// msvc warning silencer
|
||||
return (flags & mask) != 0;
|
||||
}
|
||||
|
||||
void Log(std::string const& msg) const
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
public:
|
||||
enum Flags : std::size_t
|
||||
{
|
||||
CAPTURE_NONE = 0x1,
|
||||
CAPTURE_CONSTRUCT = 0x1,
|
||||
CAPTURE_DESTRUCT = 0x2,
|
||||
CAPTURE_COPY = 0x4,
|
||||
CAPTURE_MOVE = 0x8,
|
||||
CAPTURE_ALL = CAPTURE_CONSTRUCT | CAPTURE_DESTRUCT | CAPTURE_COPY | CAPTURE_MOVE
|
||||
};
|
||||
|
||||
// Empty construct
|
||||
CopyMoveTracer() : id(++move_tracer_index), flags(CAPTURE_ALL), copied(0), moved(0)
|
||||
{
|
||||
if (IsEnabled(CAPTURE_CONSTRUCT))
|
||||
Log("Tracer constructed");
|
||||
}
|
||||
|
||||
// Construct with flags
|
||||
CopyMoveTracer(std::size_t flags_) : id(++move_tracer_index), flags(flags_), copied(0), moved(0)
|
||||
{
|
||||
if (IsEnabled(CAPTURE_CONSTRUCT))
|
||||
Log("Tracer constructed");
|
||||
}
|
||||
|
||||
// Copy construct
|
||||
CopyMoveTracer(CopyMoveTracer const& right) : id(move_tracer_index++), flags(right.flags), copied(0), moved(0)
|
||||
{
|
||||
if (IsEnabled(CAPTURE_COPY))
|
||||
Log("Tracer copy constructed");
|
||||
}
|
||||
|
||||
// Copy construct
|
||||
CopyMoveTracer(CopyMoveTracer&& right) : id(right.id), flags(right.flags), copied(0), moved(0)
|
||||
{
|
||||
if (IsEnabled(CAPTURE_COPY))
|
||||
Log("Tracer copy constructed");
|
||||
}
|
||||
};
|
||||
*/
|
||||
/*
|
||||
namespace detail
|
||||
{
|
||||
template<typename, typename>
|
||||
struct function_matches_to_args;
|
||||
|
||||
template<typename LeftReturn, typename... LeftArgs,
|
||||
typename RightReturn, typename... RightArgs>
|
||||
struct function_matches_to_args<
|
||||
std::function<LeftReturn(LeftArgs...)>,
|
||||
std::function<RightReturn(RightArgs...)>>
|
||||
{
|
||||
|
||||
};
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
class DispatcherPool
|
||||
{
|
||||
enum TerminationMode
|
||||
{
|
||||
NONE,
|
||||
TERMINATE,
|
||||
AWAIT
|
||||
};
|
||||
* Copyright (C) 2015 Naios <naios-dev@live.de>
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
typedef std::function<void()> Callable;
|
||||
#define CATCH_CONFIG_RUNNER
|
||||
|
||||
std::vector<std::thread> _pool;
|
||||
|
||||
std::atomic<TerminationMode> _shutdown;
|
||||
|
||||
std::mutex _mutex;
|
||||
|
||||
std::condition_variable _condition;
|
||||
|
||||
// moodycamel::ConcurrentQueue<Callable> _queue;
|
||||
|
||||
public:
|
||||
DispatcherPool() : DispatcherPool(std::thread::hardware_concurrency()) { }
|
||||
|
||||
DispatcherPool(unsigned int const threads) : _shutdown(NONE)
|
||||
{
|
||||
for (unsigned int i = 0; i < threads; ++i)
|
||||
{
|
||||
_pool.emplace_back([&, i]
|
||||
{
|
||||
// Reserve the consumer token
|
||||
// moodycamel::ConsumerToken token(_queue);
|
||||
|
||||
Callable callable;
|
||||
while (_shutdown != TERMINATE)
|
||||
{
|
||||
if (_queue.try_dequeue(token, callable))
|
||||
{
|
||||
std::string msg = "Thread " + std::to_string(i) + " is dispatching...\n";
|
||||
// std::cout << msg;
|
||||
callable();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_shutdown == AWAIT)
|
||||
break;
|
||||
|
||||
{
|
||||
std::string msg = "Thread " + std::to_string(i) + " out of work...\n";
|
||||
// std::cout << msg;
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
|
||||
// Lock until new tasks are added
|
||||
_condition.wait(lock);
|
||||
|
||||
{
|
||||
std::string msg = "Thread " + std::to_string(i) + " wakes up...\n";
|
||||
// std::cout << msg;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
~DispatcherPool()
|
||||
{
|
||||
Shutdown();
|
||||
}
|
||||
|
||||
template<typename Functional>
|
||||
void Dispatch(Functional&& functional)
|
||||
{
|
||||
_queue.enqueue(std::forward<Functional>(functional));
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_condition.notify_one();
|
||||
}
|
||||
|
||||
void Shutdown()
|
||||
{
|
||||
_Shutdown(TERMINATE);
|
||||
}
|
||||
|
||||
void Await()
|
||||
{
|
||||
_Shutdown(AWAIT);
|
||||
}
|
||||
|
||||
void _Shutdown(TerminationMode const mode)
|
||||
{
|
||||
_shutdown = mode;
|
||||
_condition.notify_all();
|
||||
for (auto&& thread : _pool)
|
||||
if (thread.joinable())
|
||||
thread.join();
|
||||
}
|
||||
};
|
||||
*/
|
||||
#include "catch.hpp"
|
||||
|
||||
void test_mockup();
|
||||
void test_incubator();
|
||||
|
||||
int main(int /*argc*/, char** /*argv*/)
|
||||
int main(int argc, char* const argv[])
|
||||
{
|
||||
test_mockup();
|
||||
|
||||
// CopyMoveTracer tracer;
|
||||
test_incubator();
|
||||
|
||||
/*
|
||||
CastSpellPromise(1)
|
||||
.then([](SpellCastResult)
|
||||
{
|
||||
return CastSpellPromise(2);
|
||||
})
|
||||
.then([](SpellCastResult)
|
||||
{
|
||||
std::cout << "Pause a callback (void test) " << std::endl;
|
||||
})
|
||||
.then(Validate())
|
||||
.then(AsyncQuery("SELECT * FROM `users`")
|
||||
.then([](ResultSet result)
|
||||
{
|
||||
// Evaluate result
|
||||
std::size_t const affected = result.affected;
|
||||
return Log(std::to_string(affected) + " rows affected\n");
|
||||
})
|
||||
)
|
||||
.then(TrivialPromise("huhu"))
|
||||
.then(CastSpellPromise(3))
|
||||
.then(CastSpellPromise(4)
|
||||
.then(CastSpellPromise(5))
|
||||
)
|
||||
.then(CastSpellPromise(6))
|
||||
.then([](SpellCastResult)
|
||||
{
|
||||
return Validate();
|
||||
});
|
||||
int const result = Catch::Session().run(argc, argv);
|
||||
|
||||
MoveTest()
|
||||
.then([](std::unique_ptr<int>&& ptr)
|
||||
{
|
||||
static_assert(std::is_rvalue_reference<decltype(ptr)>::value, "no rvalue");
|
||||
// Attach breakpoint here ,-)
|
||||
return result;
|
||||
}
|
||||
|
||||
// Error here
|
||||
std::unique_ptr<int> other = std::move(ptr);
|
||||
});
|
||||
template<typename T, typename V>
|
||||
using cross_forward_t =
|
||||
typename std::conditional<
|
||||
std::is_rvalue_reference<T&&>::value,
|
||||
typename std::decay<V>::type&&,
|
||||
typename std::conditional<
|
||||
std::is_lvalue_reference<T&&>::value,
|
||||
typename std::decay<V>::type&,
|
||||
typename std::decay<V>::type
|
||||
>::type
|
||||
>::type;
|
||||
|
||||
// Mockup of aggregate methods
|
||||
make_continuable()
|
||||
.all(
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); }
|
||||
)
|
||||
.some(2, // Only 2 of 3 must complete
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); }
|
||||
)
|
||||
.any( // Any of 2.
|
||||
[] { return TrivialPromise(); },
|
||||
[] { return TrivialPromise(); }
|
||||
)
|
||||
.then([]
|
||||
{
|
||||
std::cout << "Finished" << std::endl;
|
||||
});
|
||||
*/
|
||||
//Continuable<bool> cb = make_continuable([](Callback<bool>&& callback)
|
||||
//{
|
||||
template<typename T, typename V>
|
||||
cross_forward_t<T, V> cross_forward(V&& var)
|
||||
{
|
||||
return static_cast<cross_forward_t<T, V>&&>(var);
|
||||
}
|
||||
|
||||
// callback(true);
|
||||
//});
|
||||
struct TestContainer
|
||||
{
|
||||
std::shared_ptr<int> ptr;
|
||||
};
|
||||
|
||||
//test_unwrap<void()>("void()");
|
||||
//test_unwrap<std::function<void()>>("std::function<void()>");
|
||||
//test_unwrap<std::vector<std::string>>("std::vector<std::string>");
|
||||
template<typename T>
|
||||
TestContainer extract(T&& c)
|
||||
{
|
||||
return TestContainer{cross_forward<T>(c.ptr)};
|
||||
}
|
||||
|
||||
//make_continuable([=](Callback<>&&)
|
||||
//{
|
||||
TEST_CASE("CrossForward tests", "[CrossForward]")
|
||||
{
|
||||
TestContainer con;
|
||||
|
||||
//});
|
||||
con.ptr = std::make_shared<int>(0);
|
||||
|
||||
//int i = 0;
|
||||
//++i;
|
||||
static_assert(
|
||||
std::is_same<cross_forward_t<TestContainer&&, std::unique_ptr<int>&>,
|
||||
std::unique_ptr<int>&&>::value,
|
||||
"cross_forward returns wrong type!");
|
||||
|
||||
//auto lam = [=](Callback<SpellCastResult>&&)
|
||||
//{
|
||||
// // on success call the callback with SPELL_FAILED_SUCCESS
|
||||
// // callback(SPELL_FAILED_SUCCESS);
|
||||
//};
|
||||
static_assert(
|
||||
std::is_same<cross_forward_t<TestContainer&, std::unique_ptr<int>&>,
|
||||
std::unique_ptr<int>&>::value,
|
||||
"cross_forward returns wrong type!");
|
||||
|
||||
//fu::function_type_of_t<decltype(lam)> fun1;
|
||||
//fun1 = lam;
|
||||
//fun1(Callback<SpellCastResult>());
|
||||
|
||||
//fu::function_type_of_t<Callback<int>> fun2;
|
||||
//
|
||||
//shared_callback_of_t<std::function<void(int)>> sc1;
|
||||
//weak_callback_of_t<Callback<int>> sc2;
|
||||
//
|
||||
//make_weak_wrapped_callback(sc1);
|
||||
//make_weak_wrapped_callback(sc2);
|
||||
|
||||
//WeakCallbackContainer callback;
|
||||
//
|
||||
//auto weakCallback = callback([]
|
||||
//{
|
||||
//});
|
||||
|
||||
//typedef Continuable<bool> cont123;
|
||||
|
||||
//typedef Continuable<bool> myty1;
|
||||
//typedef Continuable<bool, float> myty2;
|
||||
|
||||
//// Convertible test
|
||||
//
|
||||
//// Continuable<Callback<SpellCastResult>> spell
|
||||
//{
|
||||
// auto stack =
|
||||
|
||||
// int iii = 0;
|
||||
// iii = 1;
|
||||
//}
|
||||
|
||||
//std::vector<int> myvec;
|
||||
|
||||
//typedef fu::requires_functional_constructible<std::function<void()>>::type test_assert1;
|
||||
//// typedef fu::requires_functional_constructible<std::vector<int>>::type test_assert2;
|
||||
|
||||
//// Brainstorming: this shows an example callback chain
|
||||
//// Given by continuable
|
||||
//std::function<void(Callback<SpellCastResult>&&)> continuable_1 = [](Callback<SpellCastResult>&& callback)
|
||||
//{
|
||||
// callback(SPELL_FAILED_AFFECTING_COMBAT);
|
||||
//};
|
||||
|
||||
//// Implemented by user
|
||||
//std::function<std::function<void(Callback<bool>&&)>(SpellCastResult)> callback_by_user_1 = [](SpellCastResult)
|
||||
//{
|
||||
// // Given by continuable
|
||||
// // Fn2
|
||||
// return [](Callback<bool>&& callback)
|
||||
// {
|
||||
// callback(true);
|
||||
// };
|
||||
//};
|
||||
|
||||
//// Implemented by user
|
||||
//std::function<std::function<void(Callback<>&&)>(bool)> cn2 = [](bool val)
|
||||
//{
|
||||
// // Finished
|
||||
// std::cout << "Callback chain finished! -> " << val << std::endl;
|
||||
|
||||
// // Given by continuable (auto end)
|
||||
// return [](Callback<>&&)
|
||||
// {
|
||||
// // Empty callback
|
||||
// };
|
||||
//};
|
||||
|
||||
//// Entry point
|
||||
//std::function<void(Callback<bool>&&>)> entry = [continuable_1 /*= move*/, callback_by_user_1 /*given by the user (::then(...))*/]
|
||||
// (std::function<void(Callback<bool>&&)>)
|
||||
//{
|
||||
// // Call with auto created wrapper by the continuable
|
||||
// continuable_1([&](SpellCastResult result /*forward args*/)
|
||||
// {
|
||||
// // Wrapper functional to process unary or multiple promised callbacks
|
||||
// // Returned from the user
|
||||
// std::function<void(Callback<bool>&&)> fn2 = callback_by_user_1(/*forward args*/ result);
|
||||
// return std::move(fn2);
|
||||
// });
|
||||
//};
|
||||
|
||||
//// Here we go
|
||||
//entry();
|
||||
|
||||
detail::unary_chainer_t<
|
||||
std::function<Continuable<bool>()>
|
||||
>::callback_arguments_t args213987;
|
||||
|
||||
typedef detail::functional_traits<>::result_maker_of_t<
|
||||
|
||||
std::function<Continuable<bool>()>,
|
||||
decltype(CastSpellPromise(2)),
|
||||
decltype(TrivialPromise()),
|
||||
std::function<Continuable<float, double>()>,
|
||||
std::function<Continuable<>()>,
|
||||
std::function<Continuable<bool>()>
|
||||
|
||||
> maker;
|
||||
|
||||
maker::arguments_t test282_args;
|
||||
maker::partial_results_t test282_pack;
|
||||
auto test282_size = maker::size;
|
||||
|
||||
// static_assert(std::is_same<>::value,
|
||||
|
||||
detail::concat_identities<fu::identity<int, bool, char>, fu::identity<float, double>>::type myt;
|
||||
|
||||
// fu::identity<detail::functional_traits<>::position<1>> i;
|
||||
|
||||
std::tuple<int, std::vector<int>> tup;
|
||||
|
||||
Moveable moveable(new int(7));
|
||||
|
||||
auto myargs = std::make_tuple(7, std::vector<int>({ 1, 2, 3 }), std::move(moveable));
|
||||
|
||||
std::function<int(int, std::vector<int>, Moveable&&)> lam = [](int given_i, std::vector<int> given_vec, Moveable&& moveable)
|
||||
SECTION("CrossForward - forward l-value references")
|
||||
{
|
||||
Moveable other = std::move(moveable);
|
||||
|
||||
++given_i;
|
||||
return 1;
|
||||
};
|
||||
|
||||
fu::invoke_from_tuple(lam, std::move(myargs));
|
||||
|
||||
fu::sequence_generator<2>::type seqtype;
|
||||
fu::sequence_generator<1>::type zero_seqtype;
|
||||
|
||||
detail::multiple_when_all_chainer_t<
|
||||
fu::identity<>,
|
||||
fu::identity<
|
||||
std::function<Continuable<>()>,
|
||||
std::function<Continuable<std::string>()>
|
||||
>
|
||||
>::result_maker::partial_results_t myres123345;
|
||||
|
||||
/*
|
||||
auto firstType = detail::multiple_when_all_chainer_t<
|
||||
fu::identity<>,
|
||||
fu::identity<
|
||||
std::function<Continuable<SpellCastResult>()>,
|
||||
std::function<Continuable<>()>,
|
||||
std::function<Continuable<SpellCastResult>()>
|
||||
>
|
||||
>::make_when_all(
|
||||
[]
|
||||
{
|
||||
// void
|
||||
return CastSpellPromise(10);
|
||||
},
|
||||
[]
|
||||
{
|
||||
return make_continuable();
|
||||
},
|
||||
[]
|
||||
{
|
||||
return CastSpellPromise(20);
|
||||
})
|
||||
.then([](SpellCastResult, SpellCastResult)
|
||||
{
|
||||
|
||||
})
|
||||
.then([]
|
||||
{
|
||||
|
||||
});
|
||||
*/
|
||||
|
||||
make_continuable()
|
||||
.all(
|
||||
CastSpellPromise(10)
|
||||
.then(CastSpellPromise(15)),
|
||||
CastSpellPromise(20),
|
||||
make_continuable([](Callback<bool, bool, double , std::unique_ptr<std::string>>&& callback)
|
||||
{
|
||||
callback(true, false, 0.3f, std::unique_ptr<std::string>(new std::string("oh, all work is done!")));
|
||||
}),
|
||||
TrivialPromise())
|
||||
.then([](SpellCastResult r0, SpellCastResult r1, bool r2, bool r3, double r4, std::unique_ptr<std::string> message)
|
||||
{
|
||||
return TrivialPromise("Lets see... ").then(Log(*message));
|
||||
})
|
||||
.then([]
|
||||
{
|
||||
return Log("ok, now its really finished!").then(CastSpellPromise(2));
|
||||
});
|
||||
|
||||
// DispatcherPool countPool(1);
|
||||
|
||||
// DispatcherPool pool;
|
||||
|
||||
/*
|
||||
auto const seed = std::chrono::steady_clock::now().time_since_epoch().count();
|
||||
|
||||
std::mt19937 rng(static_cast<unsigned int>(seed));
|
||||
std::uniform_int_distribution<int> gen(10, 150);
|
||||
|
||||
std::vector<int> container;
|
||||
|
||||
unsigned int counter = 0;
|
||||
|
||||
for (unsigned int run = 0; run < 15; ++run)
|
||||
{
|
||||
for (unsigned int i = 0; i < 20; ++i)
|
||||
{
|
||||
unsigned int wait = gen(rng);
|
||||
|
||||
++counter;
|
||||
|
||||
pool.Dispatch([&countPool, &container, i, run, wait]
|
||||
{
|
||||
// std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
||||
std::string str = "Pass " + std::to_string(run) + " dispatching " + std::to_string(i) + " (" + std::to_string(wait) + "ms delay)" + "\n";
|
||||
// std::cout << str;
|
||||
|
||||
countPool.Dispatch([&container, wait]
|
||||
{
|
||||
container.emplace_back(wait);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
extract(con);
|
||||
REQUIRE(con.ptr.get());
|
||||
}
|
||||
|
||||
// std::cout << "Awaiting termination...\n";
|
||||
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
|
||||
pool.Await();
|
||||
countPool.Await();
|
||||
|
||||
std::cout << container.size() << " == " << counter;
|
||||
*/
|
||||
|
||||
return 0;
|
||||
SECTION("CrossForward - forward r-value references")
|
||||
{
|
||||
extract(std::move(con));
|
||||
REQUIRE_FALSE(con.ptr.get());
|
||||
}
|
||||
}
|
||||
|
||||
9415
testing/catch.hpp
Normal file
9415
testing/catch.hpp
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user