diff --git a/test.cpp b/test.cpp index 6752cca..b314734 100644 --- a/test.cpp +++ b/test.cpp @@ -290,10 +290,13 @@ public: { _pool.emplace_back([&, i] { + // Reserve the consumer token + moodycamel::ConsumerToken token(_queue); + Callable callable; while (_shutdown != TERMINATE) { - if (_queue.try_dequeue(callable)) + if (_queue.try_dequeue(token, callable)) { std::string msg = "Thread " + std::to_string(i) + " is dispatching...\n"; std::cout << msg; @@ -674,17 +677,7 @@ int main(int /*argc*/, char** /*argv*/) }; - typedef std::function Callable; - - typedef std::unique_ptr CallableHolder; - - moodycamel::ConcurrentQueue producerConsumerQueue; - - Callable cal = [] - { - }; - - producerConsumerQueue.enqueue(std::move(cal)); + DispatcherPool countPool(1); DispatcherPool pool(3); @@ -693,17 +686,28 @@ int main(int /*argc*/, char** /*argv*/) std::mt19937 rng(seed); std::uniform_int_distribution gen(10, 150); + std::vector container; + + unsigned int counter = 0; + for (unsigned int run = 0; run < 2; ++run) { for (unsigned int i = 0; i < 20; ++i) { unsigned int wait = gen(rng); - pool.Dispatch([i, run, wait] + ++counter; + + pool.Dispatch([&countPool, &container, i, run, wait] { std::this_thread::sleep_for(std::chrono::milliseconds(wait)); std::string str = "Pass " + std::to_string(run) + " dispatching " + std::to_string(i) + " (" + std::to_string(wait) + "ms delay)" + "\n"; std::cout << str; + + countPool.Dispatch([&container, wait] + { + container.emplace_back(wait); + }); }); } @@ -714,5 +718,8 @@ int main(int /*argc*/, char** /*argv*/) // std::this_thread::sleep_for(std::chrono::seconds(5)); pool.Await(); + countPool.Await(); + + std::cout << container.size() << " == " << counter; return 0; }