#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #if defined(__GNUC__) # include // abi::__cxa_demangle #endif/*__GNUC__*/ #include "stopwatch.hpp" #include "spin_lock.hpp" #include "random.hpp" #include "ipc.h" #include "rw_lock.h" #include "test.h" namespace { std::vector datas__; constexpr int DataMin = 2; constexpr int DataMax = 256; constexpr int LoopCount = 100000; } // internal-linkage template <> struct test_cq { using cn_t = ipc::channel; std::string conn_name_; test_cq(void*) : conn_name_("test-ipc-channel") { auto watcher = connect(); QCOMPARE(watcher.recv_count(), static_cast(0)); } cn_t connect() { return { conn_name_.c_str() }; } void disconnect(cn_t& cn) { cn.disconnect(); } void wait_start(int M) { auto watcher = connect(); while (watcher.recv_count() != static_cast(M)) { std::this_thread::yield(); } } template void recv(cn_t& cn, F&& proc) { do { auto msg = cn.recv(); if (msg.size() < 2) return; proc(msg); } while(1); } void send(const std::array& info) { thread_local auto cn = connect(); int n = info[1]; if (n < 0) { cn.send(ipc::buff_t { '\0' }); } else cn.send(datas__[static_cast(n)]); } }; template <> struct test_verify { std::unordered_map> list_; int lcount_; test_verify(int M) : lcount_{ M } {} void prepare(void* pt) { std::cout << "start consumer: " << pt << std::endl; } void push_data(int cid, ipc::buff_t const & msg) { list_[cid].emplace_back(std::move(msg)); } void verify(int /*N*/, int /*Loops*/) { std::cout << "verifying..." << std::endl; for (int m = 0; m < lcount_; ++m) { QCOMPARE(datas__, list_[m]); } } }; namespace { class Unit : public TestSuite { Q_OBJECT const char* name() const { return "test_ipc"; } private slots: void initTestCase(); void cleanupTestCase(); void test_rw_lock(); void test_send_recv(); void test_channel(); void test_channel_performance(); } unit__; #include "test_ipc.moc" void Unit::initTestCase() { TestSuite::initTestCase(); capo::random<> rdm { DataMin, DataMax }; capo::random<> bit { 0, (std::numeric_limits::max)() }; for (int i = 0; i < LoopCount; ++i) { auto n = rdm(); ipc::buff_t buff(static_cast(n)); for (std::size_t k = 0; k < buff.size(); ++k) { buff[k] = static_cast(bit()); } datas__.emplace_back(std::move(buff)); } } void Unit::cleanupTestCase() { datas__.clear(); } template constexpr T acc(T b, T e) { return (e + b) * (e - b + 1) / 2; } template struct lc_wrapper : Mutex { void lock_shared () { Mutex::lock (); } void unlock_shared() { Mutex::unlock(); } }; template void benchmark_lc() { std::thread w_trd[W]; std::thread r_trd[R]; std::atomic_int fini { 0 }; // std::atomic_bool wf { false }; std::vector datas; Lc lc; test_stopwatch sw; #if defined(__GNUC__) { const char* typeid_name = typeid(Lc).name(); const char* real_name = abi::__cxa_demangle(typeid_name, nullptr, nullptr, nullptr); std::unique_ptr guard { (void*)real_name, ::free }; if (real_name == nullptr) real_name = typeid_name; std::cout << std::endl << real_name << std::endl; } #else std::cout << std::endl << typeid(Lc).name() << std::endl; #endif/*__GNUC__*/ for (auto& t : r_trd) { t = std::thread([&] { std::vector seq; std::size_t cnt = 0; while (1) { int x = -1; { std::shared_lock guard { lc }; // QVERIFY(!wf); if (cnt < datas.size()) { x = datas[cnt]; } // std::this_thread::sleep_for(std::chrono::milliseconds(1)); if (x == 0) break; // quit if (x != -1) { seq.push_back(x); ++cnt; } } std::this_thread::yield(); } if (++fini == std::extent::value) { sw.print_elapsed(W, R, Loops); } std::uint64_t sum = 0; for (int i : seq) sum += static_cast(i); QCOMPARE(sum, acc(1, Loops) * std::extent::value); }); } for (auto& t : w_trd) { t = std::thread([&] { sw.start(); for (int i = 1; i <= Loops; ++i) { { std::unique_lock guard { lc }; // wf = true; datas.push_back(i); // std::this_thread::sleep_for(std::chrono::milliseconds(1)); // wf = false; } std::this_thread::yield(); } }); } for (auto& t : w_trd) t.join(); lc.lock(); datas.push_back(0); lc.unlock(); for (auto& t : r_trd) t.join(); } template void test_lock_performance() { std::cout << std::endl << "test_lock_performance: [" << W << ":" << R << "]" << std::endl; benchmark_lc(); benchmark_lc, W, R>(); benchmark_lc , W, R>(); benchmark_lc(); } void Unit::test_rw_lock() { test_lock_performance<1, 1>(); test_lock_performance<4, 4>(); test_lock_performance<1, 8>(); test_lock_performance<8, 1>(); } void Unit::test_send_recv() { auto h = ipc::connect("my-ipc"); QVERIFY(h != nullptr); char data[] = "hello ipc!"; QVERIFY(ipc::send(h, data, sizeof(data))); auto got = ipc::recv(h); QCOMPARE((char*)got.data(), data); ipc::disconnect(h); } void Unit::test_channel() { auto wait_for_handshake = [](int id) { ipc::channel cc { "my-ipc-channel" }; std::string cfm = "copy:" + std::to_string(id), ack = "re-" + cfm; std::atomic_bool unmatched { true }; std::thread re {[&] { bool has_re = false; do { auto dd = cc.recv(); QVERIFY(!dd.empty()); std::string got { reinterpret_cast(dd.data()), dd.size() - 1 }; if (cfm == got) continue; std::cout << id << "-recv: " << got << "[" << dd.size() << "]" << std::endl; if (ack != got) { char const cp[] = "copy:"; // check header if (std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0) { std::cout << id << "-re: " << got << std::endl; QVERIFY(has_re = cc.send( std::string{ "re-" }.append( reinterpret_cast(dd.data()), dd.size() - 1))); } } else if (unmatched.load(std::memory_order_relaxed)) { unmatched.store(false, std::memory_order_release); std::cout << id << "-matched!" << std::endl; } } while (!has_re || unmatched.load(std::memory_order_relaxed)); }}; while (unmatched.load(std::memory_order_acquire)) { if (!cc.send(cfm)) { std::cout << id << "-send failed!" << std::endl; unmatched = false; break; } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } re.join(); std::cout << id << "-fini handshake!" << std::endl; return cc; }; std::vector const datas = { "hello!", "foo", "bar", "ISO/IEC", "14882:2011", "ISO/IEC 14882:2017 Information technology - Programming languages - C++", "ISO/IEC 14882:2020", "Modern C++ Design: Generic Programming and Design Patterns Applied" }; std::thread t1 {[&] { auto cc = wait_for_handshake(1); const char cp[] = "copy:", re[] = "re-copy:"; bool unchecked = true; for (std::size_t i = 0; i < datas.size(); ++i, unchecked = false) { ipc::buff_t dd; do { dd = cc.recv(); } while (unchecked && (((dd.size() > sizeof(cp)) && std::memcmp(dd.data(), cp, sizeof(cp) - 1) == 0) || ((dd.size() > sizeof(re)) && std::memcmp(dd.data(), re, sizeof(re) - 1) == 0))); QCOMPARE(dd.size(), std::strlen(datas[i]) + 1); QVERIFY(std::memcmp(dd.data(), datas[i], dd.size()) == 0); } }}; std::thread t2 {[&] { auto cc = wait_for_handshake(2); for (std::size_t i = 0; i < datas.size(); ++i) { std::cout << "sending: " << datas[i] << std::endl; cc.send(datas[i]); } }}; t1.join(); t2.join(); } template void test_prod_cons() { benchmark_prod_cons>((ipc::channel*)nullptr); } template struct test_performance { static void start() { test_performance

::start(); test_prod_cons(); } }; template struct test_performance<1, C> { static void start() { test_performance<1, C - 1>::start(); test_prod_cons<1, C, false>(); } }; template struct test_performance { static void start() { test_performance

::start(); test_prod_cons(); } }; template <> struct test_performance<1, 1> { static void start() { test_prod_cons<1, 1, false>(); } }; void Unit::test_channel_performance() { test_prod_cons<1, 1>(); test_performance<1 , 10>::start(); test_performance<10, 1 >::start(); test_performance<10, 10>::start(); } } // internal-linkage