upd: improving the broadcasting ut.

This commit is contained in:
mutouyun 2023-04-15 20:23:05 +08:00
parent 32585a39f3
commit 5578f48595

View File

@ -122,12 +122,12 @@ void test_unicast(std::size_t np, std::size_t nc, std::size_t k) {
typename concur::traits<PC>::context ctx {}; typename concur::traits<PC>::context ctx {};
for (;;) { for (;;) {
std::this_thread::yield(); std::this_thread::yield();
std::uint64_t i; std::uint64_t v;
while (!pc.dequeue(imp::make_span(circ), hdr, ctx, i)) { while (!pc.dequeue(imp::make_span(circ), hdr, ctx, v)) {
if (running == 0) return; if (running == 0) return;
std::this_thread::yield(); std::this_thread::yield();
} }
sum += i; sum += v;
} }
}; };
@ -170,55 +170,120 @@ namespace {
template <typename PC> template <typename PC>
void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) { void test_broadcast(std::size_t np, std::size_t nc, std::size_t k) {
LIBIMP_LOG_(); LIBIMP_LOG_();
{
concur::element<std::uint64_t> circ[32] {};
PC pc;
typename concur::traits<PC>::header hdr {imp::make_span(circ)};
ASSERT_TRUE(hdr.valid());
concur::element<std::uint64_t> circ[32] {}; auto push_one = [&, ctx = typename concur::traits<PC>::context{}](std::uint32_t i) mutable {
PC pc; return pc.enqueue(imp::make_span(circ), hdr, ctx, i);
typename concur::traits<PC>::header hdr {imp::make_span(circ)}; };
ASSERT_TRUE(hdr.valid()); auto pop_one = [&, ctx = typename concur::traits<PC>::context{}]() mutable {
std::uint64_t i;
if (pc.dequeue(imp::make_span(circ), hdr, ctx, i)) {
return i;
}
return (std::uint64_t)-1;
};
auto pop_one_2 = pop_one;
auto push_one = [&, ctx = typename concur::traits<PC>::context{}](std::uint32_t i) mutable { // empty queue pop
return pc.enqueue(imp::make_span(circ), hdr, ctx, i); ASSERT_EQ(pop_one(), (std::uint64_t)-1);
};
auto pop_one = [&, ctx = typename concur::traits<PC>::context{}]() mutable { // test one push & pop
std::uint64_t i; for (int i = 0; i < 32; ++i) {
if (pc.dequeue(imp::make_span(circ), hdr, ctx, i)) { ASSERT_TRUE(push_one(i));
return i; ASSERT_EQ(pop_one(), i);
} }
return (std::uint64_t)-1; for (int i = 0; i < 100; ++i) {
}; ASSERT_TRUE(push_one(i));
auto pop_one_2 = pop_one; ASSERT_EQ(pop_one(), i);
}
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
// empty queue pop // test loop push & pop
ASSERT_EQ(pop_one(), (std::uint64_t)-1); for (int i = 0; i < 10; ++i) ASSERT_TRUE(push_one(i));
for (int i = 0; i < 10; ++i) ASSERT_EQ(pop_one(), i);
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
// test one push & pop // other loop pop
for (int i = 0; i < 32; ++i) { for (int i = 0; i < 32; ++i) ASSERT_TRUE(push_one(i));
ASSERT_TRUE(push_one(i)); for (int i = 0; i < 32; ++i) ASSERT_EQ(pop_one_2(), i);
ASSERT_EQ(pop_one(), i); ASSERT_EQ(pop_one_2(), (std::uint64_t)-1);
// overwrite
ASSERT_TRUE(push_one(123));
for (int i = 1; i < 32; ++i) {
ASSERT_EQ(pop_one(), i);
}
ASSERT_EQ(pop_one(), 123);
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
} }
for (int i = 0; i < 100; ++i) {
ASSERT_TRUE(push_one(i)); log.info("\n\tStart with: ", imp::nameof<PC>(), ", ", np, " producers, ", nc, " consumers...");
ASSERT_EQ(pop_one(), i); {
struct Data {
std::uint64_t n;
std::uint64_t i;
};
concur::element<Data> circ[32] {};
PC pc;
typename concur::traits<PC>::header hdr {imp::make_span(circ)};
ASSERT_TRUE(hdr.valid());
constexpr static std::uint32_t loop_size = 100'0000;
std::atomic<std::uint64_t> sum {0};
std::atomic<std::size_t> running {np};
std::array<std::atomic<std::size_t>, 64> counters {};
auto prod_call = [&](std::size_t n) {
typename concur::traits<PC>::context ctx {};
for (std::uint32_t i = 1; i <= loop_size; ++i) {
std::this_thread::yield();
counters[n] = 0;
do {
ASSERT_TRUE(pc.enqueue(imp::make_span(circ), hdr, ctx, Data{n, i}));
std::this_thread::yield();
// We need to wait for the consumer to consume the data.
} while (counters[n] < nc);
if (i % (loop_size / 10) == 0) {
log.info("[", n, "] put count: ", i);
}
}
--running;
};
auto cons_call = [&] {
typename concur::traits<PC>::context ctx {};
std::array<std::uint64_t, 64> last_i {};
for (;;) {
std::this_thread::yield();
Data v;
while (!pc.dequeue(imp::make_span(circ), hdr, ctx, v)) {
if (running == 0) return;
std::this_thread::yield();
}
// The v.i variable always increases.
if (last_i[v.n] >= v.i) {
continue;
}
last_i[v.n] = v.i;
sum += v.i;
++counters[v.n];
}
};
std::vector<std::thread> prods(np);
for (std::size_t n = 0; n < np; ++n) prods[n] = std::thread(prod_call, n);
std::vector<std::thread> conss(nc);
for (auto &c : conss) c = std::thread(cons_call);
for (auto &p : prods) p.join();
for (auto &c : conss) c.join();
EXPECT_EQ(sum, k * np * (loop_size * std::uint64_t(loop_size + 1)) / 2);
} }
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
// test loop push & pop
for (int i = 0; i < 10; ++i) ASSERT_TRUE(push_one(i));
for (int i = 0; i < 10; ++i) ASSERT_EQ(pop_one(), i);
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
// other loop pop
for (int i = 0; i < 32; ++i) ASSERT_TRUE(push_one(i));
for (int i = 0; i < 32; ++i) ASSERT_EQ(pop_one_2(), i);
ASSERT_EQ(pop_one_2(), (std::uint64_t)-1);
// overwrite
ASSERT_TRUE(push_one(123));
for (int i = 1; i < 32; ++i) {
ASSERT_EQ(pop_one(), i);
}
ASSERT_EQ(pop_one(), 123);
ASSERT_EQ(pop_one(), (std::uint64_t)-1);
} }
} // namespace } // namespace