From cafefc14cbc4b06172e445666f01116e993e3391 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Thu, 14 Sep 2017 04:02:58 +0200 Subject: [PATCH] all-in-one: rebuild --- allinone/epoll-all-in-one.c | 510 ++++++++++++++---------------------- 1 file changed, 190 insertions(+), 320 deletions(-) diff --git a/allinone/epoll-all-in-one.c b/allinone/epoll-all-in-one.c index 2cfb9fc..d694773 100644 --- a/allinone/epoll-all-in-one.c +++ b/allinone/epoll-all-in-one.c @@ -26,7 +26,6 @@ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * */ #define EPOLL_INTERNAL static @@ -984,6 +983,7 @@ EPOLL_INTERNAL void ep_sock_force_delete(ep_port_t* port_info, ep_sock_t* sock_info); EPOLL_INTERNAL ep_sock_t* ep_sock_find(tree_t* tree, SOCKET socket); +EPOLL_INTERNAL ep_sock_t* ep_sock_from_overlapped(OVERLAPPED* overlapped); EPOLL_INTERNAL int ep_sock_set_event(ep_port_t* port_info, ep_sock_t* sock_info, @@ -991,14 +991,9 @@ EPOLL_INTERNAL int ep_sock_set_event(ep_port_t* port_info, EPOLL_INTERNAL int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info); EPOLL_INTERNAL int ep_sock_feed_event(ep_port_t* port_info, - poll_req_t* poll_req, + ep_sock_t* sock_info, struct epoll_event* ev); -EPOLL_INTERNAL void ep_sock_register_poll_req(ep_port_t* port_info, - ep_sock_t* sock_info); -EPOLL_INTERNAL void ep_sock_unregister_poll_req(ep_port_t* port_info, - ep_sock_t* sock_info); - typedef struct ep_port ep_port_t; typedef struct poll_group_allocator poll_group_allocator_t; typedef struct poll_group poll_group_t; @@ -1021,7 +1016,6 @@ typedef struct ep_port { poll_group_allocators[array_count(AFD_PROVIDER_GUID_LIST)]; tree_t sock_tree; queue_t update_queue; - size_t poll_req_count; } ep_port_t; EPOLL_INTERNAL ep_port_t* ep_port_new(HANDLE iocp); @@ -1037,9 +1031,6 @@ EPOLL_INTERNAL int ep_port_add_socket(ep_port_t* port_info, EPOLL_INTERNAL int ep_port_del_socket(ep_port_t* port_info, tree_node_t* tree_node); -EPOLL_INTERNAL void ep_port_add_req(ep_port_t* port_info); -EPOLL_INTERNAL void ep_port_del_req(ep_port_t* port_info); - EPOLL_INTERNAL void ep_port_request_socket_update(ep_port_t* port_info, ep_sock_t* sock_info); EPOLL_INTERNAL void ep_port_clear_socket_update(ep_port_t* port_info, @@ -1080,60 +1071,144 @@ int epoll_close(epoll_t port_handle) { #include #include -typedef struct ep_port ep_port_t; -typedef struct ep_sock ep_sock_t; -typedef struct poll_req poll_req_t; - -EPOLL_INTERNAL poll_req_t* poll_req_new(ep_port_t* port_info, - ep_sock_t* sock_info); - -EPOLL_INTERNAL void poll_req_delete(ep_port_t* port_info, - ep_sock_t* sock_info, - poll_req_t* poll_req); - -EPOLL_INTERNAL poll_req_t* overlapped_to_poll_req(OVERLAPPED* overlapped); - -EPOLL_INTERNAL ep_sock_t* poll_req_get_sock_data(const poll_req_t* poll_req); - -EPOLL_INTERNAL int poll_req_submit(poll_req_t* poll_req, - uint32_t epoll_events, - SOCKET socket, - SOCKET driver_socket); - -EPOLL_INTERNAL int poll_req_cancel(poll_req_t* poll_req, SOCKET group_socket); -EPOLL_INTERNAL void poll_req_complete(const poll_req_t* poll_req, - uint32_t* epoll_events_out, - bool* socket_closed_out); - #ifndef SIO_BASE_HANDLE #define SIO_BASE_HANDLE 0x48000022 #endif #define _EP_EVENT_MASK 0xffff -#define _EP_SOCK_DELETED 0x2 +typedef struct _poll_req { + OVERLAPPED overlapped; + AFD_POLL_INFO poll_info; +} _poll_req_t; + +typedef enum _poll_status { + _POLL_IDLE = 0, + _POLL_PENDING, + _POLL_CANCELLED +} _poll_status_t; typedef struct _ep_sock_private { ep_sock_t pub; - SOCKET afd_socket; + _poll_req_t poll_req; poll_group_t* poll_group; + SOCKET afd_socket; epoll_data_t user_data; - poll_req_t* latest_poll_req; uint32_t user_events; - uint32_t latest_poll_req_events; - uint32_t poll_req_count; - uint32_t flags; - bool poll_req_active; + uint32_t pending_events; + _poll_status_t poll_status; + unsigned deleted : 1; } _ep_sock_private_t; +static DWORD _epoll_events_to_afd_events(uint32_t epoll_events) { + DWORD afd_events; + + /* These events should always be monitored. */ + assert(epoll_events & EPOLLERR); + assert(epoll_events & EPOLLHUP); + afd_events = AFD_POLL_ABORT | AFD_POLL_CONNECT_FAIL | AFD_POLL_LOCAL_CLOSE; + + if (epoll_events & (EPOLLIN | EPOLLRDNORM)) + afd_events |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT; + if (epoll_events & (EPOLLPRI | EPOLLRDBAND)) + afd_events |= AFD_POLL_RECEIVE_EXPEDITED; + if (epoll_events & (EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND)) + afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; + + return afd_events; +} + +static uint32_t _afd_events_to_epoll_events(DWORD afd_events) { + uint32_t epoll_events = 0; + + if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) + epoll_events |= EPOLLIN | EPOLLRDNORM; + if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) + epoll_events |= EPOLLPRI | EPOLLRDBAND; + if (afd_events & AFD_POLL_SEND) + epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; + if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT)) + epoll_events |= EPOLLIN | EPOLLRDHUP; + if (afd_events & AFD_POLL_ABORT) + epoll_events |= EPOLLHUP; + if (afd_events & AFD_POLL_CONNECT) + epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; + if (afd_events & AFD_POLL_CONNECT_FAIL) + epoll_events |= EPOLLERR; + + return epoll_events; +} + +static int _poll_req_submit(_poll_req_t* poll_req, + uint32_t epoll_events, + SOCKET socket, + SOCKET driver_socket) { + int r; + + memset(&poll_req->overlapped, 0, sizeof poll_req->overlapped); + + poll_req->poll_info.Exclusive = FALSE; + poll_req->poll_info.NumberOfHandles = 1; + poll_req->poll_info.Timeout.QuadPart = INT64_MAX; + poll_req->poll_info.Handles[0].Handle = (HANDLE) socket; + poll_req->poll_info.Handles[0].Status = 0; + poll_req->poll_info.Handles[0].Events = + _epoll_events_to_afd_events(epoll_events); + + r = afd_poll(driver_socket, &poll_req->poll_info, &poll_req->overlapped); + if (r != 0 && GetLastError() != ERROR_IO_PENDING) + return_error(-1); + + return 0; +} + +static int _poll_req_cancel(_poll_req_t* poll_req, SOCKET driver_socket) { + OVERLAPPED* overlapped = &poll_req->overlapped; + + if (!CancelIoEx((HANDLE) driver_socket, overlapped)) { + DWORD error = GetLastError(); + if (error == ERROR_NOT_FOUND) + return 0; /* Already completed or canceled. */ + else + return_error(-1); + } + + return 0; +} + +static void _poll_req_complete(const _poll_req_t* poll_req, + uint32_t* epoll_events_out, + bool* socket_closed_out) { + const OVERLAPPED* overlapped = &poll_req->overlapped; + + uint32_t epoll_events = 0; + bool socket_closed = false; + + if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) { + /* The poll request was cancelled by CancelIoEx. */ + } else if (!NT_SUCCESS(overlapped->Internal)) { + /* The overlapped request itself failed in an unexpected way. */ + epoll_events = EPOLLERR; + } else if (poll_req->poll_info.NumberOfHandles < 1) { + /* This overlapped request succeeded but didn't report any events. */ + } else { + /* Events related to our socket were reported. */ + DWORD afd_events = poll_req->poll_info.Handles[0].Events; + + if (afd_events & AFD_POLL_LOCAL_CLOSE) + socket_closed = true; /* Socket closed locally be silently dropped. */ + else + epoll_events = _afd_events_to_epoll_events(afd_events); + } + + *epoll_events_out = epoll_events; + *socket_closed_out = socket_closed; +} + static inline _ep_sock_private_t* _ep_sock_private(ep_sock_t* sock_info) { return container_of(sock_info, _ep_sock_private_t, pub); } -static inline bool _ep_sock_is_deleted(_ep_sock_private_t* sock_private) { - return sock_private->flags & _EP_SOCK_DELETED; -} - static inline _ep_sock_private_t* _ep_sock_alloc(void) { _ep_sock_private_t* sock_private = malloc(sizeof *sock_private); if (sock_private == NULL) @@ -1142,6 +1217,7 @@ static inline _ep_sock_private_t* _ep_sock_alloc(void) { } static inline void _ep_sock_free(_ep_sock_private_t* sock_private) { + assert(sock_private->poll_status == _POLL_IDLE); free(sock_private); } @@ -1205,8 +1281,6 @@ ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket) { if (sock_private == NULL) return NULL; - unused(port_info); - memset(sock_private, 0, sizeof *sock_private); tree_node_init(&sock_private->pub.tree_node); queue_node_init(&sock_private->pub.queue_node); @@ -1219,35 +1293,33 @@ ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket) { return &sock_private->pub; } -void _ep_sock_maybe_free(_ep_sock_private_t* sock_private) { - /* The socket may still have pending overlapped requests that have yet to be - * reported by the completion port. If that's the case the memory can't be - * released yet. It'll be released later as ep_sock_unregister_poll_req() - * calls this function. - */ - if (_ep_sock_is_deleted(sock_private) && sock_private->poll_req_count == 0) - _ep_sock_free(sock_private); -} - void ep_sock_delete(ep_port_t* port_info, ep_sock_t* sock_info) { _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); - assert(!_ep_sock_is_deleted(sock_private)); - sock_private->flags |= _EP_SOCK_DELETED; + assert(!sock_private->deleted); + sock_private->deleted = true; + + if (sock_private->poll_status == _POLL_PENDING) { + _poll_req_cancel(&sock_private->poll_req, + poll_group_get_socket(sock_private->poll_group)); + sock_private->poll_status = _POLL_CANCELLED; + sock_private->pending_events = 0; + } ep_port_del_socket(port_info, &sock_info->tree_node); ep_port_clear_socket_update(port_info, sock_info); ep_port_release_poll_group(sock_private->poll_group); sock_private->poll_group = NULL; - _ep_sock_maybe_free(sock_private); + /* If the poll request still needs to complete, the ep_sock object can't + * be free()d yet. `ep_sock_feed_event` will take care of this later. */ + if (sock_private->poll_status == _POLL_IDLE) + _ep_sock_free(sock_private); } void ep_sock_force_delete(ep_port_t* port_info, ep_sock_t* sock_info) { _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); - if (sock_private->latest_poll_req != NULL) - poll_req_delete(port_info, sock_info, sock_private->latest_poll_req); - assert(sock_private->poll_req_count == 0); + sock_private->poll_status = _POLL_IDLE; ep_sock_delete(port_info, sock_info); } @@ -1259,24 +1331,10 @@ ep_sock_t* ep_sock_find(tree_t* tree, SOCKET socket) { return container_of(tree_node, ep_sock_t, tree_node); } -void ep_sock_register_poll_req(ep_port_t* port_info, ep_sock_t* sock_info) { - _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); - - assert(!_ep_sock_is_deleted(sock_private)); - - ep_port_add_req(port_info); - sock_private->poll_req_count++; - assert(sock_private->poll_req_count == 1); -} - -void ep_sock_unregister_poll_req(ep_port_t* port_info, ep_sock_t* sock_info) { - _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); - - ep_port_del_req(port_info); - sock_private->poll_req_count--; - assert(sock_private->poll_req_count == 0); - - _ep_sock_maybe_free(sock_private); +ep_sock_t* ep_sock_from_overlapped(OVERLAPPED* overlapped) { + _ep_sock_private_t* sock_private = + container_of(overlapped, _ep_sock_private_t, poll_req.overlapped); + return &sock_private->pub; } int ep_sock_set_event(ep_port_t* port_info, @@ -1290,83 +1348,61 @@ int ep_sock_set_event(ep_port_t* port_info, sock_private->user_events = events; sock_private->user_data = ev->data; - if ((events & _EP_EVENT_MASK & ~(sock_private->latest_poll_req_events)) != 0) + if ((events & _EP_EVENT_MASK & ~(sock_private->pending_events)) != 0) ep_port_request_socket_update(port_info, sock_info); return 0; } -static inline bool _is_latest_poll_req(_ep_sock_private_t* sock_private, - poll_req_t* poll_req) { - assert(sock_private->latest_poll_req == poll_req || - sock_private->latest_poll_req == NULL); - return poll_req == sock_private->latest_poll_req; -} - -static inline void _clear_latest_poll_req(_ep_sock_private_t* sock_private) { - sock_private->latest_poll_req = NULL; - sock_private->latest_poll_req_events = 0; - sock_private->poll_req_active = false; -} - -static inline void _set_latest_poll_req(_ep_sock_private_t* sock_private, - poll_req_t* poll_req, - uint32_t epoll_events) { - sock_private->latest_poll_req = poll_req; - sock_private->latest_poll_req_events = epoll_events; - sock_private->poll_req_active = true; -} - int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) { _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); + SOCKET driver_socket = poll_group_get_socket(sock_private->poll_group); bool broken = false; - SOCKET driver_socket; assert(ep_port_is_socket_update_pending(port_info, sock_info)); - driver_socket = poll_group_get_socket(sock_private->poll_group); - - /* Check if there are events registered that are not yet submitted. In - * that case we need to submit another req. - */ - if ((sock_private->user_events & _EP_EVENT_MASK & - ~sock_private->latest_poll_req_events) == 0) { + if (sock_private->poll_status == _POLL_PENDING && + (sock_private->user_events & _EP_EVENT_MASK & + ~sock_private->pending_events) == 0) { /* All the events the user is interested in are already being monitored - * by the latest poll request. It might spuriously complete because of an + * by the pending poll request. It might spuriously complete because of an * event that we're no longer interested in; if that happens we just - * submit another poll request with the right event mask. - */ - assert(sock_private->latest_poll_req != NULL); + * submit another poll request with the right event mask. */ - } else if (sock_private->latest_poll_req != NULL) { - /* A poll request is already pending. Cancel the old one first; when it - * completes, we'll submit the new one. */ - if (sock_private->poll_req_active) { - poll_req_cancel(sock_private->latest_poll_req, driver_socket); - sock_private->poll_req_active = false; - } - - } else { - poll_req_t* poll_req = poll_req_new(port_info, &sock_private->pub); - if (poll_req == NULL) + } else if (sock_private->poll_status == _POLL_PENDING) { + /* A poll request is already pending, but it's not monitoring for all the + * events that the user is interested in. Cancel the pending poll request; + * when it completes it will be submitted again with the correct event + * mask. */ + if (_poll_req_cancel(&sock_private->poll_req, driver_socket) < 0) return -1; + sock_private->poll_status = _POLL_CANCELLED; + sock_private->pending_events = 0; - if (poll_req_submit(poll_req, - sock_private->user_events, - sock_private->afd_socket, - driver_socket) < 0) { - poll_req_delete(port_info, &sock_private->pub, poll_req); + } else if (sock_private->poll_status == _POLL_CANCELLED) { + /* The poll request has already been cancelled, we're still waiting for it + * to return. For now, there's nothing that needs to be done. */ + } else if (sock_private->poll_status == _POLL_IDLE) { + if (_poll_req_submit(&sock_private->poll_req, + sock_private->user_events, + sock_private->afd_socket, + driver_socket) < 0) { if (GetLastError() == ERROR_INVALID_HANDLE) /* The socket is broken. It will be dropped from the epoll set. */ broken = true; else /* Another error occurred, which is propagated to the caller. */ return -1; - } - if (!broken) - _set_latest_poll_req(sock_private, poll_req, sock_private->user_events); + } else { + /* The poll request was successfully submitted. */ + sock_private->poll_status = _POLL_PENDING; + sock_private->pending_events = sock_private->user_events; + } + } else { + /* Unreachable. */ + assert(false); } ep_port_clear_socket_update(port_info, sock_info); @@ -1379,27 +1415,25 @@ int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) { } int ep_sock_feed_event(ep_port_t* port_info, - poll_req_t* poll_req, + ep_sock_t* sock_info, struct epoll_event* ev) { - ep_sock_t* sock_info = poll_req_get_sock_data(poll_req); _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); uint32_t epoll_events; bool drop_socket; int ev_count = 0; - if (_ep_sock_is_deleted(sock_private) || - !_is_latest_poll_req(sock_private, poll_req)) { - /* Ignore completion for overlapped poll operation if it isn't - * the the most recently posted one, or if the socket has been - * deleted. */ - poll_req_delete(port_info, sock_info, poll_req); + sock_private->poll_status = _POLL_IDLE; + sock_private->pending_events = 0; + + if (sock_private->deleted) { + /* Ignore completion for overlapped poll operation if the socket has been + * deleted; instead, free the socket. */ + _ep_sock_free(sock_private); return 0; } - _clear_latest_poll_req(sock_private); - - poll_req_complete(poll_req, &epoll_events, &drop_socket); + _poll_req_complete(&sock_private->poll_req, &epoll_events, &drop_socket); /* Filter events that the user didn't ask for. */ epoll_events &= sock_private->user_events; @@ -1416,8 +1450,6 @@ int ep_sock_feed_event(ep_port_t* port_info, ev_count = 1; } - poll_req_delete(port_info, sock_info, poll_req); - if (drop_socket) /* Drop the socket from the epoll set. */ ep_sock_delete(port_info, sock_info); @@ -1525,10 +1557,10 @@ static size_t _ep_port_feed_events(ep_port_t* port_info, for (size_t i = 0; i < completion_count; i++) { OVERLAPPED* overlapped = completion_list[i].lpOverlapped; - poll_req_t* poll_req = overlapped_to_poll_req(overlapped); + ep_sock_t* sock_info = ep_sock_from_overlapped(overlapped); struct epoll_event* ev = &event_list[event_count]; - event_count += ep_sock_feed_event(port_info, poll_req, ev); + event_count += ep_sock_feed_event(port_info, sock_info, ev); } return event_count; @@ -1664,14 +1696,6 @@ int ep_port_del_socket(ep_port_t* port_info, tree_node_t* tree_node) { return tree_del(&port_info->sock_tree, tree_node); } -void ep_port_add_req(ep_port_t* port_info) { - port_info->poll_req_count++; -} - -void ep_port_del_req(ep_port_t* port_info) { - port_info->poll_req_count--; -} - poll_group_allocator_t* _get_poll_group_allocator( ep_port_t* port_info, size_t index, @@ -2434,7 +2458,7 @@ int nt_init(void) { return 0; } -static const size_t _DS_MAX_USERS = 32; +static const size_t _POLL_GROUP_MAX_SIZE = 32; typedef struct poll_group_allocator { ep_port_t* port_info; @@ -2446,7 +2470,7 @@ typedef struct poll_group { poll_group_allocator_t* allocator; queue_node_t queue_node; SOCKET socket; - size_t user_count; + size_t group_size; } poll_group_t; static int _poll_group_create_socket(poll_group_t* poll_group, @@ -2500,7 +2524,7 @@ static poll_group_t* _poll_group_new(poll_group_allocator_t* pga) { } static void _poll_group_delete(poll_group_t* poll_group) { - assert(poll_group->user_count == 0); + assert(poll_group->group_size == 0); closesocket(poll_group->socket); queue_remove(&poll_group->queue_node); free(poll_group); @@ -2543,12 +2567,12 @@ poll_group_t* poll_group_acquire(poll_group_allocator_t* pga) { ? container_of(queue_last(queue), poll_group_t, queue_node) : NULL; - if (poll_group == NULL || poll_group->user_count >= _DS_MAX_USERS) + if (poll_group == NULL || poll_group->group_size >= _POLL_GROUP_MAX_SIZE) poll_group = _poll_group_new(pga); if (poll_group == NULL) return NULL; - if (++poll_group->user_count == _DS_MAX_USERS) { + if (++poll_group->group_size == _POLL_GROUP_MAX_SIZE) { /* Move to the front of the queue. */ queue_remove(&poll_group->queue_node); queue_prepend(&pga->poll_group_queue, &poll_group->queue_node); @@ -2560,8 +2584,8 @@ poll_group_t* poll_group_acquire(poll_group_allocator_t* pga) { void poll_group_release(poll_group_t* poll_group) { poll_group_allocator_t* pga = poll_group->allocator; - poll_group->user_count--; - assert(poll_group->user_count < _DS_MAX_USERS); + poll_group->group_size--; + assert(poll_group->group_size < _POLL_GROUP_MAX_SIZE); /* Move to the back of the queue. */ queue_remove(&poll_group->queue_node); @@ -2570,160 +2594,6 @@ void poll_group_release(poll_group_t* poll_group) { /* TODO: free the poll_group_t* item at some point. */ } -typedef struct poll_req { - OVERLAPPED overlapped; - AFD_POLL_INFO poll_info; - ep_sock_t* sock_info; -} poll_req_t; - -static inline poll_req_t* _poll_req_alloc(void) { - poll_req_t* poll_req = malloc(sizeof *poll_req); - if (poll_req == NULL) - return_error(NULL, ERROR_NOT_ENOUGH_MEMORY); - return poll_req; -} - -static inline poll_req_t* _poll_req_free(poll_req_t* poll_req) { - free(poll_req); - return NULL; -} - -poll_req_t* poll_req_new(ep_port_t* port_info, ep_sock_t* sock_info) { - poll_req_t* poll_req = _poll_req_alloc(); - if (poll_req == NULL) - return NULL; - - memset(poll_req, 0, sizeof *poll_req); - poll_req->sock_info = sock_info; - - ep_sock_register_poll_req(port_info, sock_info); - - return poll_req; -} - -void poll_req_delete(ep_port_t* port_info, - ep_sock_t* sock_info, - poll_req_t* poll_req) { - assert(poll_req != NULL); - - ep_sock_unregister_poll_req(port_info, sock_info); - - _poll_req_free(poll_req); -} - -poll_req_t* overlapped_to_poll_req(OVERLAPPED* overlapped) { - return container_of(overlapped, poll_req_t, overlapped); -} - -ep_sock_t* poll_req_get_sock_data(const poll_req_t* poll_req) { - return poll_req->sock_info; -} - -static DWORD _epoll_events_to_afd_events(uint32_t epoll_events) { - DWORD afd_events; - - /* These events should always be monitored. */ - assert(epoll_events & EPOLLERR); - assert(epoll_events & EPOLLHUP); - afd_events = AFD_POLL_ABORT | AFD_POLL_CONNECT_FAIL | AFD_POLL_LOCAL_CLOSE; - - if (epoll_events & (EPOLLIN | EPOLLRDNORM)) - afd_events |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT; - if (epoll_events & (EPOLLPRI | EPOLLRDBAND)) - afd_events |= AFD_POLL_RECEIVE_EXPEDITED; - if (epoll_events & (EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND)) - afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; - - return afd_events; -} - -static uint32_t _afd_events_to_epoll_events(DWORD afd_events) { - uint32_t epoll_events = 0; - - if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) - epoll_events |= EPOLLIN | EPOLLRDNORM; - if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) - epoll_events |= EPOLLPRI | EPOLLRDBAND; - if (afd_events & AFD_POLL_SEND) - epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; - if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT)) - epoll_events |= EPOLLIN | EPOLLRDHUP; - if (afd_events & AFD_POLL_ABORT) - epoll_events |= EPOLLHUP; - if (afd_events & AFD_POLL_CONNECT) - epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; - if (afd_events & AFD_POLL_CONNECT_FAIL) - epoll_events |= EPOLLERR; - - return epoll_events; -} - -int poll_req_submit(poll_req_t* poll_req, - uint32_t epoll_events, - SOCKET socket, - SOCKET driver_socket) { - int r; - - memset(&poll_req->overlapped, 0, sizeof poll_req->overlapped); - - poll_req->poll_info.Exclusive = FALSE; - poll_req->poll_info.NumberOfHandles = 1; - poll_req->poll_info.Timeout.QuadPart = INT64_MAX; - poll_req->poll_info.Handles[0].Handle = (HANDLE) socket; - poll_req->poll_info.Handles[0].Status = 0; - poll_req->poll_info.Handles[0].Events = - _epoll_events_to_afd_events(epoll_events); - - r = afd_poll(driver_socket, &poll_req->poll_info, &poll_req->overlapped); - if (r != 0 && GetLastError() != ERROR_IO_PENDING) - return_error(-1); - - return 0; -} - -int poll_req_cancel(poll_req_t* poll_req, SOCKET driver_socket) { - OVERLAPPED* overlapped = &poll_req->overlapped; - - if (CancelIoEx((HANDLE) driver_socket, overlapped)) { - DWORD error = GetLastError(); - if (error == ERROR_NOT_FOUND) - return 0; /* Already completed or canceled. */ - else - return_error(-1); - } - - return 0; -} - -void poll_req_complete(const poll_req_t* poll_req, - uint32_t* epoll_events_out, - bool* socket_closed_out) { - const OVERLAPPED* overlapped = &poll_req->overlapped; - - uint32_t epoll_events = 0; - bool socket_closed = false; - - if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) { - /* The poll request was cancelled by CancelIoEx. */ - } else if (!NT_SUCCESS(overlapped->Internal)) { - /* The overlapped request itself failed in an unexpected way. */ - epoll_events = EPOLLERR; - } else if (poll_req->poll_info.NumberOfHandles < 1) { - /* This overlapped request succeeded but didn't report any events. */ - } else { - /* Events related to our socket were reported. */ - DWORD afd_events = poll_req->poll_info.Handles[0].Events; - - if (afd_events & AFD_POLL_LOCAL_CLOSE) - socket_closed = true; /* Socket closed locally be silently dropped. */ - else - epoll_events = _afd_events_to_epoll_events(afd_events); - } - - *epoll_events_out = epoll_events; - *socket_closed_out = socket_closed; -} - static inline int _tree_compare(tree_node_t* a, tree_node_t* b) { if (a->key < b->key) return -1;