From 89099e3103be068fb7cc6238132d5c85a666f3f6 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Mon, 4 Sep 2017 17:55:35 +0200 Subject: [PATCH] src: major refactor --- src/afd.c | 2 +- src/epoll.c | 847 ++++++++++++++++++++++++------------------ src/error.c | 4 - src/error.h | 16 +- src/queue.h | 99 +++++ test/test-udp-pings.c | 22 +- 6 files changed, 616 insertions(+), 374 deletions(-) create mode 100644 src/queue.h diff --git a/src/afd.c b/src/afd.c index 82c6572..0d53174 100644 --- a/src/afd.c +++ b/src/afd.c @@ -73,7 +73,7 @@ int afd_poll(SOCKET socket, AFD_POLL_INFO* info, OVERLAPPED* overlapped) { } if (status == STATUS_SUCCESS) - return_success(0); + return 0; else if (status == STATUS_PENDING) return_error(-1, ERROR_IO_PENDING); else diff --git a/src/epoll.c b/src/epoll.c index 69a7f84..0c08e31 100644 --- a/src/epoll.c +++ b/src/epoll.c @@ -1,16 +1,21 @@ #include #include +#include #include #include +#include #include "afd.h" #include "epoll.h" #include "error.h" #include "nt.h" +#include "queue.h" #include "tree.h" #include "util.h" #include "win.h" +#define _EP_COMPLETION_LIST_LENGTH 64 + #ifndef SIO_BASE_HANDLE #define SIO_BASE_HANDLE 0x48000022 #endif @@ -20,26 +25,21 @@ #define _EP_SOCK_LISTED 0x1 #define _EP_SOCK_DELETED 0x2 -#define ATTN_LIST_ADD(p, s) \ - do { \ - (s)->attn_list_prev = NULL; \ - (s)->attn_list_next = (p)->attn_list; \ - if ((p)->attn_list) { \ - (p)->attn_list->attn_list_prev = (s); \ - } \ - (p)->attn_list = (s); \ - (s)->flags |= _EP_SOCK_LISTED; \ +#define ATTN_LIST_ADD(p, s) \ + do { \ + QUEUE_INSERT_TAIL(&(p)->update_queue, &(s)->queue_entry); \ + (s)->flags |= _EP_SOCK_LISTED; \ } while (0) typedef struct _ep_port_data _ep_port_data_t; -typedef struct _ep_io_req _ep_io_req_t; +typedef struct _ep_poll_req _ep_poll_req_t; typedef struct _ep_sock_data _ep_sock_data_t; static int _ep_initialize(void); -static SOCKET _ep_get_peer_socket(_ep_port_data_t* port_data, - WSAPROTOCOL_INFOW* protocol_info); -static SOCKET _ep_create_peer_socket(HANDLE iocp, - WSAPROTOCOL_INFOW* protocol_info); +static SOCKET _ep_get_driver_socket(_ep_port_data_t* port_data, + WSAPROTOCOL_INFOW* protocol_info); +static SOCKET _ep_create_driver_socket(HANDLE iocp, + WSAPROTOCOL_INFOW* protocol_info); static int _ep_compare_sock_data(_ep_sock_data_t* a, _ep_sock_data_t* b); static int _ep_submit_poll_req(_ep_port_data_t* port_data, _ep_sock_data_t* sock_data); @@ -49,60 +49,87 @@ static int _ep_initialized = 0; /* State associated with a epoll handle. */ typedef struct _ep_port_data { HANDLE iocp; - SOCKET peer_sockets[array_count(AFD_PROVIDER_GUID_LIST)]; + SOCKET driver_sockets[array_count(AFD_PROVIDER_GUID_LIST)]; RB_HEAD(_ep_sock_data_tree, _ep_sock_data) sock_data_tree; - _ep_sock_data_t* attn_list; - size_t pending_reqs_count; + QUEUE update_queue; + size_t poll_req_count; } _ep_port_data_t; /* State associated with a socket that is registered to the epoll port. */ typedef struct _ep_sock_data { - SOCKET sock; - SOCKET base_sock; - SOCKET peer_sock; + SOCKET socket; + SOCKET afd_socket; + SOCKET driver_socket; RB_ENTRY(_ep_sock_data) tree_entry; - _ep_sock_data_t* attn_list_prev; - _ep_sock_data_t* attn_list_next; + QUEUE queue_entry; epoll_data_t user_data; - uint32_t registered_events; - uint32_t submitted_events; + _ep_poll_req_t* latest_poll_req; + uint32_t user_events; + uint32_t latest_poll_req_events; + uint32_t poll_req_count; uint32_t flags; - uint32_t io_req_generation; } _ep_sock_data_t; /* State associated with a AFD_POLL request. */ -typedef struct _ep_io_req { +typedef struct _ep_poll_req { OVERLAPPED overlapped; AFD_POLL_INFO poll_info; - uint32_t generation; _ep_sock_data_t* sock_data; -} _ep_io_req_t; +} _ep_poll_req_t; + +static int _ep_sock_data_delete(_ep_port_data_t* port_data, + _ep_sock_data_t* sock_data); RB_GENERATE_STATIC(_ep_sock_data_tree, _ep_sock_data, tree_entry, _ep_compare_sock_data) -static inline _ep_io_req_t* _ep_io_req_alloc(void) { - _ep_io_req_t* io_req = malloc(sizeof *io_req); - if (io_req == NULL) +static inline _ep_poll_req_t* _ep_poll_req_alloc(void) { + _ep_poll_req_t* poll_req = malloc(sizeof *poll_req); + if (poll_req == NULL) return_error(NULL, ERROR_NOT_ENOUGH_MEMORY); - return io_req; + return poll_req; } -static inline void _ep_io_req_free(_ep_io_req_t* io_req) { - free(io_req); +static inline _ep_poll_req_t* _ep_poll_req_free(_ep_poll_req_t* poll_req) { + free(poll_req); + return NULL; } +static _ep_poll_req_t* _ep_poll_req_new(_ep_port_data_t* port_data, + _ep_sock_data_t* sock_data) { + _ep_poll_req_t* poll_req = _ep_poll_req_alloc(); + if (poll_req == NULL) + return NULL; + memset(poll_req, 0, sizeof *poll_req); + port_data->poll_req_count++; + sock_data->poll_req_count++; + return poll_req; +} + +static void _ep_poll_req_delete(_ep_port_data_t* port_data, + _ep_sock_data_t* sock_data, + _ep_poll_req_t* poll_req) { + assert(poll_req != NULL); + + port_data->poll_req_count--; + sock_data->poll_req_count--; + + _ep_poll_req_free(poll_req); + + if ((sock_data->flags & _EP_SOCK_DELETED) && sock_data->poll_req_count == 0) + _ep_sock_data_delete(port_data, sock_data); +} static int _ep_get_related_sockets(_ep_port_data_t* port_data, - SOCKET sock, - SOCKET* base_sock_out, - SOCKET* peer_sock_out) { - SOCKET base_sock, peer_sock; + SOCKET socket, + SOCKET* afd_socket_out, + SOCKET* driver_socket_out) { + SOCKET afd_socket, driver_socket; WSAPROTOCOL_INFOW protocol_info; DWORD bytes; int len; @@ -113,151 +140,413 @@ static int _ep_get_related_sockets(_ep_port_data_t* port_data, * windows XP/2k3 this will always fail since they don't support the * SIO_BASE_HANDLE ioctl. */ - base_sock = sock; - WSAIoctl(sock, + afd_socket = socket; + WSAIoctl(socket, SIO_BASE_HANDLE, NULL, 0, - &base_sock, - sizeof base_sock, + &afd_socket, + sizeof afd_socket, &bytes, NULL, NULL); /* Obtain protocol information about the socket. */ len = sizeof protocol_info; - if (getsockopt(base_sock, + if (getsockopt(afd_socket, SOL_SOCKET, SO_PROTOCOL_INFOW, (char*) &protocol_info, &len) != 0) return_error(-1); - peer_sock = _ep_get_peer_socket(port_data, &protocol_info); - if (peer_sock == INVALID_SOCKET) + driver_socket = _ep_get_driver_socket(port_data, &protocol_info); + if (driver_socket == INVALID_SOCKET) return -1; - *base_sock_out = base_sock; - *peer_sock_out = peer_sock; + *afd_socket_out = afd_socket; + *driver_socket_out = driver_socket; + + return 0; +} + +static inline _ep_sock_data_t* _ep_sock_data_alloc(void) { + _ep_sock_data_t* sock_data = malloc(sizeof *sock_data); + if (sock_data == NULL) + return_error(NULL, ERROR_NOT_ENOUGH_MEMORY); + return sock_data; +} + +static inline void _ep_sock_data_free(_ep_sock_data_t* sock_data) { + free(sock_data); +} + +static _ep_sock_data_t* _ep_sock_data_new(_ep_port_data_t* port_data) { + _ep_sock_data_t* sock_data = _ep_sock_data_alloc(); + if (sock_data == NULL) + return NULL; + + (void) port_data; + + memset(sock_data, 0, sizeof *sock_data); + QUEUE_INIT(&sock_data->queue_entry); + + return sock_data; +} + +static int _ep_sock_data_set_socket(_ep_port_data_t* port_data, + _ep_sock_data_t* sock_data, + SOCKET socket) { + if (socket == 0 || socket == INVALID_SOCKET) + return_error(-1, ERROR_INVALID_HANDLE); + + if (sock_data->socket != 0) + return_error(-1, ERROR_INVALID_PARAMETER); + + if (_ep_get_related_sockets(port_data, + socket, + &sock_data->afd_socket, + &sock_data->driver_socket) < 0) + return -1; + + sock_data->socket = socket; + + if (RB_INSERT(_ep_sock_data_tree, &port_data->sock_data_tree, sock_data) != + NULL) + return_error(-1, ERROR_ALREADY_EXISTS); /* Socket already in epoll set. */ + + return 0; +} + +static int _ep_sock_data_delete(_ep_port_data_t* port_data, + _ep_sock_data_t* sock_data) { + /* Remove from lookup tree. */ + if (sock_data->socket != 0 && !(sock_data->flags & _EP_SOCK_DELETED)) { + RB_REMOVE(_ep_sock_data_tree, &port_data->sock_data_tree, sock_data); + sock_data->flags |= _EP_SOCK_DELETED; + } + + /* Remove from attention list. */ + if (sock_data->flags & _EP_SOCK_LISTED) { + QUEUE_REMOVE(&sock_data->queue_entry); + sock_data->flags &= ~_EP_SOCK_LISTED; + } + + /* The socket may still have pending overlapped requests that have yet to be + * reported by the completion port. If that's the case the structure can't be + * free'd yet; epoll_wait() will do it as it dequeues those requests. + */ + if (sock_data->poll_req_count == 0) + _ep_sock_data_free(sock_data); + + return 0; +} + +static bool _ep_sock_data_marked_for_deletion(_ep_sock_data_t* sock_data) { + return sock_data->flags & _EP_SOCK_DELETED; +} + +static int _ep_sock_data_set_event(_ep_port_data_t* port_data, + _ep_sock_data_t* sock_data, + const struct epoll_event* ev) { + /* EPOLLERR and EPOLLHUP are always reported, even when no sollicited. */ + uint32_t events = ev->events | EPOLLERR | EPOLLHUP; + + sock_data->user_events = events; + sock_data->user_data = ev->data; + + if (events & _EP_EVENT_MASK & ~(sock_data->latest_poll_req_events)) { + /* Add to attention list, if not already added. */ + if (!(sock_data->flags & _EP_SOCK_LISTED)) { + QUEUE_INSERT_TAIL(&port_data->update_queue, &sock_data->queue_entry); + sock_data->flags |= _EP_SOCK_LISTED; + } + } return 0; } static int _ep_ctl_add(_ep_port_data_t* port_data, - uintptr_t sock, - struct epoll_event* ev) { - _ep_sock_data_t* sock_data; - - sock_data = malloc(sizeof *sock_data); + uintptr_t socket, + struct epoll_event* ev) { + _ep_sock_data_t* sock_data = _ep_sock_data_new(port_data); if (sock_data == NULL) - return_error(-1, ERROR_NOT_ENOUGH_MEMORY); + return -1; - sock_data->sock = sock; - sock_data->io_req_generation = 0; - sock_data->submitted_events = 0; - sock_data->registered_events = ev->events | EPOLLERR | EPOLLHUP; - sock_data->user_data = ev->data; - sock_data->flags = 0; - - if (_ep_get_related_sockets( - port_data, sock, &sock_data->base_sock, &sock_data->peer_sock) < 0) { - free(sock_data); + if (_ep_sock_data_set_socket(port_data, sock_data, socket) < 0 || + _ep_sock_data_set_event(port_data, sock_data, ev) < 0) { + _ep_sock_data_delete(port_data, sock_data); return -1; } - if (RB_INSERT(_ep_sock_data_tree, &port_data->sock_data_tree, sock_data) != - NULL) { - free(sock_data); - return_error(-1, ERROR_ALREADY_EXISTS); /* Socket already in epoll set. */ - } - - /* Add to attention list */ - ATTN_LIST_ADD(port_data, sock_data); - - return_success(0); + return 0; } static int _ep_ctl_mod(_ep_port_data_t* port_data, - uintptr_t sock, + uintptr_t socket, struct epoll_event* ev) { _ep_sock_data_t lookup; _ep_sock_data_t* sock_data; - lookup.sock = sock; + lookup.socket = socket; sock_data = RB_FIND(_ep_sock_data_tree, &port_data->sock_data_tree, &lookup); if (sock_data == NULL) - return_error(-1, ERROR_NOT_FOUND); /* Socket not in epoll set. */ + return_error(-1, ERROR_BAD_NETPATH); /* Socket not in epoll set. */ - if (ev->events & _EP_EVENT_MASK & ~sock_data->submitted_events) { - /* Add to attention list, if not already added. */ - if (!(sock_data->flags & _EP_SOCK_LISTED)) - ATTN_LIST_ADD(port_data, sock_data); - } + if (_ep_sock_data_set_event(port_data, sock_data, ev) < 0) + return -1; - sock_data->registered_events = ev->events | EPOLLERR | EPOLLHUP; - sock_data->user_data = ev->data; - - return_success(0); + return 0; } -static int _ep_ctl_del(_ep_port_data_t* port_data, uintptr_t sock) { +static int _ep_ctl_del(_ep_port_data_t* port_data, uintptr_t socket) { _ep_sock_data_t lookup; _ep_sock_data_t* sock_data; - lookup.sock = sock; + lookup.socket = socket; sock_data = RB_FIND(_ep_sock_data_tree, &port_data->sock_data_tree, &lookup); if (sock_data == NULL) - /* Socket has not been registered with epoll instance. */ return_error(-1, ERROR_NOT_FOUND); /* Socket not in epoll set. */ - RB_REMOVE(_ep_sock_data_tree, &port_data->sock_data_tree, sock_data); + if (_ep_sock_data_delete(port_data, sock_data) < 0) + return -1; - sock_data->flags |= _EP_SOCK_DELETED; - - /* Remove from attention list. */ - if (sock_data->flags & _EP_SOCK_LISTED) { - if (sock_data->attn_list_prev != NULL) - sock_data->attn_list_prev->attn_list_next = sock_data->attn_list_next; - if (sock_data->attn_list_next != NULL) - sock_data->attn_list_next->attn_list_prev = sock_data->attn_list_prev; - if (port_data->attn_list == sock_data) - port_data->attn_list = sock_data->attn_list_next; - sock_data->attn_list_prev = NULL; - sock_data->attn_list_next = NULL; - sock_data->flags &= ~_EP_SOCK_LISTED; - } - - if (sock_data->submitted_events == 0) { - assert(sock_data->io_req_generation == 0); - free(sock_data); - } else { - /* There are still one or more io requests pending. Wait for - * all pending requests to return before freeing. - */ - assert(sock_data->io_req_generation > 0); - } - - return_success(0); + return 0; } int epoll_ctl(epoll_t port_handle, int op, - uintptr_t sock, + uintptr_t socket, struct epoll_event* ev) { _ep_port_data_t* port_data = (_ep_port_data_t*) port_handle; switch (op) { case EPOLL_CTL_ADD: - return _ep_ctl_add(port_data, sock, ev); + return _ep_ctl_add(port_data, socket, ev); case EPOLL_CTL_MOD: - return _ep_ctl_mod(port_data, sock, ev); + return _ep_ctl_mod(port_data, socket, ev); case EPOLL_CTL_DEL: - return _ep_ctl_del(port_data, sock); + return _ep_ctl_del(port_data, socket); } return_error(-1, ERROR_INVALID_PARAMETER); } +static int _ep_sock_update(_ep_port_data_t* port_data, + _ep_sock_data_t* sock_data) { + bool broken = false; + + assert(sock_data->flags & _EP_SOCK_LISTED); + + /* Check if there are events registered that are not yet submitted. In + * that case we need to submit another req. + */ + if ((sock_data->user_events & _EP_EVENT_MASK & + ~sock_data->latest_poll_req_events) == 0) + /* All the events the user is interested in are already being monitored + * by the latest poll request. */ + goto done; + + if (_ep_submit_poll_req(port_data, sock_data) < 0) { + if (GetLastError() == ERROR_INVALID_HANDLE) + /* The socket is broken. It will be dropped from the epoll set. */ + broken = true; + else + /* Another error occurred, which is propagated to the caller. */ + return -1; + } + +done: + /* Remove the socket from the update queue. */ + QUEUE_REMOVE(&sock_data->queue_entry); + sock_data->flags &= ~_EP_SOCK_LISTED; + + /* If we saw an ERROR_INVALID_HANDLE error, drop the socket.. */ + if (broken) + _ep_sock_data_delete(port_data, sock_data); + + return 0; +} + +static int _ep_port_update_events(_ep_port_data_t* port_data) { + QUEUE* update_queue = &port_data->update_queue; + + /* Walk the queue, submitting new poll requests for every socket that needs + * it. */ + while (!QUEUE_EMPTY(update_queue)) { + QUEUE* queue_entry = QUEUE_HEAD(update_queue); + _ep_sock_data_t* sock_data = + QUEUE_DATA(queue_entry, _ep_sock_data_t, queue_entry); + + if (_ep_sock_update(port_data, sock_data) < 0) + return -1; + + /* _ep_sock_update() removes the socket from the update list if + * appropriate. */ + } + + return 0; +} + +static uint32_t _afd_events_to_epoll_events(DWORD afd_events) { + uint32_t epoll_events = 0; + + if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) + epoll_events |= EPOLLIN | EPOLLRDNORM; + if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) + epoll_events |= EPOLLPRI | EPOLLRDBAND; + if (afd_events & AFD_POLL_SEND) + epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; + if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT)) + epoll_events |= EPOLLIN | EPOLLRDHUP; + if (afd_events & AFD_POLL_ABORT) + epoll_events |= EPOLLHUP; + if (afd_events & AFD_POLL_CONNECT) + epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; + if (afd_events & AFD_POLL_CONNECT_FAIL) + epoll_events |= EPOLLERR; + + return epoll_events; +} + +static DWORD _epoll_events_to_afd_events(uint32_t epoll_events) { + DWORD afd_events; + + /* These events should always be monitored. */ + assert(epoll_events & EPOLLERR); + assert(epoll_events & EPOLLHUP); + afd_events = AFD_POLL_ABORT | AFD_POLL_CONNECT_FAIL | AFD_POLL_LOCAL_CLOSE; + + if (epoll_events & (EPOLLIN | EPOLLRDNORM)) + afd_events |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT; + if (epoll_events & (EPOLLPRI | EPOLLRDBAND)) + afd_events |= AFD_POLL_RECEIVE_EXPEDITED; + if (epoll_events & (EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND)) + afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; + + return afd_events; +} + +static inline bool _ep_poll_req_is_most_recent(_ep_sock_data_t* sock_data, + _ep_poll_req_t* poll_req) { + return poll_req == sock_data->latest_poll_req; +} + +static inline void _ep_poll_req_clear_recent(_ep_sock_data_t* sock_data) { + sock_data->latest_poll_req = NULL; + sock_data->latest_poll_req_events = 0; +} + +static inline void _ep_poll_req_set_recent(_ep_sock_data_t* sock_data, + _ep_poll_req_t* poll_req, + uint32_t epoll_events) { + sock_data->latest_poll_req = poll_req; + sock_data->latest_poll_req_events = epoll_events; +} + +static inline void _ep_poll_req_parse_completion( + const _ep_poll_req_t* poll_req, + uint32_t* epoll_events_out, + bool* socket_closed_out) { + const OVERLAPPED* overlapped = &poll_req->overlapped; + + uint32_t epoll_events = 0; + bool socket_closed = false; + + if (!NT_SUCCESS(overlapped->Internal)) { + /* The overlapped request itself failed, there are no events to consider. + */ + epoll_events = EPOLLERR; + } else if (poll_req->poll_info.NumberOfHandles < 1) { + /* This overlapped request succeeded but didn't report any events. */ + } else { + /* Events related to our socket were reported. */ + DWORD afd_events = poll_req->poll_info.Handles[0].Events; + assert(poll_req->poll_info.Handles[0].Handle == + (HANDLE) sock_data->afd_socket); + + if (afd_events & AFD_POLL_LOCAL_CLOSE) + socket_closed = true; /* Socket closed locally be silently dropped. */ + else + epoll_events = _afd_events_to_epoll_events(afd_events); + } + + *epoll_events_out = epoll_events; + *socket_closed_out = socket_closed; +} + +static size_t _ep_port_feed_event(_ep_port_data_t* port_data, + _ep_poll_req_t* poll_req, + struct epoll_event* ev) { + _ep_sock_data_t* sock_data = poll_req->sock_data; + + uint32_t epoll_events; + bool drop_socket; + size_t ev_count = 0; + + if (_ep_sock_data_marked_for_deletion(sock_data) || + !_ep_poll_req_is_most_recent(sock_data, poll_req)) { + /* Ignore completion for overlapped poll operation if it isn't + * the the most recently posted one, or if the socket has been + * deleted. */ + _ep_poll_req_delete(port_data, sock_data, poll_req); + return 0; + } + + _ep_poll_req_clear_recent(sock_data); + + _ep_poll_req_parse_completion(poll_req, &epoll_events, &drop_socket); + + /* Filter events that the user didn't ask for. */ + epoll_events &= sock_data->user_events; + + /* Drop the socket if the EPOLLONESHOT flag is set and there are any events + * to report. */ + if (epoll_events != 0 && (sock_data->user_events & EPOLLONESHOT)) + drop_socket = true; + + /* Fill the ev structure if there are any events to report. */ + if (epoll_events != 0) { + ev->data = sock_data->user_data; + ev->events = epoll_events; + ev_count = 1; + } + + _ep_poll_req_delete(port_data, sock_data, poll_req); + + if (drop_socket) + /* Drop the socket from the epoll set. */ + _ep_sock_data_delete(port_data, sock_data); + else + /* Put the socket back onto the attention list so a new poll request will + * be submitted. */ + ATTN_LIST_ADD(port_data, sock_data); + + return ev_count; +} + +static size_t _ep_port_feed_events(_ep_port_data_t* port_data, + OVERLAPPED_ENTRY* completion_list, + size_t completion_count, + struct epoll_event* event_list, + size_t max_event_count) { + if (completion_count > max_event_count) + abort(); + + size_t event_count = 0; + + for (size_t i = 0; i < completion_count; i++) { + OVERLAPPED* overlapped = completion_list[i].lpOverlapped; + _ep_poll_req_t* poll_req = + container_of(overlapped, _ep_poll_req_t, overlapped); + struct epoll_event* ev = &event_list[event_count]; + + event_count += _ep_port_feed_event(port_data, poll_req, ev); + } + + return event_count; +} + int epoll_wait(epoll_t port_handle, struct epoll_event* events, int maxevents, @@ -280,177 +569,38 @@ int epoll_wait(epoll_t port_handle, gqcs_timeout = INFINITE; } + /* Compute how much overlapped entries can be dequeued at most. */ + if ((size_t) maxevents > _EP_COMPLETION_LIST_LENGTH) + maxevents = _EP_COMPLETION_LIST_LENGTH; + /* Dequeue completion packets until either at least one interesting event * has been discovered, or the timeout is reached. */ do { - DWORD result, max_entries; - ULONG count, i; - OVERLAPPED_ENTRY entries[64]; - int num_events = 0; + OVERLAPPED_ENTRY completion_list[_EP_COMPLETION_LIST_LENGTH]; + ULONG completion_count; + ssize_t event_count; - /* Create overlapped poll requests for all sockets on the attention list. - */ - while (port_data->attn_list != NULL) { - _ep_sock_data_t* sock_data = port_data->attn_list; - assert(sock_data->flags & _EP_SOCK_LISTED); + if (_ep_port_update_events(port_data) < 0) + return -1; - /* Check if there are events registered that are not yet submitted. In - * that case we need to submit another req. - */ - if (sock_data->registered_events & _EP_EVENT_MASK & - ~sock_data->submitted_events) { - int r = _ep_submit_poll_req(port_data, sock_data); - - if (r) { - /* If submitting the poll request fails then most likely the socket - * is invalid. In this case we silently remove the socket from the - * epoll port. Other errors make epoll_wait() fail. - */ - if (GetLastError() == ERROR_INVALID_HANDLE) - return -1; - - /* Skip to the next attention list item already, because we're about - * to delete the currently selected socket. - */ - port_data->attn_list = sock_data->attn_list_next; - sock_data->flags &= ~_EP_SOCK_LISTED; - - /* Delete it. */ - r = epoll_ctl(port_handle, EPOLL_CTL_DEL, sock_data->sock, NULL); - assert(r == 0); - - continue; - } - } - - /* Remove from attention list */ - port_data->attn_list = sock_data->attn_list_next; - sock_data->attn_list_prev = sock_data->attn_list_next = NULL; - sock_data->flags &= ~_EP_SOCK_LISTED; - } - - /* Compute how much overlapped entries can be dequeued at most. */ - max_entries = array_count(entries); - if ((int) max_entries > maxevents) - max_entries = maxevents; - - result = GetQueuedCompletionStatusEx( - port_data->iocp, entries, max_entries, &count, gqcs_timeout, FALSE); - - if (!result) { - DWORD error = GetLastError(); - if (error == WAIT_TIMEOUT) - return_success(0); + BOOL r = GetQueuedCompletionStatusEx(port_data->iocp, + completion_list, + maxevents, + &completion_count, + gqcs_timeout, + FALSE); + if (!r) { + if (GetLastError() == WAIT_TIMEOUT) + return 0; else - return_error(-1, error); + return_error(-1); } - port_data->pending_reqs_count -= count; - - /* Successfully dequeued overlappeds. */ - for (i = 0; i < count; i++) { - OVERLAPPED* overlapped; - _ep_io_req_t* io_req; - _ep_sock_data_t* sock_data; - DWORD afd_events; - int registered_events, reported_events; - - overlapped = entries[i].lpOverlapped; - io_req = container_of(overlapped, _ep_io_req_t, overlapped); - sock_data = io_req->sock_data; - - if (io_req->generation < sock_data->io_req_generation) { - /* This io request has been superseded. Free and ignore it. */ - free(io_req); - continue; - } - - /* Dequeued the most recent request. Reset generation and - * submitted_events. - */ - sock_data->io_req_generation = 0; - sock_data->submitted_events = 0; - _ep_io_req_free(io_req); - - registered_events = sock_data->registered_events; - reported_events = 0; - - /* Check if this io request was associated with a socket that was removed - * with EPOLL_CTL_DEL. - */ - if (sock_data->flags & _EP_SOCK_DELETED) { - free(io_req); - free(sock_data); - continue; - } - - /* Check for error. */ - if (!NT_SUCCESS(overlapped->Internal)) { - struct epoll_event* ev = events + (num_events++); - ev->data = sock_data->user_data; - ev->events = EPOLLERR; - continue; - } - - if (io_req->poll_info.NumberOfHandles == 0) { - /* NumberOfHandles can be zero if this poll request was canceled - * due to a more recent exclusive poll request. - */ - afd_events = 0; - } else { - afd_events = io_req->poll_info.Handles[0].Events; - } - - /* Check for a closed socket. */ - if (afd_events & AFD_POLL_LOCAL_CLOSE) { - RB_REMOVE(_ep_sock_data_tree, &port_data->sock_data_tree, sock_data); - free(io_req); - free(sock_data); - continue; - } - - /* Convert afd events to epoll events. */ - if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) - reported_events |= (EPOLLIN | EPOLLRDNORM); - if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) - reported_events |= (EPOLLIN | EPOLLRDBAND); - if (afd_events & AFD_POLL_SEND) - reported_events |= (EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND); - if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT)) - reported_events |= (EPOLLRDHUP | EPOLLIN | EPOLLRDNORM | EPOLLRDBAND); - if (afd_events & AFD_POLL_ABORT) - reported_events |= EPOLLHUP | EPOLLERR; - if (afd_events & AFD_POLL_CONNECT) - reported_events |= (EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND); - if (afd_events & AFD_POLL_CONNECT_FAIL) - reported_events |= EPOLLERR; - - /* Don't report events that the user didn't specify. */ - reported_events &= registered_events; - - if (reported_events == 0) { - assert(!(sock_data->flags & _EP_SOCK_LISTED)); - ATTN_LIST_ADD(port_data, sock_data); - } else { - /* Unless EPOLLONESHOT is used add the socket back to the attention - * list. - */ - if (!(registered_events & EPOLLONESHOT)) { - assert(!(sock_data->flags & _EP_SOCK_LISTED)); - ATTN_LIST_ADD(port_data, sock_data); - } - } - - if (reported_events) { - struct epoll_event* ev = &events[num_events++]; - ev->data = sock_data->user_data; - ev->events = reported_events; - } - } - - if (num_events > 0) - return_success(num_events); + event_count = _ep_port_feed_events( + port_data, completion_list, completion_count, events, maxevents); + if (event_count > 0) + return (int) event_count; /* Events were dequeued, but none were relevant. Recompute timeout. */ if (timeout > 0) { @@ -458,7 +608,7 @@ int epoll_wait(epoll_t port_handle, } } while (timeout > 0); - return_success(0); + return 0; } epoll_t epoll_create(void) { @@ -466,8 +616,8 @@ epoll_t epoll_create(void) { HANDLE iocp; /* If necessary, do global initialization first. This is totally not - * thread-safe at the moment. - */ + * thread-safe at the moment. + */ if (!_ep_initialized) { if (_ep_initialize() < 0) return NULL; @@ -485,13 +635,14 @@ epoll_t epoll_create(void) { } port_data->iocp = iocp; - port_data->attn_list = NULL; - port_data->pending_reqs_count = 0; + port_data->poll_req_count = 0; - memset(&port_data->peer_sockets, 0, sizeof port_data->peer_sockets); + QUEUE_INIT(&port_data->update_queue); + + memset(&port_data->driver_sockets, 0, sizeof port_data->driver_sockets); RB_INIT(&port_data->sock_data_tree); - return (epoll_t)port_data; + return (epoll_t) port_data; } int epoll_close(epoll_t port_handle) { @@ -501,13 +652,13 @@ int epoll_close(epoll_t port_handle) { port_data = (_ep_port_data_t*) port_handle; /* Close all peer sockets. This will make all pending io requests return. */ - for (size_t i = 0; i < array_count(port_data->peer_sockets); i++) { - SOCKET peer_sock = port_data->peer_sockets[i]; - if (peer_sock != 0 && peer_sock != INVALID_SOCKET) { - if (closesocket(peer_sock) != 0) + for (size_t i = 0; i < array_count(port_data->driver_sockets); i++) { + SOCKET driver_socket = port_data->driver_sockets[i]; + if (driver_socket != 0 && driver_socket != INVALID_SOCKET) { + if (closesocket(driver_socket) != 0) return_error(-1); - port_data->peer_sockets[i] = 0; + port_data->driver_sockets[i] = 0; } } @@ -516,7 +667,7 @@ int epoll_close(epoll_t port_handle) { * the overlapped status contained in them. But since we are sure that * all requests will soon return, just await them all. */ - while (port_data->pending_reqs_count > 0) { + while (port_data->poll_req_count > 0) { OVERLAPPED_ENTRY entries[64]; DWORD result; ULONG count, i; @@ -531,12 +682,12 @@ int epoll_close(epoll_t port_handle) { if (!result) return_error(-1); - port_data->pending_reqs_count -= count; + port_data->poll_req_count -= count; for (i = 0; i < count; i++) { - _ep_io_req_t* io_req = - container_of(entries[i].lpOverlapped, _ep_io_req_t, overlapped); - free(io_req); + _ep_poll_req_t* poll_req = + container_of(entries[i].lpOverlapped, _ep_poll_req_t, overlapped); + free(poll_req); } } @@ -553,7 +704,7 @@ int epoll_close(epoll_t port_handle) { /* Finally, remove the port data. */ free(port_data); - return_success(0); + return 0; } static int _ep_initialize(void) { @@ -567,14 +718,14 @@ static int _ep_initialize(void) { if (nt_initialize() < 0) return -1; - return_success(0); + return 0; } -static SOCKET _ep_get_peer_socket(_ep_port_data_t* port_data, - WSAPROTOCOL_INFOW* protocol_info) { +static SOCKET _ep_get_driver_socket(_ep_port_data_t* port_data, + WSAPROTOCOL_INFOW* protocol_info) { ssize_t index; size_t i; - SOCKET peer_socket; + SOCKET driver_socket; index = -1; for (i = 0; i < array_count(AFD_PROVIDER_GUID_LIST); i++) { @@ -593,90 +744,76 @@ static SOCKET _ep_get_peer_socket(_ep_port_data_t* port_data, * try again if the peer socket creation failed earlier for the same * protocol. */ - peer_socket = port_data->peer_sockets[index]; - if (peer_socket == 0) { - peer_socket = _ep_create_peer_socket(port_data->iocp, protocol_info); - port_data->peer_sockets[index] = peer_socket; + driver_socket = port_data->driver_sockets[index]; + if (driver_socket == 0) { + driver_socket = _ep_create_driver_socket(port_data->iocp, protocol_info); + port_data->driver_sockets[index] = driver_socket; } - return peer_socket; + return driver_socket; } -static SOCKET _ep_create_peer_socket(HANDLE iocp, - WSAPROTOCOL_INFOW* protocol_info) { - SOCKET sock = 0; +static SOCKET _ep_create_driver_socket(HANDLE iocp, + WSAPROTOCOL_INFOW* protocol_info) { + SOCKET socket = 0; - sock = WSASocketW(protocol_info->iAddressFamily, - protocol_info->iSocketType, - protocol_info->iProtocol, - protocol_info, - 0, - WSA_FLAG_OVERLAPPED); - if (sock == INVALID_SOCKET) + socket = WSASocketW(protocol_info->iAddressFamily, + protocol_info->iSocketType, + protocol_info->iProtocol, + protocol_info, + 0, + WSA_FLAG_OVERLAPPED); + if (socket == INVALID_SOCKET) return_error(INVALID_SOCKET); - if (!SetHandleInformation((HANDLE) sock, HANDLE_FLAG_INHERIT, 0)) + if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0)) goto error; - if (CreateIoCompletionPort((HANDLE) sock, iocp, 0, 0) == NULL) + if (CreateIoCompletionPort((HANDLE) socket, iocp, 0, 0) == NULL) goto error; - return sock; + return socket; error:; DWORD error = GetLastError(); - closesocket(sock); + closesocket(socket); return_error(INVALID_SOCKET, error); } int _ep_compare_sock_data(_ep_sock_data_t* a, _ep_sock_data_t* b) { - return a->sock - b->sock; + return a->socket - b->socket; } static int _ep_submit_poll_req(_ep_port_data_t* port_data, _ep_sock_data_t* sock_data) { - _ep_io_req_t* io_req; - int registered_events; + _ep_poll_req_t* poll_req; + int user_events; DWORD result, afd_events; - io_req = _ep_io_req_alloc(); - if (io_req == NULL) + poll_req = _ep_poll_req_new(port_data, sock_data); + if (poll_req == NULL) return -1; - registered_events = sock_data->registered_events; + user_events = sock_data->user_events; + afd_events = _epoll_events_to_afd_events(user_events); - /* These events should always be registered. */ - assert(registered_events & EPOLLERR); - assert(registered_events & EPOLLHUP); - afd_events = AFD_POLL_ABORT | AFD_POLL_CONNECT_FAIL | AFD_POLL_LOCAL_CLOSE; + poll_req->sock_data = sock_data; - if (registered_events & (EPOLLIN | EPOLLRDNORM)) - afd_events |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT; - if (registered_events & (EPOLLIN | EPOLLRDBAND)) - afd_events |= AFD_POLL_RECEIVE_EXPEDITED; - if (registered_events & (EPOLLOUT | EPOLLWRNORM | EPOLLRDBAND)) - afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; + memset(&poll_req->overlapped, 0, sizeof poll_req->overlapped); - io_req->generation = sock_data->io_req_generation + 1; - io_req->sock_data = sock_data; + poll_req->poll_info.Exclusive = TRUE; + poll_req->poll_info.NumberOfHandles = 1; + poll_req->poll_info.Timeout.QuadPart = INT64_MAX; + poll_req->poll_info.Handles[0].Handle = (HANDLE) sock_data->afd_socket; + poll_req->poll_info.Handles[0].Status = 0; + poll_req->poll_info.Handles[0].Events = afd_events; - memset(&io_req->overlapped, 0, sizeof io_req->overlapped); - - io_req->poll_info.Exclusive = TRUE; - io_req->poll_info.NumberOfHandles = 1; - io_req->poll_info.Timeout.QuadPart = INT64_MAX; - io_req->poll_info.Handles[0].Handle = (HANDLE) sock_data->base_sock; - io_req->poll_info.Handles[0].Status = 0; - io_req->poll_info.Handles[0].Events = afd_events; - - result = - afd_poll(sock_data->peer_sock, &io_req->poll_info, &io_req->overlapped); + result = afd_poll( + sock_data->driver_socket, &poll_req->poll_info, &poll_req->overlapped); if (result != 0 && GetLastError() != ERROR_IO_PENDING) return_error(-1); - sock_data->submitted_events = registered_events; - sock_data->io_req_generation = io_req->generation; - port_data->pending_reqs_count++; + _ep_poll_req_set_recent(sock_data, poll_req, user_events); - return_success(0); + return 0; } diff --git a/src/error.c b/src/error.c index d5b4612..d600d35 100644 --- a/src/error.c +++ b/src/error.c @@ -49,7 +49,3 @@ void we_set_win_error(DWORD error) { SetLastError(error); errno = we_map_win_error_to_errno(error); } - -void we_clear_win_error(void) { - SetLastError(ERROR_SUCCESS); -} diff --git a/src/error.h b/src/error.h index 03683ea..c18c8f9 100644 --- a/src/error.h +++ b/src/error.h @@ -10,20 +10,14 @@ DWORD we_map_ntstatus_to_ws_error(NTSTATUS ntstatus); errno_t we_map_win_error_to_errno(DWORD error); void we_set_win_error(DWORD error); -void we_clear_win_error(void); -#define _return_error_helper(error, value) \ - do { \ - we_set_win_error(error); \ - return (value); \ +#define _return_error_helper(error, value) \ + do { \ + we_set_win_error(error); \ + /* { printf("%d\n", error); DebugBreak(); } */ \ + return (value); \ } while (0) #define return_error(value, ...) _return_error_helper(__VA_ARGS__ + 0, value) -#define return_success(value) \ - do { \ - we_clear_win_error(); \ - return (value); \ - } while (0) - #endif /* ERROR_H_ */ diff --git a/src/queue.h b/src/queue.h new file mode 100644 index 0000000..7a4e0cc --- /dev/null +++ b/src/queue.h @@ -0,0 +1,99 @@ +/* Copyright (c) 2013, Ben Noordhuis + * + * Permission to use, copy, modify, and/or distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#ifndef QUEUE_H_ +#define QUEUE_H_ + +#include + +typedef void* QUEUE[2]; + +/* Private macros. */ +#define QUEUE_NEXT(q) (*(QUEUE**) &((*(q))[0])) +#define QUEUE_PREV(q) (*(QUEUE**) &((*(q))[1])) +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + +/* Public macros. */ +#define QUEUE_DATA(ptr, type, field) \ + ((type*) ((char*) (ptr) -offsetof(type, field))) + +/* Important note: mutating the list while QUEUE_FOREACH is + * iterating over its elements results in undefined behavior. + */ +#define QUEUE_FOREACH(q, h) \ + for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q)) + +#define QUEUE_EMPTY(q) ((const QUEUE*) (q) == (const QUEUE*) QUEUE_NEXT(q)) + +#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) + +#define QUEUE_INIT(q) \ + do { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } while (0) + +#define QUEUE_ADD(h, n) \ + do { \ + QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \ + QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV(h) = QUEUE_PREV(n); \ + QUEUE_PREV_NEXT(h) = (h); \ + } while (0) + +#define QUEUE_SPLIT(h, q, n) \ + do { \ + QUEUE_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(n) = (n); \ + QUEUE_NEXT(n) = (q); \ + QUEUE_PREV(h) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(h) = (h); \ + QUEUE_PREV(q) = (n); \ + } while (0) + +#define QUEUE_MOVE(h, n) \ + do { \ + if (QUEUE_EMPTY(h)) \ + QUEUE_INIT(n); \ + else { \ + QUEUE* q = QUEUE_HEAD(h); \ + QUEUE_SPLIT(h, q, n); \ + } \ + } while (0) + +#define QUEUE_INSERT_HEAD(h, q) \ + do { \ + QUEUE_NEXT(q) = QUEUE_NEXT(h); \ + QUEUE_PREV(q) = (h); \ + QUEUE_NEXT_PREV(q) = (q); \ + QUEUE_NEXT(h) = (q); \ + } while (0) + +#define QUEUE_INSERT_TAIL(h, q) \ + do { \ + QUEUE_NEXT(q) = (h); \ + QUEUE_PREV(q) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(q) = (q); \ + QUEUE_PREV(h) = (q); \ + } while (0) + +#define QUEUE_REMOVE(q) \ + do { \ + QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \ + QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \ + } while (0) + +#endif /* QUEUE_H_ */ diff --git a/test/test-udp-pings.c b/test/test-udp-pings.c index 7136e58..2f798c6 100644 --- a/test/test-udp-pings.c +++ b/test/test-udp-pings.c @@ -1,12 +1,14 @@ #include #include +#include #include "win.h" +#include "error.h" #include "epoll.h" static const char PING[] = "PING"; -static const int NUM_PINGERS = 1000; -static const DWORD RUN_TIME = 10000; +static const int NUM_PINGERS = 10000; +static const DWORD RUN_TIME = 40000; int main(int argc, char* argv[]) { epoll_t epoll_hnd; @@ -61,13 +63,17 @@ int main(int argc, char* argv[]) { r = ioctlsocket(sock, FIONBIO, &one); assert(r == 0); + int one = 1; + r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &one, sizeof one); + assert(r == 0); + r = connect(sock, (struct sockaddr*) &addr, sizeof addr); /* Unlike unix, windows sets the error to EWOULDBLOCK when the connection * is being established in the background. */ assert(r == 0 || WSAGetLastError() == WSAEWOULDBLOCK); - ev.events = EPOLLOUT | EPOLLERR; + ev.events = EPOLLOUT | EPOLLERR | EPOLLONESHOT; ev.data.sock = sock; r = epoll_ctl(epoll_hnd, EPOLL_CTL_ADD, sock, &ev); assert(r == 0); @@ -147,10 +153,20 @@ int main(int argc, char* argv[]) { wsa_buf.buf = (char*) PING; wsa_buf.len = sizeof PING; r = WSASend(sock, &wsa_buf, 1, &bytes, 0, NULL, NULL); + we_set_win_error(0); + if (r < 0) perror("send"); assert(r >= 0); assert(bytes == sizeof PING); pings_sent++; + + uint32_t rev = rand() & 0xff | EPOLLOUT | EPOLLONESHOT; + struct epoll_event e; + e.data.sock = sock; + e.events = rev; + if (epoll_ctl(epoll_hnd, EPOLL_CTL_ADD, sock, &e) < 0) + abort(); + continue; }