#include #include #include #include #include #include "gtest/gtest.h" #include "libconcur/circular_queue.h" #include "libimp/log.h" #include "libimp/nameof.h" using namespace concur; TEST(circular_queue, construct) { using queue_t = circular_queue; queue_t q1; EXPECT_TRUE(q1.valid()); EXPECT_TRUE(q1.empty()); EXPECT_EQ(q1.approx_size(), 0); } namespace { template void test_queue_basic() { using queue_t = circular_queue; queue_t q1; EXPECT_TRUE(q1.valid()); EXPECT_TRUE(q1.empty()); EXPECT_EQ(q1.approx_size(), 0); EXPECT_TRUE(q1.push(1)); EXPECT_FALSE(q1.empty()); EXPECT_EQ(q1.approx_size(), 1); int value; EXPECT_TRUE(q1.pop(value)); EXPECT_EQ(value, 1); EXPECT_TRUE(q1.empty()); EXPECT_EQ(q1.approx_size(), 0); int count = 0; auto push = [&q1, &count](int i) { ASSERT_TRUE(q1.push(i)); ASSERT_FALSE(q1.empty()); ++count; ASSERT_EQ(q1.approx_size(), count); }; auto pop = [&q1, &count](int i) { int value; ASSERT_TRUE(q1.pop(value)); ASSERT_EQ(value, i); --count; ASSERT_EQ(q1.approx_size(), count); }; for (int i = 0; i < 1000; ++i) { push(i); } for (int i = 0; i < 1000; ++i) { pop(i); } for (int i = 0; i < default_circle_buffer_size; ++i) { push(i); } ASSERT_FALSE(q1.push(65536)); for (int i = 0; i < default_circle_buffer_size; ++i) { pop(i); } } } // namespace TEST(circular_queue, push_pop) { test_queue_basic(); test_queue_basic(); test_queue_basic(); } namespace { template void test_queue(std::size_t np, std::size_t nc) { LIBIMP_LOG_(); log.info("\n\tStart with: [", imp::nameof(), " - ", imp::nameof(), "]\n\t\t", np, " producers, ", nc, " consumers..."); struct Data { std::uint64_t n; std::uint64_t i; }; circular_queue que; constexpr static std::uint32_t loop_size = 10'0000; std::atomic sum {0}; std::atomic running {np}; auto prod_call = [&](std::size_t n) { for (std::uint32_t i = 1; i <= loop_size; ++i) { std::this_thread::yield(); for (std::uint32_t k = 1; !que.push(Data{n, i}); ++k) { std::this_thread::yield(); if (k % (loop_size / 10) == 0) { log.info("[", n, "] put count: ", i, ", retry: ", k); std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } if (i % (loop_size / 10) == 0) { log.info("[", n, "] put count: ", i); } } --running; }; auto cons_call = [&] { for (;;) { std::this_thread::yield(); Data v; while (!que.pop(v)) { if (running == 0) return; std::this_thread::yield(); } sum += v.i; } }; std::vector prods(np); for (std::size_t n = 0; n < np; ++n) prods[n] = std::thread(prod_call, n); std::vector conss(nc); for (auto &c : conss) c = std::thread(cons_call); for (auto &p : prods) p.join(); for (auto &c : conss) c.join(); EXPECT_EQ(sum, np * (loop_size * std::uint64_t(loop_size + 1)) / 2); } } // namespace TEST(circular_queue, multi_thread) { using namespace concur; /// \brief 1-1 test_queue(1, 1); test_queue(1, 1); test_queue(1, 1); test_queue(1, 1); /// \brief 8-1 test_queue(8, 1); test_queue(8, 1); /// \brief 1-8 test_queue(1, 8); test_queue(1, 8); /// \brief 8-8 test_queue(8, 8); }