add: (linux) some performance tests for the system ipc

This commit is contained in:
mutouyun 2023-06-04 17:10:50 +08:00
parent 8a4e6be9da
commit 2df08237a4
4 changed files with 381 additions and 2 deletions

View File

@ -10,6 +10,29 @@
namespace { namespace {
void concur_queue_rtt(benchmark::State &state) {
using namespace concur;
queue<std::int64_t, relation::single, relation::single> que [2];
std::atomic_bool stop = false;
auto producer = std::async(std::launch::async, [&stop, &que] {
for (std::int64_t i = 0; !stop.load(std::memory_order_relaxed); ++i) {
std::int64_t n {};
while (!que[0].pop(n)) ;
(void)que[1].push(i);
}
});
for (auto _ : state) {
(void)que[0].push(0);
std::int64_t n {};
while (!que[1].pop(n)) ;
}
stop = true;
(void)que[0].push(0);
producer.wait();
}
void concur_queue_1v1(benchmark::State &state) { void concur_queue_1v1(benchmark::State &state) {
using namespace concur; using namespace concur;
queue<std::int64_t, relation::single, relation::single> que; queue<std::int64_t, relation::single, relation::single> que;
@ -62,5 +85,6 @@ void concur_queue_NvN(benchmark::State &state) {
} // namespace } // namespace
BENCHMARK(concur_queue_rtt);
BENCHMARK(concur_queue_1v1); BENCHMARK(concur_queue_1v1);
BENCHMARK(concur_queue_NvN)->ThreadRange(1, 16); BENCHMARK(concur_queue_NvN)->ThreadRange(1, 16);

263
benchmark/benchmark_ipc.cpp Normal file
View File

@ -0,0 +1,263 @@
#include <sys/eventfd.h>
#include <sys/syscall.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <mqueue.h>
#include <atomic>
#include "benchmark/benchmark.h"
#include "test_util.h"
#include "libipc/shm.h"
namespace {
using flag_t = std::atomic_bool;
struct eventfd_reader {
pid_t pid_ {-1};
int rfd_ {eventfd(0, 0/*EFD_CLOEXEC*/)};
int wfd_ {eventfd(0, 0/*EFD_CLOEXEC*/)};
eventfd_reader() {
auto shm = ipc::shared_memory("shm-eventfd_reader", sizeof(flag_t));
shm.as<flag_t>()->store(false, std::memory_order_relaxed);
pid_ = test::subproc([this] {
auto shm = ipc::shared_memory("shm-eventfd_reader", sizeof(flag_t));
auto flag = shm.as<flag_t>();
printf("eventfd_reader rfd = %d, wfd = %d\n", rfd_, wfd_);
while (!flag->load(std::memory_order_relaxed)) {
uint64_t n = 0;
read(wfd_, &n, sizeof(uint64_t));
n = 1;
write(rfd_, &n, sizeof(uint64_t));
}
printf("eventfd_reader exit.\n");
});
}
~eventfd_reader() {
auto shm = ipc::shared_memory("shm-eventfd_reader", sizeof(flag_t));
shm.as<flag_t>()->store(true, std::memory_order_seq_cst);
uint64_t n = 1;
write(wfd_, &n, sizeof(uint64_t));
test::join_subproc(pid_);
close(rfd_);
close(wfd_);
}
} evtfd_r__;
struct mqueue_reader {
pid_t pid_ {-1};
mqueue_reader() {
auto shm = ipc::shared_memory("shm-mqueue_reader", sizeof(flag_t));
shm.as<flag_t>()->store(false, std::memory_order_relaxed);
pid_ = test::subproc([this] {
auto shm = ipc::shared_memory("shm-mqueue_reader", sizeof(flag_t));
auto flag = shm.as<flag_t>();
printf("mqueue_reader start.\n");
mqd_t wfd = mq_open("/mqueue-wfd", O_RDONLY);
mqd_t rfd = mq_open("/mqueue-rfd", O_WRONLY);
while (!flag->load(std::memory_order_relaxed)) {
char n {};
// read
mq_receive(wfd, &n, sizeof(n), nullptr);
// write
mq_send(rfd, &n, sizeof(n), 0);
}
printf("mqueue_reader exit.\n");
mq_close(wfd);
mq_close(rfd);
});
}
~mqueue_reader() {
auto shm = ipc::shared_memory("shm-mqueue_reader", sizeof(flag_t));
shm.as<flag_t>()->store(true, std::memory_order_seq_cst);
{
mqd_t wfd = mq_open("/mqueue-wfd", O_WRONLY);
char n {};
mq_send(wfd, &n, sizeof(n), 0);
mq_close(wfd);
}
test::join_subproc(pid_);
}
} mq_r__;
struct sock_reader {
pid_t pid_ {-1};
sock_reader() {
auto shm = ipc::shared_memory("shm-sock_reader", sizeof(flag_t));
shm.as<flag_t>()->store(false, std::memory_order_relaxed);
pid_ = test::subproc([this] {
auto shm = ipc::shared_memory("shm-sock_reader", sizeof(flag_t));
auto flag = shm.as<flag_t>();
printf("sock_reader start.\n");
int lfd = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un serun {};
serun.sun_family = AF_UNIX;
strcpy(serun.sun_path, "shm-sock.ser");
unlink(serun.sun_path);
bind(lfd, (struct sockaddr *)&serun, offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path));
listen(lfd, 16);
while (!flag->load(std::memory_order_relaxed)) {
struct sockaddr_un cliun {};
socklen_t cliun_len = sizeof(cliun);
int cfd = accept(lfd, (struct sockaddr *)&cliun, &cliun_len);
if (cfd < 0) {
printf("accept error.\n");
continue;
}
while (!flag->load(std::memory_order_relaxed)) {
char c {};
auto r = read(cfd, &c, sizeof(c));
if (r <= 0) break;
write(cfd, &c, sizeof(c));
}
close(cfd);
}
printf("sock_reader exit.\n");
});
}
~sock_reader() {
auto shm = ipc::shared_memory("shm-sock_reader", sizeof(flag_t));
shm.as<flag_t>()->store(true, std::memory_order_seq_cst);
{
auto sfd = start_client();
char c {};
write(sfd, &c, sizeof(c));
close(sfd);
}
test::join_subproc(pid_);
}
int start_client() {
int sfd = socket(AF_UNIX, SOCK_STREAM, 0);
struct sockaddr_un cliun {};
cliun.sun_family = AF_UNIX;
strcpy(cliun.sun_path, "shm-sock.cli");
unlink(cliun.sun_path);
bind(sfd, (struct sockaddr *)&cliun, offsetof(struct sockaddr_un, sun_path) + strlen(cliun.sun_path));
struct sockaddr_un serun {};
serun.sun_family = AF_UNIX;
strcpy(serun.sun_path, "shm-sock.ser");
connect(sfd, (struct sockaddr *)&serun, offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path));
return sfd;
}
} sock_r__;
struct udp_reader {
pid_t pid_ {-1};
udp_reader() {
auto shm = ipc::shared_memory("shm-udp_reader", sizeof(flag_t));
shm.as<flag_t>()->store(false, std::memory_order_relaxed);
pid_ = test::subproc([this] {
auto shm = ipc::shared_memory("shm-udp_reader", sizeof(flag_t));
auto flag = shm.as<flag_t>();
printf("udp_reader start.\n");
int lfd = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in ser {};
ser.sin_family = AF_INET;
ser.sin_addr.s_addr = htonl(INADDR_ANY);
ser.sin_port = htons(8888);
bind(lfd, (struct sockaddr *)&ser, sizeof(ser));
while (!flag->load(std::memory_order_relaxed)) {
struct sockaddr_in cli {};
socklen_t cli_len = sizeof(cli);
char c {};
recvfrom(lfd, &c, sizeof(c), 0, (struct sockaddr *)&cli, &cli_len);
sendto(lfd, &c, sizeof(c), 0, (struct sockaddr *)&cli, cli_len);
}
printf("udp_reader exit.\n");
});
}
~udp_reader() {
auto shm = ipc::shared_memory("shm-udp_reader", sizeof(flag_t));
shm.as<flag_t>()->store(true, std::memory_order_seq_cst);
{
auto sfd = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in ser {};
ser.sin_family = AF_INET;
ser.sin_addr.s_addr = htonl(INADDR_ANY);
ser.sin_port = htons(8888);
char c {};
sendto(sfd, &c, sizeof(c), 0, (struct sockaddr *)&ser, sizeof(ser));
close(sfd);
}
test::join_subproc(pid_);
}
} udp_r__;
void ipc_eventfd_rtt(benchmark::State &state) {
for (auto _ : state) {
uint64_t n = 1;
write(evtfd_r__.wfd_, &n, sizeof(n));
read (evtfd_r__.rfd_, &n, sizeof(n));
}
}
void ipc_mqueue_rtt(benchmark::State &state) {
mqd_t wfd = mq_open("/mqueue-wfd", O_WRONLY);
mqd_t rfd = mq_open("/mqueue-rfd", O_RDONLY);
for (auto _ : state) {
char n {};
// write
mq_send(wfd, &n, sizeof(n), 0);
// read
mq_receive(rfd, &n, sizeof(n), nullptr);
}
mq_close(wfd);
mq_close(rfd);
}
void ipc_sock_rtt(benchmark::State &state) {
auto sfd = sock_r__.start_client();
for (auto _ : state) {
char n {};
write(sfd, &n, sizeof(n));
read (sfd, &n, sizeof(n));
}
close(sfd);
}
void ipc_udp_rtt(benchmark::State &state) {
auto sfd = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in ser {};
ser.sin_family = AF_INET;
ser.sin_addr.s_addr = htonl(INADDR_ANY);
ser.sin_port = htons(8888);
for (auto _ : state) {
char n {'A'};
// write
sendto(sfd, &n, sizeof(n), 0, (struct sockaddr *)&ser, sizeof(ser));
// read
struct sockaddr_in cli {};
socklen_t len = sizeof(cli);
recvfrom(sfd, &n, sizeof(n), 0, (struct sockaddr *)&cli, &len);
}
close(sfd);
}
} // namespace
BENCHMARK(ipc_eventfd_rtt);
BENCHMARK(ipc_mqueue_rtt);
BENCHMARK(ipc_sock_rtt);
BENCHMARK(ipc_udp_rtt);

View File

@ -75,3 +75,61 @@ TEST(shm, shared_memory) {
EXPECT_TRUE(ipc::shm_close(*shm_r)); EXPECT_TRUE(ipc::shm_close(*shm_r));
} }
#if 0
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <string.h>
#include "test_util.h"
TEST(shm, sock) {
auto reader = test::subproc([] {
int lfd = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in ser {};
ser.sin_family = AF_INET;
ser.sin_addr.s_addr = htonl(INADDR_ANY);
ser.sin_port = htons(8888);
bind(lfd, (struct sockaddr *)&ser, sizeof(ser));
printf("reader prepared...\n");
struct sockaddr_in cli {};
socklen_t cli_len = sizeof(cli);
char c {'\0'};
printf("reading...\n");
for (;;) {
recvfrom(lfd, &c, sizeof(c), 0, (struct sockaddr *)&cli, &cli_len);
printf("read %c\n", c);
sendto(lfd, &c, sizeof(c), 0, (struct sockaddr *)&cli, cli_len);
if (c == 'Z') break;
}
close(lfd);
});
auto writer = test::subproc([] {
int sfd = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in ser {};
ser.sin_family = AF_INET;
ser.sin_addr.s_addr = htonl(INADDR_ANY);
ser.sin_port = htons(8888);
printf("writer prepared...\n");
sleep(1);
for (char c = 'A'; c <= 'Z'; ++c) {
printf("write %c\n", c);
sendto(sfd, &c, sizeof(c), 0, (struct sockaddr *)&ser, sizeof(ser));
struct sockaddr_in cli {};
socklen_t len = sizeof(cli);
char n {};
recvfrom(sfd, &n, sizeof(n), 0, (struct sockaddr *)&cli, &len);
printf("echo %c\n", c);
}
close(sfd);
});
test::join_subproc(writer);
test::join_subproc(reader);
}
#endif

34
test/test_util.h Normal file
View File

@ -0,0 +1,34 @@
#pragma once
#include <sys/wait.h>
#include <unistd.h>
namespace test {
template <typename Fn>
pid_t subproc(Fn&& fn) {
pid_t pid = fork();
if (pid == -1) {
return pid;
}
if (!pid) {
// Unhook doctest from the subprocess.
// Otherwise, we would see a test-failure printout after the crash.
signal(SIGABRT, SIG_DFL);
signal(SIGSEGV, SIG_DFL);
fn();
exit(0);
}
return pid;
}
inline void join_subproc(pid_t pid) {
int ret_code;
waitpid(pid, &ret_code, 0);
}
} // namespace test
#define REQUIRE_EXIT(...) \
test::join_subproc(test::subproc([&]() __VA_ARGS__))