From 29678f1d4152f2b83ced537b26eedad8015fe6c6 Mon Sep 17 00:00:00 2001 From: mutouyun Date: Sun, 17 Nov 2024 17:36:09 +0800 Subject: [PATCH] Added a cleanup interface for queue --- src/libipc/queue.h | 4 + test/test_queue.cpp | 615 ++++++++++++++++++++++---------------------- 2 files changed, 315 insertions(+), 304 deletions(-) diff --git a/src/libipc/queue.h b/src/libipc/queue.h index e38fe77..506b6ae 100755 --- a/src/libipc/queue.h +++ b/src/libipc/queue.h @@ -55,6 +55,10 @@ public: queue_conn(const queue_conn&) = delete; queue_conn& operator=(const queue_conn&) = delete; + void clear() noexcept { + elems_h_.clear(); + } + bool connected() const noexcept { return connected_ != 0; } diff --git a/test/test_queue.cpp b/test/test_queue.cpp index e85d39f..6a83b6a 100755 --- a/test/test_queue.cpp +++ b/test/test_queue.cpp @@ -1,304 +1,311 @@ -#include -#include -#include -#include -#include -#include -#include -#include // CHAR_BIT - -#include "libipc/prod_cons.h" -#include "libipc/policy.h" -#include "libipc/circ/elem_array.h" -#include "libipc/queue.h" - -#include "test.h" - -namespace { - -struct msg_t { - int pid_; - int dat_; - - msg_t() = default; - msg_t(int p, int d) : pid_(p), dat_(d) {} -}; - -template -using queue_t = ipc::queue>>; - -template -struct elems_t : public queue_t::elems_t {}; - -bool operator==(msg_t const & m1, msg_t const & m2) noexcept { - return (m1.pid_ == m2.pid_) && (m1.dat_ == m2.dat_); -} - -bool operator!=(msg_t const & m1, msg_t const & m2) noexcept { - return !(m1 == m2); -} - -constexpr int LoopCount = 1000000; -constexpr int PushRetry = 1000000; -constexpr int ThreadMax = 8; - -template -void push(Que & que, int p, int d) { - for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) { - ASSERT_NE(n, PushRetry); - std::this_thread::yield(); - } -} - -template -msg_t pop(Que & que) { - msg_t msg; - while (!que.pop(msg)) { - std::this_thread::yield(); - } - return msg; -} - -template -struct quitter; - -template <> -struct quitter { - template - static void emit(Que && que, int r_cnt) { - for (int k = 0; k < r_cnt; ++k) { - push(que, -1, -1); - } - } -}; - -template <> -struct quitter { - template - static void emit(Que && que, int /*r_cnt*/) { - push(que, -1, -1); - } -}; - -template -void test_sr(elems_t && elems, int s_cnt, int r_cnt, char const * message) { - ipc_ut::sender().start(static_cast(s_cnt)); - ipc_ut::reader().start(static_cast(r_cnt)); - ipc_ut::test_stopwatch sw; - - for (int k = 0; k < s_cnt; ++k) { - ipc_ut::sender() << [&elems, &sw, r_cnt, k] { - queue_t que { &elems }; - while (que.conn_count() != static_cast(r_cnt)) { - std::this_thread::yield(); - } - sw.start(); - for (int i = 0; i < LoopCount; ++i) { - push(que, k, i); - } - }; - } - for (int k = 0; k < r_cnt; ++k) { - ipc_ut::reader() << [&elems, k] { - queue_t que { &elems }; - ASSERT_TRUE(que.connect()); - while (pop(que).pid_ >= 0) ; - ASSERT_TRUE(que.disconnect()); - }; - } - - ipc_ut::sender().wait_for_done(); - quitter::emit(queue_t { &elems }, r_cnt); - ipc_ut::reader().wait_for_done(); - sw.print_elapsed(s_cnt, r_cnt, LoopCount, message); -} - -} // internal-linkage - -TEST(Queue, check_size) { - using el_t = elems_t; - - std::cout << "cq_t::head_size = " << el_t::head_size << std::endl; - std::cout << "cq_t::data_size = " << el_t::data_size << std::endl; - std::cout << "cq_t::elem_size = " << el_t::elem_size << std::endl; - std::cout << "cq_t::block_size = " << el_t::block_size << std::endl; - - EXPECT_EQ(static_cast(el_t::data_size), sizeof(msg_t)); - - std::cout << "sizeof(elems_t) = " << sizeof(el_t) << std::endl; -} - -TEST(Queue, el_connection) { - { - elems_t el; - EXPECT_TRUE(el.connect_sender()); - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_FALSE(el.connect_sender()); - } - el.disconnect_sender(); - EXPECT_TRUE(el.connect_sender()); - } - { - elems_t el; - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_TRUE(el.connect_sender()); - } - } - { - elems_t el; - auto cc = el.connect_receiver(); - EXPECT_NE(cc, 0); - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_EQ(el.connect_receiver(), 0); - } - EXPECT_EQ(el.disconnect_receiver(cc), 0); - EXPECT_EQ(el.connect_receiver(), cc); - } - { - elems_t el; - for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { - ASSERT_NE(el.connect_receiver(), 0); - } - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_EQ(el.connect_receiver(), 0); - } - } -} - -TEST(Queue, connection) { - { - elems_t el; - queue_t que{&el}; - // sending - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_TRUE(que.ready_sending()); - } - for (std::size_t i = 0; i < 10000; ++i) { - queue_t que{&el}; - ASSERT_FALSE(que.ready_sending()); - } - for (std::size_t i = 0; i < 10000; ++i) { - que.shut_sending(); - } - { - queue_t que{&el}; - EXPECT_TRUE(que.ready_sending()); - } - // receiving - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_TRUE(que.connect()); - } - for (std::size_t i = 0; i < 10000; ++i) { - queue_t que{&el}; - ASSERT_FALSE(que.connect()); - } - EXPECT_TRUE(que.disconnect()); - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_FALSE(que.disconnect()); - } - { - queue_t que{&el}; - EXPECT_TRUE(que.connect()); - } - for (std::size_t i = 0; i < 10000; ++i) { - queue_t que{&el}; - ASSERT_FALSE(que.connect()); - } - } - { - elems_t el; - queue_t que{&el}; - // sending - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_TRUE(que.ready_sending()); - } - for (std::size_t i = 0; i < 10000; ++i) { - queue_t que{&el}; - ASSERT_TRUE(que.ready_sending()); - } - for (std::size_t i = 0; i < 10000; ++i) { - que.shut_sending(); - } - for (std::size_t i = 0; i < 10000; ++i) { - queue_t que{&el}; - ASSERT_TRUE(que.ready_sending()); - } - // receiving - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_TRUE(que.connect()); - } - for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { - queue_t que{&el}; - ASSERT_TRUE(que.connect()); - } - for (std::size_t i = 0; i < 10000; ++i) { - queue_t que{&el}; - ASSERT_FALSE(que.connect()); - } - ASSERT_TRUE(que.disconnect()); - for (std::size_t i = 0; i < 10000; ++i) { - ASSERT_FALSE(que.disconnect()); - } - { - queue_t que{&el}; - ASSERT_TRUE(que.connect()); - } - for (std::size_t i = 0; i < 10000; ++i) { - queue_t que{&el}; - ASSERT_FALSE(que.connect()); - } - } -} - -TEST(Queue, prod_cons_1v1_unicast) { - test_sr(elems_t{}, 1, 1, "ssu"); - test_sr(elems_t{}, 1, 1, "smu"); - test_sr(elems_t{}, 1, 1, "mmu"); -} - -TEST(Queue, prod_cons_1v1_broadcast) { - test_sr(elems_t{}, 1, 1, "smb"); - test_sr(elems_t{}, 1, 1, "mmb"); -} - -TEST(Queue, prod_cons_1vN_unicast) { - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, 1, i, "smu"); - } - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, 1, i, "mmu"); - } -} - -TEST(Queue, prod_cons_1vN_broadcast) { - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, 1, i, "smb"); - } - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, 1, i, "mmb"); - } -} - -TEST(Queue, prod_cons_NvN_unicast) { - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, 1, i, "mmu"); - } - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, i, 1, "mmu"); - } - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, i, i, "mmu"); - } -} - -TEST(Queue, prod_cons_NvN_broadcast) { - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, 1, i, "mmb"); - } - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, i, 1, "mmb"); - } - for (int i = 1; i <= ThreadMax; ++i) { - test_sr(elems_t{}, i, i, "mmb"); - } -} +#include +#include +#include +#include +#include +#include +#include +#include // CHAR_BIT + +#include "libipc/prod_cons.h" +#include "libipc/policy.h" +#include "libipc/circ/elem_array.h" +#include "libipc/queue.h" + +#include "test.h" + +namespace { + +struct msg_t { + int pid_; + int dat_; + + msg_t() = default; + msg_t(int p, int d) : pid_(p), dat_(d) {} +}; + +template +using queue_t = ipc::queue>>; + +template +struct elems_t : public queue_t::elems_t {}; + +bool operator==(msg_t const & m1, msg_t const & m2) noexcept { + return (m1.pid_ == m2.pid_) && (m1.dat_ == m2.dat_); +} + +bool operator!=(msg_t const & m1, msg_t const & m2) noexcept { + return !(m1 == m2); +} + +constexpr int LoopCount = 1000000; +constexpr int PushRetry = 1000000; +constexpr int ThreadMax = 8; + +template +void push(Que & que, int p, int d) { + for (int n = 0; !que.push([](void*) { return true; }, p, d); ++n) { + ASSERT_NE(n, PushRetry); + std::this_thread::yield(); + } +} + +template +msg_t pop(Que & que) { + msg_t msg; + while (!que.pop(msg)) { + std::this_thread::yield(); + } + return msg; +} + +template +struct quitter; + +template <> +struct quitter { + template + static void emit(Que && que, int r_cnt) { + for (int k = 0; k < r_cnt; ++k) { + push(que, -1, -1); + } + } +}; + +template <> +struct quitter { + template + static void emit(Que && que, int /*r_cnt*/) { + push(que, -1, -1); + } +}; + +template +void test_sr(elems_t && elems, int s_cnt, int r_cnt, char const * message) { + ipc_ut::sender().start(static_cast(s_cnt)); + ipc_ut::reader().start(static_cast(r_cnt)); + ipc_ut::test_stopwatch sw; + + for (int k = 0; k < s_cnt; ++k) { + ipc_ut::sender() << [&elems, &sw, r_cnt, k] { + queue_t que { &elems }; + while (que.conn_count() != static_cast(r_cnt)) { + std::this_thread::yield(); + } + sw.start(); + for (int i = 0; i < LoopCount; ++i) { + push(que, k, i); + } + }; + } + for (int k = 0; k < r_cnt; ++k) { + ipc_ut::reader() << [&elems, k] { + queue_t que { &elems }; + ASSERT_TRUE(que.connect()); + while (pop(que).pid_ >= 0) ; + ASSERT_TRUE(que.disconnect()); + }; + } + + ipc_ut::sender().wait_for_done(); + quitter::emit(queue_t { &elems }, r_cnt); + ipc_ut::reader().wait_for_done(); + sw.print_elapsed(s_cnt, r_cnt, LoopCount, message); +} + +} // internal-linkage + +TEST(Queue, check_size) { + using el_t = elems_t; + + std::cout << "cq_t::head_size = " << el_t::head_size << std::endl; + std::cout << "cq_t::data_size = " << el_t::data_size << std::endl; + std::cout << "cq_t::elem_size = " << el_t::elem_size << std::endl; + std::cout << "cq_t::block_size = " << el_t::block_size << std::endl; + + EXPECT_EQ(static_cast(el_t::data_size), sizeof(msg_t)); + + std::cout << "sizeof(elems_t) = " << sizeof(el_t) << std::endl; +} + +TEST(Queue, el_connection) { + { + elems_t el; + EXPECT_TRUE(el.connect_sender()); + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_FALSE(el.connect_sender()); + } + el.disconnect_sender(); + EXPECT_TRUE(el.connect_sender()); + } + { + elems_t el; + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_TRUE(el.connect_sender()); + } + } + { + elems_t el; + auto cc = el.connect_receiver(); + EXPECT_NE(cc, 0); + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_EQ(el.connect_receiver(), 0); + } + EXPECT_EQ(el.disconnect_receiver(cc), 0); + EXPECT_EQ(el.connect_receiver(), cc); + } + { + elems_t el; + for (std::size_t i = 0; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { + ASSERT_NE(el.connect_receiver(), 0); + } + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_EQ(el.connect_receiver(), 0); + } + } +} + +TEST(Queue, connection) { + { + elems_t el; + queue_t que{&el}; + // sending + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_TRUE(que.ready_sending()); + } + for (std::size_t i = 0; i < 10000; ++i) { + queue_t que{&el}; + ASSERT_FALSE(que.ready_sending()); + } + for (std::size_t i = 0; i < 10000; ++i) { + que.shut_sending(); + } + { + queue_t que{&el}; + EXPECT_TRUE(que.ready_sending()); + } + // receiving + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_TRUE(que.connect()); + } + for (std::size_t i = 0; i < 10000; ++i) { + queue_t que{&el}; + ASSERT_FALSE(que.connect()); + } + EXPECT_TRUE(que.disconnect()); + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_FALSE(que.disconnect()); + } + { + queue_t que{&el}; + EXPECT_TRUE(que.connect()); + } + for (std::size_t i = 0; i < 10000; ++i) { + queue_t que{&el}; + ASSERT_FALSE(que.connect()); + } + } + { + elems_t el; + queue_t que{&el}; + // sending + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_TRUE(que.ready_sending()); + } + for (std::size_t i = 0; i < 10000; ++i) { + queue_t que{&el}; + ASSERT_TRUE(que.ready_sending()); + } + for (std::size_t i = 0; i < 10000; ++i) { + que.shut_sending(); + } + for (std::size_t i = 0; i < 10000; ++i) { + queue_t que{&el}; + ASSERT_TRUE(que.ready_sending()); + } + // receiving + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_TRUE(que.connect()); + } + for (std::size_t i = 1; i < (sizeof(ipc::circ::cc_t) * CHAR_BIT); ++i) { + queue_t que{&el}; + ASSERT_TRUE(que.connect()); + } + for (std::size_t i = 0; i < 10000; ++i) { + queue_t que{&el}; + ASSERT_FALSE(que.connect()); + } + ASSERT_TRUE(que.disconnect()); + for (std::size_t i = 0; i < 10000; ++i) { + ASSERT_FALSE(que.disconnect()); + } + { + queue_t que{&el}; + ASSERT_TRUE(que.connect()); + } + for (std::size_t i = 0; i < 10000; ++i) { + queue_t que{&el}; + ASSERT_FALSE(que.connect()); + } + } +} + +TEST(Queue, prod_cons_1v1_unicast) { + test_sr(elems_t{}, 1, 1, "ssu"); + test_sr(elems_t{}, 1, 1, "smu"); + test_sr(elems_t{}, 1, 1, "mmu"); +} + +TEST(Queue, prod_cons_1v1_broadcast) { + test_sr(elems_t{}, 1, 1, "smb"); + test_sr(elems_t{}, 1, 1, "mmb"); +} + +TEST(Queue, prod_cons_1vN_unicast) { + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, 1, i, "smu"); + } + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, 1, i, "mmu"); + } +} + +TEST(Queue, prod_cons_1vN_broadcast) { + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, 1, i, "smb"); + } + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, 1, i, "mmb"); + } +} + +TEST(Queue, prod_cons_NvN_unicast) { + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, 1, i, "mmu"); + } + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, i, 1, "mmu"); + } + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, i, i, "mmu"); + } +} + +TEST(Queue, prod_cons_NvN_broadcast) { + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, 1, i, "mmb"); + } + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, i, 1, "mmb"); + } + for (int i = 1; i <= ThreadMax; ++i) { + test_sr(elems_t{}, i, i, "mmb"); + } +} + +TEST(Queue, clear) { + queue_t que{"test-queue-clear"}; + EXPECT_TRUE(ipc_ut::expect_exist("test-queue-clear", true)); + que.clear(); + EXPECT_TRUE(ipc_ut::expect_exist("test-queue-clear", false)); +}