more work

This commit is contained in:
Naios 2015-08-09 16:19:43 +02:00
parent 88ba45dd06
commit 83742f86ba
10 changed files with 10196 additions and 5774 deletions

View File

@ -54,9 +54,9 @@ endif()
file(GLOB_RECURSE LIB_SOURCES include/*.cpp include/*.hpp include/*.h) file(GLOB_RECURSE LIB_SOURCES include/*.cpp include/*.hpp include/*.h)
add_library(Continuable STATIC ${LIB_SOURCES}) add_library(Continuable STATIC ${LIB_SOURCES})
include_directories(include dep/concurrentqueue) include_directories(include testing)
set(TEST_SOURCES test.cpp mockup.cpp) file(GLOB TEST_SOURCES *.cpp *.hpp *.h)
add_executable(ContinuableTest ${TEST_SOURCES}) add_executable(ContinuableTest ${TEST_SOURCES})

710
Incubator.cpp Normal file
View File

@ -0,0 +1,710 @@
/*
* Copyright (C) 2015 Naios <naios-dev@live.de>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <utility>
#include <memory>
#include "Callback.h"
#include "WeakCallbackContainer.h"
#include "Continuable.h"
#include <iostream>
#include <exception>
#include <type_traits>
#include <string>
#include <vector>
#include <typeinfo>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <atomic>
#include <random>
// #include "concurrentqueue.h"
#include <boost/optional.hpp>
enum SpellCastResult
{
SPELL_FAILED_SUCCESS = 0,
SPELL_FAILED_AFFECTING_COMBAT = 1,
SPELL_FAILED_ALREADY_AT_FULL_HEALTH = 2,
SPELL_FAILED_ALREADY_AT_FULL_MANA = 3,
SPELL_FAILED_ALREADY_AT_FULL_POWER = 4,
SPELL_FAILED_ALREADY_BEING_TAMED = 5
};
template<typename T>
using Optional = boost::optional<T>;
Continuable<> Log(std::string const& message)
{
return make_continuable([=](Callback<>&& callback)
{
std::cout << message << std::endl;
callback();
});
}
struct ResultSet
{
ResultSet(std::size_t affected_) :
affected(affected_) { };
std::size_t affected;
};
Continuable<ResultSet> AsyncQuery(std::string const& query)
{
return make_continuable([=](Callback<ResultSet>&& callback)
{
std::cout << query << std::endl;
callback(ResultSet(2));
});
}
// Original method taking an optional callback.
void CastSpell(int id, Optional<Callback<SpellCastResult>> const& callback = boost::none)
{
std::cout << "Casting " << id << std::endl;
// on success call the callback with SPELL_FAILED_SUCCESS
if (callback)
(*callback)(SPELL_FAILED_SUCCESS);
}
// Promise wrapped callback decorator.
Continuable<SpellCastResult> CastSpellPromise(int id)
{
return make_continuable([=](Callback<SpellCastResult>&& callback)
{
CastSpell(id, callback);
});
}
// Void instant returning continuable promise for testing purposes
Continuable<> TrivialPromise(std::string const& msg = "")
{
return Log(msg);
}
Continuable<bool> Validate()
{
return make_continuable([=](Callback<bool>&& callback)
{
std::cout << "Validate " << std::endl;
callback(true);
});
}
Continuable<std::unique_ptr<int>&&> MoveTest()
{
return make_continuable([=](Callback<std::unique_ptr<int>&&>&& callback)
{
// Move the unique ptr out to test moveability
std::unique_ptr<int> ptr(new int(5));
callback(std::move(ptr));
});
}
typedef std::unique_ptr<int> Moveable;
void testMoveAbleNormal(std::function<void(std::unique_ptr<int>&&)> callback)
{
std::unique_ptr<int> ptr(new int(5));
callback(std::move(ptr));
}
template <typename... T>
void test_unwrap(std::string const& msg)
{
std::cout << msg << " is unwrappable: " << (fu::is_unwrappable<T...>::value ? "true" : "false") << std::endl;
}
#include <iostream>
#include <atomic>
#include <random>
// static std::atomic_size_t move_tracer_index = 0;
/// Class to trace construct, destruct, copy and move operations.
/*
class CopyMoveTracer
{
std::size_t id;
std::size_t flags;
std::size_t copied;
std::size_t moved;
bool IsEnabled(std::size_t mask) const
{
// msvc warning silencer
return (flags & mask) != 0;
}
void Log(std::string const& msg) const
{
}
public:
enum Flags : std::size_t
{
CAPTURE_NONE = 0x1,
CAPTURE_CONSTRUCT = 0x1,
CAPTURE_DESTRUCT = 0x2,
CAPTURE_COPY = 0x4,
CAPTURE_MOVE = 0x8,
CAPTURE_ALL = CAPTURE_CONSTRUCT | CAPTURE_DESTRUCT | CAPTURE_COPY | CAPTURE_MOVE
};
// Empty construct
CopyMoveTracer() : id(++move_tracer_index), flags(CAPTURE_ALL), copied(0), moved(0)
{
if (IsEnabled(CAPTURE_CONSTRUCT))
Log("Tracer constructed");
}
// Construct with flags
CopyMoveTracer(std::size_t flags_) : id(++move_tracer_index), flags(flags_), copied(0), moved(0)
{
if (IsEnabled(CAPTURE_CONSTRUCT))
Log("Tracer constructed");
}
// Copy construct
CopyMoveTracer(CopyMoveTracer const& right) : id(move_tracer_index++), flags(right.flags), copied(0), moved(0)
{
if (IsEnabled(CAPTURE_COPY))
Log("Tracer copy constructed");
}
// Copy construct
CopyMoveTracer(CopyMoveTracer&& right) : id(right.id), flags(right.flags), copied(0), moved(0)
{
if (IsEnabled(CAPTURE_COPY))
Log("Tracer copy constructed");
}
};
*/
/*
namespace detail
{
template<typename, typename>
struct function_matches_to_args;
template<typename LeftReturn, typename... LeftArgs,
typename RightReturn, typename... RightArgs>
struct function_matches_to_args<
std::function<LeftReturn(LeftArgs...)>,
std::function<RightReturn(RightArgs...)>>
{
};
}
*/
/*
class DispatcherPool
{
enum TerminationMode
{
NONE,
TERMINATE,
AWAIT
};
typedef std::function<void()> Callable;
std::vector<std::thread> _pool;
std::atomic<TerminationMode> _shutdown;
std::mutex _mutex;
std::condition_variable _condition;
// moodycamel::ConcurrentQueue<Callable> _queue;
public:
DispatcherPool() : DispatcherPool(std::thread::hardware_concurrency()) { }
DispatcherPool(unsigned int const threads) : _shutdown(NONE)
{
for (unsigned int i = 0; i < threads; ++i)
{
_pool.emplace_back([&, i]
{
// Reserve the consumer token
// moodycamel::ConsumerToken token(_queue);
Callable callable;
while (_shutdown != TERMINATE)
{
if (_queue.try_dequeue(token, callable))
{
std::string msg = "Thread " + std::to_string(i) + " is dispatching...\n";
// std::cout << msg;
callable();
}
else
{
if (_shutdown == AWAIT)
break;
{
std::string msg = "Thread " + std::to_string(i) + " out of work...\n";
// std::cout << msg;
}
std::unique_lock<std::mutex> lock(_mutex);
// Lock until new tasks are added
_condition.wait(lock);
{
std::string msg = "Thread " + std::to_string(i) + " wakes up...\n";
// std::cout << msg;
}
}
}
});
}
}
~DispatcherPool()
{
Shutdown();
}
template<typename Functional>
void Dispatch(Functional&& functional)
{
_queue.enqueue(std::forward<Functional>(functional));
std::unique_lock<std::mutex> lock(_mutex);
_condition.notify_one();
}
void Shutdown()
{
_Shutdown(TERMINATE);
}
void Await()
{
_Shutdown(AWAIT);
}
void _Shutdown(TerminationMode const mode)
{
_shutdown = mode;
_condition.notify_all();
for (auto&& thread : _pool)
if (thread.joinable())
thread.join();
}
};
*/
void some_examples()
{
// CopyMoveTracer tracer;
/*
CastSpellPromise(1)
.then([](SpellCastResult)
{
return CastSpellPromise(2);
})
.then([](SpellCastResult)
{
std::cout << "Pause a callback (void test) " << std::endl;
})
.then(Validate())
.then(AsyncQuery("SELECT * FROM `users`")
.then([](ResultSet result)
{
// Evaluate result
std::size_t const affected = result.affected;
return Log(std::to_string(affected) + " rows affected\n");
})
)
.then(TrivialPromise("huhu"))
.then(CastSpellPromise(3))
.then(CastSpellPromise(4)
.then(CastSpellPromise(5))
)
.then(CastSpellPromise(6))
.then([](SpellCastResult)
{
return Validate();
});
MoveTest()
.then([](std::unique_ptr<int>&& ptr)
{
static_assert(std::is_rvalue_reference<decltype(ptr)>::value, "no rvalue");
// Error here
std::unique_ptr<int> other = std::move(ptr);
});
// Mockup of aggregate methods
make_continuable()
.all(
[] { return TrivialPromise(); },
[] { return TrivialPromise(); },
[] { return TrivialPromise(); }
)
.some(2, // Only 2 of 3 must complete
[] { return TrivialPromise(); },
[] { return TrivialPromise(); },
[] { return TrivialPromise(); }
)
.any( // Any of 2.
[] { return TrivialPromise(); },
[] { return TrivialPromise(); }
)
.then([]
{
std::cout << "Finished" << std::endl;
});
*/
//Continuable<bool> cb = make_continuable([](Callback<bool>&& callback)
//{
// callback(true);
//});
//test_unwrap<void()>("void()");
//test_unwrap<std::function<void()>>("std::function<void()>");
//test_unwrap<std::vector<std::string>>("std::vector<std::string>");
//make_continuable([=](Callback<>&&)
//{
//});
//int i = 0;
//++i;
//auto lam = [=](Callback<SpellCastResult>&&)
//{
// // on success call the callback with SPELL_FAILED_SUCCESS
// // callback(SPELL_FAILED_SUCCESS);
//};
//fu::function_type_of_t<decltype(lam)> fun1;
//fun1 = lam;
//fun1(Callback<SpellCastResult>());
//fu::function_type_of_t<Callback<int>> fun2;
//
//shared_callback_of_t<std::function<void(int)>> sc1;
//weak_callback_of_t<Callback<int>> sc2;
//
//make_weak_wrapped_callback(sc1);
//make_weak_wrapped_callback(sc2);
//WeakCallbackContainer callback;
//
//auto weakCallback = callback([]
//{
//});
//typedef Continuable<bool> cont123;
//typedef Continuable<bool> myty1;
//typedef Continuable<bool, float> myty2;
//// Convertible test
//
//// Continuable<Callback<SpellCastResult>> spell
//{
// auto stack =
// int iii = 0;
// iii = 1;
//}
//std::vector<int> myvec;
//typedef fu::requires_functional_constructible<std::function<void()>>::type test_assert1;
//// typedef fu::requires_functional_constructible<std::vector<int>>::type test_assert2;
//// Brainstorming: this shows an example callback chain
//// Given by continuable
//std::function<void(Callback<SpellCastResult>&&)> continuable_1 = [](Callback<SpellCastResult>&& callback)
//{
// callback(SPELL_FAILED_AFFECTING_COMBAT);
//};
//// Implemented by user
//std::function<std::function<void(Callback<bool>&&)>(SpellCastResult)> callback_by_user_1 = [](SpellCastResult)
//{
// // Given by continuable
// // Fn2
// return [](Callback<bool>&& callback)
// {
// callback(true);
// };
//};
//// Implemented by user
//std::function<std::function<void(Callback<>&&)>(bool)> cn2 = [](bool val)
//{
// // Finished
// std::cout << "Callback chain finished! -> " << val << std::endl;
// // Given by continuable (auto end)
// return [](Callback<>&&)
// {
// // Empty callback
// };
//};
//// Entry point
//std::function<void(Callback<bool>&&>)> entry = [continuable_1 /*= move*/, callback_by_user_1 /*given by the user (::then(...))*/]
// (std::function<void(Callback<bool>&&)>)
//{
// // Call with auto created wrapper by the continuable
// continuable_1([&](SpellCastResult result /*forward args*/)
// {
// // Wrapper functional to process unary or multiple promised callbacks
// // Returned from the user
// std::function<void(Callback<bool>&&)> fn2 = callback_by_user_1(/*forward args*/ result);
// return std::move(fn2);
// });
//};
//// Here we go
//entry();
detail::unary_chainer_t<
std::function<Continuable<bool>()>
>::callback_arguments_t args213987;
typedef detail::functional_traits<>::result_maker_of_t<
std::function<Continuable<bool>()>,
decltype(CastSpellPromise(2)),
decltype(TrivialPromise()),
std::function<Continuable<float, double>()>,
std::function<Continuable<>()>,
std::function<Continuable<bool>()>
> maker;
maker::arguments_t test282_args;
maker::partial_results_t test282_pack;
auto test282_size = maker::size;
// static_assert(std::is_same<>::value,
detail::concat_identities<fu::identity<int, bool, char>, fu::identity<float, double>>::type myt;
// fu::identity<detail::functional_traits<>::position<1>> i;
std::tuple<int, std::vector<int>> tup;
Moveable moveable(new int(7));
auto myargs = std::make_tuple(7, std::vector<int>({ 1, 2, 3 }), std::move(moveable));
std::function<int(int, std::vector<int>, Moveable&&)> lam = [](int given_i, std::vector<int> given_vec, Moveable&& moveable)
{
Moveable other = std::move(moveable);
++given_i;
return 1;
};
fu::invoke_from_tuple(lam, std::move(myargs));
fu::sequence_generator<2>::type seqtype;
fu::sequence_generator<1>::type zero_seqtype;
detail::multiple_when_all_chainer_t<
fu::identity<>,
fu::identity<
std::function<Continuable<>()>,
std::function<Continuable<std::string>()>
>
>::result_maker::partial_results_t myres123345;
/*
auto firstType = detail::multiple_when_all_chainer_t<
fu::identity<>,
fu::identity<
std::function<Continuable<SpellCastResult>()>,
std::function<Continuable<>()>,
std::function<Continuable<SpellCastResult>()>
>
>::make_when_all(
[]
{
// void
return CastSpellPromise(10);
},
[]
{
return make_continuable();
},
[]
{
return CastSpellPromise(20);
})
.then([](SpellCastResult, SpellCastResult)
{
})
.then([]
{
});
*/
make_continuable()
.all(
CastSpellPromise(10)
.then(CastSpellPromise(15)),
CastSpellPromise(20),
make_continuable([](Callback<bool, bool, double , std::unique_ptr<std::string>>&& callback)
{
callback(true, false, 0.3f, std::unique_ptr<std::string>(new std::string("oh, all work is done!")));
}),
TrivialPromise())
.then([](SpellCastResult r0, SpellCastResult r1, bool r2, bool r3, double r4, std::unique_ptr<std::string> message)
{
return TrivialPromise("Lets see... ").then(Log(*message));
})
.then([]
{
return Log("ok, now its really finished!").then(CastSpellPromise(2));
});
// DispatcherPool countPool(1);
// DispatcherPool pool;
/*
auto const seed = std::chrono::steady_clock::now().time_since_epoch().count();
std::mt19937 rng(static_cast<unsigned int>(seed));
std::uniform_int_distribution<int> gen(10, 150);
std::vector<int> container;
unsigned int counter = 0;
for (unsigned int run = 0; run < 15; ++run)
{
for (unsigned int i = 0; i < 20; ++i)
{
unsigned int wait = gen(rng);
++counter;
pool.Dispatch([&countPool, &container, i, run, wait]
{
// std::this_thread::sleep_for(std::chrono::milliseconds(wait));
std::string str = "Pass " + std::to_string(run) + " dispatching " + std::to_string(i) + " (" + std::to_string(wait) + "ms delay)" + "\n";
// std::cout << str;
countPool.Dispatch([&container, wait]
{
container.emplace_back(wait);
});
});
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
// std::cout << "Awaiting termination...\n";
// std::this_thread::sleep_for(std::chrono::seconds(5));
// std::this_thread::sleep_for(std::chrono::seconds(5));
pool.Await();
countPool.Await();
std::cout << container.size() << " == " << counter;
*/
}
template<typename T, typename V>
using cross_forward_t =
typename std::conditional<
std::is_rvalue_reference<T&&>::value,
typename std::decay<V>::type&&,
typename std::conditional<
std::is_lvalue_reference<T&&>::value,
typename std::decay<V>::type&,
typename std::decay<V>::type
>::type
>::type;
template<typename T, typename V>
cross_forward_t<T, V> cross_forward(V&& var)
{
return static_cast<cross_forward_t<T, V>&&>(var);
}
struct TestContainer
{
std::shared_ptr<int> ptr;
};
template<typename T>
TestContainer extract(T&& c)
{
return TestContainer{cross_forward<T>(c.ptr)};
}
void test_cross_forward()
{
TestContainer con;
con.ptr = std::make_shared<int>(5);
static_assert(
std::is_same<cross_forward_t<TestContainer&&, std::unique_ptr<int>&>,
std::unique_ptr<int>&&>::value,
"oO");
static_assert(
std::is_same<cross_forward_t<TestContainer&, std::unique_ptr<int>&>,
std::unique_ptr<int>&>::value,
"oO");
extract(con);
extract(std::move(con));
int i = 0;
++i;
}
void test_incubator()
{
test_cross_forward();
}

View File

@ -1,31 +0,0 @@
This license applies to everything in this repository except that which
is explicitly annotated as being written by other authors, i.e. the Boost
queue (included in the benchmarks for comparison), Intel's TBB library (ditto),
the CDSChecker tool (used for verification), the Relacy model checker (ditto),
and Jeff Preshing's semaphore implementation (used in the blocking queue) which
has a zlib license (embedded in blockingconcurrentqueue.h).
Simplified BSD License:
Copyright (c) 2013-2015, Cameron Desrochers.
All rights reserved.
Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:
- Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of
conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,449 +0,0 @@
# moodycamel::ConcurrentQueue<T>
An industrial-strength lock-free queue for C++.
Note: If all you need is a single-producer, single-consumer queue, I have [one of those too][spsc].
## Features
- Knock-your-socks-off [blazing fast performance][benchmarks].
- Single-header implementation. Just drop it in your project.
- Fully thread-safe lock-free queue. Use concurrently from any number of threads.
- C++11 implementation -- elements are moved (instead of copied) where possible.
- Templated, obviating the need to deal exclusively with pointers -- memory is managed for you.
- No artificial limitations on element types or maximum count.
- Memory can be allocated once up-front, or dynamically as needed.
- Fully portable (no assembly; all is done through standard C++11 primitives).
- Supports super-fast bulk operations.
- Includes a low-overhead blocking version (BlockingConcurrentQueue).
- Exception safe.
## Reasons to use
There are not that many full-fledged lock-free queues for C++. Boost has one, but it's limited to objects with trivial
assignment operators and trivial destructors, for example. Intel's TBB queue isn't lock-free, and requires trivial constructors too.
There's many academic papers that implement lock-free queues in C++, but usable source code is
hard to find, and tests even more so.
This queue not only has less limitations than others (for the most part), but [it's also faster][benchmarks].
It's been fairly well-tested, and offers advanced features like **bulk enqueueing/dequeueing**
(which, with my new design, is much faster than one element at a time, approaching and even surpassing
the speed of a non-concurrent queue even under heavy contention).
In short, there was a lock-free queue shaped hole in the C++ open-source universe, and I set out
to fill it with the fastest, most complete, and well-tested design and implementation I could.
The result is `moodycamel::ConcurrentQueue` :-)
## Reasons *not* to use
The fastest synchronization of all is the kind that never takes place. Fundamentally,
concurrent data structures require some synchronization, and that takes time. Every effort
was made, of course, to minimize the overhead, but if you can avoid sharing data between
threads, do so!
Why use concurrent data structures at all, then? Because they're gosh darn convenient! (And, indeed,
sometimes sharing data concurrently is unavoidable.)
My queue is **not linearizable** (see the next section on high-level design). The foundations of
its design assume that producers are independent; if this is not the case, and your producers
co-ordinate amongst themselves in some fashion, be aware that the elements won't necessarily
come out of the queue in the same order they were put in *relative to the ordering formed by that co-ordination*
(but they will still come out in the order they were put in by any *individual* producer). If this affects
your use case, you may be better off with another implementation; either way, it's an important limitation
to be aware of.
My queue is also **not NUMA aware**, and does a lot of memory re-use internally, meaning it probably doesn't
scale particularly well on NUMA architectures; however, I don't know of any other lock-free queue that *is*
NUMA aware (except for [SALSA][salsa], which is very cool, but has no publicly available implementation that I know of).
Finally, the queue is **not sequentially consistent**; there *is* a happens-before relationship between when an element is put
in the queue and when it comes out, but other things (such as pumping the queue until it's empty) require more thought
to get right in all eventualities, because explicit memory ordering may have to be done to get the desired effect. In other words,
it can sometimes be difficult to use the queue correctly. This is why it's a good idea to follow the [samples][samples.md] where possible.
On the other hand, the upside of this lack of sequential consistency is better performance.
## High-level design
Elements are stored internally using contiguous blocks instead of linked lists for better performance.
The queue is made up of a collection of sub-queues, one for each producer. When a consumer
wants to dequeue an element, it checks all the sub-queues until it finds one that's not empty.
All of this is largely transparent to the user of the queue, however -- it mostly just works<sup>TM</sup>.
One particular consequence of this design, however, (which seems to be non-intuitive) is that if two producers
enqueue at the same time, there is no defined ordering between the elements when they're later dequeued.
Normally this is fine, because even with a fully linearizable queue there'd be a race between the producer
threads and so you couldn't rely on the ordering anyway. However, if for some reason you do extra explicit synchronization
between the two producer threads yourself, thus defining a total order between enqueue operations, you might expect
that the elements would come out in the same total order, which is a guarantee my queue does not offer. At that
point, though, there semantically aren't really two separate producers, but rather one that happens to be spread
across multiple threads. In this case, you can still establish a total ordering with my queue by creating
a single producer token, and using that from both threads to enqueue (taking care to synchronize access to the token,
of course, but there was already extra synchronization involved anyway).
I've written a more detailed [overview of the internal design][blog], as well as [the full
nitty-gritty details of the design][design], on my blog. Finally, the
[source][source] itself is available for perusal for those interested in its implementation.
## Basic use
The entire queue's implementation is contained in **one header**, [`concurrentqueue.h`][concurrentqueue.h].
Simply download and include that to use the queue. The blocking version is in a separate header,
[`blockingconcurrentqueue.h`][blockingconcurrentqueue.h], that depends on the first.
The implementation makes use of certain key C++11 features, so it requires a fairly recent compiler
(e.g. VS2012+ or g++ 4.8; note that g++ 4.6 has a known bug with `std::atomic` and is thus not supported).
The algorithm implementations themselves are platform independent.
Use it like you would any other templated queue, with the exception that you can use
it from many threads at once :-)
Simple example:
#include "concurrentqueue.h"
moodycamel::ConcurrentQueue<int> q;
q.enqueue(25);
int item;
bool found = q.try_dequeue(item);
assert(found && item == 25);
Description of basic methods:
- `ConcurrentQueue(size_t initialSizeEstimate)`
Constructor which optionally accepts an estimate of the number of elements the queue will hold
- `enqueue(T&& item)`
Enqueues one item, allocating extra space if necessary
- `try_enqueue(T&& item)`
Enqueues one item, but only if enough memory is already allocated
- `try_dequeue(T& item)`
Dequeues one item, returning true if an item was found or false if the queue appeared empty
Note that it is up to the user to ensure that the queue object is completely constructed before
being used by any other threads (this includes making the memory effects of construction
visible, possibly via a memory barrier). Similarly, it's important that all threads have
finished using the queue (and the memory effects have fully propagated) before it is
destructed.
There's usually two versions of each method, one "explicit" version that takes a user-allocated per-producer or
per-consumer token, and one "implicit" version that works without tokens. Using the explicit methods is almost
always faster (though not necessarily by a huge factor). Apart from performance, the primary distinction between them
is their sub-queue allocation behaviour for enqueue operations: Using the implicit enqueue methods causes an
automatically-allocated thread-local producer sub-queue to be allocated (it is marked for reuse once the thread exits).
Explicit producers, on the other hand, are tied directly to their tokens' lifetimes (and are also recycled as needed).
Full API (pseudocode):
# Allocates more memory if necessary
enqueue(item) : bool
enqueue(prod_token, item) : bool
enqueue_bulk(item_first, count) : bool
enqueue_bulk(prod_token, item_first, count) : bool
# Fails if not enough memory to enqueue
try_enqueue(item) : bool
try_enqueue(prod_token, item) : bool
try_enqueue_bulk(item_first, count) : bool
try_enqueue_bulk(prod_token, item_first, count) : bool
# Attempts to dequeue from the queue (never allocates)
try_dequeue(item&) : bool
try_dequeue(cons_token, item&) : bool
try_dequeue_bulk(item_first, max) : size_t
try_dequeue_bulk(cons_token, item_first, max) : size_t
# If you happen to know which producer you want to dequeue from
try_dequeue_from_producer(prod_token, item&) : bool
try_dequeue_bulk_from_producer(prod_token, item_first, max) : size_t
# A not-necessarily-accurate count of the total number of elements
size_approx() : size_t
## Blocking version
As mentioned above, a full blocking wrapper of the queue is provided that adds
`wait_dequeue` and `wait_dequeue_bulk` methods in addition to the regular interface.
This wrapper is extremely low-overhead, but slightly less fast than the non-blocking
queue (due to the necessary bookkeeping involving a lightweight semaphore).
The only major caveat with the blocking version is that you must be careful not to
destroy the queue while somebody is waiting on it. This generally means you need to
know for certain that another element is going to come along before you call one of
the blocking methods.
Blocking example:
#include "blockingconcurrentqueue.h"
moodycamel::BlockingConcurrentQueue<int> q;
std::thread producer([&]() {
for (int i = 0; i != 100; ++i) {
q.enqueue(i);
}
});
std::thread consumer([&]() {
for (int i = 0; i != 100; ++i) {
int item;
q.wait_dequeue(item);
assert(item == i);
}
});
producer.join();
consumer.join();
assert(q.size_approx() == 0);
## Advanced features
#### Tokens
The queue can take advantage of extra per-producer and per-consumer storage if
it's available to speed up its operations. This takes the form of "tokens":
You can create a consumer token and/or a producer token for each thread or task
(tokens themselves are not thread-safe), and use the methods that accept a token
as their first parameter:
moodycamel::ConcurrentQueue<int> q;
moodycamel::ProducerToken ptok(q);
q.enqueue(ptok, 17);
moodycamel::ConsumerToken ctok(q);
int item;
q.try_dequeue(ctok, item);
assert(item == 17);
If you happen to know which producer you want to consume from (e.g. in
a single-producer, multi-consumer scenario), you can use the `try_dequeue_from_producer`
methods, which accept a producer token instead of a consumer token, and cut some overhead.
Note that tokens work with the blocking version of the queue too.
When producing or consuming many elements, the most efficient way is to:
1. Use the bulk methods of the queue with tokens
2. Failing that, use the bulk methods without tokens
3. Failing that, use the single-item methods with tokens
4. Failing that, use the single-item methods without tokens
Having said that, don't create tokens willy-nilly -- ideally there would be
one token (of each kind) per thread. The queue will work with what it is
given, but it performs best when used with tokens.
Note that tokens aren't actually tied to any given thread; it's not technically
required that they be local to the thread, only that they be used by a single
producer/consumer at a time.
#### Bulk operations
Thanks to the [novel design][blog] of the queue, it's just as easy to enqueue/dequeue multiple
items as it is to do one at a time. This means that overhead can be cut drastically for
bulk operations. Example syntax:
moodycamel::ConcurrentQueue<int> q;
int items[] = { 1, 2, 3, 4, 5 };
q.enqueue_bulk(items, 5);
int results[5]; // Could also be any iterator
size_t count = q.try_dequeue_bulk(results, 5);
for (size_t i = 0; i != count; ++i) {
assert(results[i] == items[i]);
}
#### Preallocation (correctly using `try_enqueue`)
`try_enqueue`, unlike just plain `enqueue`, will never allocate memory. If there's not enough room in the
queue, it simply returns false. The key to using this method properly, then, is to ensure enough space is
pre-allocated for your desired maximum element count.
The constructor accepts a count of the number of elements that it should reserve space for. Because the
queue works with blocks of elements, however, and not individual elements themselves, the value to pass
in order to obtain an effective number of pre-allocated element slots is non-obvious.
First, be aware that the count passed is rounded up to the next multiple of the block size. Note that the
default block size is 32 (this can be changed via the traits). Second, once a slot in a block has been
enqueued to, that slot cannot be re-used until the rest of the block has completely been completely filled
up and then completely emptied. This affects the number of blocks you need in order to account for the
overhead of partially-filled blocks. Third, each producer (whether implicit or explicit) claims and recycles
blocks in a different manner, which again affects the number of blocks you need to account for a desired number of
usable slots.
Suppose you want the queue to be able to hold at least `N` elements at any given time. Without delving too
deep into the rather arcane implementation details, here are some simple formulas for the number of elements
to request for pre-allocation in such a case. Note the division is intended to be arithmetic division and not
integer division (in order for `ceil()` to work).
For explicit producers (using tokens to enqueue):
(ceil(N / BLOCK_SIZE) + 1) * MAX_NUM_PRODUCERS * BLOCK_SIZ
For implicit producers (no tokens):
(ceil(N / BLOCK_SIZE) - 1 + 2 * MAX_NUM_PRODUCERS) * BLOCK_SIZE
When using mixed producer types:
((ceil(N / BLOCK_SIZE) - 1) * (MAX_EXPLICIT_PRODUCERS + 1) + 2 * (MAX_IMPLICIT_PRODUCERS + MAX_EXPLICIT_PRODUCERS)) * BLOCK_SIZE
If these formulas seem rather inconvenient, you can use the constructor overload that accepts the minimum
number of elements (`N`) and the maximum number of producers and consumers directly, and let it do the
computation for you.
Finally, it's important to note that because the queue is only eventually consistent and takes advantage of
weak memory ordering for speed, there's always a possibility that under contention `try_enqueue` will fail
even if the queue is correctly pre-sized for the desired number of elements. (e.g. A given thread may think that
the queue's full even when that's no longer the case.) So no matter what, you still need to handle the failure
case (perhaps looping until it succeeds), unless you don't mind dropping elements.
#### Exception safety
The queue is exception safe, and will never become corrupted if used with a type that may throw exceptions.
The queue itself never throws any exceptions (operations fail gracefully (return false) if memory allocation
fails instead of throwing `std::bad_alloc`).
It is important to note that the guarantees of exception safety only hold if the element type never throws
from its destructor, and that any iterators passed into the queue (for bulk operations) never throw either.
Note that in particular this means `std::back_inserter` iterators must be used with care, since the vector
being inserted into may need to allocate and throw a `std::bad_alloc` exception from inside the iterator;
so be sure to reserve enough capacity in the target container first if you do this.
The guarantees are presently as follows:
- Enqueue operations are rolled back completely if an exception is thrown from an element's constructor.
For bulk enqueue operations, this means that elements are copied instead of moved (in order to avoid
having only some of the objects be moved in the event of an exception). Non-bulk enqueues always use
the move constructor if one is available.
- If the assignment operator throws during a dequeue operation (both single and bulk), the element(s) are
considered dequeued regardless. In such a case, the dequeued elements are all properly destructed before
the exception is propagated, but there's no way to get the elements themselves back.
- Any exception that is thrown is propagated up the call stack, at which point the queue is in a consistent
state.
Note: If any of your type's copy constructors/move constructors/assignment operators don't throw, be sure
to annotate them with `noexcept`; this will avoid the exception-checking overhead in the queue where possible
(even with zero-cost exceptions, there's still a code size impact that has to be taken into account).
#### Traits
The queue also supports a traits template argument which defines various types, constants,
and the memory allocation and deallocation functions that are to be used by the queue. The typical pattern
to providing your own traits is to create a class that inherits from the default traits
and override only the values you wish to change. Example:
struct MyTraits : public moodycamel::ConcurrentQueueDefaultTraits
{
static const size_t BLOCK_SIZE = 256; // Use bigger blocks
};
moodycamel::ConcurrentQueue<int, MyTraits> q;
#### How to dequeue types without calling the constructor
The normal way to dequeue an item is to pass in an existing object by reference, which
is then assigned to internally by the queue (using the move-assignment operator if possible).
This can pose a problem for types that are
expensive to construct or don't have a default constructor; fortunately, there is a simple
workaround: Create a wrapper class that copies the memory contents of the object when it
is assigned by the queue (a poor man's move, essentially). Note that this only works if
the object contains no internal pointers. Example:
struct MyObjectMover
{
inline void operator=(MyObject&& obj)
{
std::memcpy(data, &obj, sizeof(MyObject));
// TODO: Cleanup obj so that when it's destructed by the queue
// it doesn't corrupt the data of the object we just moved it into
}
inline MyObject& obj() { return *reinterpret_cast<MyObject*>(data); }
private:
align(alignof(MyObject)) char data[sizeof(MyObject)];
};
## Samples
There are some more detailed samples [here][samples.md]. The source of
the [unit tests][unittest-src] and [benchmarks][benchmark-src] are available for reference as well.
## Benchmarks
See my blog post for some [benchmark results][benchmarks] (including versus `boost::lockfree::queue` and `tbb::concurrent_queue`),
or run the benchmarks yourself (requires MinGW and certain GnuWin32 utilities to build on Windows, or a recent
g++ on Linux):
cd build
make benchmarks
bin/benchmarks
The short version of the benchmarks is that it's so fast (especially the bulk methods), that if you're actually
using the queue to *do* anything, the queue won't be your bottleneck.
## Tests (and bugs)
I've written quite a few unit tests as well as a randomized long-running fuzz tester. I also ran the
core queue algorithm through the [CDSChecker][cdschecker] C++11 memory model model checker. Some of the
inner algorithms were tested separately using the [Relacy][relacy] model checker, and full integration
tests were also performed with Relacy.
I've tested
on Linux (Fedora 19) and Windows (7), but only on x86 processors so far (Intel and AMD). The code was
written to be platform-independent, however, and should work across all processors and OSes.
Due to the complexity of the implementation and the difficult-to-test nature of lock-free code in general,
there may still be bugs. If anyone is seeing buggy behaviour, I'd like to hear about it! (Especially if
a unit test for it can be cooked up.) Just open an issue on GitHub.
## License
I'm releasing the source of this repository (with the exception of third-party code, i.e. the Boost queue
(used in the benchmarks for comparison), Intel's TBB library (ditto), CDSChecker, Relacy, and Jeff Preshing's
cross-platform semaphore, which all have their own licenses)
under a [simplified BSD license][license].
Note that lock-free programming is a patent minefield, and this code may very
well violate a pending patent (I haven't looked), though it does not to my present knowledge.
I did design and implement this queue from scratch.
## Diving into the code
If you're interested in the source code itself, it helps to have a rough idea of how it's laid out. This
section attempts to describe that.
The queue is formed of several basic parts (listed here in roughly the order they appear in the source). There's the
helper functions (e.g. for rounding to a power of 2). There's the default traits of the queue, which contain the
constants and malloc/free functions used by the queue. There's the producer and consumer tokens. Then there's the queue's
public API itself, starting with the constructor, destructor, and swap/assignment methods. There's the public enqueue methods,
which are all wrappers around a small set of private enqueue methods found later on. There's the dequeue methods, which are
defined inline and are relatively straightforward.
Then there's all the main internal data structures. First, there's a lock-free free list, used for recycling spent blocks (elements
are enqueued to blocks internally). Then there's the block structure itself, which has two different ways of tracking whether
it's fully emptied or not (remember, given two parallel consumers, there's no way to know which one will finish first) depending on where it's used.
Then there's a small base class for the two types of internal SPMC producer queues (one for explicit producers that holds onto memory
but attempts to be faster, and one for implicit ones which attempt to recycle more memory back into the parent but is a little slower).
The explicit producer is defined first, then the implicit one. They both contain the same general four methods: One to enqueue, one to
dequeue, one to enqueue in bulk, and one to dequeue in bulk. (Obviously they have constructors and destructors too, and helper methods.)
The main difference between them is how the block handling is done (they both use the same blocks, but in different ways, and map indices
to them in different ways).
Finally, there's the miscellaneous internal methods: There's the ones that handle the initial block pool (populated when the queue is constructed),
and an abstract block pool that comprises the initial pool and any blocks on the free list. There's ones that handle the producer list
(a lock-free add-only linked list of all the producers in the system). There's ones that handle the implicit producer lookup table (which
is really a sort of specialized TLS lookup). And then there's some helper methods for allocating and freeing objects, and the data members
of the queue itself, followed lastly by the free-standing swap functions.
[blog]: http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
[design]: http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
[samples.md]: https://github.com/cameron314/concurrentqueue/blob/master/samples.md
[source]: https://github.com/cameron314/concurrentqueue
[concurrentqueue.h]: https://github.com/cameron314/concurrentqueue/blob/master/concurrentqueue.h
[blockingconcurrentqueue.h]: https://github.com/cameron314/concurrentqueue/blob/master/blockingconcurrentqueue.h
[unittest-src]: https://github.com/cameron314/concurrentqueue/tree/master/tests/unittests
[benchmarks]: http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++#benchmarks
[benchmark-src]: https://github.com/cameron314/concurrentqueue/tree/master/benchmarks
[license]: https://github.com/cameron314/concurrentqueue/blob/master/LICENSE.md
[cdschecker]: http://demsky.eecs.uci.edu/c11modelchecker.html
[relacy]: http://www.1024cores.net/home/relacy-race-detector
[spsc]: https://github.com/cameron314/readerwriterqueue
[salsa]: http://webee.technion.ac.il/~idish/ftp/spaa049-gidron.pdf

