diff --git a/LICENSE b/LICENSE index 6c8b1c8..d7fc4b1 100644 --- a/LICENSE +++ b/LICENSE @@ -1,7 +1,7 @@ wepoll - epoll for Windows https://github.com/piscisaureus/wepoll -Copyright 2012-2019, Bert Belder +Copyright 2012-2020, Bert Belder All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/wepoll.c b/wepoll.c index 651673a..a9c59bb 100644 --- a/wepoll.c +++ b/wepoll.c @@ -2,7 +2,7 @@ * wepoll - epoll for Windows * https://github.com/piscisaureus/wepoll * - * Copyright 2012-2019, Bert Belder + * Copyright 2012-2020, Bert Belder * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -29,9 +29,7 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#ifndef WEPOLL_EXPORT #define WEPOLL_EXPORT -#endif #include @@ -107,7 +105,8 @@ WEPOLL_EXPORT int epoll_wait(HANDLE ephnd, } /* extern "C" */ #endif -#include +#include + #include #define WEPOLL_INTERNAL static @@ -165,6 +164,10 @@ typedef NTSTATUS* PNTSTATUS; #define STATUS_CANCELLED ((NTSTATUS) 0xC0000120L) #endif +#ifndef STATUS_NOT_FOUND +#define STATUS_NOT_FOUND ((NTSTATUS) 0xC0000225L) +#endif + typedef struct _IO_STATUS_BLOCK { NTSTATUS Status; ULONG_PTR Information; @@ -205,6 +208,13 @@ typedef struct _OBJECT_ATTRIBUTES { (STANDARD_RIGHTS_REQUIRED | KEYEDEVENT_WAIT | KEYEDEVENT_WAKE) #define NT_NTDLL_IMPORT_LIST(X) \ + X(NTSTATUS, \ + NTAPI, \ + NtCancelIoFileEx, \ + (HANDLE FileHandle, \ + PIO_STATUS_BLOCK IoRequestToCancel, \ + PIO_STATUS_BLOCK IoStatusBlock)) \ + \ X(NTSTATUS, \ NTAPI, \ NtCreateFile, \ @@ -265,25 +275,6 @@ typedef struct _OBJECT_ATTRIBUTES { NT_NTDLL_IMPORT_LIST(X) #undef X -#include -#include - -#ifndef _SSIZE_T_DEFINED -typedef intptr_t ssize_t; -#endif - -#define array_count(a) (sizeof(a) / (sizeof((a)[0]))) - -#define container_of(ptr, type, member) \ - ((type*) ((uintptr_t) (ptr) - offsetof(type, member))) - -#define unused_var(v) ((void) (v)) - -/* Polyfill `inline` for older versions of msvc (up to Visual Studio 2013) */ -#if defined(_MSC_VER) && _MSC_VER < 1900 -#define inline __inline -#endif - #define AFD_POLL_RECEIVE 0x0001 #define AFD_POLL_RECEIVE_EXPEDITED 0x0002 #define AFD_POLL_SEND 0x0004 @@ -306,12 +297,14 @@ typedef struct _AFD_POLL_INFO { AFD_POLL_HANDLE_INFO Handles[1]; } AFD_POLL_INFO, *PAFD_POLL_INFO; -WEPOLL_INTERNAL int afd_create_helper_handle(HANDLE iocp, +WEPOLL_INTERNAL int afd_create_helper_handle(HANDLE iocp_handle, HANDLE* afd_helper_handle_out); WEPOLL_INTERNAL int afd_poll(HANDLE afd_helper_handle, AFD_POLL_INFO* poll_info, - OVERLAPPED* overlapped); + IO_STATUS_BLOCK* io_status_block); +WEPOLL_INTERNAL int afd_cancel_poll(HANDLE afd_helper_handle, + IO_STATUS_BLOCK* io_status_block); #define return_map_error(value) \ do { \ @@ -329,9 +322,6 @@ WEPOLL_INTERNAL void err_map_win_error(void); WEPOLL_INTERNAL void err_set_win_error(DWORD error); WEPOLL_INTERNAL int err_check_handle(HANDLE handle); -WEPOLL_INTERNAL int ws_global_init(void); -WEPOLL_INTERNAL SOCKET ws_get_base_socket(SOCKET socket); - #define IOCTL_AFD_POLL 0x00012024 static UNICODE_STRING afd__helper_name = @@ -340,7 +330,8 @@ static UNICODE_STRING afd__helper_name = static OBJECT_ATTRIBUTES afd__helper_attributes = RTL_CONSTANT_OBJECT_ATTRIBUTES(&afd__helper_name, 0); -int afd_create_helper_handle(HANDLE iocp, HANDLE* afd_helper_handle_out) { +int afd_create_helper_handle(HANDLE iocp_handle, + HANDLE* afd_helper_handle_out) { HANDLE afd_helper_handle; IO_STATUS_BLOCK iosb; NTSTATUS status; @@ -362,7 +353,7 @@ int afd_create_helper_handle(HANDLE iocp, HANDLE* afd_helper_handle_out) { if (status != STATUS_SUCCESS) return_set_error(-1, RtlNtStatusToDosError(status)); - if (CreateIoCompletionPort(afd_helper_handle, iocp, 0, 0) == NULL) + if (CreateIoCompletionPort(afd_helper_handle, iocp_handle, 0, 0) == NULL) goto error; if (!SetFileCompletionNotificationModes(afd_helper_handle, @@ -379,33 +370,18 @@ error: int afd_poll(HANDLE afd_helper_handle, AFD_POLL_INFO* poll_info, - OVERLAPPED* overlapped) { - IO_STATUS_BLOCK* iosb; - HANDLE event; - void* apc_context; + IO_STATUS_BLOCK* io_status_block) { NTSTATUS status; /* Blocking operation is not supported. */ - assert(overlapped != NULL); + assert(io_status_block != NULL); - iosb = (IO_STATUS_BLOCK*) &overlapped->Internal; - event = overlapped->hEvent; - - /* Do what other windows APIs would do: if hEvent has it's lowest bit set, - * don't post a completion to the completion port. */ - if ((uintptr_t) event & 1) { - event = (HANDLE)((uintptr_t) event & ~(uintptr_t) 1); - apc_context = NULL; - } else { - apc_context = overlapped; - } - - iosb->Status = STATUS_PENDING; + io_status_block->Status = STATUS_PENDING; status = NtDeviceIoControlFile(afd_helper_handle, - event, NULL, - apc_context, - iosb, + NULL, + io_status_block, + io_status_block, IOCTL_AFD_POLL, poll_info, sizeof *poll_info, @@ -420,176 +396,37 @@ int afd_poll(HANDLE afd_helper_handle, return_set_error(-1, RtlNtStatusToDosError(status)); } +int afd_cancel_poll(HANDLE afd_helper_handle, + IO_STATUS_BLOCK* io_status_block) { + NTSTATUS cancel_status; + IO_STATUS_BLOCK cancel_iosb; + + /* If the poll operation has already completed or has been cancelled earlier, + * there's nothing left for us to do. */ + if (io_status_block->Status != STATUS_PENDING) + return 0; + + cancel_status = + NtCancelIoFileEx(afd_helper_handle, io_status_block, &cancel_iosb); + + /* NtCancelIoFileEx() may return STATUS_NOT_FOUND if the operation completed + * just before calling NtCancelIoFileEx(). This is not an error. */ + if (cancel_status == STATUS_SUCCESS || cancel_status == STATUS_NOT_FOUND) + return 0; + else + return_set_error(-1, RtlNtStatusToDosError(cancel_status)); +} + WEPOLL_INTERNAL int epoll_global_init(void); WEPOLL_INTERNAL int init(void); -#include - -typedef struct queue_node queue_node_t; - -typedef struct queue_node { - queue_node_t* prev; - queue_node_t* next; -} queue_node_t; - -typedef struct queue { - queue_node_t head; -} queue_t; - -WEPOLL_INTERNAL void queue_init(queue_t* queue); -WEPOLL_INTERNAL void queue_node_init(queue_node_t* node); - -WEPOLL_INTERNAL queue_node_t* queue_first(const queue_t* queue); -WEPOLL_INTERNAL queue_node_t* queue_last(const queue_t* queue); - -WEPOLL_INTERNAL void queue_prepend(queue_t* queue, queue_node_t* node); -WEPOLL_INTERNAL void queue_append(queue_t* queue, queue_node_t* node); -WEPOLL_INTERNAL void queue_move_first(queue_t* queue, queue_node_t* node); -WEPOLL_INTERNAL void queue_move_last(queue_t* queue, queue_node_t* node); -WEPOLL_INTERNAL void queue_remove(queue_node_t* node); - -WEPOLL_INTERNAL bool queue_empty(const queue_t* queue); -WEPOLL_INTERNAL bool queue_enqueued(const queue_node_t* node); - -typedef struct port_state port_state_t; -typedef struct poll_group poll_group_t; - -WEPOLL_INTERNAL poll_group_t* poll_group_acquire(port_state_t* port); -WEPOLL_INTERNAL void poll_group_release(poll_group_t* poll_group); - -WEPOLL_INTERNAL void poll_group_delete(poll_group_t* poll_group); - -WEPOLL_INTERNAL poll_group_t* poll_group_from_queue_node( - queue_node_t* queue_node); -WEPOLL_INTERNAL HANDLE - poll_group_get_afd_helper_handle(poll_group_t* poll_group); - -/* N.b.: the tree functions do not set errno or LastError when they fail. Each - * of the API functions has at most one failure mode. It is up to the caller to - * set an appropriate error code when necessary. */ - -typedef struct tree tree_t; -typedef struct tree_node tree_node_t; - -typedef struct tree { - tree_node_t* root; -} tree_t; - -typedef struct tree_node { - tree_node_t* left; - tree_node_t* right; - tree_node_t* parent; - uintptr_t key; - bool red; -} tree_node_t; - -WEPOLL_INTERNAL void tree_init(tree_t* tree); -WEPOLL_INTERNAL void tree_node_init(tree_node_t* node); - -WEPOLL_INTERNAL int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key); -WEPOLL_INTERNAL void tree_del(tree_t* tree, tree_node_t* node); - -WEPOLL_INTERNAL tree_node_t* tree_find(const tree_t* tree, uintptr_t key); -WEPOLL_INTERNAL tree_node_t* tree_root(const tree_t* tree); - typedef struct port_state port_state_t; +typedef struct queue queue_t; typedef struct sock_state sock_state_t; +typedef struct ts_tree_node ts_tree_node_t; -WEPOLL_INTERNAL sock_state_t* sock_new(port_state_t* port_state, - SOCKET socket); -WEPOLL_INTERNAL void sock_delete(port_state_t* port_state, - sock_state_t* sock_state); -WEPOLL_INTERNAL void sock_force_delete(port_state_t* port_state, - sock_state_t* sock_state); - -WEPOLL_INTERNAL int sock_set_event(port_state_t* port_state, - sock_state_t* sock_state, - const struct epoll_event* ev); - -WEPOLL_INTERNAL int sock_update(port_state_t* port_state, - sock_state_t* sock_state); -WEPOLL_INTERNAL int sock_feed_event(port_state_t* port_state, - OVERLAPPED* overlapped, - struct epoll_event* ev); - -WEPOLL_INTERNAL sock_state_t* sock_state_from_queue_node( - queue_node_t* queue_node); -WEPOLL_INTERNAL queue_node_t* sock_state_to_queue_node( - sock_state_t* sock_state); -WEPOLL_INTERNAL sock_state_t* sock_state_from_tree_node( - tree_node_t* tree_node); -WEPOLL_INTERNAL tree_node_t* sock_state_to_tree_node(sock_state_t* sock_state); - -/* The reflock is a special kind of lock that normally prevents a chunk of - * memory from being freed, but does allow the chunk of memory to eventually be - * released in a coordinated fashion. - * - * Under normal operation, threads increase and decrease the reference count, - * which are wait-free operations. - * - * Exactly once during the reflock's lifecycle, a thread holding a reference to - * the lock may "destroy" the lock; this operation blocks until all other - * threads holding a reference to the lock have dereferenced it. After - * "destroy" returns, the calling thread may assume that no other threads have - * a reference to the lock. - * - * Attemmpting to lock or destroy a lock after reflock_unref_and_destroy() has - * been called is invalid and results in undefined behavior. Therefore the user - * should use another lock to guarantee that this can't happen. - */ - -typedef struct reflock { - volatile long state; /* 32-bit Interlocked APIs operate on `long` values. */ -} reflock_t; - -WEPOLL_INTERNAL int reflock_global_init(void); - -WEPOLL_INTERNAL void reflock_init(reflock_t* reflock); -WEPOLL_INTERNAL void reflock_ref(reflock_t* reflock); -WEPOLL_INTERNAL void reflock_unref(reflock_t* reflock); -WEPOLL_INTERNAL void reflock_unref_and_destroy(reflock_t* reflock); - -typedef struct ts_tree { - tree_t tree; - SRWLOCK lock; -} ts_tree_t; - -typedef struct ts_tree_node { - tree_node_t tree_node; - reflock_t reflock; -} ts_tree_node_t; - -WEPOLL_INTERNAL void ts_tree_init(ts_tree_t* rtl); -WEPOLL_INTERNAL void ts_tree_node_init(ts_tree_node_t* node); - -WEPOLL_INTERNAL int ts_tree_add(ts_tree_t* ts_tree, - ts_tree_node_t* node, - uintptr_t key); - -WEPOLL_INTERNAL ts_tree_node_t* ts_tree_del_and_ref(ts_tree_t* ts_tree, - uintptr_t key); -WEPOLL_INTERNAL ts_tree_node_t* ts_tree_find_and_ref(ts_tree_t* ts_tree, - uintptr_t key); - -WEPOLL_INTERNAL void ts_tree_node_unref(ts_tree_node_t* node); -WEPOLL_INTERNAL void ts_tree_node_unref_and_destroy(ts_tree_node_t* node); - -typedef struct port_state port_state_t; -typedef struct sock_state sock_state_t; - -typedef struct port_state { - HANDLE iocp; - tree_t sock_tree; - queue_t sock_update_queue; - queue_t sock_deleted_queue; - queue_t poll_group_queue; - ts_tree_node_t handle_tree_node; - CRITICAL_SECTION lock; - size_t active_poll_count; -} port_state_t; - -WEPOLL_INTERNAL port_state_t* port_new(HANDLE* iocp_out); +WEPOLL_INTERNAL port_state_t* port_new(HANDLE* iocp_handle_out); WEPOLL_INTERNAL int port_close(port_state_t* port_state); WEPOLL_INTERNAL int port_delete(port_state_t* port_state); @@ -621,12 +458,99 @@ WEPOLL_INTERNAL void port_add_deleted_socket(port_state_t* port_state, WEPOLL_INTERNAL void port_remove_deleted_socket(port_state_t* port_state, sock_state_t* sock_state); -static ts_tree_t epoll__handle_tree; +WEPOLL_INTERNAL HANDLE port_get_iocp_handle(port_state_t* port_state); +WEPOLL_INTERNAL queue_t* port_get_poll_group_queue(port_state_t* port_state); -static inline port_state_t* epoll__handle_tree_node_to_port( - ts_tree_node_t* tree_node) { - return container_of(tree_node, port_state_t, handle_tree_node); -} +WEPOLL_INTERNAL port_state_t* port_state_from_handle_tree_node( + ts_tree_node_t* tree_node); +WEPOLL_INTERNAL ts_tree_node_t* port_state_to_handle_tree_node( + port_state_t* port_state); + +/* The reflock is a special kind of lock that normally prevents a chunk of + * memory from being freed, but does allow the chunk of memory to eventually be + * released in a coordinated fashion. + * + * Under normal operation, threads increase and decrease the reference count, + * which are wait-free operations. + * + * Exactly once during the reflock's lifecycle, a thread holding a reference to + * the lock may "destroy" the lock; this operation blocks until all other + * threads holding a reference to the lock have dereferenced it. After + * "destroy" returns, the calling thread may assume that no other threads have + * a reference to the lock. + * + * Attemmpting to lock or destroy a lock after reflock_unref_and_destroy() has + * been called is invalid and results in undefined behavior. Therefore the user + * should use another lock to guarantee that this can't happen. + */ + +typedef struct reflock { + volatile long state; /* 32-bit Interlocked APIs operate on `long` values. */ +} reflock_t; + +WEPOLL_INTERNAL int reflock_global_init(void); + +WEPOLL_INTERNAL void reflock_init(reflock_t* reflock); +WEPOLL_INTERNAL void reflock_ref(reflock_t* reflock); +WEPOLL_INTERNAL void reflock_unref(reflock_t* reflock); +WEPOLL_INTERNAL void reflock_unref_and_destroy(reflock_t* reflock); + +#include + +/* N.b.: the tree functions do not set errno or LastError when they fail. Each + * of the API functions has at most one failure mode. It is up to the caller to + * set an appropriate error code when necessary. */ + +typedef struct tree tree_t; +typedef struct tree_node tree_node_t; + +typedef struct tree { + tree_node_t* root; +} tree_t; + +typedef struct tree_node { + tree_node_t* left; + tree_node_t* right; + tree_node_t* parent; + uintptr_t key; + bool red; +} tree_node_t; + +WEPOLL_INTERNAL void tree_init(tree_t* tree); +WEPOLL_INTERNAL void tree_node_init(tree_node_t* node); + +WEPOLL_INTERNAL int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key); +WEPOLL_INTERNAL void tree_del(tree_t* tree, tree_node_t* node); + +WEPOLL_INTERNAL tree_node_t* tree_find(const tree_t* tree, uintptr_t key); +WEPOLL_INTERNAL tree_node_t* tree_root(const tree_t* tree); + +typedef struct ts_tree { + tree_t tree; + SRWLOCK lock; +} ts_tree_t; + +typedef struct ts_tree_node { + tree_node_t tree_node; + reflock_t reflock; +} ts_tree_node_t; + +WEPOLL_INTERNAL void ts_tree_init(ts_tree_t* rtl); +WEPOLL_INTERNAL void ts_tree_node_init(ts_tree_node_t* node); + +WEPOLL_INTERNAL int ts_tree_add(ts_tree_t* ts_tree, + ts_tree_node_t* node, + uintptr_t key); + +WEPOLL_INTERNAL ts_tree_node_t* ts_tree_del_and_ref(ts_tree_t* ts_tree, + uintptr_t key); +WEPOLL_INTERNAL ts_tree_node_t* ts_tree_find_and_ref(ts_tree_t* ts_tree, + uintptr_t key); + +WEPOLL_INTERNAL void ts_tree_node_unref(ts_tree_node_t* node); +WEPOLL_INTERNAL void ts_tree_node_unref_and_destroy(ts_tree_node_t* node); + +static ts_tree_t epoll__handle_tree; int epoll_global_init(void) { ts_tree_init(&epoll__handle_tree); @@ -636,6 +560,7 @@ int epoll_global_init(void) { static HANDLE epoll__create(void) { port_state_t* port_state; HANDLE ephnd; + ts_tree_node_t* tree_node; if (init() < 0) return NULL; @@ -644,9 +569,8 @@ static HANDLE epoll__create(void) { if (port_state == NULL) return NULL; - if (ts_tree_add(&epoll__handle_tree, - &port_state->handle_tree_node, - (uintptr_t) ephnd) < 0) { + tree_node = port_state_to_handle_tree_node(port_state); + if (ts_tree_add(&epoll__handle_tree, tree_node, (uintptr_t) ephnd) < 0) { /* This should never happen. */ port_delete(port_state); return_set_error(NULL, ERROR_ALREADY_EXISTS); @@ -682,7 +606,7 @@ int epoll_close(HANDLE ephnd) { goto err; } - port_state = epoll__handle_tree_node_to_port(tree_node); + port_state = port_state_from_handle_tree_node(tree_node); port_close(port_state); ts_tree_node_unref_and_destroy(tree_node); @@ -708,7 +632,7 @@ int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event* ev) { goto err; } - port_state = epoll__handle_tree_node_to_port(tree_node); + port_state = port_state_from_handle_tree_node(tree_node); r = port_ctl(port_state, op, sock, ev); ts_tree_node_unref(tree_node); @@ -719,7 +643,7 @@ int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event* ev) { return 0; err: - /* On Linux, in the case of epoll_ctl_mod(), EBADF takes priority over other + /* On Linux, in the case of epoll_ctl(), EBADF takes priority over other * errors. Wepoll mimics this behavior. */ err_check_handle(ephnd); err_check_handle((HANDLE) sock); @@ -746,7 +670,7 @@ int epoll_wait(HANDLE ephnd, goto err; } - port_state = epoll__handle_tree_node_to_port(tree_node); + port_state = port_state_from_handle_tree_node(tree_node); num_events = port_wait(port_state, events, maxevents, timeout); ts_tree_node_unref(tree_node); @@ -900,6 +824,23 @@ int err_check_handle(HANDLE handle) { return 0; } +#include + +#define array_count(a) (sizeof(a) / (sizeof((a)[0]))) + +#define container_of(ptr, type, member) \ + ((type*) ((uintptr_t) (ptr) - offsetof(type, member))) + +#define unused_var(v) ((void) (v)) + +/* Polyfill `inline` for older versions of msvc (up to Visual Studio 2013) */ +#if defined(_MSC_VER) && _MSC_VER < 1900 +#define inline __inline +#endif + +WEPOLL_INTERNAL int ws_global_init(void); +WEPOLL_INTERNAL SOCKET ws_get_base_socket(SOCKET socket); + static bool init__done = false; static INIT_ONCE init__once = INIT_ONCE_STATIC_INIT; @@ -969,6 +910,44 @@ int nt_global_init(void) { #include +typedef struct poll_group poll_group_t; + +typedef struct queue_node queue_node_t; + +WEPOLL_INTERNAL poll_group_t* poll_group_acquire(port_state_t* port); +WEPOLL_INTERNAL void poll_group_release(poll_group_t* poll_group); + +WEPOLL_INTERNAL void poll_group_delete(poll_group_t* poll_group); + +WEPOLL_INTERNAL poll_group_t* poll_group_from_queue_node( + queue_node_t* queue_node); +WEPOLL_INTERNAL HANDLE + poll_group_get_afd_helper_handle(poll_group_t* poll_group); + +typedef struct queue_node { + queue_node_t* prev; + queue_node_t* next; +} queue_node_t; + +typedef struct queue { + queue_node_t head; +} queue_t; + +WEPOLL_INTERNAL void queue_init(queue_t* queue); +WEPOLL_INTERNAL void queue_node_init(queue_node_t* node); + +WEPOLL_INTERNAL queue_node_t* queue_first(const queue_t* queue); +WEPOLL_INTERNAL queue_node_t* queue_last(const queue_t* queue); + +WEPOLL_INTERNAL void queue_prepend(queue_t* queue, queue_node_t* node); +WEPOLL_INTERNAL void queue_append(queue_t* queue, queue_node_t* node); +WEPOLL_INTERNAL void queue_move_first(queue_t* queue, queue_node_t* node); +WEPOLL_INTERNAL void queue_move_last(queue_t* queue, queue_node_t* node); +WEPOLL_INTERNAL void queue_remove(queue_node_t* node); + +WEPOLL_INTERNAL bool queue_empty(const queue_t* queue); +WEPOLL_INTERNAL bool queue_enqueued(const queue_node_t* node); + static const size_t POLL_GROUP__MAX_GROUP_SIZE = 32; typedef struct poll_group { @@ -979,6 +958,9 @@ typedef struct poll_group { } poll_group_t; static poll_group_t* poll_group__new(port_state_t* port_state) { + HANDLE iocp_handle = port_get_iocp_handle(port_state); + queue_t* poll_group_queue = port_get_poll_group_queue(port_state); + poll_group_t* poll_group = malloc(sizeof *poll_group); if (poll_group == NULL) return_set_error(NULL, ERROR_NOT_ENOUGH_MEMORY); @@ -988,13 +970,13 @@ static poll_group_t* poll_group__new(port_state_t* port_state) { queue_node_init(&poll_group->queue_node); poll_group->port_state = port_state; - if (afd_create_helper_handle(port_state->iocp, - &poll_group->afd_helper_handle) < 0) { + if (afd_create_helper_handle(iocp_handle, &poll_group->afd_helper_handle) < + 0) { free(poll_group); return NULL; } - queue_append(&port_state->poll_group_queue, &poll_group->queue_node); + queue_append(poll_group_queue, &poll_group->queue_node); return poll_group; } @@ -1015,10 +997,11 @@ HANDLE poll_group_get_afd_helper_handle(poll_group_t* poll_group) { } poll_group_t* poll_group_acquire(port_state_t* port_state) { - queue_t* queue = &port_state->poll_group_queue; + queue_t* poll_group_queue = port_get_poll_group_queue(port_state); poll_group_t* poll_group = - !queue_empty(queue) - ? container_of(queue_last(queue), poll_group_t, queue_node) + !queue_empty(poll_group_queue) + ? container_of( + queue_last(poll_group_queue), poll_group_t, queue_node) : NULL; if (poll_group == NULL || @@ -1028,24 +1011,61 @@ poll_group_t* poll_group_acquire(port_state_t* port_state) { return NULL; if (++poll_group->group_size == POLL_GROUP__MAX_GROUP_SIZE) - queue_move_first(&port_state->poll_group_queue, &poll_group->queue_node); + queue_move_first(poll_group_queue, &poll_group->queue_node); return poll_group; } void poll_group_release(poll_group_t* poll_group) { port_state_t* port_state = poll_group->port_state; + queue_t* poll_group_queue = port_get_poll_group_queue(port_state); poll_group->group_size--; assert(poll_group->group_size < POLL_GROUP__MAX_GROUP_SIZE); - queue_move_last(&port_state->poll_group_queue, &poll_group->queue_node); + queue_move_last(poll_group_queue, &poll_group->queue_node); /* Poll groups are currently only freed when the epoll port is closed. */ } +WEPOLL_INTERNAL sock_state_t* sock_new(port_state_t* port_state, + SOCKET socket); +WEPOLL_INTERNAL void sock_delete(port_state_t* port_state, + sock_state_t* sock_state); +WEPOLL_INTERNAL void sock_force_delete(port_state_t* port_state, + sock_state_t* sock_state); + +WEPOLL_INTERNAL int sock_set_event(port_state_t* port_state, + sock_state_t* sock_state, + const struct epoll_event* ev); + +WEPOLL_INTERNAL int sock_update(port_state_t* port_state, + sock_state_t* sock_state); +WEPOLL_INTERNAL int sock_feed_event(port_state_t* port_state, + IO_STATUS_BLOCK* io_status_block, + struct epoll_event* ev); + +WEPOLL_INTERNAL sock_state_t* sock_state_from_queue_node( + queue_node_t* queue_node); +WEPOLL_INTERNAL queue_node_t* sock_state_to_queue_node( + sock_state_t* sock_state); +WEPOLL_INTERNAL sock_state_t* sock_state_from_tree_node( + tree_node_t* tree_node); +WEPOLL_INTERNAL tree_node_t* sock_state_to_tree_node(sock_state_t* sock_state); + #define PORT__MAX_ON_STACK_COMPLETIONS 256 +typedef struct port_state { + HANDLE iocp_handle; + tree_t sock_tree; + queue_t sock_update_queue; + queue_t sock_deleted_queue; + queue_t poll_group_queue; + ts_tree_node_t handle_tree_node; + CRITICAL_SECTION lock; + size_t active_poll_count; +} port_state_t; + static port_state_t* port__alloc(void) { port_state_t* port_state = malloc(sizeof *port_state); if (port_state == NULL) @@ -1060,28 +1080,29 @@ static void port__free(port_state_t* port) { } static HANDLE port__create_iocp(void) { - HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); - if (iocp == NULL) + HANDLE iocp_handle = + CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (iocp_handle == NULL) return_map_error(NULL); - return iocp; + return iocp_handle; } -port_state_t* port_new(HANDLE* iocp_out) { +port_state_t* port_new(HANDLE* iocp_handle_out) { port_state_t* port_state; - HANDLE iocp; + HANDLE iocp_handle; port_state = port__alloc(); if (port_state == NULL) goto err1; - iocp = port__create_iocp(); - if (iocp == NULL) + iocp_handle = port__create_iocp(); + if (iocp_handle == NULL) goto err2; memset(port_state, 0, sizeof *port_state); - port_state->iocp = iocp; + port_state->iocp_handle = iocp_handle; tree_init(&port_state->sock_tree); queue_init(&port_state->sock_update_queue); queue_init(&port_state->sock_deleted_queue); @@ -1089,7 +1110,7 @@ port_state_t* port_new(HANDLE* iocp_out) { ts_tree_node_init(&port_state->handle_tree_node); InitializeCriticalSection(&port_state->lock); - *iocp_out = iocp; + *iocp_handle_out = iocp_handle; return port_state; err2: @@ -1099,10 +1120,10 @@ err1: } static int port__close_iocp(port_state_t* port_state) { - HANDLE iocp = port_state->iocp; - port_state->iocp = NULL; + HANDLE iocp_handle = port_state->iocp_handle; + port_state->iocp_handle = NULL; - if (!CloseHandle(iocp)) + if (!CloseHandle(iocp_handle)) return_map_error(-1); return 0; @@ -1123,7 +1144,7 @@ int port_delete(port_state_t* port_state) { queue_node_t* queue_node; /* At this point the IOCP port should have been closed. */ - assert(port_state->iocp == NULL); + assert(port_state->iocp_handle == NULL); while ((tree_node = tree_root(&port_state->sock_tree)) != NULL) { sock_state_t* sock_state = sock_state_from_tree_node(tree_node); @@ -1180,10 +1201,11 @@ static int port__feed_events(port_state_t* port_state, DWORD i; for (i = 0; i < iocp_event_count; i++) { - OVERLAPPED* overlapped = iocp_events[i].lpOverlapped; + IO_STATUS_BLOCK* io_status_block = + (IO_STATUS_BLOCK*) iocp_events[i].lpOverlapped; struct epoll_event* ev = &epoll_events[epoll_event_count]; - epoll_event_count += sock_feed_event(port_state, overlapped, ev); + epoll_event_count += sock_feed_event(port_state, io_status_block, ev); } return epoll_event_count; @@ -1203,7 +1225,7 @@ static int port__poll(port_state_t* port_state, LeaveCriticalSection(&port_state->lock); - BOOL r = GetQueuedCompletionStatusEx(port_state->iocp, + BOOL r = GetQueuedCompletionStatusEx(port_state->iocp_handle, iocp_events, maxevents, &completion_count, @@ -1424,6 +1446,23 @@ void port_remove_deleted_socket(port_state_t* port_state, queue_remove(sock_state_to_queue_node(sock_state)); } +HANDLE port_get_iocp_handle(port_state_t* port_state) { + assert(port_state->iocp_handle != NULL); + return port_state->iocp_handle; +} + +queue_t* port_get_poll_group_queue(port_state_t* port_state) { + return &port_state->poll_group_queue; +} + +port_state_t* port_state_from_handle_tree_node(ts_tree_node_t* tree_node) { + return container_of(tree_node, port_state_t, handle_tree_node); +} + +ts_tree_node_t* port_state_to_handle_tree_node(port_state_t* port_state) { + return &port_state->handle_tree_node; +} + void queue_init(queue_t* queue) { queue_node_init(&queue->head); } @@ -1492,8 +1531,8 @@ static const long REFLOCK__POISON = (long) 0x300dead0; static HANDLE reflock__keyed_event = NULL; int reflock_global_init(void) { - NTSTATUS status = - NtCreateKeyedEvent(&reflock__keyed_event, KEYEDEVENT_ALL_ACCESS, NULL, 0); + NTSTATUS status = NtCreateKeyedEvent( + &reflock__keyed_event, KEYEDEVENT_ALL_ACCESS, NULL, 0); if (status != STATUS_SUCCESS) return_set_error(-1, RtlNtStatusToDosError(status)); return 0; @@ -1561,7 +1600,7 @@ typedef enum sock__poll_status { } sock__poll_status_t; typedef struct sock_state { - OVERLAPPED overlapped; + IO_STATUS_BLOCK io_status_block; AFD_POLL_INFO poll_info; queue_node_t queue_node; tree_node_t tree_node; @@ -1586,16 +1625,11 @@ static inline void sock__free(sock_state_t* sock_state) { } static int sock__cancel_poll(sock_state_t* sock_state) { - HANDLE afd_helper_handle = - poll_group_get_afd_helper_handle(sock_state->poll_group); assert(sock_state->poll_status == SOCK__POLL_PENDING); - /* CancelIoEx() may fail with ERROR_NOT_FOUND if the overlapped operation has - * already completed. This is not a problem and we proceed normally. */ - if (!HasOverlappedIoCompleted(&sock_state->overlapped) && - !CancelIoEx(afd_helper_handle, &sock_state->overlapped) && - GetLastError() != ERROR_NOT_FOUND) - return_map_error(-1); + if (afd_cancel_poll(poll_group_get_afd_helper_handle(sock_state->poll_group), + &sock_state->io_status_block) < 0) + return -1; sock_state->poll_status = SOCK__POLL_CANCELLED; sock_state->pending_events = 0; @@ -1772,11 +1806,9 @@ int sock_update(port_state_t* port_state, sock_state_t* sock_state) { sock_state->poll_info.Handles[0].Events = sock__epoll_events_to_afd_events(sock_state->user_events); - memset(&sock_state->overlapped, 0, sizeof sock_state->overlapped); - if (afd_poll(poll_group_get_afd_helper_handle(sock_state->poll_group), &sock_state->poll_info, - &sock_state->overlapped) < 0) { + &sock_state->io_status_block) < 0) { switch (GetLastError()) { case ERROR_IO_PENDING: /* Overlapped poll operation in progress; this is expected. */ @@ -1804,10 +1836,10 @@ int sock_update(port_state_t* port_state, sock_state_t* sock_state) { } int sock_feed_event(port_state_t* port_state, - OVERLAPPED* overlapped, + IO_STATUS_BLOCK* io_status_block, struct epoll_event* ev) { sock_state_t* sock_state = - container_of(overlapped, sock_state_t, overlapped); + container_of(io_status_block, sock_state_t, io_status_block); AFD_POLL_INFO* poll_info = &sock_state->poll_info; uint32_t epoll_events = 0; @@ -1818,10 +1850,10 @@ int sock_feed_event(port_state_t* port_state, /* Socket has been deleted earlier and can now be freed. */ return sock__delete(port_state, sock_state, false); - } else if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) { + } else if (io_status_block->Status == STATUS_CANCELLED) { /* The poll request was cancelled by CancelIoEx. */ - } else if (!NT_SUCCESS(overlapped->Internal)) { + } else if (!NT_SUCCESS(io_status_block->Status)) { /* The overlapped request itself failed in an unexpected way. */ epoll_events = EPOLLERR; @@ -1858,6 +1890,10 @@ int sock_feed_event(port_state_t* port_state, return 1; } +sock_state_t* sock_state_from_queue_node(queue_node_t* queue_node) { + return container_of(queue_node, sock_state_t, queue_node); +} + queue_node_t* sock_state_to_queue_node(sock_state_t* sock_state) { return &sock_state->queue_node; } @@ -1870,10 +1906,6 @@ tree_node_t* sock_state_to_tree_node(sock_state_t* sock_state) { return &sock_state->tree_node; } -sock_state_t* sock_state_from_queue_node(queue_node_t* queue_node) { - return container_of(queue_node, sock_state_t, queue_node); -} - void ts_tree_init(ts_tree_t* ts_tree) { tree_init(&ts_tree->tree); InitializeSRWLock(&ts_tree->lock); @@ -1986,23 +2018,23 @@ static inline void tree__rotate_right(tree_t* tree, tree_node_t* node) { break; \ } -#define TREE__FIXUP_AFTER_INSERT(cis, trans) \ - tree_node_t* grandparent = parent->parent; \ - tree_node_t* uncle = grandparent->trans; \ - \ - if (uncle && uncle->red) { \ - parent->red = uncle->red = false; \ - grandparent->red = true; \ - node = grandparent; \ - } else { \ - if (node == parent->trans) { \ - tree__rotate_##cis(tree, parent); \ - node = parent; \ - parent = node->parent; \ - } \ - parent->red = false; \ - grandparent->red = true; \ - tree__rotate_##trans(tree, grandparent); \ +#define TREE__REBALANCE_AFTER_INSERT(cis, trans) \ + tree_node_t* grandparent = parent->parent; \ + tree_node_t* uncle = grandparent->trans; \ + \ + if (uncle && uncle->red) { \ + parent->red = uncle->red = false; \ + grandparent->red = true; \ + node = grandparent; \ + } else { \ + if (node == parent->trans) { \ + tree__rotate_##cis(tree, parent); \ + node = parent; \ + parent = node->parent; \ + } \ + parent->red = false; \ + grandparent->red = true; \ + tree__rotate_##trans(tree, grandparent); \ } int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key) { @@ -2030,9 +2062,9 @@ int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key) { for (; parent && parent->red; parent = node->parent) { if (parent == parent->parent->left) { - TREE__FIXUP_AFTER_INSERT(left, right) + TREE__REBALANCE_AFTER_INSERT(left, right) } else { - TREE__FIXUP_AFTER_INSERT(right, left) + TREE__REBALANCE_AFTER_INSERT(right, left) } } tree->root->red = false; @@ -2040,7 +2072,7 @@ int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key) { return 0; } -#define TREE__FIXUP_AFTER_REMOVE(cis, trans) \ +#define TREE__REBALANCE_AFTER_REMOVE(cis, trans) \ tree_node_t* sibling = parent->trans; \ \ if (sibling->red) { \ @@ -2126,9 +2158,9 @@ void tree_del(tree_t* tree, tree_node_t* node) { if (node == tree->root) break; if (node == parent->left) { - TREE__FIXUP_AFTER_REMOVE(left, right) + TREE__REBALANCE_AFTER_REMOVE(left, right) } else { - TREE__FIXUP_AFTER_REMOVE(right, left) + TREE__REBALANCE_AFTER_REMOVE(right, left) } node = parent; parent = parent->parent; diff --git a/wepoll.h b/wepoll.h index eebde21..e9f4a5d 100644 --- a/wepoll.h +++ b/wepoll.h @@ -2,7 +2,7 @@ * wepoll - epoll for Windows * https://github.com/piscisaureus/wepoll * - * Copyright 2012-2019, Bert Belder + * Copyright 2012-2020, Bert Belder * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -32,9 +32,7 @@ #ifndef WEPOLL_H_ #define WEPOLL_H_ -#ifndef WEPOLL_EXPORT #define WEPOLL_EXPORT -#endif #include