version 1.2.1

This commit is contained in:
Bert Belder 2017-12-07 19:12:52 +01:00
commit af5b83ac9d
2 changed files with 216 additions and 160 deletions

View File

@ -1,5 +1,7 @@
# wepoll - epoll for windows # wepoll - epoll for windows
[![][ci status badge]][ci status link]
This library implements the [epoll][man epoll] API for Windows This library implements the [epoll][man epoll] API for Windows
applications. It attempts to be efficient, and to match the Linux API applications. It attempts to be efficient, and to match the Linux API
as semantics as closely as possible. as semantics as closely as possible.
@ -112,6 +114,9 @@ int epoll_wait(HANDLE ephnd,
* TODO: expand * TODO: expand
* [man page][man epoll_wait] * [man page][man epoll_wait]
[ci status badge]: https://ci.appveyor.com/api/projects/status/github/piscisaureus/wepoll?branch=master&svg=true
[ci status link]: https://ci.appveyor.com/project/piscisaureus/wepoll/branch/master
[dist]: https://github.com/piscisaureus/wepoll/tree/dist [dist]: https://github.com/piscisaureus/wepoll/tree/dist
[man epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html [man epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
[man epoll_create]: http://man7.org/linux/man-pages/man2/epoll_create.2.html [man epoll_create]: http://man7.org/linux/man-pages/man2/epoll_create.2.html

371
wepoll.c
View File

@ -118,9 +118,13 @@ WEPOLL_EXPORT int epoll_wait(HANDLE ephnd,
#define WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN
#endif #endif
#include <WS2tcpip.h> #pragma warning(push, 1)
#include <WinSock2.h>
#include <Windows.h> #include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma warning(pop)
WEPOLL_INTERNAL int nt_global_init(void); WEPOLL_INTERNAL int nt_global_init(void);
@ -277,6 +281,11 @@ typedef intptr_t ssize_t;
#define unused(v) ((void) (v)) #define unused(v) ((void) (v))
#if defined(_MSC_VER) && _MSC_VER < 1900
/* Polyfill `inline` for msvc 12 (Visual Studio 2013) */
#define inline __inline
#endif
#ifdef __clang__ #ifdef __clang__
/* Polyfill static_assert() because clang doesn't support it. */ /* Polyfill static_assert() because clang doesn't support it. */
#define static_assert(condition, message) typedef __attribute__( \ #define static_assert(condition, message) typedef __attribute__( \
@ -491,7 +500,7 @@ static ssize_t _afd_get_protocol_info(SOCKET socket,
SO_PROTOCOL_INFOW, SO_PROTOCOL_INFOW,
(char*) protocol_info, (char*) protocol_info,
&opt_len) != 0) &opt_len) != 0)
return_error(-1); return_handle_error(-1, socket);
id = -1; id = -1;
for (size_t i = 0; i < array_count(AFD_PROVIDER_GUID_LIST); i++) { for (size_t i = 0; i < array_count(AFD_PROVIDER_GUID_LIST); i++) {
@ -505,7 +514,7 @@ static ssize_t _afd_get_protocol_info(SOCKET socket,
/* Check if the protocol uses an msafd socket. */ /* Check if the protocol uses an msafd socket. */
if (id < 0) if (id < 0)
return_error(-1, ERROR_NOT_SUPPORTED); return_error(-1, ERROR_DEVICE_FEATURE_NOT_SUPPORTED);
return id; return id;
} }
@ -525,19 +534,18 @@ WEPOLL_INTERNAL ssize_t afd_get_protocol(SOCKET socket,
if (id < 0) { if (id < 0) {
/* If getting protocol information failed, it might be due to the socket /* If getting protocol information failed, it might be due to the socket
* not being an AFD socket. If so, attempt to fetch the underlying base * not being an AFD socket. If so, attempt to fetch the underlying base
* socket, then try again to obtain protocol information. If that also * socket, then try again to obtain protocol information. */
* fails, return the *original* error. */ DWORD error = GetLastError();
DWORD original_error = GetLastError(); if (error != ERROR_DEVICE_FEATURE_NOT_SUPPORTED)
if (original_error != ERROR_NOT_SUPPORTED) return -1;
return_error(-1);
afd_socket = _afd_get_base_socket(socket); afd_socket = _afd_get_base_socket(socket);
if (afd_socket == INVALID_SOCKET || afd_socket == socket) if (afd_socket == INVALID_SOCKET || afd_socket == socket)
return_error(-1, original_error); return_error(-1, error);
id = _afd_get_protocol_info(afd_socket, protocol_info); id = _afd_get_protocol_info(afd_socket, protocol_info);
if (id < 0) if (id < 0)
return_error(-1, original_error); return -1;
} }
*afd_socket_out = afd_socket; *afd_socket_out = afd_socket;
@ -1158,7 +1166,8 @@ typedef struct ep_port {
poll_group_allocator_t* poll_group_allocator_t*
poll_group_allocators[array_count(AFD_PROVIDER_GUID_LIST)]; poll_group_allocators[array_count(AFD_PROVIDER_GUID_LIST)];
tree_t sock_tree; tree_t sock_tree;
queue_t update_queue; queue_t sock_update_queue;
queue_t sock_deleted_queue;
reflock_tree_node_t handle_tree_node; reflock_tree_node_t handle_tree_node;
CRITICAL_SECTION lock; CRITICAL_SECTION lock;
size_t active_poll_count; size_t active_poll_count;
@ -1182,7 +1191,8 @@ WEPOLL_INTERNAL poll_group_t* ep_port_acquire_poll_group(
ep_port_t* port_info, ep_port_t* port_info,
size_t protocol_id, size_t protocol_id,
const WSAPROTOCOL_INFOW* protocol_info); const WSAPROTOCOL_INFOW* protocol_info);
WEPOLL_INTERNAL void ep_port_release_poll_group(poll_group_t* poll_group); WEPOLL_INTERNAL void ep_port_release_poll_group(ep_port_t* port_info,
poll_group_t* poll_group);
WEPOLL_INTERNAL int ep_port_add_socket(ep_port_t* port_info, WEPOLL_INTERNAL int ep_port_add_socket(ep_port_t* port_info,
ep_sock_t* sock_info, ep_sock_t* sock_info,
@ -1194,10 +1204,13 @@ WEPOLL_INTERNAL ep_sock_t* ep_port_find_socket(ep_port_t* port_info,
WEPOLL_INTERNAL void ep_port_request_socket_update(ep_port_t* port_info, WEPOLL_INTERNAL void ep_port_request_socket_update(ep_port_t* port_info,
ep_sock_t* sock_info); ep_sock_t* sock_info);
WEPOLL_INTERNAL void ep_port_clear_socket_update(ep_port_t* port_info, WEPOLL_INTERNAL void ep_port_cancel_socket_update(ep_port_t* port_info,
ep_sock_t* sock_info); ep_sock_t* sock_info);
WEPOLL_INTERNAL bool ep_port_is_socket_update_pending(ep_port_t* port_info,
ep_sock_t* sock_info); WEPOLL_INTERNAL void ep_port_add_deleted_socket(ep_port_t* port_info,
ep_sock_t* sock_info);
WEPOLL_INTERNAL void ep_port_remove_deleted_socket(ep_port_t* port_info,
ep_sock_t* sock_info);
static reflock_tree_t _epoll_handle_tree; static reflock_tree_t _epoll_handle_tree;
@ -1295,6 +1308,9 @@ int epoll_wait(HANDLE ephnd,
ep_port_t* port_info; ep_port_t* port_info;
int result; int result;
if (maxevents <= 0)
return_error(-1, ERROR_INVALID_PARAMETER);
if (init() < 0) if (init() < 0)
return -1; return -1;
@ -1311,102 +1327,103 @@ int epoll_wait(HANDLE ephnd,
return result; return result;
} }
#define _ERROR_ERRNO_MAP(X) \ #define _ERROR_ERRNO_MAP(X) \
X(ERROR_ACCESS_DENIED, EACCES) \ X(ERROR_ACCESS_DENIED, EACCES) \
X(ERROR_ALREADY_EXISTS, EEXIST) \ X(ERROR_ALREADY_EXISTS, EEXIST) \
X(ERROR_BAD_COMMAND, EACCES) \ X(ERROR_BAD_COMMAND, EACCES) \
X(ERROR_BAD_EXE_FORMAT, ENOEXEC) \ X(ERROR_BAD_EXE_FORMAT, ENOEXEC) \
X(ERROR_BAD_LENGTH, EACCES) \ X(ERROR_BAD_LENGTH, EACCES) \
X(ERROR_BAD_NETPATH, ENOENT) \ X(ERROR_BAD_NETPATH, ENOENT) \
X(ERROR_BAD_NET_NAME, ENOENT) \ X(ERROR_BAD_NET_NAME, ENOENT) \
X(ERROR_BAD_NET_RESP, ENETDOWN) \ X(ERROR_BAD_NET_RESP, ENETDOWN) \
X(ERROR_BAD_PATHNAME, ENOENT) \ X(ERROR_BAD_PATHNAME, ENOENT) \
X(ERROR_BROKEN_PIPE, EPIPE) \ X(ERROR_BROKEN_PIPE, EPIPE) \
X(ERROR_CANNOT_MAKE, EACCES) \ X(ERROR_CANNOT_MAKE, EACCES) \
X(ERROR_COMMITMENT_LIMIT, ENOMEM) \ X(ERROR_COMMITMENT_LIMIT, ENOMEM) \
X(ERROR_CONNECTION_ABORTED, ECONNABORTED) \ X(ERROR_CONNECTION_ABORTED, ECONNABORTED) \
X(ERROR_CONNECTION_ACTIVE, EISCONN) \ X(ERROR_CONNECTION_ACTIVE, EISCONN) \
X(ERROR_CONNECTION_REFUSED, ECONNREFUSED) \ X(ERROR_CONNECTION_REFUSED, ECONNREFUSED) \
X(ERROR_CRC, EACCES) \ X(ERROR_CRC, EACCES) \
X(ERROR_DIR_NOT_EMPTY, ENOTEMPTY) \ X(ERROR_DEVICE_FEATURE_NOT_SUPPORTED, EPERM) \
X(ERROR_DISK_FULL, ENOSPC) \ X(ERROR_DIR_NOT_EMPTY, ENOTEMPTY) \
X(ERROR_DUP_NAME, EADDRINUSE) \ X(ERROR_DISK_FULL, ENOSPC) \
X(ERROR_FILENAME_EXCED_RANGE, ENOENT) \ X(ERROR_DUP_NAME, EADDRINUSE) \
X(ERROR_FILE_NOT_FOUND, ENOENT) \ X(ERROR_FILENAME_EXCED_RANGE, ENOENT) \
X(ERROR_GEN_FAILURE, EACCES) \ X(ERROR_FILE_NOT_FOUND, ENOENT) \
X(ERROR_GRACEFUL_DISCONNECT, EPIPE) \ X(ERROR_GEN_FAILURE, EACCES) \
X(ERROR_HOST_DOWN, EHOSTUNREACH) \ X(ERROR_GRACEFUL_DISCONNECT, EPIPE) \
X(ERROR_HOST_UNREACHABLE, EHOSTUNREACH) \ X(ERROR_HOST_DOWN, EHOSTUNREACH) \
X(ERROR_INSUFFICIENT_BUFFER, EFAULT) \ X(ERROR_HOST_UNREACHABLE, EHOSTUNREACH) \
X(ERROR_INVALID_ADDRESS, EADDRNOTAVAIL) \ X(ERROR_INSUFFICIENT_BUFFER, EFAULT) \
X(ERROR_INVALID_FUNCTION, EINVAL) \ X(ERROR_INVALID_ADDRESS, EADDRNOTAVAIL) \
X(ERROR_INVALID_HANDLE, EBADF) \ X(ERROR_INVALID_FUNCTION, EINVAL) \
X(ERROR_INVALID_NETNAME, EADDRNOTAVAIL) \ X(ERROR_INVALID_HANDLE, EBADF) \
X(ERROR_INVALID_PARAMETER, EINVAL) \ X(ERROR_INVALID_NETNAME, EADDRNOTAVAIL) \
X(ERROR_INVALID_USER_BUFFER, EMSGSIZE) \ X(ERROR_INVALID_PARAMETER, EINVAL) \
X(ERROR_IO_PENDING, EINPROGRESS) \ X(ERROR_INVALID_USER_BUFFER, EMSGSIZE) \
X(ERROR_LOCK_VIOLATION, EACCES) \ X(ERROR_IO_PENDING, EINPROGRESS) \
X(ERROR_MORE_DATA, EMSGSIZE) \ X(ERROR_LOCK_VIOLATION, EACCES) \
X(ERROR_NETNAME_DELETED, ECONNABORTED) \ X(ERROR_MORE_DATA, EMSGSIZE) \
X(ERROR_NETWORK_ACCESS_DENIED, EACCES) \ X(ERROR_NETNAME_DELETED, ECONNABORTED) \
X(ERROR_NETWORK_BUSY, ENETDOWN) \ X(ERROR_NETWORK_ACCESS_DENIED, EACCES) \
X(ERROR_NETWORK_UNREACHABLE, ENETUNREACH) \ X(ERROR_NETWORK_BUSY, ENETDOWN) \
X(ERROR_NOACCESS, EFAULT) \ X(ERROR_NETWORK_UNREACHABLE, ENETUNREACH) \
X(ERROR_NONPAGED_SYSTEM_RESOURCES, ENOMEM) \ X(ERROR_NOACCESS, EFAULT) \
X(ERROR_NOT_ENOUGH_MEMORY, ENOMEM) \ X(ERROR_NONPAGED_SYSTEM_RESOURCES, ENOMEM) \
X(ERROR_NOT_ENOUGH_QUOTA, ENOMEM) \ X(ERROR_NOT_ENOUGH_MEMORY, ENOMEM) \
X(ERROR_NOT_FOUND, ENOENT) \ X(ERROR_NOT_ENOUGH_QUOTA, ENOMEM) \
X(ERROR_NOT_LOCKED, EACCES) \ X(ERROR_NOT_FOUND, ENOENT) \
X(ERROR_NOT_READY, EACCES) \ X(ERROR_NOT_LOCKED, EACCES) \
X(ERROR_NOT_SAME_DEVICE, EXDEV) \ X(ERROR_NOT_READY, EACCES) \
X(ERROR_NOT_SUPPORTED, EOPNOTSUPP) \ X(ERROR_NOT_SAME_DEVICE, EXDEV) \
X(ERROR_NO_MORE_FILES, ENOENT) \ X(ERROR_NOT_SUPPORTED, ENOTSUP) \
X(ERROR_NO_SYSTEM_RESOURCES, ENOMEM) \ X(ERROR_NO_MORE_FILES, ENOENT) \
X(ERROR_OPERATION_ABORTED, EINTR) \ X(ERROR_NO_SYSTEM_RESOURCES, ENOMEM) \
X(ERROR_OUT_OF_PAPER, EACCES) \ X(ERROR_OPERATION_ABORTED, EINTR) \
X(ERROR_PAGED_SYSTEM_RESOURCES, ENOMEM) \ X(ERROR_OUT_OF_PAPER, EACCES) \
X(ERROR_PAGEFILE_QUOTA, ENOMEM) \ X(ERROR_PAGED_SYSTEM_RESOURCES, ENOMEM) \
X(ERROR_PATH_NOT_FOUND, ENOENT) \ X(ERROR_PAGEFILE_QUOTA, ENOMEM) \
X(ERROR_PIPE_NOT_CONNECTED, EPIPE) \ X(ERROR_PATH_NOT_FOUND, ENOENT) \
X(ERROR_PORT_UNREACHABLE, ECONNRESET) \ X(ERROR_PIPE_NOT_CONNECTED, EPIPE) \
X(ERROR_PROTOCOL_UNREACHABLE, ENETUNREACH) \ X(ERROR_PORT_UNREACHABLE, ECONNRESET) \
X(ERROR_REM_NOT_LIST, ECONNREFUSED) \ X(ERROR_PROTOCOL_UNREACHABLE, ENETUNREACH) \
X(ERROR_REQUEST_ABORTED, EINTR) \ X(ERROR_REM_NOT_LIST, ECONNREFUSED) \
X(ERROR_REQ_NOT_ACCEP, EWOULDBLOCK) \ X(ERROR_REQUEST_ABORTED, EINTR) \
X(ERROR_SECTOR_NOT_FOUND, EACCES) \ X(ERROR_REQ_NOT_ACCEP, EWOULDBLOCK) \
X(ERROR_SEM_TIMEOUT, ETIMEDOUT) \ X(ERROR_SECTOR_NOT_FOUND, EACCES) \
X(ERROR_SHARING_VIOLATION, EACCES) \ X(ERROR_SEM_TIMEOUT, ETIMEDOUT) \
X(ERROR_TOO_MANY_NAMES, ENOMEM) \ X(ERROR_SHARING_VIOLATION, EACCES) \
X(ERROR_TOO_MANY_OPEN_FILES, EMFILE) \ X(ERROR_TOO_MANY_NAMES, ENOMEM) \
X(ERROR_UNEXP_NET_ERR, ECONNABORTED) \ X(ERROR_TOO_MANY_OPEN_FILES, EMFILE) \
X(ERROR_WAIT_NO_CHILDREN, ECHILD) \ X(ERROR_UNEXP_NET_ERR, ECONNABORTED) \
X(ERROR_WORKING_SET_QUOTA, ENOMEM) \ X(ERROR_WAIT_NO_CHILDREN, ECHILD) \
X(ERROR_WRITE_PROTECT, EACCES) \ X(ERROR_WORKING_SET_QUOTA, ENOMEM) \
X(ERROR_WRONG_DISK, EACCES) \ X(ERROR_WRITE_PROTECT, EACCES) \
X(WSAEACCES, EACCES) \ X(ERROR_WRONG_DISK, EACCES) \
X(WSAEADDRINUSE, EADDRINUSE) \ X(WSAEACCES, EACCES) \
X(WSAEADDRNOTAVAIL, EADDRNOTAVAIL) \ X(WSAEADDRINUSE, EADDRINUSE) \
X(WSAEAFNOSUPPORT, EAFNOSUPPORT) \ X(WSAEADDRNOTAVAIL, EADDRNOTAVAIL) \
X(WSAECONNABORTED, ECONNABORTED) \ X(WSAEAFNOSUPPORT, EAFNOSUPPORT) \
X(WSAECONNREFUSED, ECONNREFUSED) \ X(WSAECONNABORTED, ECONNABORTED) \
X(WSAECONNRESET, ECONNRESET) \ X(WSAECONNREFUSED, ECONNREFUSED) \
X(WSAEDISCON, EPIPE) \ X(WSAECONNRESET, ECONNRESET) \
X(WSAEFAULT, EFAULT) \ X(WSAEDISCON, EPIPE) \
X(WSAEHOSTDOWN, EHOSTUNREACH) \ X(WSAEFAULT, EFAULT) \
X(WSAEHOSTUNREACH, EHOSTUNREACH) \ X(WSAEHOSTDOWN, EHOSTUNREACH) \
X(WSAEINTR, EINTR) \ X(WSAEHOSTUNREACH, EHOSTUNREACH) \
X(WSAEINVAL, EINVAL) \ X(WSAEINTR, EINTR) \
X(WSAEISCONN, EISCONN) \ X(WSAEINVAL, EINVAL) \
X(WSAEMSGSIZE, EMSGSIZE) \ X(WSAEISCONN, EISCONN) \
X(WSAENETDOWN, ENETDOWN) \ X(WSAEMSGSIZE, EMSGSIZE) \
X(WSAENETRESET, EHOSTUNREACH) \ X(WSAENETDOWN, ENETDOWN) \
X(WSAENETUNREACH, ENETUNREACH) \ X(WSAENETRESET, EHOSTUNREACH) \
X(WSAENOBUFS, ENOMEM) \ X(WSAENETUNREACH, ENETUNREACH) \
X(WSAENOTCONN, ENOTCONN) \ X(WSAENOBUFS, ENOMEM) \
X(WSAENOTSOCK, EBADF) \ X(WSAENOTCONN, ENOTCONN) \
X(WSAEOPNOTSUPP, EOPNOTSUPP) \ X(WSAENOTSOCK, ENOTSOCK) \
X(WSAESHUTDOWN, EPIPE) \ X(WSAEOPNOTSUPP, EOPNOTSUPP) \
X(WSAETIMEDOUT, ETIMEDOUT) \ X(WSAESHUTDOWN, EPIPE) \
X(WSAETIMEDOUT, ETIMEDOUT) \
X(WSAEWOULDBLOCK, EWOULDBLOCK) X(WSAEWOULDBLOCK, EWOULDBLOCK)
errno_t err_map_win_error_to_errno(DWORD error) { errno_t err_map_win_error_to_errno(DWORD error) {
@ -1679,7 +1696,8 @@ ep_port_t* ep_port_new(HANDLE* iocp_out) {
memset(port_info, 0, sizeof *port_info); memset(port_info, 0, sizeof *port_info);
port_info->iocp = iocp; port_info->iocp = iocp;
queue_init(&port_info->update_queue); queue_init(&port_info->sock_update_queue);
queue_init(&port_info->sock_deleted_queue);
tree_init(&port_info->sock_tree); tree_init(&port_info->sock_tree);
reflock_tree_node_init(&port_info->handle_tree_node); reflock_tree_node_init(&port_info->handle_tree_node);
InitializeCriticalSection(&port_info->lock); InitializeCriticalSection(&port_info->lock);
@ -1715,6 +1733,7 @@ int ep_port_close(ep_port_t* port_info) {
int ep_port_delete(ep_port_t* port_info) { int ep_port_delete(ep_port_t* port_info) {
tree_node_t* tree_node; tree_node_t* tree_node;
queue_node_t* queue_node;
EnterCriticalSection(&port_info->lock); EnterCriticalSection(&port_info->lock);
@ -1726,6 +1745,11 @@ int ep_port_delete(ep_port_t* port_info) {
ep_sock_force_delete(port_info, sock_info); ep_sock_force_delete(port_info, sock_info);
} }
while ((queue_node = queue_first(&port_info->sock_deleted_queue)) != NULL) {
ep_sock_t* sock_info = container_of(queue_node, ep_sock_t, queue_node);
ep_sock_force_delete(port_info, sock_info);
}
for (size_t i = 0; i < array_count(port_info->poll_group_allocators); i++) { for (size_t i = 0; i < array_count(port_info->poll_group_allocators); i++) {
poll_group_allocator_t* pga = port_info->poll_group_allocators[i]; poll_group_allocator_t* pga = port_info->poll_group_allocators[i];
if (pga != NULL) if (pga != NULL)
@ -1742,12 +1766,12 @@ int ep_port_delete(ep_port_t* port_info) {
} }
static int _ep_port_update_events(ep_port_t* port_info) { static int _ep_port_update_events(ep_port_t* port_info) {
queue_t* update_queue = &port_info->update_queue; queue_t* sock_update_queue = &port_info->sock_update_queue;
/* Walk the queue, submitting new poll requests for every socket that needs /* Walk the queue, submitting new poll requests for every socket that needs
* it. */ * it. */
while (!queue_empty(update_queue)) { while (!queue_empty(sock_update_queue)) {
queue_node_t* queue_node = queue_first(update_queue); queue_node_t* queue_node = queue_first(sock_update_queue);
ep_sock_t* sock_info = container_of(queue_node, ep_sock_t, queue_node); ep_sock_t* sock_info = container_of(queue_node, ep_sock_t, queue_node);
if (ep_sock_update(port_info, sock_info) < 0) if (ep_sock_update(port_info, sock_info) < 0)
@ -1942,7 +1966,7 @@ static int _ep_port_ctl_op(ep_port_t* port_info,
case EPOLL_CTL_DEL: case EPOLL_CTL_DEL:
return _ep_port_ctl_del(port_info, sock); return _ep_port_ctl_del(port_info, sock);
default: default:
return_error(-1, ERROR_INVALID_PARAMETER); return_handle_error(-1, sock, ERROR_INVALID_PARAMETER);
} }
} }
@ -2005,28 +2029,38 @@ poll_group_t* ep_port_acquire_poll_group(
return poll_group_acquire(pga); return poll_group_acquire(pga);
} }
void ep_port_release_poll_group(poll_group_t* poll_group) { void ep_port_release_poll_group(ep_port_t* port_info,
poll_group_t* poll_group) {
unused(port_info);
poll_group_release(poll_group); poll_group_release(poll_group);
} }
void ep_port_request_socket_update(ep_port_t* port_info, void ep_port_request_socket_update(ep_port_t* port_info,
ep_sock_t* sock_info) { ep_sock_t* sock_info) {
if (ep_port_is_socket_update_pending(port_info, sock_info)) if (queue_enqueued(&sock_info->queue_node))
return; return;
queue_append(&port_info->update_queue, &sock_info->queue_node); queue_append(&port_info->sock_update_queue, &sock_info->queue_node);
assert(ep_port_is_socket_update_pending(port_info, sock_info));
} }
void ep_port_clear_socket_update(ep_port_t* port_info, ep_sock_t* sock_info) { void ep_port_cancel_socket_update(ep_port_t* port_info, ep_sock_t* sock_info) {
if (!ep_port_is_socket_update_pending(port_info, sock_info)) unused(port_info);
if (!queue_enqueued(&sock_info->queue_node))
return; return;
queue_remove(&sock_info->queue_node); queue_remove(&sock_info->queue_node);
} }
bool ep_port_is_socket_update_pending(ep_port_t* port_info, void ep_port_add_deleted_socket(ep_port_t* port_info, ep_sock_t* sock_info) {
ep_sock_t* sock_info) { if (queue_enqueued(&sock_info->queue_node))
return;
queue_append(&port_info->sock_deleted_queue, &sock_info->queue_node);
}
void ep_port_remove_deleted_socket(ep_port_t* port_info,
ep_sock_t* sock_info) {
unused(port_info); unused(port_info);
return queue_enqueued(&sock_info->queue_node); if (!queue_enqueued(&sock_info->queue_node))
return;
queue_remove(&sock_info->queue_node);
} }
void queue_init(queue_t* queue) { void queue_init(queue_t* queue) {
@ -2267,7 +2301,7 @@ typedef struct _ep_sock_private {
uint32_t user_events; uint32_t user_events;
uint32_t pending_events; uint32_t pending_events;
_poll_status_t poll_status; _poll_status_t poll_status;
unsigned deleted : 1; bool delete_pending;
} _ep_sock_private_t; } _ep_sock_private_t;
static DWORD _epoll_events_to_afd_events(uint32_t epoll_events) { static DWORD _epoll_events_to_afd_events(uint32_t epoll_events) {
@ -2388,10 +2422,22 @@ static inline _ep_sock_private_t* _ep_sock_alloc(void) {
} }
static inline void _ep_sock_free(_ep_sock_private_t* sock_private) { static inline void _ep_sock_free(_ep_sock_private_t* sock_private) {
assert(sock_private->poll_status == _POLL_IDLE);
free(sock_private); free(sock_private);
} }
static int _ep_sock_cancel_poll(_ep_sock_private_t* sock_private) {
assert(sock_private->poll_status == _POLL_PENDING);
if (_poll_req_cancel(&sock_private->poll_req,
poll_group_get_socket(sock_private->poll_group)) < 0)
return -1;
sock_private->poll_status = _POLL_CANCELLED;
sock_private->pending_events = 0;
return 0;
}
ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket) { ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket) {
SOCKET afd_socket; SOCKET afd_socket;
ssize_t protocol_id; ssize_t protocol_id;
@ -2431,39 +2477,46 @@ ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket) {
err2: err2:
_ep_sock_free(sock_private); _ep_sock_free(sock_private);
err1: err1:
ep_port_release_poll_group(poll_group); ep_port_release_poll_group(port_info, poll_group);
return NULL; return NULL;
} }
void ep_sock_delete(ep_port_t* port_info, ep_sock_t* sock_info) { static void _ep_sock_delete(ep_port_t* port_info,
ep_sock_t* sock_info,
bool force) {
_ep_sock_private_t* sock_private = _ep_sock_private(sock_info); _ep_sock_private_t* sock_private = _ep_sock_private(sock_info);
assert(!sock_private->deleted); if (!sock_private->delete_pending) {
sock_private->deleted = true; if (sock_private->poll_status == _POLL_PENDING)
_ep_sock_cancel_poll(sock_private);
if (sock_private->poll_status == _POLL_PENDING) { ep_port_cancel_socket_update(port_info, sock_info);
_poll_req_cancel(&sock_private->poll_req, ep_port_del_socket(port_info, sock_info);
poll_group_get_socket(sock_private->poll_group));
sock_private->poll_status = _POLL_CANCELLED; sock_private->delete_pending = true;
sock_private->pending_events = 0;
} }
ep_port_del_socket(port_info, sock_info);
ep_port_clear_socket_update(port_info, sock_info);
ep_port_release_poll_group(sock_private->poll_group);
sock_private->poll_group = NULL;
/* If the poll request still needs to complete, the ep_sock object can't /* If the poll request still needs to complete, the ep_sock object can't
* be free()d yet. `ep_sock_feed_event` will take care of this later. */ * be free()d yet. `ep_sock_feed_event()` or `ep_port_close()` will take care
if (sock_private->poll_status == _POLL_IDLE) * of this later. */
if (force || sock_private->poll_status == _POLL_IDLE) {
/* Free the sock_info now. */
ep_port_remove_deleted_socket(port_info, sock_info);
ep_port_release_poll_group(port_info, sock_private->poll_group);
_ep_sock_free(sock_private); _ep_sock_free(sock_private);
} else {
/* Free the socket later. */
ep_port_add_deleted_socket(port_info, sock_info);
}
}
void ep_sock_delete(ep_port_t* port_info, ep_sock_t* sock_info) {
_ep_sock_delete(port_info, sock_info, false);
} }
void ep_sock_force_delete(ep_port_t* port_info, ep_sock_t* sock_info) { void ep_sock_force_delete(ep_port_t* port_info, ep_sock_t* sock_info) {
_ep_sock_private_t* sock_private = _ep_sock_private(sock_info); _ep_sock_delete(port_info, sock_info, true);
sock_private->poll_status = _POLL_IDLE;
ep_sock_delete(port_info, sock_info);
} }
int ep_sock_set_event(ep_port_t* port_info, int ep_sock_set_event(ep_port_t* port_info,
@ -2487,11 +2540,9 @@ int ep_sock_set_event(ep_port_t* port_info,
int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) { int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) {
_ep_sock_private_t* sock_private = _ep_sock_private(sock_info); _ep_sock_private_t* sock_private = _ep_sock_private(sock_info);
SOCKET driver_socket = poll_group_get_socket(sock_private->poll_group);
bool socket_closed = false; bool socket_closed = false;
assert(ep_port_is_socket_update_pending(port_info, sock_info)); assert(!sock_private->delete_pending);
if ((sock_private->poll_status == _POLL_PENDING) && if ((sock_private->poll_status == _POLL_PENDING) &&
(sock_private->user_events & _KNOWN_EPOLL_EVENTS & (sock_private->user_events & _KNOWN_EPOLL_EVENTS &
~sock_private->pending_events) == 0) { ~sock_private->pending_events) == 0) {
@ -2505,16 +2556,16 @@ int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) {
* events that the user is interested in. Cancel the pending poll request; * events that the user is interested in. Cancel the pending poll request;
* when it completes it will be submitted again with the correct event * when it completes it will be submitted again with the correct event
* mask. */ * mask. */
if (_poll_req_cancel(&sock_private->poll_req, driver_socket) < 0) if (_ep_sock_cancel_poll(sock_private) < 0)
return -1; return -1;
sock_private->poll_status = _POLL_CANCELLED;
sock_private->pending_events = 0;
} else if (sock_private->poll_status == _POLL_CANCELLED) { } else if (sock_private->poll_status == _POLL_CANCELLED) {
/* The poll request has already been cancelled, we're still waiting for it /* The poll request has already been cancelled, we're still waiting for it
* to return. For now, there's nothing that needs to be done. */ * to return. For now, there's nothing that needs to be done. */
} else if (sock_private->poll_status == _POLL_IDLE) { } else if (sock_private->poll_status == _POLL_IDLE) {
SOCKET driver_socket = poll_group_get_socket(sock_private->poll_group);
if (_poll_req_submit(&sock_private->poll_req, if (_poll_req_submit(&sock_private->poll_req,
sock_private->user_events, sock_private->user_events,
sock_private->afd_socket, sock_private->afd_socket,
@ -2536,7 +2587,7 @@ int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) {
assert(false); assert(false);
} }
ep_port_clear_socket_update(port_info, sock_info); ep_port_cancel_socket_update(port_info, sock_info);
/* If we saw an ERROR_INVALID_HANDLE error, drop the socket. */ /* If we saw an ERROR_INVALID_HANDLE error, drop the socket. */
if (socket_closed) if (socket_closed)
@ -2558,10 +2609,10 @@ int ep_sock_feed_event(ep_port_t* port_info,
sock_private->poll_status = _POLL_IDLE; sock_private->poll_status = _POLL_IDLE;
sock_private->pending_events = 0; sock_private->pending_events = 0;
if (sock_private->deleted) { if (sock_private->delete_pending) {
/* Ignore completion for overlapped poll operation if the socket has been /* Ignore completion for overlapped poll operation if the socket is pending
* deleted; instead, free the socket. */ * deletion; instead, delete the socket. */
_ep_sock_free(sock_private); ep_sock_delete(port_info, sock_info);
return 0; return 0;
} }