test-multi-poll: poll each port from multiple threads

This commit is contained in:
Bert Belder 2017-11-24 16:00:02 +01:00
parent 6c6cca973f
commit a53ad7c753

View File

@ -2,6 +2,7 @@
#include <process.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "init.h"
#include "util.h"
@ -10,9 +11,18 @@
#include <WS2tcpip.h>
#define POLL_THREAD_COUNT 20
#define PORT_COUNT 10
#define THREADS_PER_PORT 10
#define LISTEN_PORT 12345
typedef struct test_context {
SOCKET sock;
HANDLE port;
uint64_t data;
HANDLE thread;
} test_context_t;
static SOCKET create_socket(unsigned short port) {
SOCKET sock;
struct sockaddr_in address;
@ -62,62 +72,91 @@ static void send_message(SOCKET sock, unsigned short port) {
}
static unsigned int __stdcall poll_thread(void* arg) {
HANDLE epfd;
SOCKET sock;
struct epoll_event ev_in;
test_context_t* context = arg;
struct epoll_event ev_out;
int r;
sock = (SOCKET) arg;
epfd = epoll_create1(0);
assert(epfd != NULL);
ev_in.events = EPOLLIN;
ev_in.data.u64 = 42;
r = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev_in);
assert(r == 0);
memset(&ev_out, 0, sizeof ev_out);
r = epoll_wait(epfd, &ev_out, 1, -1);
r = epoll_wait(context->port, &ev_out, 1, -1);
assert(r == 1);
assert(ev_out.data.u64 == ev_in.data.u64);
assert(ev_out.events == ev_in.events);
assert(ev_out.events == EPOLLIN);
assert(ev_out.data.u64 == context->data);
puts("Got event.");
r = epoll_close(epfd);
assert(r == 0);
printf("Got event (port %p, thread %p)\n", context->port, context->thread);
return 0;
}
int main(void) {
HANDLE threads[POLL_THREAD_COUNT];
HANDLE ports[PORT_COUNT];
test_context_t contexts[PORT_COUNT][THREADS_PER_PORT];
WSADATA wsa_data;
int r;
int r = init();
/* Initialize winsock. */
r = WSAStartup(MAKEWORD(2, 2), &wsa_data);
assert(r == 0);
SOCKET send_sock = create_socket(0);
SOCKET recv_sock = create_socket(LISTEN_PORT);
for (size_t i = 0; i < array_count(threads); i++) {
HANDLE thread = (HANDLE) _beginthreadex(
NULL, 0, poll_thread, (void*) recv_sock, 0, NULL);
assert(thread != INVALID_HANDLE_VALUE);
threads[i] = thread;
/* Create PORT_COUNT epoll ports which, will be polled by THREADS_PER_PORT
* threads. */
for (size_t i = 0; i < array_count(contexts); i++) {
HANDLE port;
struct epoll_event ev;
/* Create epoll port. */
port = epoll_create1(0);
assert(port != INVALID_HANDLE_VALUE);
ports[i] = port;
/* Register recv_sock with the epoll port. */
ev.events = EPOLLIN;
ev.data.u64 = rand();
r = epoll_ctl(port, EPOLL_CTL_ADD, recv_sock, &ev);
assert(r == 0);
/* Start THREADS_PER_PORT threads which will all poll the port. */
for (size_t j = 0; j < array_count(contexts[i]); j++) {
test_context_t* context = &contexts[i][j];
HANDLE thread;
/* Prepare context information for the polling thread. */
context->port = port;
context->sock = recv_sock;
context->data = ev.data.u64;
/* Start thread. */
thread = (HANDLE) _beginthreadex(
NULL, 0, poll_thread, (void*) context, 0, NULL);
assert(thread != INVALID_HANDLE_VALUE);
context->thread = thread;
}
}
/* Sleep for a while to give all threads a chance to finish initializing. */
Sleep(500);
/* Send a message to the receiving socket. */
send_message(send_sock, LISTEN_PORT);
for (size_t i = 0; i < array_count(threads); i++) {
HANDLE thread = threads[i];
DWORD wr = WaitForSingleObject(thread, INFINITE);
assert(wr == WAIT_OBJECT_0);
CloseHandle(thread);
/* Wait for all threads to exit and clean up after them. */
for (size_t i = 0; i < array_count(contexts); i++) {
for (size_t j = 0; j < array_count(contexts[i]); j++) {
HANDLE thread = contexts[i][j].thread;
DWORD wr = WaitForSingleObject(thread, INFINITE);
assert(wr == WAIT_OBJECT_0);
CloseHandle(thread);
}
}
/* Close all epoll ports. */
for (size_t i = 0; i < array_count(ports); i++) {
HANDLE port = ports[i];
r = epoll_close(port);
assert(r == 0);
}
return 0;