View File

@ -1,760 +0,0 @@
// Provides an efficient blocking version of moodycamel::ConcurrentQueue.
// ©2015 Cameron Desrochers. Distributed under the terms of the simplified
// BSD license, available at the top of concurrentqueue.h.
// Uses Jeff Preshing's semaphore implementation (under the terms of its
// separate zlib license, embedded below).
#pragma once
#include "concurrentqueue.h"
#include <type_traits>
#include <memory>
#if defined(_WIN32)
// Avoid including windows.h in a header; we only need a handful of
// items, so we'll redeclare them here (this is relatively safe since
// the API generally has to remain stable between Windows versions).
// I know this is an ugly hack but it still beats polluting the global
// namespace with thousands of generic names or adding a .cpp for nothing.
extern "C" {
struct _SECURITY_ATTRIBUTES;
__declspec(dllimport) void* __stdcall CreateSemaphoreW(_SECURITY_ATTRIBUTES* lpSemaphoreAttributes, long lInitialCount, long lMaximumCount, const wchar_t* lpName);
__declspec(dllimport) int __stdcall CloseHandle(void* hObject);
__declspec(dllimport) unsigned long __stdcall WaitForSingleObject(void* hHandle, unsigned long dwMilliseconds);
__declspec(dllimport) int __stdcall ReleaseSemaphore(void* hSemaphore, long lReleaseCount, long* lpPreviousCount);
}
#elif defined(__MACH__)
#include <mach/mach.h>
#elif defined(__unix__)
#include <semaphore.h>
#endif
namespace moodycamel
{
namespace details
{
// Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
// portable + lightweight semaphore implementations, originally from
// https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
// LICENSE:
// Copyright (c) 2015 Jeff Preshing
//
// This software is provided 'as-is', without any express or implied
// warranty. In no event will the authors be held liable for any damages
// arising from the use of this software.
//
// Permission is granted to anyone to use this software for any purpose,
// including commercial applications, and to alter it and redistribute it
// freely, subject to the following restrictions:
//
// 1. The origin of this software must not be misrepresented; you must not
// claim that you wrote the original software. If you use this software
// in a product, an acknowledgement in the product documentation would be
// appreciated but is not required.
// 2. Altered source versions must be plainly marked as such, and must not be
// misrepresented as being the original software.
// 3. This notice may not be removed or altered from any source distribution.
namespace mpmc_sema
{
#if defined(_WIN32)
class Semaphore
{
private:
void* m_hSema;
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
const long maxLong = 0x7fffffff;
m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
}
~Semaphore()
{
CloseHandle(m_hSema);
}
void wait()
{
const unsigned long infinite = 0xffffffff;
WaitForSingleObject(m_hSema, infinite);
}
void signal(int count = 1)
{
ReleaseSemaphore(m_hSema, count, nullptr);
}
};
#elif defined(__MACH__)
//---------------------------------------------------------
// Semaphore (Apple iOS and OSX)
// Can't use POSIX semaphores due to http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
//---------------------------------------------------------
class Semaphore
{
private:
semaphore_t m_sema;
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
}
~Semaphore()
{
semaphore_destroy(mach_task_self(), m_sema);
}
void wait()
{
semaphore_wait(m_sema);
}
void signal()
{
semaphore_signal(m_sema);
}
void signal(int count)
{
while (count-- > 0)
{
semaphore_signal(m_sema);
}
}
};
#elif defined(__unix__)
//---------------------------------------------------------
// Semaphore (POSIX, Linux)
//---------------------------------------------------------
class Semaphore
{
private:
sem_t m_sema;
Semaphore(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
Semaphore& operator=(const Semaphore& other) MOODYCAMEL_DELETE_FUNCTION;
public:
Semaphore(int initialCount = 0)
{
assert(initialCount >= 0);
sem_init(&m_sema, 0, initialCount);
}
~Semaphore()
{
sem_destroy(&m_sema);
}
void wait()
{
// http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
int rc;
do
{
rc = sem_wait(&m_sema);
}
while (rc == -1 && errno == EINTR);
}
void signal()
{
sem_post(&m_sema);
}
void signal(int count)
{
while (count-- > 0)
{
sem_post(&m_sema);
}
}
};
#else
#error Unsupported platform! (No semaphore wrapper available)
#endif
//---------------------------------------------------------
// LightweightSemaphore
//---------------------------------------------------------
class LightweightSemaphore
{
public:
typedef std::make_signed<std::size_t>::type ssize_t;
private:
std::atomic<ssize_t> m_count;
Semaphore m_sema;
void waitWithPartialSpinning()
{
ssize_t oldCount;
// Is there a better way to set the initial spin count?
// If we lower it to 1000, testBenaphore becomes 15x slower on my Core i7-5930K Windows PC,
// as threads start hitting the kernel semaphore.
int spin = 10000;
while (--spin >= 0)
{
oldCount = m_count.load(std::memory_order_relaxed);
if ((oldCount > 0) && m_count.compare_exchange_strong(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
return;
std::atomic_signal_fence(std::memory_order_acquire); // Prevent the compiler from collapsing the loop.
}
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
if (oldCount <= 0)
{
m_sema.wait();
}
}
ssize_t waitManyWithPartialSpinning(ssize_t max)
{
assert(max > 0);
ssize_t oldCount;
int spin = 10000;
while (--spin >= 0)
{
oldCount = m_count.load(std::memory_order_relaxed);
if (oldCount > 0)
{
ssize_t newCount = oldCount > max ? oldCount - max : 0;
if (m_count.compare_exchange_strong(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
return oldCount - newCount;
}
std::atomic_signal_fence(std::memory_order_acquire);
}
oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
if (oldCount <= 0)
m_sema.wait();
if (max > 1)
return 1 + tryWaitMany(max - 1);
return 1;
}
public:
LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount)
{
assert(initialCount >= 0);
}
bool tryWait()
{
ssize_t oldCount = m_count.load(std::memory_order_relaxed);
while (oldCount > 0)
{
if (m_count.compare_exchange_weak(oldCount, oldCount - 1, std::memory_order_acquire, std::memory_order_relaxed))
return true;
}
return false;
}
void wait()
{
if (!tryWait())
waitWithPartialSpinning();
}
// Acquires between 0 and (greedily) max, inclusive
ssize_t tryWaitMany(ssize_t max)
{
assert(max >= 0);
ssize_t oldCount = m_count.load(std::memory_order_relaxed);
while (oldCount > 0)
{
ssize_t newCount = oldCount > max ? oldCount - max : 0;
if (m_count.compare_exchange_weak(oldCount, newCount, std::memory_order_acquire, std::memory_order_relaxed))
return oldCount - newCount;
}
return 0;
}
// Acquires at least one, and (greedily) at most max
ssize_t waitMany(ssize_t max)
{
assert(max >= 0);
ssize_t result = tryWaitMany(max);
if (result == 0 && max > 0)
result = waitManyWithPartialSpinning(max);
return result;
}
void signal(ssize_t count = 1)
{
assert(count >= 0);
ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
ssize_t toRelease = -oldCount < count ? -oldCount : count;
if (toRelease > 0)
{
m_sema.signal((int)toRelease);
}
}
ssize_t availableApprox() const
{
ssize_t count = m_count.load(std::memory_order_relaxed);
return count > 0 ? count : 0;
}
};
} // end namespace mpmc_sema
} // end namespace details
// This is a blocking version of the queue. It has an almost identical interface to
// the normal non-blocking version, with the addition of various wait_dequeue() methods
// and the removal of producer-specific dequeue methods.
template<typename T, typename Traits = ConcurrentQueueDefaultTraits>
class BlockingConcurrentQueue
{
private:
typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
typedef details::mpmc_sema::LightweightSemaphore LightweightSemaphore;
public:
typedef typename ConcurrentQueue::producer_token_t producer_token_t;
typedef typename ConcurrentQueue::consumer_token_t consumer_token_t;
typedef typename ConcurrentQueue::index_t index_t;
typedef typename ConcurrentQueue::size_t size_t;
typedef typename std::make_signed<size_t>::type ssize_t;
static const size_t BLOCK_SIZE = ConcurrentQueue::BLOCK_SIZE;
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = ConcurrentQueue::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD;
static const size_t EXPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::EXPLICIT_INITIAL_INDEX_SIZE;
static const size_t IMPLICIT_INITIAL_INDEX_SIZE = ConcurrentQueue::IMPLICIT_INITIAL_INDEX_SIZE;
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = ConcurrentQueue::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE = ConcurrentQueue::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE;
static const size_t MAX_SUBQUEUE_SIZE = ConcurrentQueue::MAX_SUBQUEUE_SIZE;
public:
// Creates a queue with at least `capacity` element slots; note that the
// actual number of elements that can be inserted without additional memory
// allocation depends on the number of producers and the block size (e.g. if
// the block size is equal to `capacity`, only a single block will be allocated
// up-front, which means only a single producer will be able to enqueue elements
// without an extra allocation -- blocks aren't shared between producers).
// This method is not thread safe -- it is up to the user to ensure that the
// queue is fully constructed before it starts being used by other threads (this
// includes making the memory effects of construction visible, possibly with a
// memory barrier).
explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
: inner(capacity), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
}
BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore>(), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
}
// Disable copying and copy assignment
BlockingConcurrentQueue(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
BlockingConcurrentQueue& operator=(BlockingConcurrentQueue const&) MOODYCAMEL_DELETE_FUNCTION;
// Moving is supported, but note that it is *not* a thread-safe operation.
// Nobody can use the queue while it's being moved, and the memory effects
// of that move must be propagated to other threads before they can use it.
// Note: When a queue is moved, its tokens are still valid but can only be
// used with the destination queue (i.e. semantically they are moved along
// with the queue itself).
BlockingConcurrentQueue(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
: inner(std::move(other.inner)), sema(std::move(other.sema))
{ }
inline BlockingConcurrentQueue& operator=(BlockingConcurrentQueue&& other) MOODYCAMEL_NOEXCEPT
{
return swap_internal(other);
}
// Swaps this queue's state with the other's. Not thread-safe.
// Swapping two queues does not invalidate their tokens, however
// the tokens that were created for one queue must be used with
// only the swapped queue (i.e. the tokens are tied to the
// queue's movable state, not the object itself).
inline void swap(BlockingConcurrentQueue& other) MOODYCAMEL_NOEXCEPT
{
swap_internal(other);
}
private:
BlockingConcurrentQueue& swap_internal(BlockingConcurrentQueue& other)
{
if (this == &other) {
return *this;
}
inner.swap(other.inner);
sema.swap(other.sema);
return *this;
}
public:
// Enqueues a single item (by copying it).
// Allocates memory if required. Only fails if memory allocation fails (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(T const& item)
{
if (details::likely(inner.enqueue(item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible).
// Allocates memory if required. Only fails if memory allocation fails (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0,
// or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(T&& item)
{
if (details::likely(inner.enqueue(std::move(item)))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by copying it) using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails (or
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(producer_token_t const& token, T const& item)
{
if (details::likely(inner.enqueue(token, item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible) using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails (or
// Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Thread-safe.
inline bool enqueue(producer_token_t const& token, T&& item)
{
if (details::likely(inner.enqueue(token, std::move(item)))) {
sema->signal();
return true;
}
return false;
}
// Enqueues several items.
// Allocates memory if required. Only fails if memory allocation fails (or
// implicit production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
// is 0, or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Note: Use std::make_move_iterator if the elements should be moved instead of copied.
// Thread-safe.
template<typename It>
inline bool enqueue_bulk(It itemFirst, size_t count)
{
if (details::likely(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Enqueues several items using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails
// (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
template<typename It>
inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if (details::likely(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Enqueues a single item (by copying it).
// Does not allocate memory. Fails if not enough room to enqueue (or implicit
// production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
// is 0).
// Thread-safe.
inline bool try_enqueue(T const& item)
{
if (inner.try_enqueue(item)) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible).
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
// Thread-safe.
inline bool try_enqueue(T&& item)
{
if (inner.try_enqueue(std::move(item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by copying it) using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Thread-safe.
inline bool try_enqueue(producer_token_t const& token, T const& item)
{
if (inner.try_enqueue(token, item)) {
sema->signal();
return true;
}
return false;
}
// Enqueues a single item (by moving it, if possible) using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Thread-safe.
inline bool try_enqueue(producer_token_t const& token, T&& item)
{
if (inner.try_enqueue(token, std::move(item))) {
sema->signal();
return true;
}
return false;
}
// Enqueues several items.
// Does not allocate memory (except for one-time implicit producer).
// Fails if not enough room to enqueue (or implicit production is
// disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
template<typename It>
inline bool try_enqueue_bulk(It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Enqueues several items using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Note: Use std::make_move_iterator if the elements should be moved
// instead of copied.
// Thread-safe.
template<typename It>
inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
return true;
}
return false;
}
// Attempts to dequeue from the queue.
// Returns false if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename U>
inline bool try_dequeue(U& item)
{
if (sema->tryWait()) {
while (!inner.try_dequeue(item)) {
continue;
}
return true;
}
return false;
}
// Attempts to dequeue from the queue using an explicit consumer token.
// Returns false if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename U>
inline bool try_dequeue(consumer_token_t& token, U& item)
{
if (sema->tryWait()) {
while (!inner.try_dequeue(token, item)) {
continue;
}
return true;
}
return false;
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued.
// Returns 0 if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename It>
inline size_t try_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued.
// Returns 0 if all producer streams appeared empty at the time they
// were checked (so, the queue is likely but not guaranteed to be empty).
// Never allocates. Thread-safe.
template<typename It>
inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
return count;
}
// Blocks the current thread until there's something to dequeue, then
// dequeues it.
// Never allocates. Thread-safe.
template<typename U>
inline void wait_dequeue(U& item)
{
sema->wait();
while (!inner.try_dequeue(item)) {
continue;
}
}
// Blocks the current thread until there's something to dequeue, then
// dequeues it using an explicit consumer token.
// Never allocates. Thread-safe.
template<typename U>
inline void wait_dequeue(consumer_token_t& token, U& item)
{
sema->wait();
while (!inner.try_dequeue(token, item)) {
continue;
}
}
// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued, which will
// always be at least one (this method blocks until the queue
// is non-empty) and at most max.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}
// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued, which will
// always be at least one (this method blocks until the queue
// is non-empty) and at most max.
// Never allocates. Thread-safe.
template<typename It>
inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
return count;
}
// Returns an estimate of the total number of elements currently in the queue. This
// estimate is only accurate if the queue has completely stabilized before it is called
// (i.e. all enqueue and dequeue operations have completed and their memory effects are
// visible on the calling thread, and no further operations start while this method is
// being called).
// Thread-safe.
inline size_t size_approx() const
{
return (size_t)sema->availableApprox();
}
// Returns true if the underlying atomic variables used by
// the queue are lock-free (they should be on most platforms).
// Thread-safe.
static bool is_lock_free()
{
return ConcurrentQueue::is_lock_free();
}
private:
template<typename U>
static inline U* create()
{
auto p = Traits::malloc(sizeof(U));
return p != nullptr ? new (p) U : nullptr;
}
template<typename U, typename A1>
static inline U* create(A1&& a1)
{
auto p = Traits::malloc(sizeof(U));
return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
}
template<typename U>
static inline void destroy(U* p)
{
if (p != nullptr) {
p->~U();
}
Traits::free(p);
}
private:
ConcurrentQueue inner;
std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
};
template<typename T, typename Traits>
inline void swap(BlockingConcurrentQueue<T, Traits>& a, BlockingConcurrentQueue<T, Traits>& b) MOODYCAMEL_NOEXCEPT
{
a.swap(b);
}
} // end namespace moodycamel

File diff suppressed because it is too large Load Diff

View File

@ -1,375 +0,0 @@
# Samples for moodycamel::ConcurrentQueue
Here are some example usage scenarios with sample code. Note that most
use the simplest version of each available method for demonstration purposes,
but they can all be adapted to use tokens and/or the corresponding bulk methods for
extra speed.
## Hello queue
ConcurrentQueue<int> q;
for (int i = 0; i != 123; ++i)
q.enqueue(i);
int item;
for (int i = 0; i != 123; ++i) {
q.try_dequeue(item);
assert(item == i);
}
## Hello concurrency
Basic example of how to use the queue from multiple threads, with no
particular goal (i.e. it does nothing, but in an instructive way).
ConcurrentQueue<int> q;
int dequeued[100] = { 0 };
std::thread threads[20];
// Producers
for (int i = 0; i != 10; ++i) {
threads[i] = std::thread([&](int i) {
for (int j = 0; j != 10; ++j) {
q.enqueue(i * 10 + j);
}
}, i);
}
// Consumers
for (int i = 10; i != 20; ++i) {
threads[i] = std::thread([&]() {
int item;
for (int j = 0; j != 20; ++j) {
if (q.try_dequeue(item)) {
++dequeued[item];
}
}
});
}
// Wait for all threads
for (int i = 0; i != 20; ++i) {
threads[i].join();
}
// Collect any leftovers (could be some if e.g. consumers finish before producers)
int item;
while (q.try_dequeue(item)) {
++dequeued[item];
}
// Make sure everything went in and came back out!
for (int i = 0; i != 100; ++i) {
assert(dequeued[i] == 1);
}
## Bulk up
Same as previous example, but runs faster.
ConcurrentQueue<int> q;
int dequeued[100] = { 0 };
std::thread threads[20];
// Producers
for (int i = 0; i != 10; ++i) {
threads[i] = std::thread([&](int i) {
int items[10];
for (int j = 0; j != 10; ++j) {
items[j] = i * 10 + j;
}
q.enqueue_bulk(items, 10);
}, i);
}
// Consumers
for (int i = 10; i != 20; ++i) {
threads[i] = std::thread([&]() {
int items[20];
for (std::size_t count = q.try_dequeue_bulk(items, 20); count != 0; --count) {
++dequeued[items[count - 1]];
}
});
}
// Wait for all threads
for (int i = 0; i != 20; ++i) {
threads[i].join();
}
// Collect any leftovers (could be some if e.g. consumers finish before producers)
int items[10];
std::size_t count;
while ((count = q.try_dequeue_bulk(items, 10)) != 0) {
for (std::size_t i = 0; i != count; ++i) {
++dequeued[items[i]];
}
}
// Make sure everything went in and came back out!
for (int i = 0; i != 100; ++i) {
assert(dequeued[i] == 1);
}
## Producer/consumer model (simultaneous)
In this model, one set of threads is producing items,
and the other is consuming them concurrently until all of
them have been consumed. The counters are required to
ensure that all items eventually get consumed.
ConcurrentQueue<Item> q;
const int ProducerCount = 8;
const int ConsumerCount = 8;
std::thread producers[ProducerCount];
std::thread consumers[ConsumerCount];
std::atomic<int> doneProducers(0);
std::atomic<int> doneConsumers(0);
for (int i = 0; i != ProducerCount; ++i) {
producers[i] = std::thread([&]() {
while (produce) {
q.enqueue(produceItem());
}
doneProducers.fetch_add(1, std::memory_order_release);
});
}
for (int i = 0; i != ConsumerCount; ++i) {
consumers[i] = std::thread([&]() {
Item item;
bool itemsLeft;
do {
// It's important to fence (if the producers have finished) *before* dequeueing
itemsLeft = doneProducers.load(std::memory_order_acquire) != ProducerCount;
while (q.try_dequeue(item)) {
itemsLeft = true;
consumeItem(item);
}
} while (itemsLeft || doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == ConsumerCount);
// The condition above is a bit tricky, but it's necessary to ensure that the
// last consumer sees the memory effects of all the other consumers before it
// calls try_dequeue for the last time
});
}
for (int i = 0; i != ProducerCount; ++i) {
producers[i].join();
}
for (int i = 0; i != ConsumerCount; ++i) {
consumers[i].join();
}
## Producer/consumer model (simultaneous, blocking)
The blocking version is different, since either the number of elements being produced needs
to be known ahead of time, or some other coordination is required to tell the consumers when
to stop calling wait_dequeue (not shown here). This is necessary because otherwise a consumer
could end up blocking forever -- and destroying a queue while a consumer is blocking on it leads
to undefined behaviour.
BlockingConcurrentQueue<Item> q;
const int ProducerCount = 8;
const int ConsumerCount = 8;
std::thread producers[ProducerCount];
std::thread consumers[ConsumerCount];
std::atomic<int> promisedElementsRemaining(ProducerCount * 1000);
for (int i = 0; i != ProducerCount; ++i) {
producers[i] = std::thread([&]() {
for (int j = 0; j != 1000; ++j) {
q.enqueue(produceItem());
}
});
}
for (int i = 0; i != ConsumerCount; ++i) {
consumers[i] = std::thread([&]() {
Item item;
while (promisedElementsRemaining.fetch_sub(1, std::memory_order_relaxed)) {
q.wait_dequeue(item);
consumeItem(item);
}
});
}
for (int i = 0; i != ProducerCount; ++i) {
producers[i].join();
}
for (int i = 0; i != ConsumerCount; ++i) {
consumers[i].join();
}
## Producer/consumer model (separate stages)
ConcurrentQueue<Item> q;
// Production stage
std::thread threads[8];
for (int i = 0; i != 8; ++i) {
threads[i] = std::thread([&]() {
while (produce) {
q.enqueue(produceItem());
}
});
}
for (int i = 0; i != 8; ++i) {
threads[i].join();
}
// Consumption stage
std::atomic<int> doneConsumers(0);
for (int i = 0; i != 8; ++i) {
threads[i] = std::thread([&]() {
Item item;
do {
while (q.try_dequeue(item)) {
consumeItem(item);
}
// Loop again one last time if we're the last producer (with the acquired
// memory effects of the other producers):
} while (doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == 8);
});
}
for (int i = 0; i != 8; ++i) {
threads[i].join();
}
Note that there's no point trying to use the blocking queue with this model, since
there's no need to use the `wait` methods (all the elements are produced before any
are consumed), and hence the complexity would be the same but with additional overhead.
## Object pool
If you don't know what threads will be using the queue in advance,
you can't really declare any long-term tokens. The obvious solution
is to use the implicit methods (that don't take any tokens):
// A pool of 'Something' objects that can be safely accessed
// from any thread
class SomethingPool
{
public:
Something getSomething()
{
Something obj;
queue.try_dequeue(obj);
// If the dequeue succeeded, obj will be an object from the
// thread pool, otherwise it will be the default-constructed
// object as declared above
return obj;
}
void recycleSomething(Something&& obj)
{
queue.enqueue(std::move(obj));
}
};
## Threadpool task queue
BlockingConcurrentQueue<Task> q;
// To create a task from any thread:
q.enqueue(...);
// On threadpool threads:
Task task;
while (true) {
q.wait_dequeue(task);
// Process task...
}
## Multithreaded game loop
BlockingConcurrentQueue<Task> q;
std::atomic<int> pendingTasks(0);
// On threadpool threads:
Task task;
while (true) {
q.wait_dequeue(task);
// Process task...
pendingTasks.fetch_add(-1, std::memory_order_release);
}
// Whenever a new task needs to be processed for the frame:
pendingTasks.fetch_add(1, std::memory_order_release);
q.enqueue(...);
// To wait for all the frame's tasks to complete before rendering:
while (pendingTasks.load(std::memory_order_acquire) != 0)
continue;
// Alternatively you could help out the thread pool while waiting:
while (pendingTasks.load(std::memory_order_acquire) != 0) {
if (!q.try_dequeue(task)) {
continue;
}
// Process task...
pendingTasks.fetch_add(-1, std::memory_order_release);
}
## Pump until empty
This might be useful if, for example, you want to process any remaining items
in the queue before it's destroyed. Note that it is your responsibility
to ensure that the memory effects of any enqueue operations you wish to see on
the dequeue thread are visible (i.e. if you're waiting for a certain set of elements,
you need to use memory fences to ensure that those elements are visible to the dequeue
thread after they've been enqueued).
ConcurrentQueue<Item> q;
// Single-threaded pumping:
Item item;
while (q.try_dequeue(item)) {
// Process item...
}
// q is guaranteed to be empty here, unless there is another thread enqueueing still or
// there was another thread dequeueing at one point and its memory effects have not
// yet been propagated to this thread.
// Multi-threaded pumping:
std::thread threads[8];
std::atomic<int> doneConsumers(0);
for (int i = 0; i != 8; ++i) {
threads[i] = std::thread([&]() {
Item item;
do {
while (q.try_dequeue(item)) {
// Process item...
}
} while (doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == 8);
// If there are still enqueue operations happening on other threads,
// then the queue may not be empty at this point. However, if all enqueue
// operations completed before we finished pumping (and the propagation of
// their memory effects too), and all dequeue operations apart from those
// our threads did above completed before we finished pumping (and the
// propagation of their memory effects too), then the queue is guaranteed
// to be empty at this point.
});
}
for (int i = 0; i != 8; ++i) {
threads[i].join();
}
## Wait for a queue to become empty (without dequeueing)
You can't (robustly) :-) However, you can set up your own atomic counter and
poll that instead (see the game loop example). If you're satisfied with merely an estimate, you can use
`size_approx()`. Note that `size_approx()` may return 0 even if the queue is
not completely empty, unless the queue has already stabilized first (no threads
are enqueueing or dequeueing, and all memory effects of any previous operations
have been propagated to the thread before it calls `size_approx()`).

View File

@ -60,8 +60,6 @@ Continuable<> make_continuable(Args&&...)
return Continuable<>(); return Continuable<>();
} }
template <typename... LeftArgs, typename... RightArgs> template <typename... LeftArgs, typename... RightArgs>
Continuable<> operator&& (Continuable<LeftArgs...>&&, Continuable<RightArgs...>&&) Continuable<> operator&& (Continuable<LeftArgs...>&&, Continuable<RightArgs...>&&)
{ {

692
test.cpp
View File

@ -1,638 +1,94 @@
#include "Callback.h"
#include "WeakCallbackContainer.h"
#include "Continuable.h"
#include <iostream>
#include <exception>
#include <type_traits>
#include <string>
#include <vector>
#include <typeinfo>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <atomic>
#include <random>
// #include "concurrentqueue.h"
#include <boost/optional.hpp>
enum SpellCastResult
{
SPELL_FAILED_SUCCESS = 0,
SPELL_FAILED_AFFECTING_COMBAT = 1,
SPELL_FAILED_ALREADY_AT_FULL_HEALTH = 2,
SPELL_FAILED_ALREADY_AT_FULL_MANA = 3,
SPELL_FAILED_ALREADY_AT_FULL_POWER = 4,
SPELL_FAILED_ALREADY_BEING_TAMED = 5
};
template<typename T>
using Optional = boost::optional<T>;
Continuable<> Log(std::string const& message)
{
return make_continuable([=](Callback<>&& callback)
{
std::cout << message << std::endl;
callback();
});
}
struct ResultSet
{
ResultSet(std::size_t affected_) :
affected(affected_) { };
std::size_t affected;
};
Continuable<ResultSet> AsyncQuery(std::string const& query)
{
return make_continuable([=](Callback<ResultSet>&& callback)
{
std::cout << query << std::endl;
callback(ResultSet(2));
});
}
// Original method taking an optional callback.
void CastSpell(int id, Optional<Callback<SpellCastResult>> const& callback = boost::none)
{
std::cout << "Casting " << id << std::endl;
// on success call the callback with SPELL_FAILED_SUCCESS
if (callback)
(*callback)(SPELL_FAILED_SUCCESS);
}
// Promise wrapped callback decorator.
Continuable<SpellCastResult> CastSpellPromise(int id)
{
return make_continuable([=](Callback<SpellCastResult>&& callback)
{
CastSpell(id, callback);
});
}
// Void instant returning continuable promise for testing purposes
Continuable<> TrivialPromise(std::string const& msg = "")
{
return Log(msg);
}
Continuable<bool> Validate()
{
return make_continuable([=](Callback<bool>&& callback)
{
std::cout << "Validate " << std::endl;
callback(true);
});
}
Continuable<std::unique_ptr<int>&&> MoveTest()
{
return make_continuable([=](Callback<std::unique_ptr<int>&&>&& callback)
{
// Move the unique ptr out to test moveability
std::unique_ptr<int> ptr(new int(5));
callback(std::move(ptr));
});
}
typedef std::unique_ptr<int> Moveable;
void testMoveAbleNormal(std::function<void(std::unique_ptr<int>&&)> callback)
{
std::unique_ptr<int> ptr(new int(5));
callback(std::move(ptr));
}
template <typename... T>
void test_unwrap(std::string const& msg)
{
std::cout << msg << " is unwrappable: " << (fu::is_unwrappable<T...>::value ? "true" : "false") << std::endl;
}
#include <iostream>
#include <atomic>
#include <random>
// static std::atomic_size_t move_tracer_index = 0;
/// Class to trace construct, destruct, copy and move operations.
/*
class CopyMoveTracer
{
std::size_t id;
std::size_t flags;
std::size_t copied;
std::size_t moved;
bool IsEnabled(std::size_t mask) const
{
// msvc warning silencer
return (flags & mask) != 0;
}
void Log(std::string const& msg) const
{
}
public:
enum Flags : std::size_t
{
CAPTURE_NONE = 0x1,
CAPTURE_CONSTRUCT = 0x1,
CAPTURE_DESTRUCT = 0x2,
CAPTURE_COPY = 0x4,
CAPTURE_MOVE = 0x8,
CAPTURE_ALL = CAPTURE_CONSTRUCT | CAPTURE_DESTRUCT | CAPTURE_COPY | CAPTURE_MOVE
};
// Empty construct
CopyMoveTracer() : id(++move_tracer_index), flags(CAPTURE_ALL), copied(0), moved(0)
{
if (IsEnabled(CAPTURE_CONSTRUCT))
Log("Tracer constructed");
}
// Construct with flags
CopyMoveTracer(std::size_t flags_) : id(++move_tracer_index), flags(flags_), copied(0), moved(0)
{
if (IsEnabled(CAPTURE_CONSTRUCT))
Log("Tracer constructed");
}
// Copy construct
CopyMoveTracer(CopyMoveTracer const& right) : id(move_tracer_index++), flags(right.flags), copied(0), moved(0)
{
if (IsEnabled(CAPTURE_COPY))
Log("Tracer copy constructed");
}
// Copy construct
CopyMoveTracer(CopyMoveTracer&& right) : id(right.id), flags(right.flags), copied(0), moved(0)
{
if (IsEnabled(CAPTURE_COPY))
Log("Tracer copy constructed");
}
};
*/
/*
namespace detail
{
template<typename, typename>
struct function_matches_to_args;
template<typename LeftReturn, typename... LeftArgs,
typename RightReturn, typename... RightArgs>
struct function_matches_to_args<
std::function<LeftReturn(LeftArgs...)>,
std::function<RightReturn(RightArgs...)>>
{
};
}
*/
/* /*
class DispatcherPool * Copyright (C) 2015 Naios <naios-dev@live.de>
{ *
enum TerminationMode * This program is free software: you can redistribute it and/or modify
{ * it under the terms of the GNU General Public License as published by
NONE, * the Free Software Foundation, either version 3 of the License, or
TERMINATE, * (at your option) any later version.
AWAIT *
}; * This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
typedef std::function<void()> Callable; * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
std::vector<std::thread> _pool; *
* You should have received a copy of the GNU General Public License
std::atomic<TerminationMode> _shutdown; * along with this program. If not, see <http://www.gnu.org/licenses/>.
std::mutex _mutex;
std::condition_variable _condition;
// moodycamel::ConcurrentQueue<Callable> _queue;
public:
DispatcherPool() : DispatcherPool(std::thread::hardware_concurrency()) { }
DispatcherPool(unsigned int const threads) : _shutdown(NONE)
{
for (unsigned int i = 0; i < threads; ++i)
{
_pool.emplace_back([&, i]
{
// Reserve the consumer token
// moodycamel::ConsumerToken token(_queue);
Callable callable;
while (_shutdown != TERMINATE)
{
if (_queue.try_dequeue(token, callable))
{
std::string msg = "Thread " + std::to_string(i) + " is dispatching...\n";
// std::cout << msg;
callable();
}
else
{
if (_shutdown == AWAIT)
break;
{
std::string msg = "Thread " + std::to_string(i) + " out of work...\n";
// std::cout << msg;
}
std::unique_lock<std::mutex> lock(_mutex);
// Lock until new tasks are added
_condition.wait(lock);
{
std::string msg = "Thread " + std::to_string(i) + " wakes up...\n";
// std::cout << msg;
}
}
}
});
}
}
~DispatcherPool()
{
Shutdown();
}
template<typename Functional>
void Dispatch(Functional&& functional)
{
_queue.enqueue(std::forward<Functional>(functional));
std::unique_lock<std::mutex> lock(_mutex);
_condition.notify_one();
}
void Shutdown()
{
_Shutdown(TERMINATE);
}
void Await()
{
_Shutdown(AWAIT);
}
void _Shutdown(TerminationMode const mode)
{
_shutdown = mode;
_condition.notify_all();
for (auto&& thread : _pool)
if (thread.joinable())
thread.join();
}
};
*/ */
#define CATCH_CONFIG_RUNNER
#include "catch.hpp"
void test_mockup(); void test_mockup();
void test_incubator();
int main(int /*argc*/, char** /*argv*/) int main(int argc, char* const argv[])
{ {
test_mockup(); test_mockup();
// CopyMoveTracer tracer; test_incubator();
/* int const result = Catch::Session().run(argc, argv);
CastSpellPromise(1)
.then([](SpellCastResult) // Attach breakpoint here ,-)
return result;
}
template<typename T, typename V>
using cross_forward_t =
typename std::conditional<
std::is_rvalue_reference<T&&>::value,
typename std::decay<V>::type&&,
typename std::conditional<
std::is_lvalue_reference<T&&>::value,
typename std::decay<V>::type&,
typename std::decay<V>::type
>::type
>::type;
template<typename T, typename V>
cross_forward_t<T, V> cross_forward(V&& var)
{ {
return CastSpellPromise(2); return static_cast<cross_forward_t<T, V>&&>(var);
}) }
.then([](SpellCastResult)
struct TestContainer
{ {
std::cout << "Pause a callback (void test) " << std::endl; std::shared_ptr<int> ptr;
})
.then(Validate())
.then(AsyncQuery("SELECT * FROM `users`")
.then([](ResultSet result)
{
// Evaluate result
std::size_t const affected = result.affected;
return Log(std::to_string(affected) + " rows affected\n");
})
)
.then(TrivialPromise("huhu"))
.then(CastSpellPromise(3))
.then(CastSpellPromise(4)
.then(CastSpellPromise(5))
)
.then(CastSpellPromise(6))
.then([](SpellCastResult)
{
return Validate();
});
MoveTest()
.then([](std::unique_ptr<int>&& ptr)
{
static_assert(std::is_rvalue_reference<decltype(ptr)>::value, "no rvalue");
// Error here
std::unique_ptr<int> other = std::move(ptr);
});
// Mockup of aggregate methods
make_continuable()
.all(
[] { return TrivialPromise(); },
[] { return TrivialPromise(); },
[] { return TrivialPromise(); }
)
.some(2, // Only 2 of 3 must complete
[] { return TrivialPromise(); },
[] { return TrivialPromise(); },
[] { return TrivialPromise(); }
)
.any( // Any of 2.
[] { return TrivialPromise(); },
[] { return TrivialPromise(); }
)
.then([]
{
std::cout << "Finished" << std::endl;
});
*/
//Continuable<bool> cb = make_continuable([](Callback<bool>&& callback)
//{
// callback(true);
//});
//test_unwrap<void()>("void()");
//test_unwrap<std::function<void()>>("std::function<void()>");
//test_unwrap<std::vector<std::string>>("std::vector<std::string>");
//make_continuable([=](Callback<>&&)
//{
//});
//int i = 0;
//++i;
//auto lam = [=](Callback<SpellCastResult>&&)
//{
// // on success call the callback with SPELL_FAILED_SUCCESS
// // callback(SPELL_FAILED_SUCCESS);
//};
//fu::function_type_of_t<decltype(lam)> fun1;
//fun1 = lam;
//fun1(Callback<SpellCastResult>());
//fu::function_type_of_t<Callback<int>> fun2;
//
//shared_callback_of_t<std::function<void(int)>> sc1;
//weak_callback_of_t<Callback<int>> sc2;
//
//make_weak_wrapped_callback(sc1);
//make_weak_wrapped_callback(sc2);
//WeakCallbackContainer callback;
//
//auto weakCallback = callback([]
//{
//});
//typedef Continuable<bool> cont123;
//typedef Continuable<bool> myty1;
//typedef Continuable<bool, float> myty2;
//// Convertible test
//
//// Continuable<Callback<SpellCastResult>> spell
//{
// auto stack =
// int iii = 0;
// iii = 1;
//}
//std::vector<int> myvec;
//typedef fu::requires_functional_constructible<std::function<void()>>::type test_assert1;
//// typedef fu::requires_functional_constructible<std::vector<int>>::type test_assert2;
//// Brainstorming: this shows an example callback chain
//// Given by continuable
//std::function<void(Callback<SpellCastResult>&&)> continuable_1 = [](Callback<SpellCastResult>&& callback)
//{
// callback(SPELL_FAILED_AFFECTING_COMBAT);
//};
//// Implemented by user
//std::function<std::function<void(Callback<bool>&&)>(SpellCastResult)> callback_by_user_1 = [](SpellCastResult)
//{
// // Given by continuable
// // Fn2
// return [](Callback<bool>&& callback)
// {
// callback(true);
// };
//};
//// Implemented by user
//std::function<std::function<void(Callback<>&&)>(bool)> cn2 = [](bool val)
//{
// // Finished
// std::cout << "Callback chain finished! -> " << val << std::endl;
// // Given by continuable (auto end)
// return [](Callback<>&&)
// {
// // Empty callback
// };
//};
//// Entry point
//std::function<void(Callback<bool>&&>)> entry = [continuable_1 /*= move*/, callback_by_user_1 /*given by the user (::then(...))*/]
// (std::function<void(Callback<bool>&&)>)
//{
// // Call with auto created wrapper by the continuable
// continuable_1([&](SpellCastResult result /*forward args*/)
// {
// // Wrapper functional to process unary or multiple promised callbacks
// // Returned from the user
// std::function<void(Callback<bool>&&)> fn2 = callback_by_user_1(/*forward args*/ result);
// return std::move(fn2);
// });
//};
//// Here we go
//entry();
detail::unary_chainer_t<
std::function<Continuable<bool>()>
>::callback_arguments_t args213987;
typedef detail::functional_traits<>::result_maker_of_t<
std::function<Continuable<bool>()>,
decltype(CastSpellPromise(2)),
decltype(TrivialPromise()),
std::function<Continuable<float, double>()>,
std::function<Continuable<>()>,
std::function<Continuable<bool>()>
> maker;
maker::arguments_t test282_args;
maker::partial_results_t test282_pack;
auto test282_size = maker::size;
// static_assert(std::is_same<>::value,
detail::concat_identities<fu::identity<int, bool, char>, fu::identity<float, double>>::type myt;
// fu::identity<detail::functional_traits<>::position<1>> i;
std::tuple<int, std::vector<int>> tup;
Moveable moveable(new int(7));
auto myargs = std::make_tuple(7, std::vector<int>({ 1, 2, 3 }), std::move(moveable));
std::function<int(int, std::vector<int>, Moveable&&)> lam = [](int given_i, std::vector<int> given_vec, Moveable&& moveable)
{
Moveable other = std::move(moveable);
++given_i;
return 1;
}; };
fu::invoke_from_tuple(lam, std::move(myargs)); template<typename T>
TestContainer extract(T&& c)
fu::sequence_generator<2>::type seqtype;
fu::sequence_generator<1>::type zero_seqtype;
detail::multiple_when_all_chainer_t<
fu::identity<>,
fu::identity<
std::function<Continuable<>()>,
std::function<Continuable<std::string>()>
>
>::result_maker::partial_results_t myres123345;
/*
auto firstType = detail::multiple_when_all_chainer_t<
fu::identity<>,
fu::identity<
std::function<Continuable<SpellCastResult>()>,
std::function<Continuable<>()>,
std::function<Continuable<SpellCastResult>()>
>
>::make_when_all(
[]
{ {
// void return TestContainer{cross_forward<T>(c.ptr)};
return CastSpellPromise(10);
},
[]
{
return make_continuable();
},
[]
{
return CastSpellPromise(20);
})
.then([](SpellCastResult, SpellCastResult)
{
})
.then([]
{
});
*/
make_continuable()
.all(
CastSpellPromise(10)
.then(CastSpellPromise(15)),
CastSpellPromise(20),
make_continuable([](Callback<bool, bool, double , std::unique_ptr<std::string>>&& callback)
{
callback(true, false, 0.3f, std::unique_ptr<std::string>(new std::string("oh, all work is done!")));
}),
TrivialPromise())
.then([](SpellCastResult r0, SpellCastResult r1, bool r2, bool r3, double r4, std::unique_ptr<std::string> message)
{
return TrivialPromise("Lets see... ").then(Log(*message));
})
.then([]
{
return Log("ok, now its really finished!").then(CastSpellPromise(2));
});
// DispatcherPool countPool(1);
// DispatcherPool pool;
/*
auto const seed = std::chrono::steady_clock::now().time_since_epoch().count();
std::mt19937 rng(static_cast<unsigned int>(seed));
std::uniform_int_distribution<int> gen(10, 150);
std::vector<int> container;
unsigned int counter = 0;
for (unsigned int run = 0; run < 15; ++run)
{
for (unsigned int i = 0; i < 20; ++i)
{
unsigned int wait = gen(rng);
++counter;
pool.Dispatch([&countPool, &container, i, run, wait]
{
// std::this_thread::sleep_for(std::chrono::milliseconds(wait));
std::string str = "Pass " + std::to_string(run) + " dispatching " + std::to_string(i) + " (" + std::to_string(wait) + "ms delay)" + "\n";
// std::cout << str;
countPool.Dispatch([&container, wait]
{
container.emplace_back(wait);
});
});
} }
std::this_thread::sleep_for(std::chrono::milliseconds(50)); TEST_CASE("CrossForward tests", "[CrossForward]")
{
TestContainer con;
con.ptr = std::make_shared<int>(0);
static_assert(
std::is_same<cross_forward_t<TestContainer&&, std::unique_ptr<int>&>,
std::unique_ptr<int>&&>::value,
"cross_forward returns wrong type!");
static_assert(
std::is_same<cross_forward_t<TestContainer&, std::unique_ptr<int>&>,
std::unique_ptr<int>&>::value,
"cross_forward returns wrong type!");
SECTION("CrossForward - forward l-value references")
{
extract(con);
REQUIRE(con.ptr.get());
} }
// std::cout << "Awaiting termination...\n"; SECTION("CrossForward - forward r-value references")
{
// std::this_thread::sleep_for(std::chrono::seconds(5)); extract(std::move(con));
REQUIRE_FALSE(con.ptr.get());
// std::this_thread::sleep_for(std::chrono::seconds(5)); }
pool.Await();
countPool.Await();
std::cout << container.size() << " == " << counter;
*/
return 0;
} }

9415
testing/catch.hpp Normal file

File diff suppressed because it is too large Load Diff