version 1.5.6

This commit is contained in:
Bert Belder 2020-01-13 00:26:46 +01:00
commit 8128da416c
No known key found for this signature in database
GPG Key ID: 7A77887B2E2ED461
3 changed files with 328 additions and 298 deletions

View File

@ -1,7 +1,7 @@
wepoll - epoll for Windows wepoll - epoll for Windows
https://github.com/piscisaureus/wepoll https://github.com/piscisaureus/wepoll
Copyright 2012-2019, Bert Belder <bertbelder@gmail.com> Copyright 2012-2020, Bert Belder <bertbelder@gmail.com>
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without

588
wepoll.c
View File

@ -2,7 +2,7 @@
* wepoll - epoll for Windows * wepoll - epoll for Windows
* https://github.com/piscisaureus/wepoll * https://github.com/piscisaureus/wepoll
* *
* Copyright 2012-2019, Bert Belder <bertbelder@gmail.com> * Copyright 2012-2020, Bert Belder <bertbelder@gmail.com>
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -29,9 +29,7 @@
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/ */
#ifndef WEPOLL_EXPORT
#define WEPOLL_EXPORT #define WEPOLL_EXPORT
#endif
#include <stdint.h> #include <stdint.h>
@ -107,7 +105,8 @@ WEPOLL_EXPORT int epoll_wait(HANDLE ephnd,
} /* extern "C" */ } /* extern "C" */
#endif #endif
#include <malloc.h> #include <assert.h>
#include <stdlib.h> #include <stdlib.h>
#define WEPOLL_INTERNAL static #define WEPOLL_INTERNAL static
@ -165,6 +164,10 @@ typedef NTSTATUS* PNTSTATUS;
#define STATUS_CANCELLED ((NTSTATUS) 0xC0000120L) #define STATUS_CANCELLED ((NTSTATUS) 0xC0000120L)
#endif #endif
#ifndef STATUS_NOT_FOUND
#define STATUS_NOT_FOUND ((NTSTATUS) 0xC0000225L)
#endif
typedef struct _IO_STATUS_BLOCK { typedef struct _IO_STATUS_BLOCK {
NTSTATUS Status; NTSTATUS Status;
ULONG_PTR Information; ULONG_PTR Information;
@ -205,6 +208,13 @@ typedef struct _OBJECT_ATTRIBUTES {
(STANDARD_RIGHTS_REQUIRED | KEYEDEVENT_WAIT | KEYEDEVENT_WAKE) (STANDARD_RIGHTS_REQUIRED | KEYEDEVENT_WAIT | KEYEDEVENT_WAKE)
#define NT_NTDLL_IMPORT_LIST(X) \ #define NT_NTDLL_IMPORT_LIST(X) \
X(NTSTATUS, \
NTAPI, \
NtCancelIoFileEx, \
(HANDLE FileHandle, \
PIO_STATUS_BLOCK IoRequestToCancel, \
PIO_STATUS_BLOCK IoStatusBlock)) \
\
X(NTSTATUS, \ X(NTSTATUS, \
NTAPI, \ NTAPI, \
NtCreateFile, \ NtCreateFile, \
@ -265,25 +275,6 @@ typedef struct _OBJECT_ATTRIBUTES {
NT_NTDLL_IMPORT_LIST(X) NT_NTDLL_IMPORT_LIST(X)
#undef X #undef X
#include <assert.h>
#include <stddef.h>
#ifndef _SSIZE_T_DEFINED
typedef intptr_t ssize_t;
#endif
#define array_count(a) (sizeof(a) / (sizeof((a)[0])))
#define container_of(ptr, type, member) \
((type*) ((uintptr_t) (ptr) - offsetof(type, member)))
#define unused_var(v) ((void) (v))
/* Polyfill `inline` for older versions of msvc (up to Visual Studio 2013) */
#if defined(_MSC_VER) && _MSC_VER < 1900
#define inline __inline
#endif
#define AFD_POLL_RECEIVE 0x0001 #define AFD_POLL_RECEIVE 0x0001
#define AFD_POLL_RECEIVE_EXPEDITED 0x0002 #define AFD_POLL_RECEIVE_EXPEDITED 0x0002
#define AFD_POLL_SEND 0x0004 #define AFD_POLL_SEND 0x0004
@ -306,12 +297,14 @@ typedef struct _AFD_POLL_INFO {
AFD_POLL_HANDLE_INFO Handles[1]; AFD_POLL_HANDLE_INFO Handles[1];
} AFD_POLL_INFO, *PAFD_POLL_INFO; } AFD_POLL_INFO, *PAFD_POLL_INFO;
WEPOLL_INTERNAL int afd_create_helper_handle(HANDLE iocp, WEPOLL_INTERNAL int afd_create_helper_handle(HANDLE iocp_handle,
HANDLE* afd_helper_handle_out); HANDLE* afd_helper_handle_out);
WEPOLL_INTERNAL int afd_poll(HANDLE afd_helper_handle, WEPOLL_INTERNAL int afd_poll(HANDLE afd_helper_handle,
AFD_POLL_INFO* poll_info, AFD_POLL_INFO* poll_info,
OVERLAPPED* overlapped); IO_STATUS_BLOCK* io_status_block);
WEPOLL_INTERNAL int afd_cancel_poll(HANDLE afd_helper_handle,
IO_STATUS_BLOCK* io_status_block);
#define return_map_error(value) \ #define return_map_error(value) \
do { \ do { \
@ -329,9 +322,6 @@ WEPOLL_INTERNAL void err_map_win_error(void);
WEPOLL_INTERNAL void err_set_win_error(DWORD error); WEPOLL_INTERNAL void err_set_win_error(DWORD error);
WEPOLL_INTERNAL int err_check_handle(HANDLE handle); WEPOLL_INTERNAL int err_check_handle(HANDLE handle);
WEPOLL_INTERNAL int ws_global_init(void);
WEPOLL_INTERNAL SOCKET ws_get_base_socket(SOCKET socket);
#define IOCTL_AFD_POLL 0x00012024 #define IOCTL_AFD_POLL 0x00012024
static UNICODE_STRING afd__helper_name = static UNICODE_STRING afd__helper_name =
@ -340,7 +330,8 @@ static UNICODE_STRING afd__helper_name =
static OBJECT_ATTRIBUTES afd__helper_attributes = static OBJECT_ATTRIBUTES afd__helper_attributes =
RTL_CONSTANT_OBJECT_ATTRIBUTES(&afd__helper_name, 0); RTL_CONSTANT_OBJECT_ATTRIBUTES(&afd__helper_name, 0);
int afd_create_helper_handle(HANDLE iocp, HANDLE* afd_helper_handle_out) { int afd_create_helper_handle(HANDLE iocp_handle,
HANDLE* afd_helper_handle_out) {
HANDLE afd_helper_handle; HANDLE afd_helper_handle;
IO_STATUS_BLOCK iosb; IO_STATUS_BLOCK iosb;
NTSTATUS status; NTSTATUS status;
@ -362,7 +353,7 @@ int afd_create_helper_handle(HANDLE iocp, HANDLE* afd_helper_handle_out) {
if (status != STATUS_SUCCESS) if (status != STATUS_SUCCESS)
return_set_error(-1, RtlNtStatusToDosError(status)); return_set_error(-1, RtlNtStatusToDosError(status));
if (CreateIoCompletionPort(afd_helper_handle, iocp, 0, 0) == NULL) if (CreateIoCompletionPort(afd_helper_handle, iocp_handle, 0, 0) == NULL)
goto error; goto error;
if (!SetFileCompletionNotificationModes(afd_helper_handle, if (!SetFileCompletionNotificationModes(afd_helper_handle,
@ -379,33 +370,18 @@ error:
int afd_poll(HANDLE afd_helper_handle, int afd_poll(HANDLE afd_helper_handle,
AFD_POLL_INFO* poll_info, AFD_POLL_INFO* poll_info,
OVERLAPPED* overlapped) { IO_STATUS_BLOCK* io_status_block) {
IO_STATUS_BLOCK* iosb;
HANDLE event;
void* apc_context;
NTSTATUS status; NTSTATUS status;
/* Blocking operation is not supported. */ /* Blocking operation is not supported. */
assert(overlapped != NULL); assert(io_status_block != NULL);
iosb = (IO_STATUS_BLOCK*) &overlapped->Internal; io_status_block->Status = STATUS_PENDING;
event = overlapped->hEvent;
/* Do what other windows APIs would do: if hEvent has it's lowest bit set,
* don't post a completion to the completion port. */
if ((uintptr_t) event & 1) {
event = (HANDLE)((uintptr_t) event & ~(uintptr_t) 1);
apc_context = NULL;
} else {
apc_context = overlapped;
}
iosb->Status = STATUS_PENDING;
status = NtDeviceIoControlFile(afd_helper_handle, status = NtDeviceIoControlFile(afd_helper_handle,
event,
NULL, NULL,
apc_context, NULL,
iosb, io_status_block,
io_status_block,
IOCTL_AFD_POLL, IOCTL_AFD_POLL,
poll_info, poll_info,
sizeof *poll_info, sizeof *poll_info,
@ -420,176 +396,37 @@ int afd_poll(HANDLE afd_helper_handle,
return_set_error(-1, RtlNtStatusToDosError(status)); return_set_error(-1, RtlNtStatusToDosError(status));
} }
int afd_cancel_poll(HANDLE afd_helper_handle,
IO_STATUS_BLOCK* io_status_block) {
NTSTATUS cancel_status;
IO_STATUS_BLOCK cancel_iosb;
/* If the poll operation has already completed or has been cancelled earlier,
* there's nothing left for us to do. */
if (io_status_block->Status != STATUS_PENDING)
return 0;
cancel_status =
NtCancelIoFileEx(afd_helper_handle, io_status_block, &cancel_iosb);
/* NtCancelIoFileEx() may return STATUS_NOT_FOUND if the operation completed
* just before calling NtCancelIoFileEx(). This is not an error. */
if (cancel_status == STATUS_SUCCESS || cancel_status == STATUS_NOT_FOUND)
return 0;
else
return_set_error(-1, RtlNtStatusToDosError(cancel_status));
}
WEPOLL_INTERNAL int epoll_global_init(void); WEPOLL_INTERNAL int epoll_global_init(void);
WEPOLL_INTERNAL int init(void); WEPOLL_INTERNAL int init(void);
#include <stdbool.h>
typedef struct queue_node queue_node_t;
typedef struct queue_node {
queue_node_t* prev;
queue_node_t* next;
} queue_node_t;
typedef struct queue {
queue_node_t head;
} queue_t;
WEPOLL_INTERNAL void queue_init(queue_t* queue);
WEPOLL_INTERNAL void queue_node_init(queue_node_t* node);
WEPOLL_INTERNAL queue_node_t* queue_first(const queue_t* queue);
WEPOLL_INTERNAL queue_node_t* queue_last(const queue_t* queue);
WEPOLL_INTERNAL void queue_prepend(queue_t* queue, queue_node_t* node);
WEPOLL_INTERNAL void queue_append(queue_t* queue, queue_node_t* node);
WEPOLL_INTERNAL void queue_move_first(queue_t* queue, queue_node_t* node);
WEPOLL_INTERNAL void queue_move_last(queue_t* queue, queue_node_t* node);
WEPOLL_INTERNAL void queue_remove(queue_node_t* node);
WEPOLL_INTERNAL bool queue_empty(const queue_t* queue);
WEPOLL_INTERNAL bool queue_enqueued(const queue_node_t* node);
typedef struct port_state port_state_t;
typedef struct poll_group poll_group_t;
WEPOLL_INTERNAL poll_group_t* poll_group_acquire(port_state_t* port);
WEPOLL_INTERNAL void poll_group_release(poll_group_t* poll_group);
WEPOLL_INTERNAL void poll_group_delete(poll_group_t* poll_group);
WEPOLL_INTERNAL poll_group_t* poll_group_from_queue_node(
queue_node_t* queue_node);
WEPOLL_INTERNAL HANDLE
poll_group_get_afd_helper_handle(poll_group_t* poll_group);
/* N.b.: the tree functions do not set errno or LastError when they fail. Each
* of the API functions has at most one failure mode. It is up to the caller to
* set an appropriate error code when necessary. */
typedef struct tree tree_t;
typedef struct tree_node tree_node_t;
typedef struct tree {
tree_node_t* root;
} tree_t;
typedef struct tree_node {
tree_node_t* left;
tree_node_t* right;
tree_node_t* parent;
uintptr_t key;
bool red;
} tree_node_t;
WEPOLL_INTERNAL void tree_init(tree_t* tree);
WEPOLL_INTERNAL void tree_node_init(tree_node_t* node);
WEPOLL_INTERNAL int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key);
WEPOLL_INTERNAL void tree_del(tree_t* tree, tree_node_t* node);
WEPOLL_INTERNAL tree_node_t* tree_find(const tree_t* tree, uintptr_t key);
WEPOLL_INTERNAL tree_node_t* tree_root(const tree_t* tree);
typedef struct port_state port_state_t; typedef struct port_state port_state_t;
typedef struct queue queue_t;
typedef struct sock_state sock_state_t; typedef struct sock_state sock_state_t;
typedef struct ts_tree_node ts_tree_node_t;
WEPOLL_INTERNAL sock_state_t* sock_new(port_state_t* port_state, WEPOLL_INTERNAL port_state_t* port_new(HANDLE* iocp_handle_out);
SOCKET socket);
WEPOLL_INTERNAL void sock_delete(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL void sock_force_delete(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL int sock_set_event(port_state_t* port_state,
sock_state_t* sock_state,
const struct epoll_event* ev);
WEPOLL_INTERNAL int sock_update(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL int sock_feed_event(port_state_t* port_state,
OVERLAPPED* overlapped,
struct epoll_event* ev);
WEPOLL_INTERNAL sock_state_t* sock_state_from_queue_node(
queue_node_t* queue_node);
WEPOLL_INTERNAL queue_node_t* sock_state_to_queue_node(
sock_state_t* sock_state);
WEPOLL_INTERNAL sock_state_t* sock_state_from_tree_node(
tree_node_t* tree_node);
WEPOLL_INTERNAL tree_node_t* sock_state_to_tree_node(sock_state_t* sock_state);
/* The reflock is a special kind of lock that normally prevents a chunk of
* memory from being freed, but does allow the chunk of memory to eventually be
* released in a coordinated fashion.
*
* Under normal operation, threads increase and decrease the reference count,
* which are wait-free operations.
*
* Exactly once during the reflock's lifecycle, a thread holding a reference to
* the lock may "destroy" the lock; this operation blocks until all other
* threads holding a reference to the lock have dereferenced it. After
* "destroy" returns, the calling thread may assume that no other threads have
* a reference to the lock.
*
* Attemmpting to lock or destroy a lock after reflock_unref_and_destroy() has
* been called is invalid and results in undefined behavior. Therefore the user
* should use another lock to guarantee that this can't happen.
*/
typedef struct reflock {
volatile long state; /* 32-bit Interlocked APIs operate on `long` values. */
} reflock_t;
WEPOLL_INTERNAL int reflock_global_init(void);
WEPOLL_INTERNAL void reflock_init(reflock_t* reflock);
WEPOLL_INTERNAL void reflock_ref(reflock_t* reflock);
WEPOLL_INTERNAL void reflock_unref(reflock_t* reflock);
WEPOLL_INTERNAL void reflock_unref_and_destroy(reflock_t* reflock);
typedef struct ts_tree {
tree_t tree;
SRWLOCK lock;
} ts_tree_t;
typedef struct ts_tree_node {
tree_node_t tree_node;
reflock_t reflock;
} ts_tree_node_t;
WEPOLL_INTERNAL void ts_tree_init(ts_tree_t* rtl);
WEPOLL_INTERNAL void ts_tree_node_init(ts_tree_node_t* node);
WEPOLL_INTERNAL int ts_tree_add(ts_tree_t* ts_tree,
ts_tree_node_t* node,
uintptr_t key);
WEPOLL_INTERNAL ts_tree_node_t* ts_tree_del_and_ref(ts_tree_t* ts_tree,
uintptr_t key);
WEPOLL_INTERNAL ts_tree_node_t* ts_tree_find_and_ref(ts_tree_t* ts_tree,
uintptr_t key);
WEPOLL_INTERNAL void ts_tree_node_unref(ts_tree_node_t* node);
WEPOLL_INTERNAL void ts_tree_node_unref_and_destroy(ts_tree_node_t* node);
typedef struct port_state port_state_t;
typedef struct sock_state sock_state_t;
typedef struct port_state {
HANDLE iocp;
tree_t sock_tree;
queue_t sock_update_queue;
queue_t sock_deleted_queue;
queue_t poll_group_queue;
ts_tree_node_t handle_tree_node;
CRITICAL_SECTION lock;
size_t active_poll_count;
} port_state_t;
WEPOLL_INTERNAL port_state_t* port_new(HANDLE* iocp_out);
WEPOLL_INTERNAL int port_close(port_state_t* port_state); WEPOLL_INTERNAL int port_close(port_state_t* port_state);
WEPOLL_INTERNAL int port_delete(port_state_t* port_state); WEPOLL_INTERNAL int port_delete(port_state_t* port_state);
@ -621,12 +458,99 @@ WEPOLL_INTERNAL void port_add_deleted_socket(port_state_t* port_state,
WEPOLL_INTERNAL void port_remove_deleted_socket(port_state_t* port_state, WEPOLL_INTERNAL void port_remove_deleted_socket(port_state_t* port_state,
sock_state_t* sock_state); sock_state_t* sock_state);
static ts_tree_t epoll__handle_tree; WEPOLL_INTERNAL HANDLE port_get_iocp_handle(port_state_t* port_state);
WEPOLL_INTERNAL queue_t* port_get_poll_group_queue(port_state_t* port_state);
static inline port_state_t* epoll__handle_tree_node_to_port( WEPOLL_INTERNAL port_state_t* port_state_from_handle_tree_node(
ts_tree_node_t* tree_node) { ts_tree_node_t* tree_node);
return container_of(tree_node, port_state_t, handle_tree_node); WEPOLL_INTERNAL ts_tree_node_t* port_state_to_handle_tree_node(
} port_state_t* port_state);
/* The reflock is a special kind of lock that normally prevents a chunk of
* memory from being freed, but does allow the chunk of memory to eventually be
* released in a coordinated fashion.
*
* Under normal operation, threads increase and decrease the reference count,
* which are wait-free operations.
*
* Exactly once during the reflock's lifecycle, a thread holding a reference to
* the lock may "destroy" the lock; this operation blocks until all other
* threads holding a reference to the lock have dereferenced it. After
* "destroy" returns, the calling thread may assume that no other threads have
* a reference to the lock.
*
* Attemmpting to lock or destroy a lock after reflock_unref_and_destroy() has
* been called is invalid and results in undefined behavior. Therefore the user
* should use another lock to guarantee that this can't happen.
*/
typedef struct reflock {
volatile long state; /* 32-bit Interlocked APIs operate on `long` values. */
} reflock_t;
WEPOLL_INTERNAL int reflock_global_init(void);
WEPOLL_INTERNAL void reflock_init(reflock_t* reflock);
WEPOLL_INTERNAL void reflock_ref(reflock_t* reflock);
WEPOLL_INTERNAL void reflock_unref(reflock_t* reflock);
WEPOLL_INTERNAL void reflock_unref_and_destroy(reflock_t* reflock);
#include <stdbool.h>
/* N.b.: the tree functions do not set errno or LastError when they fail. Each
* of the API functions has at most one failure mode. It is up to the caller to
* set an appropriate error code when necessary. */
typedef struct tree tree_t;
typedef struct tree_node tree_node_t;
typedef struct tree {
tree_node_t* root;
} tree_t;
typedef struct tree_node {
tree_node_t* left;
tree_node_t* right;
tree_node_t* parent;
uintptr_t key;
bool red;
} tree_node_t;
WEPOLL_INTERNAL void tree_init(tree_t* tree);
WEPOLL_INTERNAL void tree_node_init(tree_node_t* node);
WEPOLL_INTERNAL int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key);
WEPOLL_INTERNAL void tree_del(tree_t* tree, tree_node_t* node);
WEPOLL_INTERNAL tree_node_t* tree_find(const tree_t* tree, uintptr_t key);
WEPOLL_INTERNAL tree_node_t* tree_root(const tree_t* tree);
typedef struct ts_tree {
tree_t tree;
SRWLOCK lock;
} ts_tree_t;
typedef struct ts_tree_node {
tree_node_t tree_node;
reflock_t reflock;
} ts_tree_node_t;
WEPOLL_INTERNAL void ts_tree_init(ts_tree_t* rtl);
WEPOLL_INTERNAL void ts_tree_node_init(ts_tree_node_t* node);
WEPOLL_INTERNAL int ts_tree_add(ts_tree_t* ts_tree,
ts_tree_node_t* node,
uintptr_t key);
WEPOLL_INTERNAL ts_tree_node_t* ts_tree_del_and_ref(ts_tree_t* ts_tree,
uintptr_t key);
WEPOLL_INTERNAL ts_tree_node_t* ts_tree_find_and_ref(ts_tree_t* ts_tree,
uintptr_t key);
WEPOLL_INTERNAL void ts_tree_node_unref(ts_tree_node_t* node);
WEPOLL_INTERNAL void ts_tree_node_unref_and_destroy(ts_tree_node_t* node);
static ts_tree_t epoll__handle_tree;
int epoll_global_init(void) { int epoll_global_init(void) {
ts_tree_init(&epoll__handle_tree); ts_tree_init(&epoll__handle_tree);
@ -636,6 +560,7 @@ int epoll_global_init(void) {
static HANDLE epoll__create(void) { static HANDLE epoll__create(void) {
port_state_t* port_state; port_state_t* port_state;
HANDLE ephnd; HANDLE ephnd;
ts_tree_node_t* tree_node;
if (init() < 0) if (init() < 0)
return NULL; return NULL;
@ -644,9 +569,8 @@ static HANDLE epoll__create(void) {
if (port_state == NULL) if (port_state == NULL)
return NULL; return NULL;
if (ts_tree_add(&epoll__handle_tree, tree_node = port_state_to_handle_tree_node(port_state);
&port_state->handle_tree_node, if (ts_tree_add(&epoll__handle_tree, tree_node, (uintptr_t) ephnd) < 0) {
(uintptr_t) ephnd) < 0) {
/* This should never happen. */ /* This should never happen. */
port_delete(port_state); port_delete(port_state);
return_set_error(NULL, ERROR_ALREADY_EXISTS); return_set_error(NULL, ERROR_ALREADY_EXISTS);
@ -682,7 +606,7 @@ int epoll_close(HANDLE ephnd) {
goto err; goto err;
} }
port_state = epoll__handle_tree_node_to_port(tree_node); port_state = port_state_from_handle_tree_node(tree_node);
port_close(port_state); port_close(port_state);
ts_tree_node_unref_and_destroy(tree_node); ts_tree_node_unref_and_destroy(tree_node);
@ -708,7 +632,7 @@ int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event* ev) {
goto err; goto err;
} }
port_state = epoll__handle_tree_node_to_port(tree_node); port_state = port_state_from_handle_tree_node(tree_node);
r = port_ctl(port_state, op, sock, ev); r = port_ctl(port_state, op, sock, ev);
ts_tree_node_unref(tree_node); ts_tree_node_unref(tree_node);
@ -719,7 +643,7 @@ int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event* ev) {
return 0; return 0;
err: err:
/* On Linux, in the case of epoll_ctl_mod(), EBADF takes priority over other /* On Linux, in the case of epoll_ctl(), EBADF takes priority over other
* errors. Wepoll mimics this behavior. */ * errors. Wepoll mimics this behavior. */
err_check_handle(ephnd); err_check_handle(ephnd);
err_check_handle((HANDLE) sock); err_check_handle((HANDLE) sock);
@ -746,7 +670,7 @@ int epoll_wait(HANDLE ephnd,
goto err; goto err;
} }
port_state = epoll__handle_tree_node_to_port(tree_node); port_state = port_state_from_handle_tree_node(tree_node);
num_events = port_wait(port_state, events, maxevents, timeout); num_events = port_wait(port_state, events, maxevents, timeout);
ts_tree_node_unref(tree_node); ts_tree_node_unref(tree_node);
@ -900,6 +824,23 @@ int err_check_handle(HANDLE handle) {
return 0; return 0;
} }
#include <stddef.h>
#define array_count(a) (sizeof(a) / (sizeof((a)[0])))
#define container_of(ptr, type, member) \
((type*) ((uintptr_t) (ptr) - offsetof(type, member)))
#define unused_var(v) ((void) (v))
/* Polyfill `inline` for older versions of msvc (up to Visual Studio 2013) */
#if defined(_MSC_VER) && _MSC_VER < 1900
#define inline __inline
#endif
WEPOLL_INTERNAL int ws_global_init(void);
WEPOLL_INTERNAL SOCKET ws_get_base_socket(SOCKET socket);
static bool init__done = false; static bool init__done = false;
static INIT_ONCE init__once = INIT_ONCE_STATIC_INIT; static INIT_ONCE init__once = INIT_ONCE_STATIC_INIT;
@ -969,6 +910,44 @@ int nt_global_init(void) {
#include <string.h> #include <string.h>
typedef struct poll_group poll_group_t;
typedef struct queue_node queue_node_t;
WEPOLL_INTERNAL poll_group_t* poll_group_acquire(port_state_t* port);
WEPOLL_INTERNAL void poll_group_release(poll_group_t* poll_group);
WEPOLL_INTERNAL void poll_group_delete(poll_group_t* poll_group);
WEPOLL_INTERNAL poll_group_t* poll_group_from_queue_node(
queue_node_t* queue_node);
WEPOLL_INTERNAL HANDLE
poll_group_get_afd_helper_handle(poll_group_t* poll_group);
typedef struct queue_node {
queue_node_t* prev;
queue_node_t* next;
} queue_node_t;
typedef struct queue {
queue_node_t head;
} queue_t;
WEPOLL_INTERNAL void queue_init(queue_t* queue);
WEPOLL_INTERNAL void queue_node_init(queue_node_t* node);
WEPOLL_INTERNAL queue_node_t* queue_first(const queue_t* queue);
WEPOLL_INTERNAL queue_node_t* queue_last(const queue_t* queue);
WEPOLL_INTERNAL void queue_prepend(queue_t* queue, queue_node_t* node);
WEPOLL_INTERNAL void queue_append(queue_t* queue, queue_node_t* node);
WEPOLL_INTERNAL void queue_move_first(queue_t* queue, queue_node_t* node);
WEPOLL_INTERNAL void queue_move_last(queue_t* queue, queue_node_t* node);
WEPOLL_INTERNAL void queue_remove(queue_node_t* node);
WEPOLL_INTERNAL bool queue_empty(const queue_t* queue);
WEPOLL_INTERNAL bool queue_enqueued(const queue_node_t* node);
static const size_t POLL_GROUP__MAX_GROUP_SIZE = 32; static const size_t POLL_GROUP__MAX_GROUP_SIZE = 32;
typedef struct poll_group { typedef struct poll_group {
@ -979,6 +958,9 @@ typedef struct poll_group {
} poll_group_t; } poll_group_t;
static poll_group_t* poll_group__new(port_state_t* port_state) { static poll_group_t* poll_group__new(port_state_t* port_state) {
HANDLE iocp_handle = port_get_iocp_handle(port_state);
queue_t* poll_group_queue = port_get_poll_group_queue(port_state);
poll_group_t* poll_group = malloc(sizeof *poll_group); poll_group_t* poll_group = malloc(sizeof *poll_group);
if (poll_group == NULL) if (poll_group == NULL)
return_set_error(NULL, ERROR_NOT_ENOUGH_MEMORY); return_set_error(NULL, ERROR_NOT_ENOUGH_MEMORY);
@ -988,13 +970,13 @@ static poll_group_t* poll_group__new(port_state_t* port_state) {
queue_node_init(&poll_group->queue_node); queue_node_init(&poll_group->queue_node);
poll_group->port_state = port_state; poll_group->port_state = port_state;
if (afd_create_helper_handle(port_state->iocp, if (afd_create_helper_handle(iocp_handle, &poll_group->afd_helper_handle) <
&poll_group->afd_helper_handle) < 0) { 0) {
free(poll_group); free(poll_group);
return NULL; return NULL;
} }
queue_append(&port_state->poll_group_queue, &poll_group->queue_node); queue_append(poll_group_queue, &poll_group->queue_node);
return poll_group; return poll_group;
} }
@ -1015,10 +997,11 @@ HANDLE poll_group_get_afd_helper_handle(poll_group_t* poll_group) {
} }
poll_group_t* poll_group_acquire(port_state_t* port_state) { poll_group_t* poll_group_acquire(port_state_t* port_state) {
queue_t* queue = &port_state->poll_group_queue; queue_t* poll_group_queue = port_get_poll_group_queue(port_state);
poll_group_t* poll_group = poll_group_t* poll_group =
!queue_empty(queue) !queue_empty(poll_group_queue)
? container_of(queue_last(queue), poll_group_t, queue_node) ? container_of(
queue_last(poll_group_queue), poll_group_t, queue_node)
: NULL; : NULL;
if (poll_group == NULL || if (poll_group == NULL ||
@ -1028,24 +1011,61 @@ poll_group_t* poll_group_acquire(port_state_t* port_state) {
return NULL; return NULL;
if (++poll_group->group_size == POLL_GROUP__MAX_GROUP_SIZE) if (++poll_group->group_size == POLL_GROUP__MAX_GROUP_SIZE)
queue_move_first(&port_state->poll_group_queue, &poll_group->queue_node); queue_move_first(poll_group_queue, &poll_group->queue_node);
return poll_group; return poll_group;
} }
void poll_group_release(poll_group_t* poll_group) { void poll_group_release(poll_group_t* poll_group) {
port_state_t* port_state = poll_group->port_state; port_state_t* port_state = poll_group->port_state;
queue_t* poll_group_queue = port_get_poll_group_queue(port_state);
poll_group->group_size--; poll_group->group_size--;
assert(poll_group->group_size < POLL_GROUP__MAX_GROUP_SIZE); assert(poll_group->group_size < POLL_GROUP__MAX_GROUP_SIZE);
queue_move_last(&port_state->poll_group_queue, &poll_group->queue_node); queue_move_last(poll_group_queue, &poll_group->queue_node);
/* Poll groups are currently only freed when the epoll port is closed. */ /* Poll groups are currently only freed when the epoll port is closed. */
} }
WEPOLL_INTERNAL sock_state_t* sock_new(port_state_t* port_state,
SOCKET socket);
WEPOLL_INTERNAL void sock_delete(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL void sock_force_delete(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL int sock_set_event(port_state_t* port_state,
sock_state_t* sock_state,
const struct epoll_event* ev);
WEPOLL_INTERNAL int sock_update(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL int sock_feed_event(port_state_t* port_state,
IO_STATUS_BLOCK* io_status_block,
struct epoll_event* ev);
WEPOLL_INTERNAL sock_state_t* sock_state_from_queue_node(
queue_node_t* queue_node);
WEPOLL_INTERNAL queue_node_t* sock_state_to_queue_node(
sock_state_t* sock_state);
WEPOLL_INTERNAL sock_state_t* sock_state_from_tree_node(
tree_node_t* tree_node);
WEPOLL_INTERNAL tree_node_t* sock_state_to_tree_node(sock_state_t* sock_state);
#define PORT__MAX_ON_STACK_COMPLETIONS 256 #define PORT__MAX_ON_STACK_COMPLETIONS 256
typedef struct port_state {
HANDLE iocp_handle;
tree_t sock_tree;
queue_t sock_update_queue;
queue_t sock_deleted_queue;
queue_t poll_group_queue;
ts_tree_node_t handle_tree_node;
CRITICAL_SECTION lock;
size_t active_poll_count;
} port_state_t;
static port_state_t* port__alloc(void) { static port_state_t* port__alloc(void) {
port_state_t* port_state = malloc(sizeof *port_state); port_state_t* port_state = malloc(sizeof *port_state);
if (port_state == NULL) if (port_state == NULL)
@ -1060,28 +1080,29 @@ static void port__free(port_state_t* port) {
} }
static HANDLE port__create_iocp(void) { static HANDLE port__create_iocp(void) {
HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); HANDLE iocp_handle =
if (iocp == NULL) CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (iocp_handle == NULL)
return_map_error(NULL); return_map_error(NULL);
return iocp; return iocp_handle;
} }
port_state_t* port_new(HANDLE* iocp_out) { port_state_t* port_new(HANDLE* iocp_handle_out) {
port_state_t* port_state; port_state_t* port_state;
HANDLE iocp; HANDLE iocp_handle;
port_state = port__alloc(); port_state = port__alloc();
if (port_state == NULL) if (port_state == NULL)
goto err1; goto err1;
iocp = port__create_iocp(); iocp_handle = port__create_iocp();
if (iocp == NULL) if (iocp_handle == NULL)
goto err2; goto err2;
memset(port_state, 0, sizeof *port_state); memset(port_state, 0, sizeof *port_state);
port_state->iocp = iocp; port_state->iocp_handle = iocp_handle;
tree_init(&port_state->sock_tree); tree_init(&port_state->sock_tree);
queue_init(&port_state->sock_update_queue); queue_init(&port_state->sock_update_queue);
queue_init(&port_state->sock_deleted_queue); queue_init(&port_state->sock_deleted_queue);
@ -1089,7 +1110,7 @@ port_state_t* port_new(HANDLE* iocp_out) {
ts_tree_node_init(&port_state->handle_tree_node); ts_tree_node_init(&port_state->handle_tree_node);
InitializeCriticalSection(&port_state->lock); InitializeCriticalSection(&port_state->lock);
*iocp_out = iocp; *iocp_handle_out = iocp_handle;
return port_state; return port_state;
err2: err2:
@ -1099,10 +1120,10 @@ err1:
} }
static int port__close_iocp(port_state_t* port_state) { static int port__close_iocp(port_state_t* port_state) {
HANDLE iocp = port_state->iocp; HANDLE iocp_handle = port_state->iocp_handle;
port_state->iocp = NULL; port_state->iocp_handle = NULL;
if (!CloseHandle(iocp)) if (!CloseHandle(iocp_handle))
return_map_error(-1); return_map_error(-1);
return 0; return 0;
@ -1123,7 +1144,7 @@ int port_delete(port_state_t* port_state) {
queue_node_t* queue_node; queue_node_t* queue_node;
/* At this point the IOCP port should have been closed. */ /* At this point the IOCP port should have been closed. */
assert(port_state->iocp == NULL); assert(port_state->iocp_handle == NULL);
while ((tree_node = tree_root(&port_state->sock_tree)) != NULL) { while ((tree_node = tree_root(&port_state->sock_tree)) != NULL) {
sock_state_t* sock_state = sock_state_from_tree_node(tree_node); sock_state_t* sock_state = sock_state_from_tree_node(tree_node);
@ -1180,10 +1201,11 @@ static int port__feed_events(port_state_t* port_state,
DWORD i; DWORD i;
for (i = 0; i < iocp_event_count; i++) { for (i = 0; i < iocp_event_count; i++) {
OVERLAPPED* overlapped = iocp_events[i].lpOverlapped; IO_STATUS_BLOCK* io_status_block =
(IO_STATUS_BLOCK*) iocp_events[i].lpOverlapped;
struct epoll_event* ev = &epoll_events[epoll_event_count]; struct epoll_event* ev = &epoll_events[epoll_event_count];
epoll_event_count += sock_feed_event(port_state, overlapped, ev); epoll_event_count += sock_feed_event(port_state, io_status_block, ev);
} }
return epoll_event_count; return epoll_event_count;
@ -1203,7 +1225,7 @@ static int port__poll(port_state_t* port_state,
LeaveCriticalSection(&port_state->lock); LeaveCriticalSection(&port_state->lock);
BOOL r = GetQueuedCompletionStatusEx(port_state->iocp, BOOL r = GetQueuedCompletionStatusEx(port_state->iocp_handle,
iocp_events, iocp_events,
maxevents, maxevents,
&completion_count, &completion_count,
@ -1424,6 +1446,23 @@ void port_remove_deleted_socket(port_state_t* port_state,
queue_remove(sock_state_to_queue_node(sock_state)); queue_remove(sock_state_to_queue_node(sock_state));
} }
HANDLE port_get_iocp_handle(port_state_t* port_state) {
assert(port_state->iocp_handle != NULL);
return port_state->iocp_handle;
}
queue_t* port_get_poll_group_queue(port_state_t* port_state) {
return &port_state->poll_group_queue;
}
port_state_t* port_state_from_handle_tree_node(ts_tree_node_t* tree_node) {
return container_of(tree_node, port_state_t, handle_tree_node);
}
ts_tree_node_t* port_state_to_handle_tree_node(port_state_t* port_state) {
return &port_state->handle_tree_node;
}
void queue_init(queue_t* queue) { void queue_init(queue_t* queue) {
queue_node_init(&queue->head); queue_node_init(&queue->head);
} }
@ -1492,8 +1531,8 @@ static const long REFLOCK__POISON = (long) 0x300dead0;
static HANDLE reflock__keyed_event = NULL; static HANDLE reflock__keyed_event = NULL;
int reflock_global_init(void) { int reflock_global_init(void) {
NTSTATUS status = NTSTATUS status = NtCreateKeyedEvent(
NtCreateKeyedEvent(&reflock__keyed_event, KEYEDEVENT_ALL_ACCESS, NULL, 0); &reflock__keyed_event, KEYEDEVENT_ALL_ACCESS, NULL, 0);
if (status != STATUS_SUCCESS) if (status != STATUS_SUCCESS)
return_set_error(-1, RtlNtStatusToDosError(status)); return_set_error(-1, RtlNtStatusToDosError(status));
return 0; return 0;
@ -1561,7 +1600,7 @@ typedef enum sock__poll_status {
} sock__poll_status_t; } sock__poll_status_t;
typedef struct sock_state { typedef struct sock_state {
OVERLAPPED overlapped; IO_STATUS_BLOCK io_status_block;
AFD_POLL_INFO poll_info; AFD_POLL_INFO poll_info;
queue_node_t queue_node; queue_node_t queue_node;
tree_node_t tree_node; tree_node_t tree_node;
@ -1586,16 +1625,11 @@ static inline void sock__free(sock_state_t* sock_state) {
} }
static int sock__cancel_poll(sock_state_t* sock_state) { static int sock__cancel_poll(sock_state_t* sock_state) {
HANDLE afd_helper_handle =
poll_group_get_afd_helper_handle(sock_state->poll_group);
assert(sock_state->poll_status == SOCK__POLL_PENDING); assert(sock_state->poll_status == SOCK__POLL_PENDING);
/* CancelIoEx() may fail with ERROR_NOT_FOUND if the overlapped operation has if (afd_cancel_poll(poll_group_get_afd_helper_handle(sock_state->poll_group),
* already completed. This is not a problem and we proceed normally. */ &sock_state->io_status_block) < 0)
if (!HasOverlappedIoCompleted(&sock_state->overlapped) && return -1;
!CancelIoEx(afd_helper_handle, &sock_state->overlapped) &&
GetLastError() != ERROR_NOT_FOUND)
return_map_error(-1);
sock_state->poll_status = SOCK__POLL_CANCELLED; sock_state->poll_status = SOCK__POLL_CANCELLED;
sock_state->pending_events = 0; sock_state->pending_events = 0;
@ -1772,11 +1806,9 @@ int sock_update(port_state_t* port_state, sock_state_t* sock_state) {
sock_state->poll_info.Handles[0].Events = sock_state->poll_info.Handles[0].Events =
sock__epoll_events_to_afd_events(sock_state->user_events); sock__epoll_events_to_afd_events(sock_state->user_events);
memset(&sock_state->overlapped, 0, sizeof sock_state->overlapped);
if (afd_poll(poll_group_get_afd_helper_handle(sock_state->poll_group), if (afd_poll(poll_group_get_afd_helper_handle(sock_state->poll_group),
&sock_state->poll_info, &sock_state->poll_info,
&sock_state->overlapped) < 0) { &sock_state->io_status_block) < 0) {
switch (GetLastError()) { switch (GetLastError()) {
case ERROR_IO_PENDING: case ERROR_IO_PENDING:
/* Overlapped poll operation in progress; this is expected. */ /* Overlapped poll operation in progress; this is expected. */
@ -1804,10 +1836,10 @@ int sock_update(port_state_t* port_state, sock_state_t* sock_state) {
} }
int sock_feed_event(port_state_t* port_state, int sock_feed_event(port_state_t* port_state,
OVERLAPPED* overlapped, IO_STATUS_BLOCK* io_status_block,
struct epoll_event* ev) { struct epoll_event* ev) {
sock_state_t* sock_state = sock_state_t* sock_state =
container_of(overlapped, sock_state_t, overlapped); container_of(io_status_block, sock_state_t, io_status_block);
AFD_POLL_INFO* poll_info = &sock_state->poll_info; AFD_POLL_INFO* poll_info = &sock_state->poll_info;
uint32_t epoll_events = 0; uint32_t epoll_events = 0;
@ -1818,10 +1850,10 @@ int sock_feed_event(port_state_t* port_state,
/* Socket has been deleted earlier and can now be freed. */ /* Socket has been deleted earlier and can now be freed. */
return sock__delete(port_state, sock_state, false); return sock__delete(port_state, sock_state, false);
} else if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) { } else if (io_status_block->Status == STATUS_CANCELLED) {
/* The poll request was cancelled by CancelIoEx. */ /* The poll request was cancelled by CancelIoEx. */
} else if (!NT_SUCCESS(overlapped->Internal)) { } else if (!NT_SUCCESS(io_status_block->Status)) {
/* The overlapped request itself failed in an unexpected way. */ /* The overlapped request itself failed in an unexpected way. */
epoll_events = EPOLLERR; epoll_events = EPOLLERR;
@ -1858,6 +1890,10 @@ int sock_feed_event(port_state_t* port_state,
return 1; return 1;
} }
sock_state_t* sock_state_from_queue_node(queue_node_t* queue_node) {
return container_of(queue_node, sock_state_t, queue_node);
}
queue_node_t* sock_state_to_queue_node(sock_state_t* sock_state) { queue_node_t* sock_state_to_queue_node(sock_state_t* sock_state) {
return &sock_state->queue_node; return &sock_state->queue_node;
} }
@ -1870,10 +1906,6 @@ tree_node_t* sock_state_to_tree_node(sock_state_t* sock_state) {
return &sock_state->tree_node; return &sock_state->tree_node;
} }
sock_state_t* sock_state_from_queue_node(queue_node_t* queue_node) {
return container_of(queue_node, sock_state_t, queue_node);
}
void ts_tree_init(ts_tree_t* ts_tree) { void ts_tree_init(ts_tree_t* ts_tree) {
tree_init(&ts_tree->tree); tree_init(&ts_tree->tree);
InitializeSRWLock(&ts_tree->lock); InitializeSRWLock(&ts_tree->lock);
@ -1986,7 +2018,7 @@ static inline void tree__rotate_right(tree_t* tree, tree_node_t* node) {
break; \ break; \
} }
#define TREE__FIXUP_AFTER_INSERT(cis, trans) \ #define TREE__REBALANCE_AFTER_INSERT(cis, trans) \
tree_node_t* grandparent = parent->parent; \ tree_node_t* grandparent = parent->parent; \
tree_node_t* uncle = grandparent->trans; \ tree_node_t* uncle = grandparent->trans; \
\ \
@ -2030,9 +2062,9 @@ int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key) {
for (; parent && parent->red; parent = node->parent) { for (; parent && parent->red; parent = node->parent) {
if (parent == parent->parent->left) { if (parent == parent->parent->left) {
TREE__FIXUP_AFTER_INSERT(left, right) TREE__REBALANCE_AFTER_INSERT(left, right)
} else { } else {
TREE__FIXUP_AFTER_INSERT(right, left) TREE__REBALANCE_AFTER_INSERT(right, left)
} }
} }
tree->root->red = false; tree->root->red = false;
@ -2040,7 +2072,7 @@ int tree_add(tree_t* tree, tree_node_t* node, uintptr_t key) {
return 0; return 0;
} }
#define TREE__FIXUP_AFTER_REMOVE(cis, trans) \ #define TREE__REBALANCE_AFTER_REMOVE(cis, trans) \
tree_node_t* sibling = parent->trans; \ tree_node_t* sibling = parent->trans; \
\ \
if (sibling->red) { \ if (sibling->red) { \
@ -2126,9 +2158,9 @@ void tree_del(tree_t* tree, tree_node_t* node) {
if (node == tree->root) if (node == tree->root)
break; break;
if (node == parent->left) { if (node == parent->left) {
TREE__FIXUP_AFTER_REMOVE(left, right) TREE__REBALANCE_AFTER_REMOVE(left, right)
} else { } else {
TREE__FIXUP_AFTER_REMOVE(right, left) TREE__REBALANCE_AFTER_REMOVE(right, left)
} }
node = parent; node = parent;
parent = parent->parent; parent = parent->parent;

View File

@ -2,7 +2,7 @@
* wepoll - epoll for Windows * wepoll - epoll for Windows
* https://github.com/piscisaureus/wepoll * https://github.com/piscisaureus/wepoll
* *
* Copyright 2012-2019, Bert Belder <bertbelder@gmail.com> * Copyright 2012-2020, Bert Belder <bertbelder@gmail.com>
* All rights reserved. * All rights reserved.
* *
* Redistribution and use in source and binary forms, with or without * Redistribution and use in source and binary forms, with or without
@ -32,9 +32,7 @@
#ifndef WEPOLL_H_ #ifndef WEPOLL_H_
#define WEPOLL_H_ #define WEPOLL_H_
#ifndef WEPOLL_EXPORT
#define WEPOLL_EXPORT #define WEPOLL_EXPORT
#endif
#include <stdint.h> #include <stdint.h>