mirror of
https://github.com/Naios/continuable.git
synced 2025-12-06 16:56:44 +08:00
Some experiments with async threadpool dispatching
This commit is contained in:
parent
ef3f494ba2
commit
38c6b1643e
@ -31,7 +31,7 @@ endif()
|
||||
file(GLOB_RECURSE LIB_SOURCES include/*.cpp include/*.hpp include/*.h)
|
||||
add_library(continue STATIC ${LIB_SOURCES})
|
||||
|
||||
include_directories(include)
|
||||
include_directories(include dep/concurrentqueue)
|
||||
|
||||
set(TEST_SOURCES test.cpp)
|
||||
|
||||
|
||||
31
dep/concurrentqueue/LICENSE.md
Normal file
31
dep/concurrentqueue/LICENSE.md
Normal file
@ -0,0 +1,31 @@
|
||||
This license applies to everything in this repository except that which
|
||||
is explicitly annotated as being written by other authors, i.e. the Boost
|
||||
queue (included in the benchmarks for comparison), Intel's TBB library (ditto),
|
||||
the CDSChecker tool (used for verification), the Relacy model checker (ditto),
|
||||
and Jeff Preshing's semaphore implementation (used in the blocking queue) which
|
||||
has a zlib license (embedded in blockingconcurrentqueue.h).
|
||||
|
||||
|
||||
Simplified BSD License:
|
||||
|
||||
Copyright (c) 2013-2015, Cameron Desrochers.
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without modification,
|
||||
are permitted provided that the following conditions are met:
|
||||
|
||||
- Redistributions of source code must retain the above copyright notice, this list of
|
||||
conditions and the following disclaimer.
|
||||
- Redistributions in binary form must reproduce the above copyright notice, this list of
|
||||
conditions and the following disclaimer in the documentation and/or other materials
|
||||
provided with the distribution.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
|
||||
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
|
||||
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
|
||||
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
||||
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
|
||||
TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
|
||||
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
449
dep/concurrentqueue/README.md
Normal file
449
dep/concurrentqueue/README.md
Normal file
@ -0,0 +1,449 @@
|
||||
# moodycamel::ConcurrentQueue<T>
|
||||
|
||||
An industrial-strength lock-free queue for C++.
|
||||
|
||||
Note: If all you need is a single-producer, single-consumer queue, I have [one of those too][spsc].
|
||||
|
||||
## Features
|
||||
|
||||
- Knock-your-socks-off [blazing fast performance][benchmarks].
|
||||
- Single-header implementation. Just drop it in your project.
|
||||
- Fully thread-safe lock-free queue. Use concurrently from any number of threads.
|
||||
- C++11 implementation -- elements are moved (instead of copied) where possible.
|
||||
- Templated, obviating the need to deal exclusively with pointers -- memory is managed for you.
|
||||
- No artificial limitations on element types or maximum count.
|
||||
- Memory can be allocated once up-front, or dynamically as needed.
|
||||
- Fully portable (no assembly; all is done through standard C++11 primitives).
|
||||
- Supports super-fast bulk operations.
|
||||
- Includes a low-overhead blocking version (BlockingConcurrentQueue).
|
||||
- Exception safe.
|
||||
|
||||
## Reasons to use
|
||||
|
||||
There are not that many full-fledged lock-free queues for C++. Boost has one, but it's limited to objects with trivial
|
||||
assignment operators and trivial destructors, for example. Intel's TBB queue isn't lock-free, and requires trivial constructors too.
|
||||
There's many academic papers that implement lock-free queues in C++, but usable source code is
|
||||
hard to find, and tests even more so.
|
||||
|
||||
This queue not only has less limitations than others (for the most part), but [it's also faster][benchmarks].
|
||||
It's been fairly well-tested, and offers advanced features like **bulk enqueueing/dequeueing**
|
||||
(which, with my new design, is much faster than one element at a time, approaching and even surpassing
|
||||
the speed of a non-concurrent queue even under heavy contention).
|
||||
|
||||
In short, there was a lock-free queue shaped hole in the C++ open-source universe, and I set out
|
||||
to fill it with the fastest, most complete, and well-tested design and implementation I could.
|
||||
The result is `moodycamel::ConcurrentQueue` :-)
|
||||
|
||||
## Reasons *not* to use
|
||||
|
||||
The fastest synchronization of all is the kind that never takes place. Fundamentally,
|
||||
concurrent data structures require some synchronization, and that takes time. Every effort
|
||||
was made, of course, to minimize the overhead, but if you can avoid sharing data between
|
||||
threads, do so!
|
||||
|
||||
Why use concurrent data structures at all, then? Because they're gosh darn convenient! (And, indeed,
|
||||
sometimes sharing data concurrently is unavoidable.)
|
||||
|
||||
My queue is **not linearizable** (see the next section on high-level design). The foundations of
|
||||
its design assume that producers are independent; if this is not the case, and your producers
|
||||
co-ordinate amongst themselves in some fashion, be aware that the elements won't necessarily
|
||||
come out of the queue in the same order they were put in *relative to the ordering formed by that co-ordination*
|
||||
(but they will still come out in the order they were put in by any *individual* producer). If this affects
|
||||
your use case, you may be better off with another implementation; either way, it's an important limitation
|
||||
to be aware of.
|
||||
|
||||
My queue is also **not NUMA aware**, and does a lot of memory re-use internally, meaning it probably doesn't
|
||||
scale particularly well on NUMA architectures; however, I don't know of any other lock-free queue that *is*
|
||||
NUMA aware (except for [SALSA][salsa], which is very cool, but has no publicly available implementation that I know of).
|
||||
|
||||
Finally, the queue is **not sequentially consistent**; there *is* a happens-before relationship between when an element is put
|
||||
in the queue and when it comes out, but other things (such as pumping the queue until it's empty) require more thought
|
||||
to get right in all eventualities, because explicit memory ordering may have to be done to get the desired effect. In other words,
|
||||
it can sometimes be difficult to use the queue correctly. This is why it's a good idea to follow the [samples][samples.md] where possible.
|
||||
On the other hand, the upside of this lack of sequential consistency is better performance.
|
||||
|
||||
## High-level design
|
||||
|
||||
Elements are stored internally using contiguous blocks instead of linked lists for better performance.
|
||||
The queue is made up of a collection of sub-queues, one for each producer. When a consumer
|
||||
wants to dequeue an element, it checks all the sub-queues until it finds one that's not empty.
|
||||
All of this is largely transparent to the user of the queue, however -- it mostly just works<sup>TM</sup>.
|
||||
|
||||
One particular consequence of this design, however, (which seems to be non-intuitive) is that if two producers
|
||||
enqueue at the same time, there is no defined ordering between the elements when they're later dequeued.
|
||||
Normally this is fine, because even with a fully linearizable queue there'd be a race between the producer
|
||||
threads and so you couldn't rely on the ordering anyway. However, if for some reason you do extra explicit synchronization
|
||||
between the two producer threads yourself, thus defining a total order between enqueue operations, you might expect
|
||||
that the elements would come out in the same total order, which is a guarantee my queue does not offer. At that
|
||||
point, though, there semantically aren't really two separate producers, but rather one that happens to be spread
|
||||
across multiple threads. In this case, you can still establish a total ordering with my queue by creating
|
||||
a single producer token, and using that from both threads to enqueue (taking care to synchronize access to the token,
|
||||
of course, but there was already extra synchronization involved anyway).
|
||||
|
||||
I've written a more detailed [overview of the internal design][blog], as well as [the full
|
||||
nitty-gritty details of the design][design], on my blog. Finally, the
|
||||
[source][source] itself is available for perusal for those interested in its implementation.
|
||||
|
||||
## Basic use
|
||||
|
||||
The entire queue's implementation is contained in **one header**, [`concurrentqueue.h`][concurrentqueue.h].
|
||||
Simply download and include that to use the queue. The blocking version is in a separate header,
|
||||
[`blockingconcurrentqueue.h`][blockingconcurrentqueue.h], that depends on the first.
|
||||
The implementation makes use of certain key C++11 features, so it requires a fairly recent compiler
|
||||
(e.g. VS2012+ or g++ 4.8; note that g++ 4.6 has a known bug with `std::atomic` and is thus not supported).
|
||||
The algorithm implementations themselves are platform independent.
|
||||
|
||||
Use it like you would any other templated queue, with the exception that you can use
|
||||
it from many threads at once :-)
|
||||
|
||||
Simple example:
|
||||
|
||||
#include "concurrentqueue.h"
|
||||
|
||||
moodycamel::ConcurrentQueue<int> q;
|
||||
q.enqueue(25);
|
||||
|
||||
int item;
|
||||
bool found = q.try_dequeue(item);
|
||||
assert(found && item == 25);
|
||||
|
||||
Description of basic methods:
|
||||
- `ConcurrentQueue(size_t initialSizeEstimate)`
|
||||
Constructor which optionally accepts an estimate of the number of elements the queue will hold
|
||||
- `enqueue(T&& item)`
|
||||
Enqueues one item, allocating extra space if necessary
|
||||
- `try_enqueue(T&& item)`
|
||||
Enqueues one item, but only if enough memory is already allocated
|
||||
- `try_dequeue(T& item)`
|
||||
Dequeues one item, returning true if an item was found or false if the queue appeared empty
|
||||
|
||||
Note that it is up to the user to ensure that the queue object is completely constructed before
|
||||
being used by any other threads (this includes making the memory effects of construction
|
||||
visible, possibly via a memory barrier). Similarly, it's important that all threads have
|
||||
finished using the queue (and the memory effects have fully propagated) before it is
|
||||
destructed.
|
||||
|
||||
There's usually two versions of each method, one "explicit" version that takes a user-allocated per-producer or
|
||||
per-consumer token, and one "implicit" version that works without tokens. Using the explicit methods is almost
|
||||
always faster (though not necessarily by a huge factor). Apart from performance, the primary distinction between them
|
||||
is their sub-queue allocation behaviour for enqueue operations: Using the implicit enqueue methods causes an
|
||||
automatically-allocated thread-local producer sub-queue to be allocated (it is marked for reuse once the thread exits).
|
||||
Explicit producers, on the other hand, are tied directly to their tokens' lifetimes (and are also recycled as needed).
|
||||
|
||||
Full API (pseudocode):
|
||||
|
||||
# Allocates more memory if necessary
|
||||
enqueue(item) : bool
|
||||
enqueue(prod_token, item) : bool
|
||||
enqueue_bulk(item_first, count) : bool
|
||||
enqueue_bulk(prod_token, item_first, count) : bool
|
||||
|
||||
# Fails if not enough memory to enqueue
|
||||
try_enqueue(item) : bool
|
||||
try_enqueue(prod_token, item) : bool
|
||||
try_enqueue_bulk(item_first, count) : bool
|
||||
try_enqueue_bulk(prod_token, item_first, count) : bool
|
||||
|
||||
# Attempts to dequeue from the queue (never allocates)
|
||||
try_dequeue(item&) : bool
|
||||
try_dequeue(cons_token, item&) : bool
|
||||
try_dequeue_bulk(item_first, max) : size_t
|
||||
try_dequeue_bulk(cons_token, item_first, max) : size_t
|
||||
|
||||
# If you happen to know which producer you want to dequeue from
|
||||
try_dequeue_from_producer(prod_token, item&) : bool
|
||||
try_dequeue_bulk_from_producer(prod_token, item_first, max) : size_t
|
||||
|
||||
# A not-necessarily-accurate count of the total number of elements
|
||||
size_approx() : size_t
|
||||
|
||||
## Blocking version
|
||||
|
||||
As mentioned above, a full blocking wrapper of the queue is provided that adds
|
||||
`wait_dequeue` and `wait_dequeue_bulk` methods in addition to the regular interface.
|
||||
This wrapper is extremely low-overhead, but slightly less fast than the non-blocking
|
||||
queue (due to the necessary bookkeeping involving a lightweight semaphore).
|
||||
|
||||
The only major caveat with the blocking version is that you must be careful not to
|
||||
destroy the queue while somebody is waiting on it. This generally means you need to
|
||||
know for certain that another element is going to come along before you call one of
|
||||
the blocking methods.
|
||||
|
||||
Blocking example:
|
||||
|
||||
#include "blockingconcurrentqueue.h"
|
||||
|
||||
moodycamel::BlockingConcurrentQueue<int> q;
|
||||
std::thread producer([&]() {
|
||||
for (int i = 0; i != 100; ++i) {
|
||||
q.enqueue(i);
|
||||
}
|
||||
});
|
||||
std::thread consumer([&]() {
|
||||
for (int i = 0; i != 100; ++i) {
|
||||
int item;
|
||||
q.wait_dequeue(item);
|
||||
assert(item == i);
|
||||
}
|
||||
});
|
||||
producer.join();
|
||||
consumer.join();
|
||||
|
||||
assert(q.size_approx() == 0);
|
||||
|
||||
## Advanced features
|
||||
|
||||
#### Tokens
|
||||
|
||||
The queue can take advantage of extra per-producer and per-consumer storage if
|
||||
it's available to speed up its operations. This takes the form of "tokens":
|
||||
You can create a consumer token and/or a producer token for each thread or task
|
||||
(tokens themselves are not thread-safe), and use the methods that accept a token
|
||||
as their first parameter:
|
||||
|
||||
moodycamel::ConcurrentQueue<int> q;
|
||||
|
||||
moodycamel::ProducerToken ptok(q);
|
||||
q.enqueue(ptok, 17);
|
||||
|
||||
moodycamel::ConsumerToken ctok(q);
|
||||
int item;
|
||||
q.try_dequeue(ctok, item);
|
||||
assert(item == 17);
|
||||
|
||||
If you happen to know which producer you want to consume from (e.g. in
|
||||
a single-producer, multi-consumer scenario), you can use the `try_dequeue_from_producer`
|
||||
methods, which accept a producer token instead of a consumer token, and cut some overhead.
|
||||
|
||||
Note that tokens work with the blocking version of the queue too.
|
||||
|
||||
When producing or consuming many elements, the most efficient way is to:
|
||||
|
||||
1. Use the bulk methods of the queue with tokens
|
||||
2. Failing that, use the bulk methods without tokens
|
||||
3. Failing that, use the single-item methods with tokens
|
||||
4. Failing that, use the single-item methods without tokens
|
||||
|
||||
Having said that, don't create tokens willy-nilly -- ideally there would be
|
||||
one token (of each kind) per thread. The queue will work with what it is
|
||||
given, but it performs best when used with tokens.
|
||||
|
||||
Note that tokens aren't actually tied to any given thread; it's not technically
|
||||
required that they be local to the thread, only that they be used by a single
|
||||
producer/consumer at a time.
|
||||
|
||||
#### Bulk operations
|
||||
|
||||
Thanks to the [novel design][blog] of the queue, it's just as easy to enqueue/dequeue multiple
|
||||
items as it is to do one at a time. This means that overhead can be cut drastically for
|
||||
bulk operations. Example syntax:
|
||||
|
||||
moodycamel::ConcurrentQueue<int> q;
|
||||
|
||||
int items[] = { 1, 2, 3, 4, 5 };
|
||||
q.enqueue_bulk(items, 5);
|
||||
|
||||
int results[5]; // Could also be any iterator
|
||||
size_t count = q.try_dequeue_bulk(results, 5);
|
||||
for (size_t i = 0; i != count; ++i) {
|
||||
assert(results[i] == items[i]);
|
||||
}
|
||||
|
||||
#### Preallocation (correctly using `try_enqueue`)
|
||||
|
||||
`try_enqueue`, unlike just plain `enqueue`, will never allocate memory. If there's not enough room in the
|
||||
queue, it simply returns false. The key to using this method properly, then, is to ensure enough space is
|
||||
pre-allocated for your desired maximum element count.
|
||||
|
||||
The constructor accepts a count of the number of elements that it should reserve space for. Because the
|
||||
queue works with blocks of elements, however, and not individual elements themselves, the value to pass
|
||||
in order to obtain an effective number of pre-allocated element slots is non-obvious.
|
||||
|
||||
First, be aware that the count passed is rounded up to the next multiple of the block size. Note that the
|
||||
default block size is 32 (this can be changed via the traits). Second, once a slot in a block has been
|
||||
enqueued to, that slot cannot be re-used until the rest of the block has completely been completely filled
|
||||
up and then completely emptied. This affects the number of blocks you need in order to account for the
|
||||
overhead of partially-filled blocks. Third, each producer (whether implicit or explicit) claims and recycles
|
||||
blocks in a different manner, which again affects the number of blocks you need to account for a desired number of
|
||||
usable slots.
|
||||
|
||||
Suppose you want the queue to be able to hold at least `N` elements at any given time. Without delving too
|
||||
deep into the rather arcane implementation details, here are some simple formulas for the number of elements
|
||||
to request for pre-allocation in such a case. Note the division is intended to be arithmetic division and not
|
||||
integer division (in order for `ceil()` to work).
|
||||
|
||||
For explicit producers (using tokens to enqueue):
|
||||
|
||||
(ceil(N / BLOCK_SIZE) + 1) * MAX_NUM_PRODUCERS * BLOCK_SIZ
|
||||
|
||||
For implicit producers (no tokens):
|
||||
|
||||
(ceil(N / BLOCK_SIZE) - 1 + 2 * MAX_NUM_PRODUCERS) * BLOCK_SIZE
|
||||
|
||||
When using mixed producer types:
|
||||
|
||||
((ceil(N / BLOCK_SIZE) - 1) * (MAX_EXPLICIT_PRODUCERS + 1) + 2 * (MAX_IMPLICIT_PRODUCERS + MAX_EXPLICIT_PRODUCERS)) * BLOCK_SIZE
|
||||
|
||||
If these formulas seem rather inconvenient, you can use the constructor overload that accepts the minimum
|
||||
number of elements (`N`) and the maximum number of producers and consumers directly, and let it do the
|
||||
computation for you.
|
||||
|
||||
Finally, it's important to note that because the queue is only eventually consistent and takes advantage of
|
||||
weak memory ordering for speed, there's always a possibility that under contention `try_enqueue` will fail
|
||||
even if the queue is correctly pre-sized for the desired number of elements. (e.g. A given thread may think that
|
||||
the queue's full even when that's no longer the case.) So no matter what, you still need to handle the failure
|
||||
case (perhaps looping until it succeeds), unless you don't mind dropping elements.
|
||||
|
||||
#### Exception safety
|
||||
|
||||
The queue is exception safe, and will never become corrupted if used with a type that may throw exceptions.
|
||||
The queue itself never throws any exceptions (operations fail gracefully (return false) if memory allocation
|
||||
fails instead of throwing `std::bad_alloc`).
|
||||
|
||||
It is important to note that the guarantees of exception safety only hold if the element type never throws
|
||||
from its destructor, and that any iterators passed into the queue (for bulk operations) never throw either.
|
||||
Note that in particular this means `std::back_inserter` iterators must be used with care, since the vector
|
||||
being inserted into may need to allocate and throw a `std::bad_alloc` exception from inside the iterator;
|
||||
so be sure to reserve enough capacity in the target container first if you do this.
|
||||
|
||||
The guarantees are presently as follows:
|
||||
- Enqueue operations are rolled back completely if an exception is thrown from an element's constructor.
|
||||
For bulk enqueue operations, this means that elements are copied instead of moved (in order to avoid
|
||||
having only some of the objects be moved in the event of an exception). Non-bulk enqueues always use
|
||||
the move constructor if one is available.
|
||||
- If the assignment operator throws during a dequeue operation (both single and bulk), the element(s) are
|
||||
considered dequeued regardless. In such a case, the dequeued elements are all properly destructed before
|
||||
the exception is propagated, but there's no way to get the elements themselves back.
|
||||
- Any exception that is thrown is propagated up the call stack, at which point the queue is in a consistent
|
||||
state.
|
||||
|
||||
Note: If any of your type's copy constructors/move constructors/assignment operators don't throw, be sure
|
||||
to annotate them with `noexcept`; this will avoid the exception-checking overhead in the queue where possible
|
||||
(even with zero-cost exceptions, there's still a code size impact that has to be taken into account).
|
||||
|
||||
#### Traits
|
||||
|
||||
The queue also supports a traits template argument which defines various types, constants,
|
||||
and the memory allocation and deallocation functions that are to be used by the queue. The typical pattern
|
||||
to providing your own traits is to create a class that inherits from the default traits
|
||||
and override only the values you wish to change. Example:
|
||||
|
||||
struct MyTraits : public moodycamel::ConcurrentQueueDefaultTraits
|
||||
{
|
||||
static const size_t BLOCK_SIZE = 256; // Use bigger blocks
|
||||
};
|
||||
|
||||
moodycamel::ConcurrentQueue<int, MyTraits> q;
|
||||
|
||||
#### How to dequeue types without calling the constructor
|
||||
|
||||
The normal way to dequeue an item is to pass in an existing object by reference, which
|
||||
is then assigned to internally by the queue (using the move-assignment operator if possible).
|
||||
This can pose a problem for types that are
|
||||
expensive to construct or don't have a default constructor; fortunately, there is a simple
|
||||
workaround: Create a wrapper class that copies the memory contents of the object when it
|
||||
is assigned by the queue (a poor man's move, essentially). Note that this only works if
|
||||
the object contains no internal pointers. Example:
|
||||
|
||||
struct MyObjectMover
|
||||
{
|
||||
inline void operator=(MyObject&& obj)
|
||||
{
|
||||
std::memcpy(data, &obj, sizeof(MyObject));
|
||||
|
||||
// TODO: Cleanup obj so that when it's destructed by the queue
|
||||
// it doesn't corrupt the data of the object we just moved it into
|
||||
}
|
||||
|
||||
inline MyObject& obj() { return *reinterpret_cast<MyObject*>(data); }
|
||||
|
||||
private:
|
||||
align(alignof(MyObject)) char data[sizeof(MyObject)];
|
||||
};
|
||||
|
||||
## Samples
|
||||
|
||||
There are some more detailed samples [here][samples.md]. The source of
|
||||
the [unit tests][unittest-src] and [benchmarks][benchmark-src] are available for reference as well.
|
||||
|
||||
## Benchmarks
|
||||
|
||||
See my blog post for some [benchmark results][benchmarks] (including versus `boost::lockfree::queue` and `tbb::concurrent_queue`),
|
||||
or run the benchmarks yourself (requires MinGW and certain GnuWin32 utilities to build on Windows, or a recent
|
||||
g++ on Linux):
|
||||
|
||||
cd build
|
||||
make benchmarks
|
||||
bin/benchmarks
|
||||
|
||||
The short version of the benchmarks is that it's so fast (especially the bulk methods), that if you're actually
|
||||
using the queue to *do* anything, the queue won't be your bottleneck.
|
||||
|
||||
## Tests (and bugs)
|
||||
|
||||
I've written quite a few unit tests as well as a randomized long-running fuzz tester. I also ran the
|
||||
core queue algorithm through the [CDSChecker][cdschecker] C++11 memory model model checker. Some of the
|
||||
inner algorithms were tested separately using the [Relacy][relacy] model checker, and full integration
|
||||
tests were also performed with Relacy.
|
||||
I've tested
|
||||
on Linux (Fedora 19) and Windows (7), but only on x86 processors so far (Intel and AMD). The code was
|
||||
written to be platform-independent, however, and should work across all processors and OSes.
|
||||
|
||||
Due to the complexity of the implementation and the difficult-to-test nature of lock-free code in general,
|
||||
there may still be bugs. If anyone is seeing buggy behaviour, I'd like to hear about it! (Especially if
|
||||
a unit test for it can be cooked up.) Just open an issue on GitHub.
|
||||
|
||||
## License
|
||||
|
||||
I'm releasing the source of this repository (with the exception of third-party code, i.e. the Boost queue
|
||||
(used in the benchmarks for comparison), Intel's TBB library (ditto), CDSChecker, Relacy, and Jeff Preshing's
|
||||
cross-platform semaphore, which all have their own licenses)
|
||||
under a [simplified BSD license][license].
|
||||
|
||||
Note that lock-free programming is a patent minefield, and this code may very
|
||||
well violate a pending patent (I haven't looked), though it does not to my present knowledge.
|
||||
I did design and implement this queue from scratch.
|
||||
|
||||
## Diving into the code
|
||||
|
||||
If you're interested in the source code itself, it helps to have a rough idea of how it's laid out. This
|
||||
section attempts to describe that.
|
||||
|
||||
The queue is formed of several basic parts (listed here in roughly the order they appear in the source). There's the
|
||||
helper functions (e.g. for rounding to a power of 2). There's the default traits of the queue, which contain the
|
||||
constants and malloc/free functions used by the queue. There's the producer and consumer tokens. Then there's the queue's
|
||||
public API itself, starting with the constructor, destructor, and swap/assignment methods. There's the public enqueue methods,
|
||||
which are all wrappers around a small set of private enqueue methods found later on. There's the dequeue methods, which are
|
||||
defined inline and are relatively straightforward.
|
||||
|
||||
Then there's all the main internal data structures. First, there's a lock-free free list, used for recycling spent blocks (elements
|
||||
are enqueued to blocks internally). Then there's the block structure itself, which has two different ways of tracking whether
|
||||
it's fully emptied or not (remember, given two parallel consumers, there's no way to know which one will finish first) depending on where it's used.
|
||||
Then there's a small base class for the two types of internal SPMC producer queues (one for explicit producers that holds onto memory
|
||||
but attempts to be faster, and one for implicit ones which attempt to recycle more memory back into the parent but is a little slower).
|
||||
The explicit producer is defined first, then the implicit one. They both contain the same general four methods: One to enqueue, one to
|
||||
dequeue, one to enqueue in bulk, and one to dequeue in bulk. (Obviously they have constructors and destructors too, and helper methods.)
|
||||
The main difference between them is how the block handling is done (they both use the same blocks, but in different ways, and map indices
|
||||
to them in different ways).
|
||||
|
||||
Finally, there's the miscellaneous internal methods: There's the ones that handle the initial block pool (populated when the queue is constructed),
|
||||
and an abstract block pool that comprises the initial pool and any blocks on the free list. There's ones that handle the producer list
|
||||
(a lock-free add-only linked list of all the producers in the system). There's ones that handle the implicit producer lookup table (which
|
||||
is really a sort of specialized TLS lookup). And then there's some helper methods for allocating and freeing objects, and the data members
|
||||
of the queue itself, followed lastly by the free-standing swap functions.
|
||||
|
||||
|
||||
[blog]: http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
|
||||
[design]: http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
|
||||
[samples.md]: https://github.com/cameron314/concurrentqueue/blob/master/samples.md
|
||||
[source]: https://github.com/cameron314/concurrentqueue
|
||||
[concurrentqueue.h]: https://github.com/cameron314/concurrentqueue/blob/master/concurrentqueue.h
|
||||
[blockingconcurrentqueue.h]: https://github.com/cameron314/concurrentqueue/blob/master/blockingconcurrentqueue.h
|
||||
[unittest-src]: https://github.com/cameron314/concurrentqueue/tree/master/tests/unittests
|
||||
[benchmarks]: http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++#benchmarks
|
||||
[benchmark-src]: https://github.com/cameron314/concurrentqueue/tree/master/benchmarks
|
||||
[license]: https://github.com/cameron314/concurrentqueue/blob/master/LICENSE.md
|
||||
[cdschecker]: http://demsky.eecs.uci.edu/c11modelchecker.html
|
||||
[relacy]: http://www.1024cores.net/home/relacy-race-detector
|
||||
[spsc]: https://github.com/cameron314/readerwriterqueue
|
||||
[salsa]: http://webee.technion.ac.il/~idish/ftp/spaa049-gidron.pdf
|
||||
3542
dep/concurrentqueue/concurrentqueue.h
Normal file
3542
dep/concurrentqueue/concurrentqueue.h
Normal file
File diff suppressed because it is too large
Load Diff
375
dep/concurrentqueue/samples.md
Normal file
375
dep/concurrentqueue/samples.md
Normal file
@ -0,0 +1,375 @@
|
||||
# Samples for moodycamel::ConcurrentQueue
|
||||
|
||||
Here are some example usage scenarios with sample code. Note that most
|
||||
use the simplest version of each available method for demonstration purposes,
|
||||
but they can all be adapted to use tokens and/or the corresponding bulk methods for
|
||||
extra speed.
|
||||
|
||||
|
||||
## Hello queue
|
||||
|
||||
ConcurrentQueue<int> q;
|
||||
|
||||
for (int i = 0; i != 123; ++i)
|
||||
q.enqueue(i);
|
||||
|
||||
int item;
|
||||
for (int i = 0; i != 123; ++i) {
|
||||
q.try_dequeue(item);
|
||||
assert(item == i);
|
||||
}
|
||||
|
||||
|
||||
## Hello concurrency
|
||||
|
||||
Basic example of how to use the queue from multiple threads, with no
|
||||
particular goal (i.e. it does nothing, but in an instructive way).
|
||||
|
||||
ConcurrentQueue<int> q;
|
||||
int dequeued[100] = { 0 };
|
||||
std::thread threads[20];
|
||||
|
||||
// Producers
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
threads[i] = std::thread([&](int i) {
|
||||
for (int j = 0; j != 10; ++j) {
|
||||
q.enqueue(i * 10 + j);
|
||||
}
|
||||
}, i);
|
||||
}
|
||||
|
||||
// Consumers
|
||||
for (int i = 10; i != 20; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
int item;
|
||||
for (int j = 0; j != 20; ++j) {
|
||||
if (q.try_dequeue(item)) {
|
||||
++dequeued[item];
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for all threads
|
||||
for (int i = 0; i != 20; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
// Collect any leftovers (could be some if e.g. consumers finish before producers)
|
||||
int item;
|
||||
while (q.try_dequeue(item)) {
|
||||
++dequeued[item];
|
||||
}
|
||||
|
||||
// Make sure everything went in and came back out!
|
||||
for (int i = 0; i != 100; ++i) {
|
||||
assert(dequeued[i] == 1);
|
||||
}
|
||||
|
||||
|
||||
## Bulk up
|
||||
|
||||
Same as previous example, but runs faster.
|
||||
|
||||
ConcurrentQueue<int> q;
|
||||
int dequeued[100] = { 0 };
|
||||
std::thread threads[20];
|
||||
|
||||
// Producers
|
||||
for (int i = 0; i != 10; ++i) {
|
||||
threads[i] = std::thread([&](int i) {
|
||||
int items[10];
|
||||
for (int j = 0; j != 10; ++j) {
|
||||
items[j] = i * 10 + j;
|
||||
}
|
||||
q.enqueue_bulk(items, 10);
|
||||
}, i);
|
||||
}
|
||||
|
||||
// Consumers
|
||||
for (int i = 10; i != 20; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
int items[20];
|
||||
for (std::size_t count = q.try_dequeue_bulk(items, 20); count != 0; --count) {
|
||||
++dequeued[items[count - 1]];
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for all threads
|
||||
for (int i = 0; i != 20; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
// Collect any leftovers (could be some if e.g. consumers finish before producers)
|
||||
int items[10];
|
||||
std::size_t count;
|
||||
while ((count = q.try_dequeue_bulk(items, 10)) != 0) {
|
||||
for (std::size_t i = 0; i != count; ++i) {
|
||||
++dequeued[items[i]];
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure everything went in and came back out!
|
||||
for (int i = 0; i != 100; ++i) {
|
||||
assert(dequeued[i] == 1);
|
||||
}
|
||||
|
||||
|
||||
## Producer/consumer model (simultaneous)
|
||||
|
||||
In this model, one set of threads is producing items,
|
||||
and the other is consuming them concurrently until all of
|
||||
them have been consumed. The counters are required to
|
||||
ensure that all items eventually get consumed.
|
||||
|
||||
ConcurrentQueue<Item> q;
|
||||
const int ProducerCount = 8;
|
||||
const int ConsumerCount = 8;
|
||||
std::thread producers[ProducerCount];
|
||||
std::thread consumers[ConsumerCount];
|
||||
std::atomic<int> doneProducers(0);
|
||||
std::atomic<int> doneConsumers(0);
|
||||
for (int i = 0; i != ProducerCount; ++i) {
|
||||
producers[i] = std::thread([&]() {
|
||||
while (produce) {
|
||||
q.enqueue(produceItem());
|
||||
}
|
||||
doneProducers.fetch_add(1, std::memory_order_release);
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != ConsumerCount; ++i) {
|
||||
consumers[i] = std::thread([&]() {
|
||||
Item item;
|
||||
bool itemsLeft;
|
||||
do {
|
||||
// It's important to fence (if the producers have finished) *before* dequeueing
|
||||
itemsLeft = doneProducers.load(std::memory_order_acquire) != ProducerCount;
|
||||
while (q.try_dequeue(item)) {
|
||||
itemsLeft = true;
|
||||
consumeItem(item);
|
||||
}
|
||||
} while (itemsLeft || doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == ConsumerCount);
|
||||
// The condition above is a bit tricky, but it's necessary to ensure that the
|
||||
// last consumer sees the memory effects of all the other consumers before it
|
||||
// calls try_dequeue for the last time
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != ProducerCount; ++i) {
|
||||
producers[i].join();
|
||||
}
|
||||
for (int i = 0; i != ConsumerCount; ++i) {
|
||||
consumers[i].join();
|
||||
}
|
||||
|
||||
## Producer/consumer model (simultaneous, blocking)
|
||||
|
||||
The blocking version is different, since either the number of elements being produced needs
|
||||
to be known ahead of time, or some other coordination is required to tell the consumers when
|
||||
to stop calling wait_dequeue (not shown here). This is necessary because otherwise a consumer
|
||||
could end up blocking forever -- and destroying a queue while a consumer is blocking on it leads
|
||||
to undefined behaviour.
|
||||
|
||||
BlockingConcurrentQueue<Item> q;
|
||||
const int ProducerCount = 8;
|
||||
const int ConsumerCount = 8;
|
||||
std::thread producers[ProducerCount];
|
||||
std::thread consumers[ConsumerCount];
|
||||
std::atomic<int> promisedElementsRemaining(ProducerCount * 1000);
|
||||
for (int i = 0; i != ProducerCount; ++i) {
|
||||
producers[i] = std::thread([&]() {
|
||||
for (int j = 0; j != 1000; ++j) {
|
||||
q.enqueue(produceItem());
|
||||
}
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != ConsumerCount; ++i) {
|
||||
consumers[i] = std::thread([&]() {
|
||||
Item item;
|
||||
while (promisedElementsRemaining.fetch_sub(1, std::memory_order_relaxed)) {
|
||||
q.wait_dequeue(item);
|
||||
consumeItem(item);
|
||||
}
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != ProducerCount; ++i) {
|
||||
producers[i].join();
|
||||
}
|
||||
for (int i = 0; i != ConsumerCount; ++i) {
|
||||
consumers[i].join();
|
||||
}
|
||||
|
||||
|
||||
## Producer/consumer model (separate stages)
|
||||
|
||||
ConcurrentQueue<Item> q;
|
||||
|
||||
// Production stage
|
||||
std::thread threads[8];
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
while (produce) {
|
||||
q.enqueue(produceItem());
|
||||
}
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
// Consumption stage
|
||||
std::atomic<int> doneConsumers(0);
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
Item item;
|
||||
do {
|
||||
while (q.try_dequeue(item)) {
|
||||
consumeItem(item);
|
||||
}
|
||||
// Loop again one last time if we're the last producer (with the acquired
|
||||
// memory effects of the other producers):
|
||||
} while (doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == 8);
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
Note that there's no point trying to use the blocking queue with this model, since
|
||||
there's no need to use the `wait` methods (all the elements are produced before any
|
||||
are consumed), and hence the complexity would be the same but with additional overhead.
|
||||
|
||||
|
||||
## Object pool
|
||||
|
||||
If you don't know what threads will be using the queue in advance,
|
||||
you can't really declare any long-term tokens. The obvious solution
|
||||
is to use the implicit methods (that don't take any tokens):
|
||||
|
||||
// A pool of 'Something' objects that can be safely accessed
|
||||
// from any thread
|
||||
class SomethingPool
|
||||
{
|
||||
public:
|
||||
Something getSomething()
|
||||
{
|
||||
Something obj;
|
||||
queue.try_dequeue(obj);
|
||||
|
||||
// If the dequeue succeeded, obj will be an object from the
|
||||
// thread pool, otherwise it will be the default-constructed
|
||||
// object as declared above
|
||||
return obj;
|
||||
}
|
||||
|
||||
void recycleSomething(Something&& obj)
|
||||
{
|
||||
queue.enqueue(std::move(obj));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
## Threadpool task queue
|
||||
|
||||
BlockingConcurrentQueue<Task> q;
|
||||
|
||||
// To create a task from any thread:
|
||||
q.enqueue(...);
|
||||
|
||||
// On threadpool threads:
|
||||
Task task;
|
||||
while (true) {
|
||||
q.wait_dequeue(task);
|
||||
|
||||
// Process task...
|
||||
}
|
||||
|
||||
|
||||
## Multithreaded game loop
|
||||
|
||||
BlockingConcurrentQueue<Task> q;
|
||||
std::atomic<int> pendingTasks(0);
|
||||
|
||||
// On threadpool threads:
|
||||
Task task;
|
||||
while (true) {
|
||||
q.wait_dequeue(task);
|
||||
|
||||
// Process task...
|
||||
|
||||
pendingTasks.fetch_add(-1, std::memory_order_release);
|
||||
}
|
||||
|
||||
// Whenever a new task needs to be processed for the frame:
|
||||
pendingTasks.fetch_add(1, std::memory_order_release);
|
||||
q.enqueue(...);
|
||||
|
||||
// To wait for all the frame's tasks to complete before rendering:
|
||||
while (pendingTasks.load(std::memory_order_acquire) != 0)
|
||||
continue;
|
||||
|
||||
// Alternatively you could help out the thread pool while waiting:
|
||||
while (pendingTasks.load(std::memory_order_acquire) != 0) {
|
||||
if (!q.try_dequeue(task)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Process task...
|
||||
|
||||
pendingTasks.fetch_add(-1, std::memory_order_release);
|
||||
}
|
||||
|
||||
|
||||
## Pump until empty
|
||||
|
||||
This might be useful if, for example, you want to process any remaining items
|
||||
in the queue before it's destroyed. Note that it is your responsibility
|
||||
to ensure that the memory effects of any enqueue operations you wish to see on
|
||||
the dequeue thread are visible (i.e. if you're waiting for a certain set of elements,
|
||||
you need to use memory fences to ensure that those elements are visible to the dequeue
|
||||
thread after they've been enqueued).
|
||||
|
||||
ConcurrentQueue<Item> q;
|
||||
|
||||
// Single-threaded pumping:
|
||||
Item item;
|
||||
while (q.try_dequeue(item)) {
|
||||
// Process item...
|
||||
}
|
||||
// q is guaranteed to be empty here, unless there is another thread enqueueing still or
|
||||
// there was another thread dequeueing at one point and its memory effects have not
|
||||
// yet been propagated to this thread.
|
||||
|
||||
// Multi-threaded pumping:
|
||||
std::thread threads[8];
|
||||
std::atomic<int> doneConsumers(0);
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i] = std::thread([&]() {
|
||||
Item item;
|
||||
do {
|
||||
while (q.try_dequeue(item)) {
|
||||
// Process item...
|
||||
}
|
||||
} while (doneConsumers.fetch_add(1, std::memory_order_acq_rel) + 1 == 8);
|
||||
// If there are still enqueue operations happening on other threads,
|
||||
// then the queue may not be empty at this point. However, if all enqueue
|
||||
// operations completed before we finished pumping (and the propagation of
|
||||
// their memory effects too), and all dequeue operations apart from those
|
||||
// our threads did above completed before we finished pumping (and the
|
||||
// propagation of their memory effects too), then the queue is guaranteed
|
||||
// to be empty at this point.
|
||||
});
|
||||
}
|
||||
for (int i = 0; i != 8; ++i) {
|
||||
threads[i].join();
|
||||
}
|
||||
|
||||
|
||||
## Wait for a queue to become empty (without dequeueing)
|
||||
|
||||
You can't (robustly) :-) However, you can set up your own atomic counter and
|
||||
poll that instead (see the game loop example). If you're satisfied with merely an estimate, you can use
|
||||
`size_approx()`. Note that `size_approx()` may return 0 even if the queue is
|
||||
not completely empty, unless the queue has already stabilized first (no threads
|
||||
are enqueueing or dequeueing, and all memory effects of any previous operations
|
||||
have been propagated to the thread before it calls `size_approx()`).
|
||||
143
test.cpp
143
test.cpp
@ -10,6 +10,14 @@
|
||||
#include <vector>
|
||||
#include <typeinfo>
|
||||
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
#include <random>
|
||||
|
||||
#include "concurrentqueue.h"
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
enum SpellCastResult
|
||||
@ -252,6 +260,101 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class DispatcherPool
|
||||
{
|
||||
enum TerminationMode
|
||||
{
|
||||
NONE,
|
||||
TERMINATE,
|
||||
AWAIT
|
||||
};
|
||||
|
||||
typedef std::function<void()> Callable;
|
||||
|
||||
std::vector<std::thread> _pool;
|
||||
|
||||
std::atomic<TerminationMode> _shutdown;
|
||||
|
||||
std::mutex _mutex;
|
||||
|
||||
std::condition_variable _condition;
|
||||
|
||||
moodycamel::ConcurrentQueue<Callable> _queue;
|
||||
|
||||
public:
|
||||
DispatcherPool() : DispatcherPool(std::thread::hardware_concurrency()) { }
|
||||
|
||||
DispatcherPool(unsigned int const threads) : _shutdown(NONE)
|
||||
{
|
||||
for (unsigned int i = 0; i < threads; ++i)
|
||||
{
|
||||
_pool.emplace_back([&, i]
|
||||
{
|
||||
Callable callable;
|
||||
while (_shutdown != TERMINATE)
|
||||
{
|
||||
if (_queue.try_dequeue(callable))
|
||||
{
|
||||
std::string msg = "Thread " + std::to_string(i) + " is dispatching...\n";
|
||||
std::cout << msg;
|
||||
callable();
|
||||
}
|
||||
else
|
||||
{
|
||||
if (_shutdown == AWAIT)
|
||||
break;
|
||||
|
||||
{
|
||||
std::string msg = "Thread " + std::to_string(i) + " out of work...\n";
|
||||
std::cout << msg;
|
||||
}
|
||||
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
|
||||
// Lock until new tasks are added
|
||||
_condition.wait(lock);
|
||||
|
||||
{
|
||||
std::string msg = "Thread " + std::to_string(i) + " wakes up...\n";
|
||||
std::cout << msg;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
~DispatcherPool()
|
||||
{
|
||||
Shutdown();
|
||||
}
|
||||
|
||||
template<typename Functional>
|
||||
void Dispatch(Functional&& functional)
|
||||
{
|
||||
_queue.enqueue(std::forward<Functional>(functional));
|
||||
std::unique_lock<std::mutex> lock(_mutex);
|
||||
_condition.notify_one();
|
||||
}
|
||||
|
||||
void Shutdown()
|
||||
{
|
||||
_Shutdown(TERMINATE);
|
||||
}
|
||||
|
||||
void Await()
|
||||
{
|
||||
_Shutdown(AWAIT);
|
||||
}
|
||||
|
||||
void _Shutdown(TerminationMode const mode)
|
||||
{
|
||||
_shutdown = mode;
|
||||
for (auto&& thread : _pool)
|
||||
thread.join();
|
||||
}
|
||||
};
|
||||
|
||||
int main(int /*argc*/, char** /*argv*/)
|
||||
{
|
||||
/*
|
||||
@ -570,5 +673,45 @@ int main(int /*argc*/, char** /*argv*/)
|
||||
|
||||
};
|
||||
|
||||
typedef std::function<void()> Callable;
|
||||
|
||||
typedef std::unique_ptr<Callable> CallableHolder;
|
||||
|
||||
moodycamel::ConcurrentQueue<Callable> producerConsumerQueue;
|
||||
|
||||
Callable cal = []
|
||||
{
|
||||
};
|
||||
|
||||
producerConsumerQueue.enqueue(std::move(cal));
|
||||
|
||||
DispatcherPool pool(3);
|
||||
|
||||
auto const seed = std::chrono::steady_clock::now().time_since_epoch().count();
|
||||
|
||||
std::mt19937 rng(seed);
|
||||
std::uniform_int_distribution<int> gen(10, 100);
|
||||
|
||||
for (unsigned int run = 0; run < 4; ++run)
|
||||
{
|
||||
for (unsigned int i = 0; i < 20; ++i)
|
||||
{
|
||||
unsigned int wait = gen(rng);
|
||||
|
||||
pool.Dispatch([i, run, wait]
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(wait));
|
||||
std::string str = "Pass " + std::to_string(run) + " dispatching " + std::to_string(i) + "\n";
|
||||
std::cout << str;
|
||||
});
|
||||
}
|
||||
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
std::cout << "Awaiting termination\n";
|
||||
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(5));
|
||||
pool.Await();
|
||||
return 0;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user