diff --git a/test/test-multi-poll.c b/test/test-multi-poll.c index 2a55590..9f300fd 100644 --- a/test/test-multi-poll.c +++ b/test/test-multi-poll.c @@ -2,6 +2,7 @@ #include #include #include +#include #include "init.h" #include "util.h" @@ -10,9 +11,18 @@ #include -#define POLL_THREAD_COUNT 20 +#define PORT_COUNT 10 +#define THREADS_PER_PORT 10 + #define LISTEN_PORT 12345 +typedef struct test_context { + SOCKET sock; + HANDLE port; + uint64_t data; + HANDLE thread; +} test_context_t; + static SOCKET create_socket(unsigned short port) { SOCKET sock; struct sockaddr_in address; @@ -62,62 +72,91 @@ static void send_message(SOCKET sock, unsigned short port) { } static unsigned int __stdcall poll_thread(void* arg) { - HANDLE epfd; - SOCKET sock; - struct epoll_event ev_in; + test_context_t* context = arg; struct epoll_event ev_out; int r; - sock = (SOCKET) arg; - - epfd = epoll_create1(0); - assert(epfd != NULL); - - ev_in.events = EPOLLIN; - ev_in.data.u64 = 42; - r = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev_in); - assert(r == 0); - memset(&ev_out, 0, sizeof ev_out); - r = epoll_wait(epfd, &ev_out, 1, -1); + r = epoll_wait(context->port, &ev_out, 1, -1); assert(r == 1); - assert(ev_out.data.u64 == ev_in.data.u64); - assert(ev_out.events == ev_in.events); + assert(ev_out.events == EPOLLIN); + assert(ev_out.data.u64 == context->data); - puts("Got event."); - - r = epoll_close(epfd); - assert(r == 0); + printf("Got event (port %p, thread %p)\n", context->port, context->thread); return 0; } int main(void) { - HANDLE threads[POLL_THREAD_COUNT]; + HANDLE ports[PORT_COUNT]; + test_context_t contexts[PORT_COUNT][THREADS_PER_PORT]; + WSADATA wsa_data; + int r; - int r = init(); + /* Initialize winsock. */ + r = WSAStartup(MAKEWORD(2, 2), &wsa_data); assert(r == 0); SOCKET send_sock = create_socket(0); SOCKET recv_sock = create_socket(LISTEN_PORT); - for (size_t i = 0; i < array_count(threads); i++) { - HANDLE thread = (HANDLE) _beginthreadex( - NULL, 0, poll_thread, (void*) recv_sock, 0, NULL); - assert(thread != INVALID_HANDLE_VALUE); - threads[i] = thread; + /* Create PORT_COUNT epoll ports which, will be polled by THREADS_PER_PORT + * threads. */ + for (size_t i = 0; i < array_count(contexts); i++) { + HANDLE port; + struct epoll_event ev; + + /* Create epoll port. */ + port = epoll_create1(0); + assert(port != INVALID_HANDLE_VALUE); + ports[i] = port; + + /* Register recv_sock with the epoll port. */ + ev.events = EPOLLIN; + ev.data.u64 = rand(); + r = epoll_ctl(port, EPOLL_CTL_ADD, recv_sock, &ev); + assert(r == 0); + + /* Start THREADS_PER_PORT threads which will all poll the port. */ + for (size_t j = 0; j < array_count(contexts[i]); j++) { + test_context_t* context = &contexts[i][j]; + HANDLE thread; + + /* Prepare context information for the polling thread. */ + context->port = port; + context->sock = recv_sock; + context->data = ev.data.u64; + + /* Start thread. */ + thread = (HANDLE) _beginthreadex( + NULL, 0, poll_thread, (void*) context, 0, NULL); + assert(thread != INVALID_HANDLE_VALUE); + context->thread = thread; + } } + /* Sleep for a while to give all threads a chance to finish initializing. */ Sleep(500); + /* Send a message to the receiving socket. */ send_message(send_sock, LISTEN_PORT); - for (size_t i = 0; i < array_count(threads); i++) { - HANDLE thread = threads[i]; - DWORD wr = WaitForSingleObject(thread, INFINITE); - assert(wr == WAIT_OBJECT_0); - CloseHandle(thread); + /* Wait for all threads to exit and clean up after them. */ + for (size_t i = 0; i < array_count(contexts); i++) { + for (size_t j = 0; j < array_count(contexts[i]); j++) { + HANDLE thread = contexts[i][j].thread; + DWORD wr = WaitForSingleObject(thread, INFINITE); + assert(wr == WAIT_OBJECT_0); + CloseHandle(thread); + } + } + + /* Close all epoll ports. */ + for (size_t i = 0; i < array_count(ports); i++) { + HANDLE port = ports[i]; + r = epoll_close(port); + assert(r == 0); } return 0;