diff --git a/test/concur/test_concur_concurrent.cpp b/test/concur/test_concur_concurrent.cpp index 80c6210..91f16b7 100644 --- a/test/concur/test_concur_concurrent.cpp +++ b/test/concur/test_concur_concurrent.cpp @@ -122,12 +122,12 @@ void test_unicast(std::size_t np, std::size_t nc, std::size_t k) { typename concur::traits::context ctx {}; for (;;) { std::this_thread::yield(); - std::uint64_t i; - while (!pc.dequeue(imp::make_span(circ), hdr, ctx, i)) { + std::uint64_t v; + while (!pc.dequeue(imp::make_span(circ), hdr, ctx, v)) { if (running == 0) return; std::this_thread::yield(); } - sum += i; + sum += v; } }; @@ -170,55 +170,120 @@ namespace { template void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) { LIBIMP_LOG_(); + { + concur::element circ[32] {}; + PC pc; + typename concur::traits::header hdr {imp::make_span(circ)}; + ASSERT_TRUE(hdr.valid()); - concur::element circ[32] {}; - PC pc; - typename concur::traits::header hdr {imp::make_span(circ)}; - ASSERT_TRUE(hdr.valid()); + auto push_one = [&, ctx = typename concur::traits::context{}](std::uint32_t i) mutable { + return pc.enqueue(imp::make_span(circ), hdr, ctx, i); + }; + auto pop_one = [&, ctx = typename concur::traits::context{}]() mutable { + std::uint64_t i; + if (pc.dequeue(imp::make_span(circ), hdr, ctx, i)) { + return i; + } + return (std::uint64_t)-1; + }; + auto pop_one_2 = pop_one; - auto push_one = [&, ctx = typename concur::traits::context{}](std::uint32_t i) mutable { - return pc.enqueue(imp::make_span(circ), hdr, ctx, i); - }; - auto pop_one = [&, ctx = typename concur::traits::context{}]() mutable { - std::uint64_t i; - if (pc.dequeue(imp::make_span(circ), hdr, ctx, i)) { - return i; + // empty queue pop + ASSERT_EQ(pop_one(), (std::uint64_t)-1); + + // test one push & pop + for (int i = 0; i < 32; ++i) { + ASSERT_TRUE(push_one(i)); + ASSERT_EQ(pop_one(), i); } - return (std::uint64_t)-1; - }; - auto pop_one_2 = pop_one; + for (int i = 0; i < 100; ++i) { + ASSERT_TRUE(push_one(i)); + ASSERT_EQ(pop_one(), i); + } + ASSERT_EQ(pop_one(), (std::uint64_t)-1); - // empty queue pop - ASSERT_EQ(pop_one(), (std::uint64_t)-1); + // test loop push & pop + for (int i = 0; i < 10; ++i) ASSERT_TRUE(push_one(i)); + for (int i = 0; i < 10; ++i) ASSERT_EQ(pop_one(), i); + ASSERT_EQ(pop_one(), (std::uint64_t)-1); - // test one push & pop - for (int i = 0; i < 32; ++i) { - ASSERT_TRUE(push_one(i)); - ASSERT_EQ(pop_one(), i); + // other loop pop + for (int i = 0; i < 32; ++i) ASSERT_TRUE(push_one(i)); + for (int i = 0; i < 32; ++i) ASSERT_EQ(pop_one_2(), i); + ASSERT_EQ(pop_one_2(), (std::uint64_t)-1); + + // overwrite + ASSERT_TRUE(push_one(123)); + for (int i = 1; i < 32; ++i) { + ASSERT_EQ(pop_one(), i); + } + ASSERT_EQ(pop_one(), 123); + ASSERT_EQ(pop_one(), (std::uint64_t)-1); } - for (int i = 0; i < 100; ++i) { - ASSERT_TRUE(push_one(i)); - ASSERT_EQ(pop_one(), i); + + log.info("\n\tStart with: ", imp::nameof(), ", ", np, " producers, ", nc, " consumers..."); + { + struct Data { + std::uint64_t n; + std::uint64_t i; + }; + concur::element circ[32] {}; + PC pc; + typename concur::traits::header hdr {imp::make_span(circ)}; + ASSERT_TRUE(hdr.valid()); + + constexpr static std::uint32_t loop_size = 100'0000; + + std::atomic sum {0}; + std::atomic running {np}; + std::array, 64> counters {}; + + auto prod_call = [&](std::size_t n) { + typename concur::traits::context ctx {}; + for (std::uint32_t i = 1; i <= loop_size; ++i) { + std::this_thread::yield(); + counters[n] = 0; + do { + ASSERT_TRUE(pc.enqueue(imp::make_span(circ), hdr, ctx, Data{n, i})); + std::this_thread::yield(); + // We need to wait for the consumer to consume the data. + } while (counters[n] < nc); + if (i % (loop_size / 10) == 0) { + log.info("[", n, "] put count: ", i); + } + } + --running; + }; + auto cons_call = [&] { + typename concur::traits::context ctx {}; + std::array last_i {}; + for (;;) { + std::this_thread::yield(); + Data v; + while (!pc.dequeue(imp::make_span(circ), hdr, ctx, v)) { + if (running == 0) return; + std::this_thread::yield(); + } + // The v.i variable always increases. + if (last_i[v.n] >= v.i) { + continue; + } + last_i[v.n] = v.i; + sum += v.i; + ++counters[v.n]; + } + }; + + std::vector prods(np); + for (std::size_t n = 0; n < np; ++n) prods[n] = std::thread(prod_call, n); + std::vector conss(nc); + for (auto &c : conss) c = std::thread(cons_call); + + for (auto &p : prods) p.join(); + for (auto &c : conss) c.join(); + + EXPECT_EQ(sum, k * np * (loop_size * std::uint64_t(loop_size + 1)) / 2); } - ASSERT_EQ(pop_one(), (std::uint64_t)-1); - - // test loop push & pop - for (int i = 0; i < 10; ++i) ASSERT_TRUE(push_one(i)); - for (int i = 0; i < 10; ++i) ASSERT_EQ(pop_one(), i); - ASSERT_EQ(pop_one(), (std::uint64_t)-1); - - // other loop pop - for (int i = 0; i < 32; ++i) ASSERT_TRUE(push_one(i)); - for (int i = 0; i < 32; ++i) ASSERT_EQ(pop_one_2(), i); - ASSERT_EQ(pop_one_2(), (std::uint64_t)-1); - - // overwrite - ASSERT_TRUE(push_one(123)); - for (int i = 1; i < 32; ++i) { - ASSERT_EQ(pop_one(), i); - } - ASSERT_EQ(pop_one(), 123); - ASSERT_EQ(pop_one(), (std::uint64_t)-1); } } // namespace