diff --git a/src/sock.c b/src/sock.c index bb4c1a9..0f1ab8b 100644 --- a/src/sock.c +++ b/src/sock.c @@ -15,11 +15,6 @@ (EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDNORM | \ EPOLLRDBAND | EPOLLWRNORM | EPOLLWRBAND | EPOLLRDHUP) -typedef struct _poll_req { - OVERLAPPED overlapped; - AFD_POLL_INFO poll_info; -} _poll_req_t; - typedef enum _poll_status { _POLL_IDLE = 0, _POLL_PENDING, @@ -27,7 +22,8 @@ typedef enum _poll_status { } _poll_status_t; typedef struct ep_sock { - _poll_req_t poll_req; + OVERLAPPED overlapped; + AFD_POLL_INFO poll_info; queue_node_t queue_node; tree_node_t tree_node; poll_group_t* poll_group; @@ -39,112 +35,6 @@ typedef struct ep_sock { bool delete_pending; } ep_sock_t; -static DWORD _epoll_events_to_afd_events(uint32_t epoll_events) { - /* Always monitor for AFD_POLL_LOCAL_CLOSE, which is triggered when the - * socket is closed with closesocket() or CloseHandle(). */ - DWORD afd_events = AFD_POLL_LOCAL_CLOSE; - - if (epoll_events & (EPOLLIN | EPOLLRDNORM)) - afd_events |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT; - if (epoll_events & (EPOLLPRI | EPOLLRDBAND)) - afd_events |= AFD_POLL_RECEIVE_EXPEDITED; - if (epoll_events & (EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND)) - afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; - if (epoll_events & (EPOLLIN | EPOLLRDNORM | EPOLLRDHUP)) - afd_events |= AFD_POLL_DISCONNECT; - if (epoll_events & EPOLLHUP) - afd_events |= AFD_POLL_ABORT; - if (epoll_events & EPOLLERR) - afd_events |= AFD_POLL_CONNECT_FAIL; - - return afd_events; -} - -static uint32_t _afd_events_to_epoll_events(DWORD afd_events) { - uint32_t epoll_events = 0; - - if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) - epoll_events |= EPOLLIN | EPOLLRDNORM; - if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) - epoll_events |= EPOLLPRI | EPOLLRDBAND; - if (afd_events & (AFD_POLL_SEND | AFD_POLL_CONNECT)) - epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; - if (afd_events & AFD_POLL_DISCONNECT) - epoll_events |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP; - if (afd_events & AFD_POLL_ABORT) - epoll_events |= EPOLLHUP; - if (afd_events & AFD_POLL_CONNECT_FAIL) - epoll_events |= EPOLLERR; - - return epoll_events; -} - -static int _poll_req_submit(_poll_req_t* poll_req, - uint32_t epoll_events, - SOCKET socket, - SOCKET driver_socket) { - int r; - - memset(&poll_req->overlapped, 0, sizeof poll_req->overlapped); - - poll_req->poll_info.Exclusive = FALSE; - poll_req->poll_info.NumberOfHandles = 1; - poll_req->poll_info.Timeout.QuadPart = INT64_MAX; - poll_req->poll_info.Handles[0].Handle = (HANDLE) socket; - poll_req->poll_info.Handles[0].Status = 0; - poll_req->poll_info.Handles[0].Events = - _epoll_events_to_afd_events(epoll_events); - - r = afd_poll(driver_socket, &poll_req->poll_info, &poll_req->overlapped); - if (r != 0 && GetLastError() != ERROR_IO_PENDING) - return_error(-1); - - return 0; -} - -static int _poll_req_cancel(_poll_req_t* poll_req, SOCKET driver_socket) { - OVERLAPPED* overlapped = &poll_req->overlapped; - - if (!CancelIoEx((HANDLE) driver_socket, overlapped)) { - DWORD error = GetLastError(); - if (error == ERROR_NOT_FOUND) - return 0; /* Already completed or canceled. */ - else - return_error(-1); - } - - return 0; -} - -static void _poll_req_complete(const _poll_req_t* poll_req, - uint32_t* epoll_events_out, - bool* socket_closed_out) { - const OVERLAPPED* overlapped = &poll_req->overlapped; - - uint32_t epoll_events = 0; - bool socket_closed = false; - - if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) { - /* The poll request was cancelled by CancelIoEx. */ - } else if (!NT_SUCCESS(overlapped->Internal)) { - /* The overlapped request itself failed in an unexpected way. */ - epoll_events = EPOLLERR; - } else if (poll_req->poll_info.NumberOfHandles < 1) { - /* This overlapped request succeeded but didn't report any events. */ - } else { - /* Events related to our socket were reported. */ - DWORD afd_events = poll_req->poll_info.Handles[0].Events; - - if (afd_events & AFD_POLL_LOCAL_CLOSE) - socket_closed = true; /* Socket closed locally be silently dropped. */ - else - epoll_events = _afd_events_to_epoll_events(afd_events); - } - - *epoll_events_out = epoll_events; - *socket_closed_out = socket_closed; -} - static inline ep_sock_t* _ep_sock_alloc(void) { ep_sock_t* sock_info = malloc(sizeof *sock_info); if (sock_info == NULL) @@ -157,15 +47,17 @@ static inline void _ep_sock_free(ep_sock_t* sock_info) { } static int _ep_sock_cancel_poll(ep_sock_t* sock_info) { + HANDLE driver_handle = (HANDLE) poll_group_get_socket(sock_info->poll_group); assert(sock_info->poll_status == _POLL_PENDING); - if (_poll_req_cancel(&sock_info->poll_req, - poll_group_get_socket(sock_info->poll_group)) < 0) - return -1; + /* CancelIoEx() may fail with ERROR_NOT_FOUND if the overlapped operation has + * already completed. This is not a problem and we proceed normally. */ + if (!CancelIoEx(driver_handle, &sock_info->overlapped) && + GetLastError() != ERROR_NOT_FOUND) + return_error(-1); sock_info->poll_status = _POLL_CANCELLED; sock_info->pending_events = 0; - return 0; } @@ -210,9 +102,9 @@ err1: return NULL; } -static void _ep_sock_delete(ep_port_t* port_info, - ep_sock_t* sock_info, - bool force) { +static int _ep_sock_delete(ep_port_t* port_info, + ep_sock_t* sock_info, + bool force) { if (!sock_info->delete_pending) { if (sock_info->poll_status == _POLL_PENDING) _ep_sock_cancel_poll(sock_info); @@ -235,6 +127,8 @@ static void _ep_sock_delete(ep_port_t* port_info, /* Free the socket later. */ ep_port_add_deleted_socket(port_info, sock_info); } + + return 0; } void ep_sock_delete(ep_port_t* port_info, ep_sock_t* sock_info) { @@ -262,108 +156,161 @@ int ep_sock_set_event(ep_port_t* port_info, return 0; } -int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) { - bool socket_closed = false; +static inline ULONG _epoll_events_to_afd_events(uint32_t epoll_events) { + /* Always monitor for AFD_POLL_LOCAL_CLOSE, which is triggered when the + * socket is closed with closesocket() or CloseHandle(). */ + DWORD afd_events = AFD_POLL_LOCAL_CLOSE; + if (epoll_events & (EPOLLIN | EPOLLRDNORM)) + afd_events |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT; + if (epoll_events & (EPOLLPRI | EPOLLRDBAND)) + afd_events |= AFD_POLL_RECEIVE_EXPEDITED; + if (epoll_events & (EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND)) + afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; + if (epoll_events & (EPOLLIN | EPOLLRDNORM | EPOLLRDHUP)) + afd_events |= AFD_POLL_DISCONNECT; + if (epoll_events & EPOLLHUP) + afd_events |= AFD_POLL_ABORT; + if (epoll_events & EPOLLERR) + afd_events |= AFD_POLL_CONNECT_FAIL; + + return afd_events; +} + +static inline uint32_t _afd_events_to_epoll_events(ULONG afd_events) { + uint32_t epoll_events = 0; + + if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) + epoll_events |= EPOLLIN | EPOLLRDNORM; + if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) + epoll_events |= EPOLLPRI | EPOLLRDBAND; + if (afd_events & (AFD_POLL_SEND | AFD_POLL_CONNECT)) + epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; + if (afd_events & AFD_POLL_DISCONNECT) + epoll_events |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP; + if (afd_events & AFD_POLL_ABORT) + epoll_events |= EPOLLHUP; + if (afd_events & AFD_POLL_CONNECT_FAIL) + epoll_events |= EPOLLERR; + + return epoll_events; +} + +int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) { assert(!sock_info->delete_pending); + if ((sock_info->poll_status == _POLL_PENDING) && (sock_info->user_events & _KNOWN_EPOLL_EVENTS & ~sock_info->pending_events) == 0) { - /* All the events the user is interested in are already being monitored - * by the pending poll request. It might spuriously complete because of an - * event that we're no longer interested in; if that happens we just - * submit another poll request with the right event mask. */ + /* All the events the user is interested in are already being monitored by + * the pending poll operation. It might spuriously complete because of an + * event that we're no longer interested in; when that happens we'll submit + * a new poll operation with the updated event mask. */ } else if (sock_info->poll_status == _POLL_PENDING) { - /* A poll request is already pending, but it's not monitoring for all the - * events that the user is interested in. Cancel the pending poll request; - * when it completes it will be submitted again with the correct event - * mask. */ + /* A poll operation is already pending, but it's not monitoring for all the + * events that the user is interested in. Therefore, cancel the pending + * poll operation; when we receive it's completion package, a new poll + * operation will be submitted with the correct event mask. */ if (_ep_sock_cancel_poll(sock_info) < 0) return -1; } else if (sock_info->poll_status == _POLL_CANCELLED) { - /* The poll request has already been cancelled, we're still waiting for it - * to return. For now, there's nothing that needs to be done. */ + /* The poll operation has already been cancelled, we're still waiting for + * it to return. For now, there's nothing that needs to be done. */ } else if (sock_info->poll_status == _POLL_IDLE) { - SOCKET driver_socket = poll_group_get_socket(sock_info->poll_group); + /* No poll operation is pending; start one. */ + sock_info->poll_info.Exclusive = FALSE; + sock_info->poll_info.NumberOfHandles = 1; + sock_info->poll_info.Timeout.QuadPart = INT64_MAX; + sock_info->poll_info.Handles[0].Handle = (HANDLE) sock_info->base_socket; + sock_info->poll_info.Handles[0].Status = 0; + sock_info->poll_info.Handles[0].Events = + _epoll_events_to_afd_events(sock_info->user_events); - if (_poll_req_submit(&sock_info->poll_req, - sock_info->user_events, - sock_info->base_socket, - driver_socket) < 0) { - if (GetLastError() == ERROR_INVALID_HANDLE) - /* The socket is broken. It will be dropped from the epoll set. */ - socket_closed = true; - else - /* Another error occurred, which is propagated to the caller. */ - return -1; + memset(&sock_info->overlapped, 0, sizeof sock_info->overlapped); - } else { - /* The poll request was successfully submitted. */ - sock_info->poll_status = _POLL_PENDING; - sock_info->pending_events = sock_info->user_events; + if (afd_poll(poll_group_get_socket(sock_info->poll_group), + &sock_info->poll_info, + &sock_info->overlapped) < 0) { + switch (GetLastError()) { + case ERROR_IO_PENDING: + /* Overlapped poll operation in progress; this is expected. */ + break; + case ERROR_INVALID_HANDLE: + /* Socket closed; it'll be dropped from the epoll set. */ + return _ep_sock_delete(port_info, sock_info, false); + default: + /* Other errors are propagated to the caller. */ + return_error(-1); + } } + + /* The poll request was successfully submitted. */ + sock_info->poll_status = _POLL_PENDING; + sock_info->pending_events = sock_info->user_events; + } else { /* Unreachable. */ assert(false); } ep_port_cancel_socket_update(port_info, sock_info); - - /* If we saw an ERROR_INVALID_HANDLE error, drop the socket. */ - if (socket_closed) - ep_sock_delete(port_info, sock_info); - return 0; } int ep_sock_feed_event(ep_port_t* port_info, OVERLAPPED* overlapped, struct epoll_event* ev) { - ep_sock_t* sock_info = - container_of(overlapped, ep_sock_t, poll_req.overlapped); - uint32_t epoll_events; - bool socket_closed; - int ev_count = 0; + ep_sock_t* sock_info = container_of(overlapped, ep_sock_t, overlapped); + AFD_POLL_INFO* poll_info = &sock_info->poll_info; + uint32_t epoll_events = 0; sock_info->poll_status = _POLL_IDLE; sock_info->pending_events = 0; if (sock_info->delete_pending) { - /* Ignore completion for overlapped poll operation if the socket is pending - * deletion; instead, delete the socket. */ - ep_sock_delete(port_info, sock_info); - return 0; + /* Socket has been deleted earlier and can now be freed. */ + return _ep_sock_delete(port_info, sock_info, false); + + } else if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) { + /* The poll request was cancelled by CancelIoEx. */ + + } else if (!NT_SUCCESS(overlapped->Internal)) { + /* The overlapped request itself failed in an unexpected way. */ + epoll_events = EPOLLERR; + + } else if (poll_info->NumberOfHandles < 1) { + /* This poll operation succeeded but didn't report any socket events. */ + + } else if (poll_info->Handles[0].Events & AFD_POLL_LOCAL_CLOSE) { + /* The poll operation reported that the socket was closed. */ + return _ep_sock_delete(port_info, sock_info, false); + + } else { + /* Events related to our socket were reported. */ + epoll_events = _afd_events_to_epoll_events(poll_info->Handles[0].Events); } - _poll_req_complete(&sock_info->poll_req, &epoll_events, &socket_closed); + /* Requeue the socket so a new poll request will be submitted. */ + ep_port_request_socket_update(port_info, sock_info); - /* Filter events that the user didn't ask for. */ + /* Filter out events that the user didn't ask for. */ epoll_events &= sock_info->user_events; - /* Clear the event mask if EPOLLONESHOT is set and there are any events - * to report. */ - if (epoll_events != 0 && (sock_info->user_events & EPOLLONESHOT)) + /* Return if there are no epoll events to report. */ + if (epoll_events == 0) + return 0; + + /* If the the socket has the EPOLLONESHOT flag set, unmonitor all events, + * even EPOLLERR and EPOLLHUP. But always keep looking for closed sockets. */ + if (sock_info->user_events & EPOLLONESHOT) sock_info->user_events = 0; - /* Fill the ev structure if there are any events to report. */ - if (epoll_events != 0) { - ev->data = sock_info->user_data; - ev->events = epoll_events; - ev_count = 1; - } - - if (socket_closed) - /* Drop the socket from the epoll set. */ - ep_sock_delete(port_info, sock_info); - else - /* Put the socket back onto the attention list so a new poll request will - * be submitted. */ - ep_port_request_socket_update(port_info, sock_info); - - return ev_count; + ev->data = sock_info->user_data; + ev->events = epoll_events; + return 1; } queue_node_t* ep_sock_to_queue_node(ep_sock_t* sock_info) {