diff --git a/benchmark/benchmark_concur.cpp b/benchmark/benchmark_concur.cpp index 712db08..24c6331 100644 --- a/benchmark/benchmark_concur.cpp +++ b/benchmark/benchmark_concur.cpp @@ -10,6 +10,29 @@ namespace { +void concur_queue_rtt(benchmark::State &state) { + using namespace concur; + queue que [2]; + std::atomic_bool stop = false; + auto producer = std::async(std::launch::async, [&stop, &que] { + for (std::int64_t i = 0; !stop.load(std::memory_order_relaxed); ++i) { + std::int64_t n {}; + while (!que[0].pop(n)) ; + (void)que[1].push(i); + } + }); + + for (auto _ : state) { + (void)que[0].push(0); + std::int64_t n {}; + while (!que[1].pop(n)) ; + } + + stop = true; + (void)que[0].push(0); + producer.wait(); +} + void concur_queue_1v1(benchmark::State &state) { using namespace concur; queue que; @@ -62,5 +85,6 @@ void concur_queue_NvN(benchmark::State &state) { } // namespace +BENCHMARK(concur_queue_rtt); BENCHMARK(concur_queue_1v1); -BENCHMARK(concur_queue_NvN)->ThreadRange(1, 16); \ No newline at end of file +BENCHMARK(concur_queue_NvN)->ThreadRange(1, 16); diff --git a/benchmark/benchmark_ipc.cpp b/benchmark/benchmark_ipc.cpp new file mode 100644 index 0000000..5d7a698 --- /dev/null +++ b/benchmark/benchmark_ipc.cpp @@ -0,0 +1,263 @@ + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +#include "benchmark/benchmark.h" + +#include "test_util.h" + +#include "libipc/shm.h" + +namespace { + +using flag_t = std::atomic_bool; + +struct eventfd_reader { + pid_t pid_ {-1}; + int rfd_ {eventfd(0, 0/*EFD_CLOEXEC*/)}; + int wfd_ {eventfd(0, 0/*EFD_CLOEXEC*/)}; + + eventfd_reader() { + auto shm = ipc::shared_memory("shm-eventfd_reader", sizeof(flag_t)); + shm.as()->store(false, std::memory_order_relaxed); + pid_ = test::subproc([this] { + auto shm = ipc::shared_memory("shm-eventfd_reader", sizeof(flag_t)); + auto flag = shm.as(); + printf("eventfd_reader rfd = %d, wfd = %d\n", rfd_, wfd_); + while (!flag->load(std::memory_order_relaxed)) { + uint64_t n = 0; + read(wfd_, &n, sizeof(uint64_t)); + n = 1; + write(rfd_, &n, sizeof(uint64_t)); + } + printf("eventfd_reader exit.\n"); + }); + } + + ~eventfd_reader() { + auto shm = ipc::shared_memory("shm-eventfd_reader", sizeof(flag_t)); + shm.as()->store(true, std::memory_order_seq_cst); + uint64_t n = 1; + write(wfd_, &n, sizeof(uint64_t)); + test::join_subproc(pid_); + close(rfd_); + close(wfd_); + } +} evtfd_r__; + +struct mqueue_reader { + pid_t pid_ {-1}; + + mqueue_reader() { + auto shm = ipc::shared_memory("shm-mqueue_reader", sizeof(flag_t)); + shm.as()->store(false, std::memory_order_relaxed); + + pid_ = test::subproc([this] { + auto shm = ipc::shared_memory("shm-mqueue_reader", sizeof(flag_t)); + auto flag = shm.as(); + printf("mqueue_reader start.\n"); + mqd_t wfd = mq_open("/mqueue-wfd", O_RDONLY); + mqd_t rfd = mq_open("/mqueue-rfd", O_WRONLY); + while (!flag->load(std::memory_order_relaxed)) { + char n {}; + // read + mq_receive(wfd, &n, sizeof(n), nullptr); + // write + mq_send(rfd, &n, sizeof(n), 0); + } + printf("mqueue_reader exit.\n"); + mq_close(wfd); + mq_close(rfd); + }); + } + + ~mqueue_reader() { + auto shm = ipc::shared_memory("shm-mqueue_reader", sizeof(flag_t)); + shm.as()->store(true, std::memory_order_seq_cst); + { + mqd_t wfd = mq_open("/mqueue-wfd", O_WRONLY); + char n {}; + mq_send(wfd, &n, sizeof(n), 0); + mq_close(wfd); + } + test::join_subproc(pid_); + } +} mq_r__; + +struct sock_reader { + pid_t pid_ {-1}; + + sock_reader() { + auto shm = ipc::shared_memory("shm-sock_reader", sizeof(flag_t)); + shm.as()->store(false, std::memory_order_relaxed); + + pid_ = test::subproc([this] { + auto shm = ipc::shared_memory("shm-sock_reader", sizeof(flag_t)); + auto flag = shm.as(); + printf("sock_reader start.\n"); + int lfd = socket(AF_UNIX, SOCK_STREAM, 0); + struct sockaddr_un serun {}; + serun.sun_family = AF_UNIX; + strcpy(serun.sun_path, "shm-sock.ser"); + unlink(serun.sun_path); + bind(lfd, (struct sockaddr *)&serun, offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path)); + listen(lfd, 16); + while (!flag->load(std::memory_order_relaxed)) { + struct sockaddr_un cliun {}; + socklen_t cliun_len = sizeof(cliun); + int cfd = accept(lfd, (struct sockaddr *)&cliun, &cliun_len); + if (cfd < 0) { + printf("accept error.\n"); + continue; + } + while (!flag->load(std::memory_order_relaxed)) { + char c {}; + auto r = read(cfd, &c, sizeof(c)); + if (r <= 0) break; + write(cfd, &c, sizeof(c)); + } + close(cfd); + } + printf("sock_reader exit.\n"); + }); + } + + ~sock_reader() { + auto shm = ipc::shared_memory("shm-sock_reader", sizeof(flag_t)); + shm.as()->store(true, std::memory_order_seq_cst); + { + auto sfd = start_client(); + char c {}; + write(sfd, &c, sizeof(c)); + close(sfd); + } + test::join_subproc(pid_); + } + + int start_client() { + int sfd = socket(AF_UNIX, SOCK_STREAM, 0); + struct sockaddr_un cliun {}; + cliun.sun_family = AF_UNIX; + strcpy(cliun.sun_path, "shm-sock.cli"); + unlink(cliun.sun_path); + bind(sfd, (struct sockaddr *)&cliun, offsetof(struct sockaddr_un, sun_path) + strlen(cliun.sun_path)); + struct sockaddr_un serun {}; + serun.sun_family = AF_UNIX; + strcpy(serun.sun_path, "shm-sock.ser"); + connect(sfd, (struct sockaddr *)&serun, offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path)); + return sfd; + } +} sock_r__; + +struct udp_reader { + pid_t pid_ {-1}; + + udp_reader() { + auto shm = ipc::shared_memory("shm-udp_reader", sizeof(flag_t)); + shm.as()->store(false, std::memory_order_relaxed); + + pid_ = test::subproc([this] { + auto shm = ipc::shared_memory("shm-udp_reader", sizeof(flag_t)); + auto flag = shm.as(); + printf("udp_reader start.\n"); + int lfd = socket(AF_INET, SOCK_DGRAM, 0); + struct sockaddr_in ser {}; + ser.sin_family = AF_INET; + ser.sin_addr.s_addr = htonl(INADDR_ANY); + ser.sin_port = htons(8888); + bind(lfd, (struct sockaddr *)&ser, sizeof(ser)); + while (!flag->load(std::memory_order_relaxed)) { + struct sockaddr_in cli {}; + socklen_t cli_len = sizeof(cli); + char c {}; + recvfrom(lfd, &c, sizeof(c), 0, (struct sockaddr *)&cli, &cli_len); + sendto(lfd, &c, sizeof(c), 0, (struct sockaddr *)&cli, cli_len); + } + printf("udp_reader exit.\n"); + }); + } + + ~udp_reader() { + auto shm = ipc::shared_memory("shm-udp_reader", sizeof(flag_t)); + shm.as()->store(true, std::memory_order_seq_cst); + { + auto sfd = socket(AF_INET, SOCK_DGRAM, 0); + struct sockaddr_in ser {}; + ser.sin_family = AF_INET; + ser.sin_addr.s_addr = htonl(INADDR_ANY); + ser.sin_port = htons(8888); + char c {}; + sendto(sfd, &c, sizeof(c), 0, (struct sockaddr *)&ser, sizeof(ser)); + close(sfd); + } + test::join_subproc(pid_); + } +} udp_r__; + +void ipc_eventfd_rtt(benchmark::State &state) { + for (auto _ : state) { + uint64_t n = 1; + write(evtfd_r__.wfd_, &n, sizeof(n)); + read (evtfd_r__.rfd_, &n, sizeof(n)); + } +} + +void ipc_mqueue_rtt(benchmark::State &state) { + mqd_t wfd = mq_open("/mqueue-wfd", O_WRONLY); + mqd_t rfd = mq_open("/mqueue-rfd", O_RDONLY); + for (auto _ : state) { + char n {}; + // write + mq_send(wfd, &n, sizeof(n), 0); + // read + mq_receive(rfd, &n, sizeof(n), nullptr); + } + mq_close(wfd); + mq_close(rfd); +} + +void ipc_sock_rtt(benchmark::State &state) { + auto sfd = sock_r__.start_client(); + for (auto _ : state) { + char n {}; + write(sfd, &n, sizeof(n)); + read (sfd, &n, sizeof(n)); + } + close(sfd); +} + +void ipc_udp_rtt(benchmark::State &state) { + auto sfd = socket(AF_INET, SOCK_DGRAM, 0); + struct sockaddr_in ser {}; + ser.sin_family = AF_INET; + ser.sin_addr.s_addr = htonl(INADDR_ANY); + ser.sin_port = htons(8888); + for (auto _ : state) { + char n {'A'}; + // write + sendto(sfd, &n, sizeof(n), 0, (struct sockaddr *)&ser, sizeof(ser)); + // read + struct sockaddr_in cli {}; + socklen_t len = sizeof(cli); + recvfrom(sfd, &n, sizeof(n), 0, (struct sockaddr *)&cli, &len); + } + close(sfd); +} + +} // namespace + +BENCHMARK(ipc_eventfd_rtt); +BENCHMARK(ipc_mqueue_rtt); +BENCHMARK(ipc_sock_rtt); +BENCHMARK(ipc_udp_rtt); diff --git a/test/ipc/test_ipc_shm.cpp b/test/ipc/test_ipc_shm.cpp index 99f7aac..de8cb5d 100644 --- a/test/ipc/test_ipc_shm.cpp +++ b/test/ipc/test_ipc_shm.cpp @@ -74,4 +74,62 @@ TEST(shm, shared_memory) { EXPECT_EQ(*shm.as(), 4444); EXPECT_TRUE(ipc::shm_close(*shm_r)); -} \ No newline at end of file +} + +#if 0 +#include +#include +#include +#include +#include + +#include "test_util.h" + +TEST(shm, sock) { + auto reader = test::subproc([] { + int lfd = socket(AF_INET, SOCK_DGRAM, 0); + struct sockaddr_in ser {}; + ser.sin_family = AF_INET; + ser.sin_addr.s_addr = htonl(INADDR_ANY); + ser.sin_port = htons(8888); + bind(lfd, (struct sockaddr *)&ser, sizeof(ser)); + printf("reader prepared...\n"); + struct sockaddr_in cli {}; + socklen_t cli_len = sizeof(cli); + char c {'\0'}; + printf("reading...\n"); + for (;;) { + recvfrom(lfd, &c, sizeof(c), 0, (struct sockaddr *)&cli, &cli_len); + printf("read %c\n", c); + sendto(lfd, &c, sizeof(c), 0, (struct sockaddr *)&cli, cli_len); + if (c == 'Z') break; + } + close(lfd); + }); + + auto writer = test::subproc([] { + int sfd = socket(AF_INET, SOCK_DGRAM, 0); + struct sockaddr_in ser {}; + ser.sin_family = AF_INET; + ser.sin_addr.s_addr = htonl(INADDR_ANY); + ser.sin_port = htons(8888); + + printf("writer prepared...\n"); + sleep(1); + + for (char c = 'A'; c <= 'Z'; ++c) { + printf("write %c\n", c); + sendto(sfd, &c, sizeof(c), 0, (struct sockaddr *)&ser, sizeof(ser)); + struct sockaddr_in cli {}; + socklen_t len = sizeof(cli); + char n {}; + recvfrom(sfd, &n, sizeof(n), 0, (struct sockaddr *)&cli, &len); + printf("echo %c\n", c); + } + close(sfd); + }); + + test::join_subproc(writer); + test::join_subproc(reader); +} +#endif \ No newline at end of file diff --git a/test/test_util.h b/test/test_util.h new file mode 100644 index 0000000..a55a128 --- /dev/null +++ b/test/test_util.h @@ -0,0 +1,34 @@ + +#pragma once + +#include +#include + +namespace test { + +template +pid_t subproc(Fn&& fn) { + pid_t pid = fork(); + if (pid == -1) { + return pid; + } + if (!pid) { + // Unhook doctest from the subprocess. + // Otherwise, we would see a test-failure printout after the crash. + signal(SIGABRT, SIG_DFL); + signal(SIGSEGV, SIG_DFL); + fn(); + exit(0); + } + return pid; +} + +inline void join_subproc(pid_t pid) { + int ret_code; + waitpid(pid, &ret_code, 0); +} + +} // namespace test + +#define REQUIRE_EXIT(...) \ + test::join_subproc(test::subproc([&]() __VA_ARGS__))