From 9402a42a4d97d801f12def7c69dd5a4d3afd6175 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Tue, 26 Sep 2017 02:15:31 +0200 Subject: [PATCH] all-in-one: rebuild --- allinone/epoll-all-in-one.c | 1578 ++++++++++++++++++++++------------- 1 file changed, 1015 insertions(+), 563 deletions(-) diff --git a/allinone/epoll-all-in-one.c b/allinone/epoll-all-in-one.c index 0a569da..70ea1eb 100644 --- a/allinone/epoll-all-in-one.c +++ b/allinone/epoll-all-in-one.c @@ -68,15 +68,16 @@ enum EPOLL_EVENTS { #define EPOLL_CTL_MOD 2 #define EPOLL_CTL_DEL 3 -typedef void* epoll_t; +typedef void* HANDLE; +typedef uintptr_t SOCKET; typedef union epoll_data { void* ptr; int fd; uint32_t u32; uint64_t u64; - /* SOCKET */ uintptr_t sock; - /* HANDLE */ void* hnd; + SOCKET sock; + HANDLE hnd; } epoll_data_t; struct epoll_event { @@ -88,16 +89,17 @@ struct epoll_event { extern "C" { #endif -EPOLL_EXTERN epoll_t epoll_create(void); +EPOLL_EXTERN HANDLE epoll_create(int size); +EPOLL_EXTERN HANDLE epoll_create1(int flags); -EPOLL_EXTERN int epoll_close(epoll_t epoll_hnd); +EPOLL_EXTERN int epoll_close(HANDLE ephnd); -EPOLL_EXTERN int epoll_ctl(epoll_t epoll_hnd, +EPOLL_EXTERN int epoll_ctl(HANDLE ephnd, int op, - /* SOCKET */ uintptr_t sock, + SOCKET sock, struct epoll_event* event); -EPOLL_EXTERN int epoll_wait(epoll_t epoll_hnd, +EPOLL_EXTERN int epoll_wait(HANDLE ephnd, struct epoll_event* events, int maxevents, int timeout); @@ -232,8 +234,19 @@ typedef intptr_t ssize_t; #define container_of(ptr, type, member) \ ((type*) ((char*) (ptr) -offsetof(type, member))) +#define safe_container_of(ptr, type, member) \ + ((type*) _util_safe_container_of_helper((ptr), offsetof(type, member))) + #define unused(v) ((void) (v)) +#ifdef __clang__ +/* Polyfill static_assert() because clang doesn't support it. */ +#define static_assert(condition, message) typedef __attribute__( \ + (unused)) int __static_assert_##__LINE__[(condition) ? 1 : -1]; +#endif + +EPOLL_INTERNAL void* _util_safe_container_of_helper(void* ptr, size_t offset); + /* clang-format off */ #define AFD_NO_FAST_IO 0x00000001 @@ -323,7 +336,7 @@ EPOLL_INTERNAL void we_set_win_error(DWORD error); #define return_error(value, ...) _return_error_helper(__VA_ARGS__ + 0, value) -EPOLL_INTERNAL int nt_init(void); +EPOLL_INTERNAL int nt_global_init(void); typedef struct _IO_STATUS_BLOCK { union { @@ -337,21 +350,55 @@ typedef VOID(NTAPI* PIO_APC_ROUTINE)(PVOID ApcContext, PIO_STATUS_BLOCK IoStatusBlock, ULONG Reserved); -#define NTDLL_IMPORT_LIST(X) \ - X(NTSTATUS, \ - NTAPI, \ - NtDeviceIoControlFile, \ - (HANDLE FileHandle, \ - HANDLE Event, \ - PIO_APC_ROUTINE ApcRoutine, \ - PVOID ApcContext, \ - PIO_STATUS_BLOCK IoStatusBlock, \ - ULONG IoControlCode, \ - PVOID InputBuffer, \ - ULONG InputBufferLength, \ - PVOID OutputBuffer, \ - ULONG OutputBufferLength)) \ - X(ULONG, WINAPI, RtlNtStatusToDosError, (NTSTATUS Status)) +typedef struct _LSA_UNICODE_STRING { + USHORT Length; + USHORT MaximumLength; + PWSTR Buffer; +} LSA_UNICODE_STRING, *PLSA_UNICODE_STRING, UNICODE_STRING, *PUNICODE_STRING; + +typedef struct _OBJECT_ATTRIBUTES { + ULONG Length; + HANDLE RootDirectory; + PUNICODE_STRING ObjectName; + ULONG Attributes; + PVOID SecurityDescriptor; + PVOID SecurityQualityOfService; +} OBJECT_ATTRIBUTES, *POBJECT_ATTRIBUTES; + +#define NTDLL_IMPORT_LIST(X) \ + X(NTSTATUS, \ + NTAPI, \ + NtDeviceIoControlFile, \ + (HANDLE FileHandle, \ + HANDLE Event, \ + PIO_APC_ROUTINE ApcRoutine, \ + PVOID ApcContext, \ + PIO_STATUS_BLOCK IoStatusBlock, \ + ULONG IoControlCode, \ + PVOID InputBuffer, \ + ULONG InputBufferLength, \ + PVOID OutputBuffer, \ + ULONG OutputBufferLength)) \ + \ + X(ULONG, WINAPI, RtlNtStatusToDosError, (NTSTATUS Status)) \ + \ + X(NTSTATUS, \ + NTAPI, \ + NtCreateKeyedEvent, \ + (PHANDLE handle, \ + ACCESS_MASK access, \ + POBJECT_ATTRIBUTES attr, \ + ULONG flags)) \ + \ + X(NTSTATUS, \ + NTAPI, \ + NtWaitForKeyedEvent, \ + (HANDLE handle, PVOID key, BOOLEAN alertable, PLARGE_INTEGER mstimeout)) \ + \ + X(NTSTATUS, \ + NTAPI, \ + NtReleaseKeyedEvent, \ + (HANDLE handle, PVOID key, BOOLEAN alertable, PLARGE_INTEGER mstimeout)) #define X(return_type, declarators, name, parameters) \ EPOLL_INTERNAL_EXTERN return_type(declarators* name) parameters; @@ -522,8 +569,13 @@ EPOLL_INTERNAL ssize_t afd_get_protocol(SOCKET socket, *afd_socket_out = afd_socket; return id; } -#include -#include + +#include + +EPOLL_INTERNAL int api_global_init(void); + +EPOLL_INTERNAL int init(void); + #include typedef struct queue_node queue_node_t; @@ -552,6 +604,19 @@ EPOLL_INTERNAL void queue_remove(queue_node_t* node); EPOLL_INTERNAL bool queue_empty(const queue_t* queue); EPOLL_INTERNAL bool queue_enqueued(const queue_node_t* node); +typedef struct ep_port ep_port_t; +typedef struct poll_group_allocator poll_group_allocator_t; +typedef struct poll_group poll_group_t; + +EPOLL_INTERNAL poll_group_allocator_t* poll_group_allocator_new( + ep_port_t* port_info, const WSAPROTOCOL_INFOW* protocol_info); +EPOLL_INTERNAL void poll_group_allocator_delete(poll_group_allocator_t* pga); + +EPOLL_INTERNAL poll_group_t* poll_group_acquire(poll_group_allocator_t* pga); +EPOLL_INTERNAL void poll_group_release(poll_group_t* ds); + +EPOLL_INTERNAL SOCKET poll_group_get_socket(poll_group_t* poll_group); + #ifdef __clang__ #define RB_UNUSED __attribute__((__unused__)) #else @@ -1012,6 +1077,40 @@ name##_RB_MINMAX(struct name *head, int val) \ /* clang-format on */ +/* The reflock is a special kind of lock that normally prevents a chunk of + * memory from being freed, but does allow the chunk of memory to eventually be + * released in a coordinated fashion. + * + * Under normal operation, threads increase and decrease the reference count, + * which are wait-free operations. + * + * Exactly once during the reflock's lifecycle, a thread holding a reference to + * the lock may "destroy" the lock; this operation blocks until all other + * threads holding a reference to the lock have dereferenced it. After + * "destroy" returns, the calling thread may assume that no other threads have + * a reference to the lock. + * + * Attemmpting to lock or destroy a lock after reflock_unref_and_destroy() has + * been called is invalid and results in undefined behavior. Therefore the user + * should use another lock to guarantee that this can't happen. + */ + +typedef struct reflock { + uint32_t state; +} reflock_t; + +EPOLL_INTERNAL int reflock_global_init(void); + +EPOLL_INTERNAL void reflock_init(reflock_t* reflock); +EPOLL_INTERNAL void reflock_ref(reflock_t* reflock); +EPOLL_INTERNAL void reflock_unref(reflock_t* reflock); +EPOLL_INTERNAL void reflock_unref_and_destroy(reflock_t* reflock); + +/* NB: the tree functions do not set errno or LastError when they fail. Each of + * the API functions has at most one failure mode. It is up to the caller to + * set an appropriate error code when necessary. + */ + typedef RB_HEAD(tree, tree_node) tree_t; typedef struct tree_node { @@ -1028,6 +1127,32 @@ EPOLL_INTERNAL int tree_del(tree_t* tree, tree_node_t* node); EPOLL_INTERNAL tree_node_t* tree_find(tree_t* tree, uintptr_t key); EPOLL_INTERNAL tree_node_t* tree_root(tree_t* tree); +typedef struct reflock_tree { + tree_t tree; + SRWLOCK lock; +} reflock_tree_t; + +typedef struct reflock_tree_node { + tree_node_t tree_node; + reflock_t reflock; +} reflock_tree_node_t; + +EPOLL_INTERNAL void reflock_tree_init(reflock_tree_t* rtl); +EPOLL_INTERNAL void reflock_tree_node_init(reflock_tree_node_t* node); + +EPOLL_INTERNAL int reflock_tree_add(reflock_tree_t* rlt, + reflock_tree_node_t* node, + uintptr_t key); + +EPOLL_INTERNAL reflock_tree_node_t* reflock_tree_del_and_ref( + reflock_tree_t* rlt, uintptr_t key); +EPOLL_INTERNAL reflock_tree_node_t* reflock_tree_find_and_ref( + reflock_tree_t* rlt, uintptr_t key); + +EPOLL_INTERNAL void reflock_tree_node_unref(reflock_tree_node_t* node); +EPOLL_INTERNAL void reflock_tree_node_unref_and_destroy( + reflock_tree_node_t* node); + typedef struct ep_port ep_port_t; typedef struct poll_req poll_req_t; @@ -1041,7 +1166,6 @@ EPOLL_INTERNAL void ep_sock_delete(ep_port_t* port_info, ep_sock_t* sock_info); EPOLL_INTERNAL void ep_sock_force_delete(ep_port_t* port_info, ep_sock_t* sock_info); -EPOLL_INTERNAL ep_sock_t* ep_sock_find_in_tree(tree_t* tree, SOCKET socket); EPOLL_INTERNAL ep_sock_t* ep_sock_from_overlapped(OVERLAPPED* overlapped); EPOLL_INTERNAL int ep_sock_set_event(ep_port_t* port_info, @@ -1053,19 +1177,6 @@ EPOLL_INTERNAL int ep_sock_feed_event(ep_port_t* port_info, ep_sock_t* sock_info, struct epoll_event* ev); -typedef struct ep_port ep_port_t; -typedef struct poll_group_allocator poll_group_allocator_t; -typedef struct poll_group poll_group_t; - -EPOLL_INTERNAL poll_group_allocator_t* poll_group_allocator_new( - ep_port_t* port_info, const WSAPROTOCOL_INFOW* protocol_info); -EPOLL_INTERNAL void poll_group_allocator_delete(poll_group_allocator_t* pga); - -EPOLL_INTERNAL poll_group_t* poll_group_acquire(poll_group_allocator_t* pga); -EPOLL_INTERNAL void poll_group_release(poll_group_t* ds); - -EPOLL_INTERNAL SOCKET poll_group_get_socket(poll_group_t* poll_group); - typedef struct ep_port ep_port_t; typedef struct ep_sock ep_sock_t; @@ -1075,17 +1186,24 @@ typedef struct ep_port { poll_group_allocators[array_count(AFD_PROVIDER_GUID_LIST)]; tree_t sock_tree; queue_t update_queue; + reflock_tree_node_t handle_tree_node; + CRITICAL_SECTION lock; + size_t active_poll_count; } ep_port_t; -EPOLL_INTERNAL ep_port_t* ep_port_new(HANDLE iocp); +EPOLL_INTERNAL ep_port_t* ep_port_new(HANDLE* iocp_out); +EPOLL_INTERNAL int ep_port_close(ep_port_t* port_info); EPOLL_INTERNAL int ep_port_delete(ep_port_t* port_info); -EPOLL_INTERNAL int ep_port_update_events(ep_port_t* port_info); -EPOLL_INTERNAL size_t ep_port_feed_events(ep_port_t* port_info, - OVERLAPPED_ENTRY* completion_list, - size_t completion_count, - struct epoll_event* event_list, - size_t max_event_count); +EPOLL_INTERNAL int ep_port_wait(ep_port_t* port_info, + struct epoll_event* events, + int maxevents, + int timeout); + +EPOLL_INTERNAL int ep_port_ctl(ep_port_t* port_info, + int op, + SOCKET sock, + struct epoll_event* ev); EPOLL_INTERNAL poll_group_t* ep_port_acquire_poll_group( ep_port_t* port_info, @@ -1108,526 +1226,115 @@ EPOLL_INTERNAL void ep_port_clear_socket_update(ep_port_t* port_info, EPOLL_INTERNAL bool ep_port_is_socket_update_pending(ep_port_t* port_info, ep_sock_t* sock_info); -#define _EP_EVENT_MASK 0xffff +static reflock_tree_t _epoll_handle_tree; -typedef struct _poll_req { - OVERLAPPED overlapped; - AFD_POLL_INFO poll_info; -} _poll_req_t; - -typedef enum _poll_status { - _POLL_IDLE = 0, - _POLL_PENDING, - _POLL_CANCELLED -} _poll_status_t; - -typedef struct _ep_sock_private { - ep_sock_t pub; - _poll_req_t poll_req; - poll_group_t* poll_group; - SOCKET afd_socket; - epoll_data_t user_data; - uint32_t user_events; - uint32_t pending_events; - _poll_status_t poll_status; - unsigned deleted : 1; -} _ep_sock_private_t; - -static DWORD _epoll_events_to_afd_events(uint32_t epoll_events) { - DWORD afd_events; - - /* These events should always be monitored. */ - assert(epoll_events & EPOLLERR); - assert(epoll_events & EPOLLHUP); - afd_events = AFD_POLL_ABORT | AFD_POLL_CONNECT_FAIL | AFD_POLL_LOCAL_CLOSE; - - if (epoll_events & (EPOLLIN | EPOLLRDNORM)) - afd_events |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT; - if (epoll_events & (EPOLLPRI | EPOLLRDBAND)) - afd_events |= AFD_POLL_RECEIVE_EXPEDITED; - if (epoll_events & (EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND)) - afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; - - return afd_events; +static inline ep_port_t* _handle_tree_node_to_port( + reflock_tree_node_t* tree_node) { + return container_of(tree_node, ep_port_t, handle_tree_node); } -static uint32_t _afd_events_to_epoll_events(DWORD afd_events) { - uint32_t epoll_events = 0; - - if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) - epoll_events |= EPOLLIN | EPOLLRDNORM; - if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) - epoll_events |= EPOLLPRI | EPOLLRDBAND; - if (afd_events & AFD_POLL_SEND) - epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; - if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT)) - epoll_events |= EPOLLIN | EPOLLRDHUP; - if (afd_events & AFD_POLL_ABORT) - epoll_events |= EPOLLHUP; - if (afd_events & AFD_POLL_CONNECT) - epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; - if (afd_events & AFD_POLL_CONNECT_FAIL) - epoll_events |= EPOLLERR; - - return epoll_events; -} - -static int _poll_req_submit(_poll_req_t* poll_req, - uint32_t epoll_events, - SOCKET socket, - SOCKET driver_socket) { - int r; - - memset(&poll_req->overlapped, 0, sizeof poll_req->overlapped); - - poll_req->poll_info.Exclusive = FALSE; - poll_req->poll_info.NumberOfHandles = 1; - poll_req->poll_info.Timeout.QuadPart = INT64_MAX; - poll_req->poll_info.Handles[0].Handle = (HANDLE) socket; - poll_req->poll_info.Handles[0].Status = 0; - poll_req->poll_info.Handles[0].Events = - _epoll_events_to_afd_events(epoll_events); - - r = afd_poll(driver_socket, &poll_req->poll_info, &poll_req->overlapped); - if (r != 0 && GetLastError() != ERROR_IO_PENDING) - return_error(-1); - +int api_global_init(void) { + reflock_tree_init(&_epoll_handle_tree); return 0; } -static int _poll_req_cancel(_poll_req_t* poll_req, SOCKET driver_socket) { - OVERLAPPED* overlapped = &poll_req->overlapped; - - if (!CancelIoEx((HANDLE) driver_socket, overlapped)) { - DWORD error = GetLastError(); - if (error == ERROR_NOT_FOUND) - return 0; /* Already completed or canceled. */ - else - return_error(-1); - } - - return 0; -} - -static void _poll_req_complete(const _poll_req_t* poll_req, - uint32_t* epoll_events_out, - bool* socket_closed_out) { - const OVERLAPPED* overlapped = &poll_req->overlapped; - - uint32_t epoll_events = 0; - bool socket_closed = false; - - if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) { - /* The poll request was cancelled by CancelIoEx. */ - } else if (!NT_SUCCESS(overlapped->Internal)) { - /* The overlapped request itself failed in an unexpected way. */ - epoll_events = EPOLLERR; - } else if (poll_req->poll_info.NumberOfHandles < 1) { - /* This overlapped request succeeded but didn't report any events. */ - } else { - /* Events related to our socket were reported. */ - DWORD afd_events = poll_req->poll_info.Handles[0].Events; - - if (afd_events & AFD_POLL_LOCAL_CLOSE) - socket_closed = true; /* Socket closed locally be silently dropped. */ - else - epoll_events = _afd_events_to_epoll_events(afd_events); - } - - *epoll_events_out = epoll_events; - *socket_closed_out = socket_closed; -} - -static inline _ep_sock_private_t* _ep_sock_private(ep_sock_t* sock_info) { - return container_of(sock_info, _ep_sock_private_t, pub); -} - -static inline _ep_sock_private_t* _ep_sock_alloc(void) { - _ep_sock_private_t* sock_private = malloc(sizeof *sock_private); - if (sock_private == NULL) - return_error(NULL, ERROR_NOT_ENOUGH_MEMORY); - return sock_private; -} - -static inline void _ep_sock_free(_ep_sock_private_t* sock_private) { - assert(sock_private->poll_status == _POLL_IDLE); - free(sock_private); -} - -ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket) { - SOCKET afd_socket; - ssize_t protocol_id; - WSAPROTOCOL_INFOW protocol_info; - poll_group_t* poll_group; - _ep_sock_private_t* sock_private; - - if (socket == 0 || socket == INVALID_SOCKET) - return_error(NULL, ERROR_INVALID_HANDLE); - - protocol_id = afd_get_protocol(socket, &afd_socket, &protocol_info); - if (protocol_id < 0) - return NULL; - - poll_group = - ep_port_acquire_poll_group(port_info, protocol_id, &protocol_info); - if (poll_group == NULL) - return NULL; - - sock_private = _ep_sock_alloc(); - if (sock_private == NULL) - goto err1; - - memset(sock_private, 0, sizeof *sock_private); - - sock_private->afd_socket = afd_socket; - sock_private->poll_group = poll_group; - - tree_node_init(&sock_private->pub.tree_node); - queue_node_init(&sock_private->pub.queue_node); - - if (ep_port_add_socket(port_info, &sock_private->pub, socket) < 0) - goto err2; - - return &sock_private->pub; - -err2: - _ep_sock_free(sock_private); -err1: - ep_port_release_poll_group(poll_group); - - return NULL; -} - -void ep_sock_delete(ep_port_t* port_info, ep_sock_t* sock_info) { - _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); - - assert(!sock_private->deleted); - sock_private->deleted = true; - - if (sock_private->poll_status == _POLL_PENDING) { - _poll_req_cancel(&sock_private->poll_req, - poll_group_get_socket(sock_private->poll_group)); - sock_private->poll_status = _POLL_CANCELLED; - sock_private->pending_events = 0; - } - - ep_port_del_socket(port_info, sock_info); - ep_port_clear_socket_update(port_info, sock_info); - ep_port_release_poll_group(sock_private->poll_group); - sock_private->poll_group = NULL; - - /* If the poll request still needs to complete, the ep_sock object can't - * be free()d yet. `ep_sock_feed_event` will take care of this later. */ - if (sock_private->poll_status == _POLL_IDLE) - _ep_sock_free(sock_private); -} - -void ep_sock_force_delete(ep_port_t* port_info, ep_sock_t* sock_info) { - _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); - sock_private->poll_status = _POLL_IDLE; - ep_sock_delete(port_info, sock_info); -} - -ep_sock_t* ep_sock_find_in_tree(tree_t* tree, SOCKET socket) { - tree_node_t* tree_node = tree_find(tree, socket); - if (tree_node == NULL) - return NULL; - - return container_of(tree_node, ep_sock_t, tree_node); -} - -ep_sock_t* ep_sock_from_overlapped(OVERLAPPED* overlapped) { - _ep_sock_private_t* sock_private = - container_of(overlapped, _ep_sock_private_t, poll_req.overlapped); - return &sock_private->pub; -} - -int ep_sock_set_event(ep_port_t* port_info, - ep_sock_t* sock_info, - const struct epoll_event* ev) { - _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); - - /* EPOLLERR and EPOLLHUP are always reported, even when no sollicited. */ - uint32_t events = ev->events | EPOLLERR | EPOLLHUP; - - sock_private->user_events = events; - sock_private->user_data = ev->data; - - if ((events & _EP_EVENT_MASK & ~(sock_private->pending_events)) != 0) - ep_port_request_socket_update(port_info, sock_info); - - return 0; -} - -int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) { - _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); - SOCKET driver_socket = poll_group_get_socket(sock_private->poll_group); - bool broken = false; - - assert(ep_port_is_socket_update_pending(port_info, sock_info)); - - if (sock_private->poll_status == _POLL_PENDING && - (sock_private->user_events & _EP_EVENT_MASK & - ~sock_private->pending_events) == 0) { - /* All the events the user is interested in are already being monitored - * by the pending poll request. It might spuriously complete because of an - * event that we're no longer interested in; if that happens we just - * submit another poll request with the right event mask. */ - - } else if (sock_private->poll_status == _POLL_PENDING) { - /* A poll request is already pending, but it's not monitoring for all the - * events that the user is interested in. Cancel the pending poll request; - * when it completes it will be submitted again with the correct event - * mask. */ - if (_poll_req_cancel(&sock_private->poll_req, driver_socket) < 0) - return -1; - sock_private->poll_status = _POLL_CANCELLED; - sock_private->pending_events = 0; - - } else if (sock_private->poll_status == _POLL_CANCELLED) { - /* The poll request has already been cancelled, we're still waiting for it - * to return. For now, there's nothing that needs to be done. */ - - } else if (sock_private->poll_status == _POLL_IDLE) { - if (_poll_req_submit(&sock_private->poll_req, - sock_private->user_events, - sock_private->afd_socket, - driver_socket) < 0) { - if (GetLastError() == ERROR_INVALID_HANDLE) - /* The socket is broken. It will be dropped from the epoll set. */ - broken = true; - else - /* Another error occurred, which is propagated to the caller. */ - return -1; - - } else { - /* The poll request was successfully submitted. */ - sock_private->poll_status = _POLL_PENDING; - sock_private->pending_events = sock_private->user_events; - } - } else { - /* Unreachable. */ - assert(false); - } - - ep_port_clear_socket_update(port_info, sock_info); - - /* If we saw an ERROR_INVALID_HANDLE error, drop the socket. */ - if (broken) - ep_sock_delete(port_info, sock_info); - - return 0; -} - -int ep_sock_feed_event(ep_port_t* port_info, - ep_sock_t* sock_info, - struct epoll_event* ev) { - _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); - - uint32_t epoll_events; - bool drop_socket; - int ev_count = 0; - - sock_private->poll_status = _POLL_IDLE; - sock_private->pending_events = 0; - - if (sock_private->deleted) { - /* Ignore completion for overlapped poll operation if the socket has been - * deleted; instead, free the socket. */ - _ep_sock_free(sock_private); - return 0; - } - - _poll_req_complete(&sock_private->poll_req, &epoll_events, &drop_socket); - - /* Filter events that the user didn't ask for. */ - epoll_events &= sock_private->user_events; - - /* Clear the event mask if EPOLLONESHOT is set and there are any events - * to report. */ - if (epoll_events != 0 && (sock_private->user_events & EPOLLONESHOT)) - sock_private->user_events = EPOLLERR | EPOLLHUP; - - /* Fill the ev structure if there are any events to report. */ - if (epoll_events != 0) { - ev->data = sock_private->user_data; - ev->events = epoll_events; - ev_count = 1; - } - - if (drop_socket) - /* Drop the socket from the epoll set. */ - ep_sock_delete(port_info, sock_info); - else - /* Put the socket back onto the attention list so a new poll request will - * be submitted. */ - ep_port_request_socket_update(port_info, sock_info); - - return ev_count; -} - -#include - -EPOLL_INTERNAL int init(void); - -#define _EPOLL_MAX_COMPLETION_COUNT 64 - -epoll_t epoll_create(void) { +static HANDLE _epoll_create(void) { ep_port_t* port_info; - HANDLE iocp; + HANDLE ephnd; if (init() < 0) return NULL; - iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); - if (iocp == INVALID_HANDLE_VALUE) - return_error(NULL); - - port_info = ep_port_new(iocp); - if (port_info == NULL) { - CloseHandle(iocp); + port_info = ep_port_new(&ephnd); + if (port_info == NULL) return NULL; + + if (reflock_tree_add(&_epoll_handle_tree, + &port_info->handle_tree_node, + (uintptr_t) ephnd) < 0) { + ep_port_delete(port_info); + return_error(INVALID_HANDLE_VALUE, ERROR_ALREADY_EXISTS); } - return (epoll_t) port_info; + return ephnd; } -int epoll_close(epoll_t port_handle) { +HANDLE epoll_create(int size) { + if (size <= 0) + return_error(INVALID_HANDLE_VALUE, ERROR_INVALID_PARAMETER); + + return _epoll_create(); +} + +HANDLE epoll_create1(int flags) { + if (flags != 0) + return_error(INVALID_HANDLE_VALUE, ERROR_INVALID_PARAMETER); + + return _epoll_create(); +} + +int epoll_close(HANDLE ephnd) { + reflock_tree_node_t* tree_node; ep_port_t* port_info; if (init() < 0) return -1; - port_info = (ep_port_t*) port_handle; + tree_node = reflock_tree_del_and_ref(&_epoll_handle_tree, (uintptr_t) ephnd); + if (tree_node == NULL) + return_error(-1, ERROR_INVALID_HANDLE); + port_info = _handle_tree_node_to_port(tree_node); + + ep_port_close(port_info); + + reflock_tree_node_unref_and_destroy(tree_node); return ep_port_delete(port_info); } -static int _ep_ctl_add(ep_port_t* port_info, - uintptr_t socket, - struct epoll_event* ev) { - ep_sock_t* sock_info = ep_sock_new(port_info, socket); - if (sock_info == NULL) - return -1; - - if (ep_sock_set_event(port_info, sock_info, ev) < 0) { - ep_sock_delete(port_info, sock_info); - return -1; - } - - return 0; -} - -static int _ep_ctl_mod(ep_port_t* port_info, - uintptr_t socket, - struct epoll_event* ev) { - ep_sock_t* sock_info = ep_port_find_socket(port_info, socket); - if (sock_info == NULL) - return -1; - - if (ep_sock_set_event(port_info, sock_info, ev) < 0) - return -1; - - return 0; -} - -static int _ep_ctl_del(ep_port_t* port_info, uintptr_t socket) { - ep_sock_t* sock_info = ep_port_find_socket(port_info, socket); - if (sock_info == NULL) - return -1; - - ep_sock_delete(port_info, sock_info); - - return 0; -} - -int epoll_ctl(epoll_t port_handle, - int op, - uintptr_t socket, - struct epoll_event* ev) { - ep_port_t* port_info = (ep_port_t*) port_handle; +int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event* ev) { + reflock_tree_node_t* tree_node; + ep_port_t* port_info; + int result; if (init() < 0) return -1; - switch (op) { - case EPOLL_CTL_ADD: - return _ep_ctl_add(port_info, socket, ev); - case EPOLL_CTL_MOD: - return _ep_ctl_mod(port_info, socket, ev); - case EPOLL_CTL_DEL: - return _ep_ctl_del(port_info, socket); - } + tree_node = + reflock_tree_find_and_ref(&_epoll_handle_tree, (uintptr_t) ephnd); + if (tree_node == NULL) + return_error(-1, ERROR_INVALID_HANDLE); + port_info = _handle_tree_node_to_port(tree_node); - return_error(-1, ERROR_INVALID_PARAMETER); + result = ep_port_ctl(port_info, op, sock, ev); + + reflock_tree_node_unref(tree_node); + + return result; } -int epoll_wait(epoll_t port_handle, +int epoll_wait(HANDLE ephnd, struct epoll_event* events, int maxevents, int timeout) { + reflock_tree_node_t* tree_node; ep_port_t* port_info; - ULONGLONG due = 0; - DWORD gqcs_timeout; + int result; if (init() < 0) return -1; - port_info = (ep_port_t*) port_handle; + tree_node = + reflock_tree_find_and_ref(&_epoll_handle_tree, (uintptr_t) ephnd); + if (tree_node == NULL) + return_error(-1, ERROR_INVALID_HANDLE); + port_info = _handle_tree_node_to_port(tree_node); - /* Compute the timeout for GetQueuedCompletionStatus, and the wait end - * time, if the user specified a timeout other than zero or infinite. - */ - if (timeout > 0) { - due = GetTickCount64() + timeout; - gqcs_timeout = (DWORD) timeout; - } else if (timeout == 0) { - gqcs_timeout = 0; - } else { - gqcs_timeout = INFINITE; - } + result = ep_port_wait(port_info, events, maxevents, timeout); - /* Compute how much overlapped entries can be dequeued at most. */ - if ((size_t) maxevents > _EPOLL_MAX_COMPLETION_COUNT) - maxevents = _EPOLL_MAX_COMPLETION_COUNT; + reflock_tree_node_unref(tree_node); - /* Dequeue completion packets until either at least one interesting event - * has been discovered, or the timeout is reached. - */ - do { - OVERLAPPED_ENTRY completion_list[_EPOLL_MAX_COMPLETION_COUNT]; - ULONG completion_count; - ssize_t event_count; - - if (ep_port_update_events(port_info) < 0) - return -1; - - BOOL r = GetQueuedCompletionStatusEx(port_info->iocp, - completion_list, - maxevents, - &completion_count, - gqcs_timeout, - FALSE); - if (!r) { - if (GetLastError() == WAIT_TIMEOUT) - return 0; - else - return_error(-1); - } - - event_count = ep_port_feed_events( - port_info, completion_list, completion_count, events, maxevents); - if (event_count > 0) - return (int) event_count; - - /* Events were dequeued, but none were relevant. Recompute timeout. */ - if (timeout > 0) { - ULONGLONG now = GetTickCount64(); - gqcs_timeout = (now < due) ? (DWORD)(due - now) : 0; - } - } while (gqcs_timeout > 0); - - return 0; + return result; } /* clang-format off */ @@ -2272,8 +1979,9 @@ void we_set_win_error(DWORD error) { } static bool _initialized = false; +static INIT_ONCE _once = INIT_ONCE_STATIC_INIT; -static int _init_winsock(void) { +static int _winsock_global_init(void) { int r; WSADATA wsa_data; @@ -2284,19 +1992,27 @@ static int _init_winsock(void) { return 0; } -static int _init_once(void) { - if (_init_winsock() < 0 || nt_init() < 0) - return -1; +static BOOL CALLBACK _init_once_callback(INIT_ONCE* once, + void* parameter, + void** context) { + unused(once); + unused(parameter); + unused(context); + + if (_winsock_global_init() < 0 || nt_global_init() < 0 || + reflock_global_init() < 0 || api_global_init() < 0) + return FALSE; _initialized = true; - return 0; + return TRUE; } int init(void) { - if (_initialized) - return 0; + if (!_initialized && + !InitOnceExecuteOnce(&_once, _init_once_callback, NULL, NULL)) + return -1; /* LastError and errno aren't touched InitOnceExecuteOnce. */ - return _init_once(); + return 0; } #define X(return_type, declarators, name, parameters) \ @@ -2304,7 +2020,7 @@ int init(void) { NTDLL_IMPORT_LIST(X) #undef X -int nt_init(void) { +int nt_global_init(void) { HMODULE ntdll; ntdll = GetModuleHandleW(L"ntdll.dll"); @@ -2320,6 +2036,8 @@ int nt_init(void) { return 0; } +#include +#include static const size_t _POLL_GROUP_MAX_SIZE = 32; @@ -2452,6 +2170,8 @@ void poll_group_release(poll_group_t* poll_group) { /* TODO: free the poll_group_t* item at some point. */ } +#define _PORT_MAX_ON_STACK_COMPLETIONS 256 + static ep_port_t* _ep_port_alloc(void) { ep_port_t* port_info = malloc(sizeof *port_info); if (port_info == NULL) @@ -2465,28 +2185,62 @@ static void _ep_port_free(ep_port_t* port) { free(port); } -ep_port_t* ep_port_new(HANDLE iocp) { +ep_port_t* ep_port_new(HANDLE* iocp_out) { ep_port_t* port_info; + HANDLE iocp; port_info = _ep_port_alloc(); if (port_info == NULL) - return NULL; + goto err1; + + iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (iocp == INVALID_HANDLE_VALUE) + goto err2; memset(port_info, 0, sizeof *port_info); port_info->iocp = iocp; queue_init(&port_info->update_queue); tree_init(&port_info->sock_tree); + reflock_tree_node_init(&port_info->handle_tree_node); + InitializeCriticalSection(&port_info->lock); + *iocp_out = iocp; return port_info; + +err2: + _ep_port_free(port_info); +err1: + return NULL; +} + +static int _ep_port_close_iocp(ep_port_t* port_info) { + HANDLE iocp = port_info->iocp; + port_info->iocp = NULL; + + if (!CloseHandle(iocp)) + return_error(-1); + + return 0; +} + +int ep_port_close(ep_port_t* port_info) { + int result; + + EnterCriticalSection(&port_info->lock); + result = _ep_port_close_iocp(port_info); + LeaveCriticalSection(&port_info->lock); + + return result; } int ep_port_delete(ep_port_t* port_info) { tree_node_t* tree_node; - if (!CloseHandle(port_info->iocp)) - return_error(-1); - port_info->iocp = NULL; + EnterCriticalSection(&port_info->lock); + + if (port_info->iocp != NULL) + _ep_port_close_iocp(port_info); while ((tree_node = tree_root(&port_info->sock_tree)) != NULL) { ep_sock_t* sock_info = container_of(tree_node, ep_sock_t, tree_node); @@ -2499,12 +2253,16 @@ int ep_port_delete(ep_port_t* port_info) { poll_group_allocator_delete(pga); } + LeaveCriticalSection(&port_info->lock); + + DeleteCriticalSection(&port_info->lock); + _ep_port_free(port_info); return 0; } -int ep_port_update_events(ep_port_t* port_info) { +static int _ep_port_update_events(ep_port_t* port_info) { queue_t* update_queue = &port_info->update_queue; /* Walk the queue, submitting new poll requests for every socket that needs @@ -2523,39 +2281,226 @@ int ep_port_update_events(ep_port_t* port_info) { return 0; } -size_t ep_port_feed_events(ep_port_t* port_info, - OVERLAPPED_ENTRY* completion_list, - size_t completion_count, - struct epoll_event* event_list, - size_t max_event_count) { - if (completion_count > max_event_count) - abort(); +static void _ep_port_update_events_if_polling(ep_port_t* port_info) { + if (port_info->active_poll_count > 0) + _ep_port_update_events(port_info); +} - size_t event_count = 0; +static int _ep_port_feed_events(ep_port_t* port_info, + struct epoll_event* epoll_events, + OVERLAPPED_ENTRY* iocp_events, + int iocp_event_count) { + int epoll_event_count = 0; - for (size_t i = 0; i < completion_count; i++) { - OVERLAPPED* overlapped = completion_list[i].lpOverlapped; + for (int i = 0; i < iocp_event_count; i++) { + OVERLAPPED* overlapped = iocp_events[i].lpOverlapped; ep_sock_t* sock_info = ep_sock_from_overlapped(overlapped); - struct epoll_event* ev = &event_list[event_count]; + struct epoll_event* ev = &epoll_events[epoll_event_count]; - event_count += ep_sock_feed_event(port_info, sock_info, ev); + epoll_event_count += ep_sock_feed_event(port_info, sock_info, ev); } - return event_count; + return epoll_event_count; +} + +static int _ep_port_poll(ep_port_t* port_info, + struct epoll_event* epoll_events, + OVERLAPPED_ENTRY* iocp_events, + int maxevents, + DWORD timeout) { + ULONG completion_count; + + if (_ep_port_update_events(port_info) < 0) + return -1; + + port_info->active_poll_count++; + + LeaveCriticalSection(&port_info->lock); + + BOOL r = GetQueuedCompletionStatusEx(port_info->iocp, + iocp_events, + maxevents, + &completion_count, + timeout, + FALSE); + + EnterCriticalSection(&port_info->lock); + + port_info->active_poll_count--; + + if (!r) + return_error(-1); + + return _ep_port_feed_events( + port_info, epoll_events, iocp_events, completion_count); +} + +int ep_port_wait(ep_port_t* port_info, + struct epoll_event* events, + int maxevents, + int timeout) { + OVERLAPPED_ENTRY stack_iocp_events[_PORT_MAX_ON_STACK_COMPLETIONS]; + OVERLAPPED_ENTRY* iocp_events; + ULONGLONG due = 0; + DWORD gqcs_timeout; + int result; + + /* Check whether `maxevents` is in range. */ + if (maxevents <= 0) + return_error(-1, ERROR_INVALID_PARAMETER); + + /* Decide whether the IOCP completion list can live on the stack, or allocate + * memory for it on the heap. */ + if ((size_t) maxevents <= array_count(stack_iocp_events)) { + iocp_events = stack_iocp_events; + } else if ((iocp_events = malloc(maxevents * sizeof *iocp_events)) == NULL) { + iocp_events = stack_iocp_events; + maxevents = array_count(stack_iocp_events); + } + + /* Compute the timeout for GetQueuedCompletionStatus, and the wait end + * time, if the user specified a timeout other than zero or infinite. + */ + if (timeout > 0) { + due = GetTickCount64() + timeout; + gqcs_timeout = (DWORD) timeout; + } else if (timeout == 0) { + gqcs_timeout = 0; + } else { + gqcs_timeout = INFINITE; + } + + EnterCriticalSection(&port_info->lock); + + /* Dequeue completion packets until either at least one interesting event + * has been discovered, or the timeout is reached. + */ + do { + ULONGLONG now; + + result = + _ep_port_poll(port_info, events, iocp_events, maxevents, gqcs_timeout); + if (result < 0 || result > 0) + break; /* Result, error, or time-out. */ + + if (timeout < 0) + continue; /* _ep_port_wait() never times out. */ + + /* Check for time-out. */ + now = GetTickCount64(); + if (now >= due) + break; + + /* Recompute timeout. */ + gqcs_timeout = (DWORD)(due - now); + } while (gqcs_timeout > 0); + + _ep_port_update_events_if_polling(port_info); + + LeaveCriticalSection(&port_info->lock); + + if (iocp_events != stack_iocp_events) + free(iocp_events); + + if (result >= 0) + return result; + else if (GetLastError() == WAIT_TIMEOUT) + return 0; + else + return -1; +} + +static int _ep_port_ctl_add(ep_port_t* port_info, + SOCKET sock, + struct epoll_event* ev) { + ep_sock_t* sock_info = ep_sock_new(port_info, sock); + if (sock_info == NULL) + return -1; + + if (ep_sock_set_event(port_info, sock_info, ev) < 0) { + ep_sock_delete(port_info, sock_info); + return -1; + } + + _ep_port_update_events_if_polling(port_info); + + return 0; +} + +static int _ep_port_ctl_mod(ep_port_t* port_info, + SOCKET sock, + struct epoll_event* ev) { + ep_sock_t* sock_info = ep_port_find_socket(port_info, sock); + if (sock_info == NULL) + return -1; + + if (ep_sock_set_event(port_info, sock_info, ev) < 0) + return -1; + + _ep_port_update_events_if_polling(port_info); + + return 0; +} + +static int _ep_port_ctl_del(ep_port_t* port_info, SOCKET sock) { + ep_sock_t* sock_info = ep_port_find_socket(port_info, sock); + if (sock_info == NULL) + return -1; + + ep_sock_delete(port_info, sock_info); + + return 0; +} + +static int _ep_port_ctl_op(ep_port_t* port_info, + int op, + SOCKET sock, + struct epoll_event* ev) { + switch (op) { + case EPOLL_CTL_ADD: + return _ep_port_ctl_add(port_info, sock, ev); + case EPOLL_CTL_MOD: + return _ep_port_ctl_mod(port_info, sock, ev); + case EPOLL_CTL_DEL: + return _ep_port_ctl_del(port_info, sock); + default: + return_error(-1, ERROR_INVALID_PARAMETER); + } +} + +int ep_port_ctl(ep_port_t* port_info, + int op, + SOCKET sock, + struct epoll_event* ev) { + int result; + + EnterCriticalSection(&port_info->lock); + result = _ep_port_ctl_op(port_info, op, sock, ev); + LeaveCriticalSection(&port_info->lock); + + return result; } int ep_port_add_socket(ep_port_t* port_info, ep_sock_t* sock_info, SOCKET socket) { - return tree_add(&port_info->sock_tree, &sock_info->tree_node, socket); + if (tree_add(&port_info->sock_tree, &sock_info->tree_node, socket) < 0) + return_error(-1, ERROR_ALREADY_EXISTS); + return 0; } int ep_port_del_socket(ep_port_t* port_info, ep_sock_t* sock_info) { - return tree_del(&port_info->sock_tree, &sock_info->tree_node); + if (tree_del(&port_info->sock_tree, &sock_info->tree_node) < 0) + return_error(-1, ERROR_NOT_FOUND); + return 0; } ep_sock_t* ep_port_find_socket(ep_port_t* port_info, SOCKET socket) { - return ep_sock_find_in_tree(&port_info->sock_tree, socket); + ep_sock_t* sock_info = safe_container_of( + tree_find(&port_info->sock_tree, socket), ep_sock_t, tree_node); + if (sock_info == NULL) + return_error(NULL, ERROR_NOT_FOUND); + return sock_info; } static poll_group_allocator_t* _ep_port_get_poll_group_allocator( @@ -2665,6 +2610,511 @@ bool queue_enqueued(const queue_node_t* node) { return node->prev != node; } +void reflock_tree_init(reflock_tree_t* rlt) { + tree_init(&rlt->tree); + InitializeSRWLock(&rlt->lock); +} + +void reflock_tree_node_init(reflock_tree_node_t* node) { + tree_node_init(&node->tree_node); + reflock_init(&node->reflock); +} + +int reflock_tree_add(reflock_tree_t* rlt, + reflock_tree_node_t* node, + uintptr_t key) { + int r; + + AcquireSRWLockExclusive(&rlt->lock); + r = tree_add(&rlt->tree, &node->tree_node, key); + ReleaseSRWLockExclusive(&rlt->lock); + + return r; +} + +reflock_tree_node_t* reflock_tree_del_and_ref(reflock_tree_t* rlt, + uintptr_t key) { + tree_node_t* tree_node; + reflock_tree_node_t* rlt_node; + + AcquireSRWLockExclusive(&rlt->lock); + + tree_node = tree_find(&rlt->tree, (uintptr_t) key); + rlt_node = safe_container_of(tree_node, reflock_tree_node_t, tree_node); + + if (rlt_node != NULL) { + tree_del(&rlt->tree, tree_node); + reflock_ref(&rlt_node->reflock); + } + + ReleaseSRWLockExclusive(&rlt->lock); + + return rlt_node; +} + +reflock_tree_node_t* reflock_tree_find_and_ref(reflock_tree_t* rlt, + uintptr_t key) { + tree_node_t* tree_node; + reflock_tree_node_t* rlt_node; + + AcquireSRWLockShared(&rlt->lock); + + tree_node = tree_find(&rlt->tree, (uintptr_t) key); + rlt_node = safe_container_of(tree_node, reflock_tree_node_t, tree_node); + if (rlt_node != NULL) + reflock_ref(&rlt_node->reflock); + + ReleaseSRWLockShared(&rlt->lock); + + return rlt_node; +} + +void reflock_tree_node_unref(reflock_tree_node_t* node) { + reflock_unref(&node->reflock); +} + +void reflock_tree_node_unref_and_destroy(reflock_tree_node_t* node) { + reflock_unref_and_destroy(&node->reflock); +} + +/* clang-format off */ +static const uint32_t _REF = 0x00000001; +static const uint32_t _REF_MASK = 0x0fffffff; +static const uint32_t _DESTROY = 0x10000000; +static const uint32_t _DESTROY_MASK = 0xf0000000; +static const uint32_t _POISON = 0x300DEAD0; +/* clang-format on */ + +static HANDLE _keyed_event = NULL; + +int reflock_global_init(void) { + NTSTATUS status = + NtCreateKeyedEvent(&_keyed_event, ~(ACCESS_MASK) 0, NULL, 0); + if (status != STATUS_SUCCESS) + return_error(-1, RtlNtStatusToDosError(status)); + return 0; +} + +void reflock_init(reflock_t* reflock) { + reflock->state = 0; +} + +static void _signal_event(const void* address) { + NTSTATUS status = + NtReleaseKeyedEvent(_keyed_event, (PVOID) address, FALSE, NULL); + if (status != STATUS_SUCCESS) + abort(); +} + +static void _await_event(const void* address) { + NTSTATUS status = + NtWaitForKeyedEvent(_keyed_event, (PVOID) address, FALSE, NULL); + if (status != STATUS_SUCCESS) + abort(); +} + +static inline uint32_t _sync_add_and_fetch(volatile uint32_t* target, + uint32_t value) { + static_assert(sizeof(*target) == sizeof(long), ""); + return InterlockedAdd((volatile long*) target, value); +} + +static inline uint32_t _sync_sub_and_fetch(volatile uint32_t* target, + uint32_t value) { + uint32_t add_value = -(int32_t) value; + return _sync_add_and_fetch(target, add_value); +} + +static inline uint32_t _sync_fetch_and_set(volatile uint32_t* target, + uint32_t value) { + static_assert(sizeof(*target) == sizeof(long), ""); + return InterlockedExchange((volatile long*) target, value); +} + +void reflock_ref(reflock_t* reflock) { + uint32_t state = _sync_add_and_fetch(&reflock->state, _REF); + unused(state); + assert((state & _DESTROY_MASK) == 0); /* Overflow or destroyed. */ +} + +void reflock_unref(reflock_t* reflock) { + uint32_t state = _sync_sub_and_fetch(&reflock->state, _REF); + uint32_t ref_count = state & _REF_MASK; + uint32_t destroy = state & _DESTROY_MASK; + + unused(ref_count); + unused(destroy); + + if (state == _DESTROY) + _signal_event(reflock); + else + assert(destroy == 0 || ref_count > 0); +} + +void reflock_unref_and_destroy(reflock_t* reflock) { + uint32_t state = _sync_add_and_fetch(&reflock->state, _DESTROY - _REF); + uint32_t ref_count = state & _REF_MASK; + + assert((state & _DESTROY_MASK) == + _DESTROY); /* Underflow or already destroyed. */ + + if (ref_count != 0) + _await_event(reflock); + + state = _sync_fetch_and_set(&reflock->state, _POISON); + assert(state == _DESTROY); +} + +#define _EP_EVENT_MASK 0xffff + +typedef struct _poll_req { + OVERLAPPED overlapped; + AFD_POLL_INFO poll_info; +} _poll_req_t; + +typedef enum _poll_status { + _POLL_IDLE = 0, + _POLL_PENDING, + _POLL_CANCELLED +} _poll_status_t; + +typedef struct _ep_sock_private { + ep_sock_t pub; + _poll_req_t poll_req; + poll_group_t* poll_group; + SOCKET afd_socket; + epoll_data_t user_data; + uint32_t user_events; + uint32_t pending_events; + _poll_status_t poll_status; + unsigned deleted : 1; +} _ep_sock_private_t; + +static DWORD _epoll_events_to_afd_events(uint32_t epoll_events) { + DWORD afd_events; + + /* These events should always be monitored. */ + assert(epoll_events & EPOLLERR); + assert(epoll_events & EPOLLHUP); + afd_events = AFD_POLL_ABORT | AFD_POLL_CONNECT_FAIL | AFD_POLL_LOCAL_CLOSE; + + if (epoll_events & (EPOLLIN | EPOLLRDNORM)) + afd_events |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT; + if (epoll_events & (EPOLLPRI | EPOLLRDBAND)) + afd_events |= AFD_POLL_RECEIVE_EXPEDITED; + if (epoll_events & (EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND)) + afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; + + return afd_events; +} + +static uint32_t _afd_events_to_epoll_events(DWORD afd_events) { + uint32_t epoll_events = 0; + + if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) + epoll_events |= EPOLLIN | EPOLLRDNORM; + if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) + epoll_events |= EPOLLPRI | EPOLLRDBAND; + if (afd_events & AFD_POLL_SEND) + epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; + if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT)) + epoll_events |= EPOLLIN | EPOLLRDHUP; + if (afd_events & AFD_POLL_ABORT) + epoll_events |= EPOLLHUP; + if (afd_events & AFD_POLL_CONNECT) + epoll_events |= EPOLLOUT | EPOLLWRNORM | EPOLLWRBAND; + if (afd_events & AFD_POLL_CONNECT_FAIL) + epoll_events |= EPOLLERR; + + return epoll_events; +} + +static int _poll_req_submit(_poll_req_t* poll_req, + uint32_t epoll_events, + SOCKET socket, + SOCKET driver_socket) { + int r; + + memset(&poll_req->overlapped, 0, sizeof poll_req->overlapped); + + poll_req->poll_info.Exclusive = FALSE; + poll_req->poll_info.NumberOfHandles = 1; + poll_req->poll_info.Timeout.QuadPart = INT64_MAX; + poll_req->poll_info.Handles[0].Handle = (HANDLE) socket; + poll_req->poll_info.Handles[0].Status = 0; + poll_req->poll_info.Handles[0].Events = + _epoll_events_to_afd_events(epoll_events); + + r = afd_poll(driver_socket, &poll_req->poll_info, &poll_req->overlapped); + if (r != 0 && GetLastError() != ERROR_IO_PENDING) + return_error(-1); + + return 0; +} + +static int _poll_req_cancel(_poll_req_t* poll_req, SOCKET driver_socket) { + OVERLAPPED* overlapped = &poll_req->overlapped; + + if (!CancelIoEx((HANDLE) driver_socket, overlapped)) { + DWORD error = GetLastError(); + if (error == ERROR_NOT_FOUND) + return 0; /* Already completed or canceled. */ + else + return_error(-1); + } + + return 0; +} + +static void _poll_req_complete(const _poll_req_t* poll_req, + uint32_t* epoll_events_out, + bool* socket_closed_out) { + const OVERLAPPED* overlapped = &poll_req->overlapped; + + uint32_t epoll_events = 0; + bool socket_closed = false; + + if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) { + /* The poll request was cancelled by CancelIoEx. */ + } else if (!NT_SUCCESS(overlapped->Internal)) { + /* The overlapped request itself failed in an unexpected way. */ + epoll_events = EPOLLERR; + } else if (poll_req->poll_info.NumberOfHandles < 1) { + /* This overlapped request succeeded but didn't report any events. */ + } else { + /* Events related to our socket were reported. */ + DWORD afd_events = poll_req->poll_info.Handles[0].Events; + + if (afd_events & AFD_POLL_LOCAL_CLOSE) + socket_closed = true; /* Socket closed locally be silently dropped. */ + else + epoll_events = _afd_events_to_epoll_events(afd_events); + } + + *epoll_events_out = epoll_events; + *socket_closed_out = socket_closed; +} + +static inline _ep_sock_private_t* _ep_sock_private(ep_sock_t* sock_info) { + return container_of(sock_info, _ep_sock_private_t, pub); +} + +static inline _ep_sock_private_t* _ep_sock_alloc(void) { + _ep_sock_private_t* sock_private = malloc(sizeof *sock_private); + if (sock_private == NULL) + return_error(NULL, ERROR_NOT_ENOUGH_MEMORY); + return sock_private; +} + +static inline void _ep_sock_free(_ep_sock_private_t* sock_private) { + assert(sock_private->poll_status == _POLL_IDLE); + free(sock_private); +} + +ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket) { + SOCKET afd_socket; + ssize_t protocol_id; + WSAPROTOCOL_INFOW protocol_info; + poll_group_t* poll_group; + _ep_sock_private_t* sock_private; + + if (socket == 0 || socket == INVALID_SOCKET) + return_error(NULL, ERROR_INVALID_HANDLE); + + protocol_id = afd_get_protocol(socket, &afd_socket, &protocol_info); + if (protocol_id < 0) + return NULL; + + poll_group = + ep_port_acquire_poll_group(port_info, protocol_id, &protocol_info); + if (poll_group == NULL) + return NULL; + + sock_private = _ep_sock_alloc(); + if (sock_private == NULL) + goto err1; + + memset(sock_private, 0, sizeof *sock_private); + + sock_private->afd_socket = afd_socket; + sock_private->poll_group = poll_group; + + tree_node_init(&sock_private->pub.tree_node); + queue_node_init(&sock_private->pub.queue_node); + + if (ep_port_add_socket(port_info, &sock_private->pub, socket) < 0) + goto err2; + + return &sock_private->pub; + +err2: + _ep_sock_free(sock_private); +err1: + ep_port_release_poll_group(poll_group); + + return NULL; +} + +void ep_sock_delete(ep_port_t* port_info, ep_sock_t* sock_info) { + _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); + + assert(!sock_private->deleted); + sock_private->deleted = true; + + if (sock_private->poll_status == _POLL_PENDING) { + _poll_req_cancel(&sock_private->poll_req, + poll_group_get_socket(sock_private->poll_group)); + sock_private->poll_status = _POLL_CANCELLED; + sock_private->pending_events = 0; + } + + ep_port_del_socket(port_info, sock_info); + ep_port_clear_socket_update(port_info, sock_info); + ep_port_release_poll_group(sock_private->poll_group); + sock_private->poll_group = NULL; + + /* If the poll request still needs to complete, the ep_sock object can't + * be free()d yet. `ep_sock_feed_event` will take care of this later. */ + if (sock_private->poll_status == _POLL_IDLE) + _ep_sock_free(sock_private); +} + +void ep_sock_force_delete(ep_port_t* port_info, ep_sock_t* sock_info) { + _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); + sock_private->poll_status = _POLL_IDLE; + ep_sock_delete(port_info, sock_info); +} + +ep_sock_t* ep_sock_from_overlapped(OVERLAPPED* overlapped) { + _ep_sock_private_t* sock_private = + container_of(overlapped, _ep_sock_private_t, poll_req.overlapped); + return &sock_private->pub; +} + +int ep_sock_set_event(ep_port_t* port_info, + ep_sock_t* sock_info, + const struct epoll_event* ev) { + _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); + + /* EPOLLERR and EPOLLHUP are always reported, even when no sollicited. */ + uint32_t events = ev->events | EPOLLERR | EPOLLHUP; + + sock_private->user_events = events; + sock_private->user_data = ev->data; + + if ((events & _EP_EVENT_MASK & ~(sock_private->pending_events)) != 0) + ep_port_request_socket_update(port_info, sock_info); + + return 0; +} + +int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) { + _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); + SOCKET driver_socket = poll_group_get_socket(sock_private->poll_group); + bool broken = false; + + assert(ep_port_is_socket_update_pending(port_info, sock_info)); + + if (sock_private->poll_status == _POLL_PENDING && + (sock_private->user_events & _EP_EVENT_MASK & + ~sock_private->pending_events) == 0) { + /* All the events the user is interested in are already being monitored + * by the pending poll request. It might spuriously complete because of an + * event that we're no longer interested in; if that happens we just + * submit another poll request with the right event mask. */ + + } else if (sock_private->poll_status == _POLL_PENDING) { + /* A poll request is already pending, but it's not monitoring for all the + * events that the user is interested in. Cancel the pending poll request; + * when it completes it will be submitted again with the correct event + * mask. */ + if (_poll_req_cancel(&sock_private->poll_req, driver_socket) < 0) + return -1; + sock_private->poll_status = _POLL_CANCELLED; + sock_private->pending_events = 0; + + } else if (sock_private->poll_status == _POLL_CANCELLED) { + /* The poll request has already been cancelled, we're still waiting for it + * to return. For now, there's nothing that needs to be done. */ + + } else if (sock_private->poll_status == _POLL_IDLE) { + if (_poll_req_submit(&sock_private->poll_req, + sock_private->user_events, + sock_private->afd_socket, + driver_socket) < 0) { + if (GetLastError() == ERROR_INVALID_HANDLE) + /* The socket is broken. It will be dropped from the epoll set. */ + broken = true; + else + /* Another error occurred, which is propagated to the caller. */ + return -1; + + } else { + /* The poll request was successfully submitted. */ + sock_private->poll_status = _POLL_PENDING; + sock_private->pending_events = sock_private->user_events; + } + } else { + /* Unreachable. */ + assert(false); + } + + ep_port_clear_socket_update(port_info, sock_info); + + /* If we saw an ERROR_INVALID_HANDLE error, drop the socket. */ + if (broken) + ep_sock_delete(port_info, sock_info); + + return 0; +} + +int ep_sock_feed_event(ep_port_t* port_info, + ep_sock_t* sock_info, + struct epoll_event* ev) { + _ep_sock_private_t* sock_private = _ep_sock_private(sock_info); + + uint32_t epoll_events; + bool drop_socket; + int ev_count = 0; + + sock_private->poll_status = _POLL_IDLE; + sock_private->pending_events = 0; + + if (sock_private->deleted) { + /* Ignore completion for overlapped poll operation if the socket has been + * deleted; instead, free the socket. */ + _ep_sock_free(sock_private); + return 0; + } + + _poll_req_complete(&sock_private->poll_req, &epoll_events, &drop_socket); + + /* Filter events that the user didn't ask for. */ + epoll_events &= sock_private->user_events; + + /* Clear the event mask if EPOLLONESHOT is set and there are any events + * to report. */ + if (epoll_events != 0 && (sock_private->user_events & EPOLLONESHOT)) + sock_private->user_events = EPOLLERR | EPOLLHUP; + + /* Fill the ev structure if there are any events to report. */ + if (epoll_events != 0) { + ev->data = sock_private->user_data; + ev->events = epoll_events; + ev_count = 1; + } + + if (drop_socket) + /* Drop the socket from the epoll set. */ + ep_sock_delete(port_info, sock_info); + else + /* Put the socket back onto the attention list so a new poll request will + * be submitted. */ + ep_port_request_socket_update(port_info, sock_info); + + return ev_count; +} + static inline int _tree_compare(tree_node_t* a, tree_node_t* b) { if (a->key < b->key) return -1; @@ -2691,7 +3141,7 @@ int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key) { existing_node = RB_INSERT(tree, tree, node); if (existing_node != NULL) - return_error(-1, ERROR_ALREADY_EXISTS); + return -1; return 0; } @@ -2702,7 +3152,7 @@ int tree_del(tree_t* tree, tree_node_t* node) { removed_node = RB_REMOVE(tree, tree, node); if (removed_node == NULL) - return_error(-1, ERROR_NOT_FOUND); + return -1; else assert(removed_node == node); @@ -2710,19 +3160,21 @@ int tree_del(tree_t* tree, tree_node_t* node) { } tree_node_t* tree_find(tree_t* tree, uintptr_t key) { - tree_node_t* node; tree_node_t lookup; memset(&lookup, 0, sizeof lookup); lookup.key = key; - node = RB_FIND(tree, tree, &lookup); - if (node == NULL) - return_error(NULL, ERROR_NOT_FOUND); - - return node; + return RB_FIND(tree, tree, &lookup); } tree_node_t* tree_root(tree_t* tree) { return RB_ROOT(tree); } + +void* _util_safe_container_of_helper(void* ptr, size_t offset) { + if (ptr == NULL) + return NULL; + else + return (char*) ptr - offset; +}