port,sock: drop 'ep_' prefix, rename types to sock_state_t/port_state_t

This commit is contained in:
Bert Belder 2018-05-17 00:59:26 -07:00
parent 613a821a30
commit 64b714ab36
No known key found for this signature in database
GPG Key ID: 7A77887B2E2ED461
7 changed files with 329 additions and 319 deletions

View File

@ -11,8 +11,9 @@
static ts_tree_t _epoll_handle_tree;
static inline ep_port_t* _handle_tree_node_to_port(ts_tree_node_t* tree_node) {
return container_of(tree_node, ep_port_t, handle_tree_node);
static inline port_state_t* _handle_tree_node_to_port(
ts_tree_node_t* tree_node) {
return container_of(tree_node, port_state_t, handle_tree_node);
}
int api_global_init(void) {
@ -21,21 +22,21 @@ int api_global_init(void) {
}
static HANDLE _epoll_create(void) {
ep_port_t* port_info;
port_state_t* port_state;
HANDLE ephnd;
if (init() < 0)
return NULL;
port_info = ep_port_new(&ephnd);
if (port_info == NULL)
port_state = port_new(&ephnd);
if (port_state == NULL)
return NULL;
if (ts_tree_add(&_epoll_handle_tree,
&port_info->handle_tree_node,
&port_state->handle_tree_node,
(uintptr_t) ephnd) < 0) {
/* This should never happen. */
ep_port_delete(port_info);
port_delete(port_state);
return_set_error(NULL, ERROR_ALREADY_EXISTS);
}
@ -58,7 +59,7 @@ HANDLE epoll_create1(int flags) {
int epoll_close(HANDLE ephnd) {
ts_tree_node_t* tree_node;
ep_port_t* port_info;
port_state_t* port_state;
if (init() < 0)
return -1;
@ -69,12 +70,12 @@ int epoll_close(HANDLE ephnd) {
goto err;
}
port_info = _handle_tree_node_to_port(tree_node);
ep_port_close(port_info);
port_state = _handle_tree_node_to_port(tree_node);
port_close(port_state);
ts_tree_node_unref_and_destroy(tree_node);
return ep_port_delete(port_info);
return port_delete(port_state);
err:
err_check_handle(ephnd);
@ -83,7 +84,7 @@ err:
int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event* ev) {
ts_tree_node_t* tree_node;
ep_port_t* port_info;
port_state_t* port_state;
int r;
if (init() < 0)
@ -95,8 +96,8 @@ int epoll_ctl(HANDLE ephnd, int op, SOCKET sock, struct epoll_event* ev) {
goto err;
}
port_info = _handle_tree_node_to_port(tree_node);
r = ep_port_ctl(port_info, op, sock, ev);
port_state = _handle_tree_node_to_port(tree_node);
r = port_ctl(port_state, op, sock, ev);
ts_tree_node_unref(tree_node);
@ -118,7 +119,7 @@ int epoll_wait(HANDLE ephnd,
int maxevents,
int timeout) {
ts_tree_node_t* tree_node;
ep_port_t* port_info;
port_state_t* port_state;
int num_events;
if (maxevents <= 0)
@ -133,8 +134,8 @@ int epoll_wait(HANDLE ephnd,
goto err;
}
port_info = _handle_tree_node_to_port(tree_node);
num_events = ep_port_wait(port_info, events, maxevents, timeout);
port_state = _handle_tree_node_to_port(tree_node);
num_events = port_wait(port_state, events, maxevents, timeout);
ts_tree_node_unref(tree_node);

View File

@ -11,13 +11,13 @@
static const size_t _POLL_GROUP_MAX_GROUP_SIZE = 32;
typedef struct poll_group {
ep_port_t* port_info;
port_state_t* port_state;
queue_node_t queue_node;
SOCKET socket;
size_t group_size;
} poll_group_t;
static poll_group_t* _poll_group_new(ep_port_t* port_info) {
static poll_group_t* _poll_group_new(port_state_t* port_state) {
poll_group_t* poll_group = malloc(sizeof *poll_group);
if (poll_group == NULL)
return_set_error(NULL, ERROR_NOT_ENOUGH_MEMORY);
@ -25,14 +25,14 @@ static poll_group_t* _poll_group_new(ep_port_t* port_info) {
memset(poll_group, 0, sizeof *poll_group);
queue_node_init(&poll_group->queue_node);
poll_group->port_info = port_info;
poll_group->port_state = port_state;
if (afd_create_driver_socket(port_info->iocp, &poll_group->socket) < 0) {
if (afd_create_driver_socket(port_state->iocp, &poll_group->socket) < 0) {
free(poll_group);
return NULL;
}
queue_append(&port_info->poll_group_queue, &poll_group->queue_node);
queue_append(&port_state->poll_group_queue, &poll_group->queue_node);
return poll_group;
}
@ -52,8 +52,8 @@ SOCKET poll_group_get_socket(poll_group_t* poll_group) {
return poll_group->socket;
}
poll_group_t* poll_group_acquire(ep_port_t* port_info) {
queue_t* queue = &port_info->poll_group_queue;
poll_group_t* poll_group_acquire(port_state_t* port_state) {
queue_t* queue = &port_state->poll_group_queue;
poll_group_t* poll_group =
!queue_empty(queue)
? container_of(queue_last(queue), poll_group_t, queue_node)
@ -61,23 +61,23 @@ poll_group_t* poll_group_acquire(ep_port_t* port_info) {
if (poll_group == NULL ||
poll_group->group_size >= _POLL_GROUP_MAX_GROUP_SIZE)
poll_group = _poll_group_new(port_info);
poll_group = _poll_group_new(port_state);
if (poll_group == NULL)
return NULL;
if (++poll_group->group_size == _POLL_GROUP_MAX_GROUP_SIZE)
queue_move_first(&port_info->poll_group_queue, &poll_group->queue_node);
queue_move_first(&port_state->poll_group_queue, &poll_group->queue_node);
return poll_group;
}
void poll_group_release(poll_group_t* poll_group) {
ep_port_t* port_info = poll_group->port_info;
port_state_t* port_state = poll_group->port_state;
poll_group->group_size--;
assert(poll_group->group_size < _POLL_GROUP_MAX_GROUP_SIZE);
queue_move_last(&port_info->poll_group_queue, &poll_group->queue_node);
queue_move_last(&port_state->poll_group_queue, &poll_group->queue_node);
/* Poll groups are currently only freed when the epoll port is closed. */
}

View File

@ -6,10 +6,10 @@
#include "queue.h"
#include "win.h"
typedef struct ep_port ep_port_t;
typedef struct port_state port_state_t;
typedef struct poll_group poll_group_t;
WEPOLL_INTERNAL poll_group_t* poll_group_acquire(ep_port_t* port);
WEPOLL_INTERNAL poll_group_t* poll_group_acquire(port_state_t* port);
WEPOLL_INTERNAL void poll_group_release(poll_group_t* poll_group);
WEPOLL_INTERNAL void poll_group_delete(poll_group_t* poll_group);

View File

@ -14,22 +14,22 @@
#include "wepoll.h"
#include "win.h"
#define EP_PORT__MAX_ON_STACK_COMPLETIONS 256
#define PORT__MAX_ON_STACK_COMPLETIONS 256
static ep_port_t* _ep_port_alloc(void) {
ep_port_t* port_info = malloc(sizeof *port_info);
if (port_info == NULL)
static port_state_t* _port_alloc(void) {
port_state_t* port_state = malloc(sizeof *port_state);
if (port_state == NULL)
return_set_error(NULL, ERROR_NOT_ENOUGH_MEMORY);
return port_info;
return port_state;
}
static void _ep_port_free(ep_port_t* port) {
static void _port_free(port_state_t* port) {
assert(port != NULL);
free(port);
}
static HANDLE _ep_port_create_iocp(void) {
static HANDLE _port_create_iocp(void) {
HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
if (iocp == NULL)
return_map_error(NULL);
@ -37,40 +37,40 @@ static HANDLE _ep_port_create_iocp(void) {
return iocp;
}
ep_port_t* ep_port_new(HANDLE* iocp_out) {
ep_port_t* port_info;
port_state_t* port_new(HANDLE* iocp_out) {
port_state_t* port_state;
HANDLE iocp;
port_info = _ep_port_alloc();
if (port_info == NULL)
port_state = _port_alloc();
if (port_state == NULL)
goto err1;
iocp = _ep_port_create_iocp();
iocp = _port_create_iocp();
if (iocp == NULL)
goto err2;
memset(port_info, 0, sizeof *port_info);
memset(port_state, 0, sizeof *port_state);
port_info->iocp = iocp;
tree_init(&port_info->sock_tree);
queue_init(&port_info->sock_update_queue);
queue_init(&port_info->sock_deleted_queue);
queue_init(&port_info->poll_group_queue);
ts_tree_node_init(&port_info->handle_tree_node);
InitializeCriticalSection(&port_info->lock);
port_state->iocp = iocp;
tree_init(&port_state->sock_tree);
queue_init(&port_state->sock_update_queue);
queue_init(&port_state->sock_deleted_queue);
queue_init(&port_state->poll_group_queue);
ts_tree_node_init(&port_state->handle_tree_node);
InitializeCriticalSection(&port_state->lock);
*iocp_out = iocp;
return port_info;
return port_state;
err2:
_ep_port_free(port_info);
_port_free(port_state);
err1:
return NULL;
}
static int _ep_port_close_iocp(ep_port_t* port_info) {
HANDLE iocp = port_info->iocp;
port_info->iocp = NULL;
static int _port_close_iocp(port_state_t* port_state) {
HANDLE iocp = port_state->iocp;
port_state->iocp = NULL;
if (!CloseHandle(iocp))
return_map_error(-1);
@ -78,74 +78,74 @@ static int _ep_port_close_iocp(ep_port_t* port_info) {
return 0;
}
int ep_port_close(ep_port_t* port_info) {
int port_close(port_state_t* port_state) {
int result;
EnterCriticalSection(&port_info->lock);
result = _ep_port_close_iocp(port_info);
LeaveCriticalSection(&port_info->lock);
EnterCriticalSection(&port_state->lock);
result = _port_close_iocp(port_state);
LeaveCriticalSection(&port_state->lock);
return result;
}
int ep_port_delete(ep_port_t* port_info) {
int port_delete(port_state_t* port_state) {
tree_node_t* tree_node;
queue_node_t* queue_node;
/* At this point the IOCP port should have been closed. */
assert(port_info->iocp == NULL);
assert(port_state->iocp == NULL);
while ((tree_node = tree_root(&port_info->sock_tree)) != NULL) {
ep_sock_t* sock_info = ep_sock_from_tree_node(tree_node);
ep_sock_force_delete(port_info, sock_info);
while ((tree_node = tree_root(&port_state->sock_tree)) != NULL) {
sock_state_t* sock_state = sock_state_from_tree_node(tree_node);
sock_force_delete(port_state, sock_state);
}
while ((queue_node = queue_first(&port_info->sock_deleted_queue)) != NULL) {
ep_sock_t* sock_info = ep_sock_from_queue_node(queue_node);
ep_sock_force_delete(port_info, sock_info);
while ((queue_node = queue_first(&port_state->sock_deleted_queue)) != NULL) {
sock_state_t* sock_state = sock_state_from_queue_node(queue_node);
sock_force_delete(port_state, sock_state);
}
while ((queue_node = queue_first(&port_info->poll_group_queue)) != NULL) {
while ((queue_node = queue_first(&port_state->poll_group_queue)) != NULL) {
poll_group_t* poll_group = poll_group_from_queue_node(queue_node);
poll_group_delete(poll_group);
}
assert(queue_empty(&port_info->sock_update_queue));
assert(queue_empty(&port_state->sock_update_queue));
DeleteCriticalSection(&port_info->lock);
DeleteCriticalSection(&port_state->lock);
_ep_port_free(port_info);
_port_free(port_state);
return 0;
}
static int _ep_port_update_events(ep_port_t* port_info) {
queue_t* sock_update_queue = &port_info->sock_update_queue;
static int _port_update_events(port_state_t* port_state) {
queue_t* sock_update_queue = &port_state->sock_update_queue;
/* Walk the queue, submitting new poll requests for every socket that needs
* it. */
while (!queue_empty(sock_update_queue)) {
queue_node_t* queue_node = queue_first(sock_update_queue);
ep_sock_t* sock_info = ep_sock_from_queue_node(queue_node);
sock_state_t* sock_state = sock_state_from_queue_node(queue_node);
if (ep_sock_update(port_info, sock_info) < 0)
if (sock_update(port_state, sock_state) < 0)
return -1;
/* ep_sock_update() removes the socket from the update queue. */
/* sock_update() removes the socket from the update queue. */
}
return 0;
}
static void _ep_port_update_events_if_polling(ep_port_t* port_info) {
if (port_info->active_poll_count > 0)
_ep_port_update_events(port_info);
static void _port_update_events_if_polling(port_state_t* port_state) {
if (port_state->active_poll_count > 0)
_port_update_events(port_state);
}
static int _ep_port_feed_events(ep_port_t* port_info,
struct epoll_event* epoll_events,
OVERLAPPED_ENTRY* iocp_events,
DWORD iocp_event_count) {
static int _port_feed_events(port_state_t* port_state,
struct epoll_event* epoll_events,
OVERLAPPED_ENTRY* iocp_events,
DWORD iocp_event_count) {
int epoll_event_count = 0;
DWORD i;
@ -153,49 +153,49 @@ static int _ep_port_feed_events(ep_port_t* port_info,
OVERLAPPED* overlapped = iocp_events[i].lpOverlapped;
struct epoll_event* ev = &epoll_events[epoll_event_count];
epoll_event_count += ep_sock_feed_event(port_info, overlapped, ev);
epoll_event_count += sock_feed_event(port_state, overlapped, ev);
}
return epoll_event_count;
}
static int _ep_port_poll(ep_port_t* port_info,
struct epoll_event* epoll_events,
OVERLAPPED_ENTRY* iocp_events,
DWORD maxevents,
DWORD timeout) {
static int _port_poll(port_state_t* port_state,
struct epoll_event* epoll_events,
OVERLAPPED_ENTRY* iocp_events,
DWORD maxevents,
DWORD timeout) {
DWORD completion_count;
if (_ep_port_update_events(port_info) < 0)
if (_port_update_events(port_state) < 0)
return -1;
port_info->active_poll_count++;
port_state->active_poll_count++;
LeaveCriticalSection(&port_info->lock);
LeaveCriticalSection(&port_state->lock);
BOOL r = GetQueuedCompletionStatusEx(port_info->iocp,
BOOL r = GetQueuedCompletionStatusEx(port_state->iocp,
iocp_events,
maxevents,
&completion_count,
timeout,
FALSE);
EnterCriticalSection(&port_info->lock);
EnterCriticalSection(&port_state->lock);
port_info->active_poll_count--;
port_state->active_poll_count--;
if (!r)
return_map_error(-1);
return _ep_port_feed_events(
port_info, epoll_events, iocp_events, completion_count);
return _port_feed_events(
port_state, epoll_events, iocp_events, completion_count);
}
int ep_port_wait(ep_port_t* port_info,
struct epoll_event* events,
int maxevents,
int timeout) {
OVERLAPPED_ENTRY stack_iocp_events[EP_PORT__MAX_ON_STACK_COMPLETIONS];
int port_wait(port_state_t* port_state,
struct epoll_event* events,
int maxevents,
int timeout) {
OVERLAPPED_ENTRY stack_iocp_events[PORT__MAX_ON_STACK_COMPLETIONS];
OVERLAPPED_ENTRY* iocp_events;
uint64_t due = 0;
DWORD gqcs_timeout;
@ -226,15 +226,15 @@ int ep_port_wait(ep_port_t* port_info,
gqcs_timeout = INFINITE;
}
EnterCriticalSection(&port_info->lock);
EnterCriticalSection(&port_state->lock);
/* Dequeue completion packets until either at least one interesting event
* has been discovered, or the timeout is reached. */
for (;;) {
uint64_t now;
result = _ep_port_poll(
port_info, events, iocp_events, (DWORD) maxevents, gqcs_timeout);
result = _port_poll(
port_state, events, iocp_events, (DWORD) maxevents, gqcs_timeout);
if (result < 0 || result > 0)
break; /* Result, error, or time-out. */
@ -254,9 +254,9 @@ int ep_port_wait(ep_port_t* port_info,
gqcs_timeout = (DWORD)(due - now);
}
_ep_port_update_events_if_polling(port_info);
_port_update_events_if_polling(port_state);
LeaveCriticalSection(&port_info->lock);
LeaveCriticalSection(&port_state->lock);
if (iocp_events != stack_iocp_events)
free(iocp_events);
@ -269,124 +269,127 @@ int ep_port_wait(ep_port_t* port_info,
return -1;
}
static int _ep_port_ctl_add(ep_port_t* port_info,
SOCKET sock,
struct epoll_event* ev) {
ep_sock_t* sock_info = ep_sock_new(port_info, sock);
if (sock_info == NULL)
static int _port_ctl_add(port_state_t* port_state,
SOCKET sock,
struct epoll_event* ev) {
sock_state_t* sock_state = sock_new(port_state, sock);
if (sock_state == NULL)
return -1;
if (ep_sock_set_event(port_info, sock_info, ev) < 0) {
ep_sock_delete(port_info, sock_info);
if (sock_set_event(port_state, sock_state, ev) < 0) {
sock_delete(port_state, sock_state);
return -1;
}
_ep_port_update_events_if_polling(port_info);
_port_update_events_if_polling(port_state);
return 0;
}
static int _ep_port_ctl_mod(ep_port_t* port_info,
SOCKET sock,
struct epoll_event* ev) {
ep_sock_t* sock_info = ep_port_find_socket(port_info, sock);
if (sock_info == NULL)
static int _port_ctl_mod(port_state_t* port_state,
SOCKET sock,
struct epoll_event* ev) {
sock_state_t* sock_state = port_find_socket(port_state, sock);
if (sock_state == NULL)
return -1;
if (ep_sock_set_event(port_info, sock_info, ev) < 0)
if (sock_set_event(port_state, sock_state, ev) < 0)
return -1;
_ep_port_update_events_if_polling(port_info);
_port_update_events_if_polling(port_state);
return 0;
}
static int _ep_port_ctl_del(ep_port_t* port_info, SOCKET sock) {
ep_sock_t* sock_info = ep_port_find_socket(port_info, sock);
if (sock_info == NULL)
static int _port_ctl_del(port_state_t* port_state, SOCKET sock) {
sock_state_t* sock_state = port_find_socket(port_state, sock);
if (sock_state == NULL)
return -1;
ep_sock_delete(port_info, sock_info);
sock_delete(port_state, sock_state);
return 0;
}
static int _ep_port_ctl_op(ep_port_t* port_info,
int op,
SOCKET sock,
struct epoll_event* ev) {
static int _port_ctl_op(port_state_t* port_state,
int op,
SOCKET sock,
struct epoll_event* ev) {
switch (op) {
case EPOLL_CTL_ADD:
return _ep_port_ctl_add(port_info, sock, ev);
return _port_ctl_add(port_state, sock, ev);
case EPOLL_CTL_MOD:
return _ep_port_ctl_mod(port_info, sock, ev);
return _port_ctl_mod(port_state, sock, ev);
case EPOLL_CTL_DEL:
return _ep_port_ctl_del(port_info, sock);
return _port_ctl_del(port_state, sock);
default:
return_set_error(-1, ERROR_INVALID_PARAMETER);
}
}
int ep_port_ctl(ep_port_t* port_info,
int op,
SOCKET sock,
struct epoll_event* ev) {
int port_ctl(port_state_t* port_state,
int op,
SOCKET sock,
struct epoll_event* ev) {
int result;
EnterCriticalSection(&port_info->lock);
result = _ep_port_ctl_op(port_info, op, sock, ev);
LeaveCriticalSection(&port_info->lock);
EnterCriticalSection(&port_state->lock);
result = _port_ctl_op(port_state, op, sock, ev);
LeaveCriticalSection(&port_state->lock);
return result;
}
int ep_port_register_socket_handle(ep_port_t* port_info,
ep_sock_t* sock_info,
SOCKET socket) {
if (tree_add(
&port_info->sock_tree, ep_sock_to_tree_node(sock_info), socket) < 0)
int port_register_socket_handle(port_state_t* port_state,
sock_state_t* sock_state,
SOCKET socket) {
if (tree_add(&port_state->sock_tree,
sock_state_to_tree_node(sock_state),
socket) < 0)
return_set_error(-1, ERROR_ALREADY_EXISTS);
return 0;
}
void ep_port_unregister_socket_handle(ep_port_t* port_info,
ep_sock_t* sock_info) {
tree_del(&port_info->sock_tree, ep_sock_to_tree_node(sock_info));
void port_unregister_socket_handle(port_state_t* port_state,
sock_state_t* sock_state) {
tree_del(&port_state->sock_tree, sock_state_to_tree_node(sock_state));
}
ep_sock_t* ep_port_find_socket(ep_port_t* port_info, SOCKET socket) {
tree_node_t* tree_node = tree_find(&port_info->sock_tree, socket);
sock_state_t* port_find_socket(port_state_t* port_state, SOCKET socket) {
tree_node_t* tree_node = tree_find(&port_state->sock_tree, socket);
if (tree_node == NULL)
return_set_error(NULL, ERROR_NOT_FOUND);
return ep_sock_from_tree_node(tree_node);
return sock_state_from_tree_node(tree_node);
}
void ep_port_request_socket_update(ep_port_t* port_info,
ep_sock_t* sock_info) {
if (queue_enqueued(ep_sock_to_queue_node(sock_info)))
void port_request_socket_update(port_state_t* port_state,
sock_state_t* sock_state) {
if (queue_enqueued(sock_state_to_queue_node(sock_state)))
return;
queue_append(&port_info->sock_update_queue,
ep_sock_to_queue_node(sock_info));
queue_append(&port_state->sock_update_queue,
sock_state_to_queue_node(sock_state));
}
void ep_port_cancel_socket_update(ep_port_t* port_info, ep_sock_t* sock_info) {
unused_var(port_info);
if (!queue_enqueued(ep_sock_to_queue_node(sock_info)))
void port_cancel_socket_update(port_state_t* port_state,
sock_state_t* sock_state) {
unused_var(port_state);
if (!queue_enqueued(sock_state_to_queue_node(sock_state)))
return;
queue_remove(ep_sock_to_queue_node(sock_info));
queue_remove(sock_state_to_queue_node(sock_state));
}
void ep_port_add_deleted_socket(ep_port_t* port_info, ep_sock_t* sock_info) {
if (queue_enqueued(ep_sock_to_queue_node(sock_info)))
void port_add_deleted_socket(port_state_t* port_state,
sock_state_t* sock_state) {
if (queue_enqueued(sock_state_to_queue_node(sock_state)))
return;
queue_append(&port_info->sock_deleted_queue,
ep_sock_to_queue_node(sock_info));
queue_append(&port_state->sock_deleted_queue,
sock_state_to_queue_node(sock_state));
}
void ep_port_remove_deleted_socket(ep_port_t* port_info,
ep_sock_t* sock_info) {
unused_var(port_info);
if (!queue_enqueued(ep_sock_to_queue_node(sock_info)))
void port_remove_deleted_socket(port_state_t* port_state,
sock_state_t* sock_state) {
unused_var(port_state);
if (!queue_enqueued(sock_state_to_queue_node(sock_state)))
return;
queue_remove(ep_sock_to_queue_node(sock_info));
queue_remove(sock_state_to_queue_node(sock_state));
}

View File

@ -11,10 +11,10 @@
#include "util.h"
#include "win.h"
typedef struct ep_port ep_port_t;
typedef struct ep_sock ep_sock_t;
typedef struct port_state port_state_t;
typedef struct sock_state sock_state_t;
typedef struct ep_port {
typedef struct port_state {
HANDLE iocp;
tree_t sock_tree;
queue_t sock_update_queue;
@ -23,38 +23,38 @@ typedef struct ep_port {
ts_tree_node_t handle_tree_node;
CRITICAL_SECTION lock;
size_t active_poll_count;
} ep_port_t;
} port_state_t;
WEPOLL_INTERNAL ep_port_t* ep_port_new(HANDLE* iocp_out);
WEPOLL_INTERNAL int ep_port_close(ep_port_t* port_info);
WEPOLL_INTERNAL int ep_port_delete(ep_port_t* port_info);
WEPOLL_INTERNAL port_state_t* port_new(HANDLE* iocp_out);
WEPOLL_INTERNAL int port_close(port_state_t* port_state);
WEPOLL_INTERNAL int port_delete(port_state_t* port_state);
WEPOLL_INTERNAL int ep_port_wait(ep_port_t* port_info,
struct epoll_event* events,
int maxevents,
int timeout);
WEPOLL_INTERNAL int port_wait(port_state_t* port_state,
struct epoll_event* events,
int maxevents,
int timeout);
WEPOLL_INTERNAL int ep_port_ctl(ep_port_t* port_info,
int op,
SOCKET sock,
struct epoll_event* ev);
WEPOLL_INTERNAL int port_ctl(port_state_t* port_state,
int op,
SOCKET sock,
struct epoll_event* ev);
WEPOLL_INTERNAL int ep_port_register_socket_handle(ep_port_t* port_info,
ep_sock_t* sock_info,
SOCKET socket);
WEPOLL_INTERNAL void ep_port_unregister_socket_handle(ep_port_t* port_info,
ep_sock_t* sock_info);
WEPOLL_INTERNAL ep_sock_t* ep_port_find_socket(ep_port_t* port_info,
WEPOLL_INTERNAL int port_register_socket_handle(port_state_t* port_state,
sock_state_t* sock_state,
SOCKET socket);
WEPOLL_INTERNAL void port_unregister_socket_handle(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL sock_state_t* port_find_socket(port_state_t* port_state,
SOCKET socket);
WEPOLL_INTERNAL void ep_port_request_socket_update(ep_port_t* port_info,
ep_sock_t* sock_info);
WEPOLL_INTERNAL void ep_port_cancel_socket_update(ep_port_t* port_info,
ep_sock_t* sock_info);
WEPOLL_INTERNAL void port_request_socket_update(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL void port_cancel_socket_update(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL void ep_port_add_deleted_socket(ep_port_t* port_info,
ep_sock_t* sock_info);
WEPOLL_INTERNAL void ep_port_remove_deleted_socket(ep_port_t* port_info,
ep_sock_t* sock_info);
WEPOLL_INTERNAL void port_add_deleted_socket(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL void port_remove_deleted_socket(port_state_t* port_state,
sock_state_t* sock_state);
#endif /* WEPOLL_PORT_H_ */

View File

@ -11,7 +11,7 @@
#include "sock.h"
#include "ws.h"
static const uint32_t _EP_SOCK_KNOWN_EPOLL_EVENTS =
static const uint32_t _SOCK_KNOWN_EPOLL_EVENTS =
EPOLLIN | EPOLLPRI | EPOLLOUT | EPOLLERR | EPOLLHUP | EPOLLRDNORM |
EPOLLRDBAND | EPOLLWRNORM | EPOLLWRBAND | EPOLLMSG | EPOLLRDHUP;
@ -21,7 +21,7 @@ typedef enum _poll_status {
_POLL_CANCELLED
} _poll_status_t;
typedef struct ep_sock {
typedef struct sock_state {
OVERLAPPED overlapped;
AFD_POLL_INFO poll_info;
queue_node_t queue_node;
@ -33,39 +33,39 @@ typedef struct ep_sock {
uint32_t pending_events;
_poll_status_t poll_status;
bool delete_pending;
} ep_sock_t;
} sock_state_t;
static inline ep_sock_t* _ep_sock_alloc(void) {
ep_sock_t* sock_info = malloc(sizeof *sock_info);
if (sock_info == NULL)
static inline sock_state_t* _sock_alloc(void) {
sock_state_t* sock_state = malloc(sizeof *sock_state);
if (sock_state == NULL)
return_set_error(NULL, ERROR_NOT_ENOUGH_MEMORY);
return sock_info;
return sock_state;
}
static inline void _ep_sock_free(ep_sock_t* sock_info) {
free(sock_info);
static inline void _sock_free(sock_state_t* sock_state) {
free(sock_state);
}
static int _ep_sock_cancel_poll(ep_sock_t* sock_info) {
static int _sock_cancel_poll(sock_state_t* sock_state) {
HANDLE driver_handle =
(HANDLE)(uintptr_t) poll_group_get_socket(sock_info->poll_group);
assert(sock_info->poll_status == _POLL_PENDING);
(HANDLE)(uintptr_t) poll_group_get_socket(sock_state->poll_group);
assert(sock_state->poll_status == _POLL_PENDING);
/* CancelIoEx() may fail with ERROR_NOT_FOUND if the overlapped operation has
* already completed. This is not a problem and we proceed normally. */
if (!CancelIoEx(driver_handle, &sock_info->overlapped) &&
if (!CancelIoEx(driver_handle, &sock_state->overlapped) &&
GetLastError() != ERROR_NOT_FOUND)
return_map_error(-1);
sock_info->poll_status = _POLL_CANCELLED;
sock_info->pending_events = 0;
sock_state->poll_status = _POLL_CANCELLED;
sock_state->pending_events = 0;
return 0;
}
ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket) {
sock_state_t* sock_new(port_state_t* port_state, SOCKET socket) {
SOCKET base_socket;
poll_group_t* poll_group;
ep_sock_t* sock_info;
sock_state_t* sock_state;
if (socket == 0 || socket == INVALID_SOCKET)
return_set_error(NULL, ERROR_INVALID_HANDLE);
@ -74,85 +74,85 @@ ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket) {
if (base_socket == INVALID_SOCKET)
return NULL;
poll_group = poll_group_acquire(port_info);
poll_group = poll_group_acquire(port_state);
if (poll_group == NULL)
return NULL;
sock_info = _ep_sock_alloc();
if (sock_info == NULL)
sock_state = _sock_alloc();
if (sock_state == NULL)
goto err1;
memset(sock_info, 0, sizeof *sock_info);
memset(sock_state, 0, sizeof *sock_state);
sock_info->base_socket = base_socket;
sock_info->poll_group = poll_group;
sock_state->base_socket = base_socket;
sock_state->poll_group = poll_group;
tree_node_init(&sock_info->tree_node);
queue_node_init(&sock_info->queue_node);
tree_node_init(&sock_state->tree_node);
queue_node_init(&sock_state->queue_node);
if (ep_port_register_socket_handle(port_info, sock_info, socket) < 0)
if (port_register_socket_handle(port_state, sock_state, socket) < 0)
goto err2;
return sock_info;
return sock_state;
err2:
_ep_sock_free(sock_info);
_sock_free(sock_state);
err1:
poll_group_release(poll_group);
return NULL;
}
static int _ep_sock_delete(ep_port_t* port_info,
ep_sock_t* sock_info,
bool force) {
if (!sock_info->delete_pending) {
if (sock_info->poll_status == _POLL_PENDING)
_ep_sock_cancel_poll(sock_info);
static int _sock_delete(port_state_t* port_state,
sock_state_t* sock_state,
bool force) {
if (!sock_state->delete_pending) {
if (sock_state->poll_status == _POLL_PENDING)
_sock_cancel_poll(sock_state);
ep_port_cancel_socket_update(port_info, sock_info);
ep_port_unregister_socket_handle(port_info, sock_info);
port_cancel_socket_update(port_state, sock_state);
port_unregister_socket_handle(port_state, sock_state);
sock_info->delete_pending = true;
sock_state->delete_pending = true;
}
/* If the poll request still needs to complete, the ep_sock object can't
* be free()d yet. `ep_sock_feed_event()` or `ep_port_close()` will take care
/* If the poll request still needs to complete, the sock_state object can't
* be free()d yet. `sock_feed_event()` or `port_close()` will take care
* of this later. */
if (force || sock_info->poll_status == _POLL_IDLE) {
/* Free the sock_info now. */
ep_port_remove_deleted_socket(port_info, sock_info);
poll_group_release(sock_info->poll_group);
_ep_sock_free(sock_info);
if (force || sock_state->poll_status == _POLL_IDLE) {
/* Free the sock_state now. */
port_remove_deleted_socket(port_state, sock_state);
poll_group_release(sock_state->poll_group);
_sock_free(sock_state);
} else {
/* Free the socket later. */
ep_port_add_deleted_socket(port_info, sock_info);
port_add_deleted_socket(port_state, sock_state);
}
return 0;
}
void ep_sock_delete(ep_port_t* port_info, ep_sock_t* sock_info) {
_ep_sock_delete(port_info, sock_info, false);
void sock_delete(port_state_t* port_state, sock_state_t* sock_state) {
_sock_delete(port_state, sock_state, false);
}
void ep_sock_force_delete(ep_port_t* port_info, ep_sock_t* sock_info) {
_ep_sock_delete(port_info, sock_info, true);
void sock_force_delete(port_state_t* port_state, sock_state_t* sock_state) {
_sock_delete(port_state, sock_state, true);
}
int ep_sock_set_event(ep_port_t* port_info,
ep_sock_t* sock_info,
const struct epoll_event* ev) {
int sock_set_event(port_state_t* port_state,
sock_state_t* sock_state,
const struct epoll_event* ev) {
/* EPOLLERR and EPOLLHUP are always reported, even when not requested by the
* caller. However they are disabled after a event has been reported for a
* socket for which the EPOLLONESHOT flag as set. */
uint32_t events = ev->events | EPOLLERR | EPOLLHUP;
sock_info->user_events = events;
sock_info->user_data = ev->data;
sock_state->user_events = events;
sock_state->user_data = ev->data;
if ((events & _EP_SOCK_KNOWN_EPOLL_EVENTS & ~sock_info->pending_events) != 0)
ep_port_request_socket_update(port_info, sock_info);
if ((events & _SOCK_KNOWN_EPOLL_EVENTS & ~sock_state->pending_events) != 0)
port_request_socket_update(port_state, sock_state);
return 0;
}
@ -197,51 +197,51 @@ static inline uint32_t _afd_events_to_epoll_events(DWORD afd_events) {
return epoll_events;
}
int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) {
assert(!sock_info->delete_pending);
int sock_update(port_state_t* port_state, sock_state_t* sock_state) {
assert(!sock_state->delete_pending);
if ((sock_info->poll_status == _POLL_PENDING) &&
(sock_info->user_events & _EP_SOCK_KNOWN_EPOLL_EVENTS &
~sock_info->pending_events) == 0) {
if ((sock_state->poll_status == _POLL_PENDING) &&
(sock_state->user_events & _SOCK_KNOWN_EPOLL_EVENTS &
~sock_state->pending_events) == 0) {
/* All the events the user is interested in are already being monitored by
* the pending poll operation. It might spuriously complete because of an
* event that we're no longer interested in; when that happens we'll submit
* a new poll operation with the updated event mask. */
} else if (sock_info->poll_status == _POLL_PENDING) {
} else if (sock_state->poll_status == _POLL_PENDING) {
/* A poll operation is already pending, but it's not monitoring for all the
* events that the user is interested in. Therefore, cancel the pending
* poll operation; when we receive it's completion package, a new poll
* operation will be submitted with the correct event mask. */
if (_ep_sock_cancel_poll(sock_info) < 0)
if (_sock_cancel_poll(sock_state) < 0)
return -1;
} else if (sock_info->poll_status == _POLL_CANCELLED) {
} else if (sock_state->poll_status == _POLL_CANCELLED) {
/* The poll operation has already been cancelled, we're still waiting for
* it to return. For now, there's nothing that needs to be done. */
} else if (sock_info->poll_status == _POLL_IDLE) {
} else if (sock_state->poll_status == _POLL_IDLE) {
/* No poll operation is pending; start one. */
sock_info->poll_info.Exclusive = FALSE;
sock_info->poll_info.NumberOfHandles = 1;
sock_info->poll_info.Timeout.QuadPart = INT64_MAX;
sock_info->poll_info.Handles[0].Handle = (HANDLE) sock_info->base_socket;
sock_info->poll_info.Handles[0].Status = 0;
sock_info->poll_info.Handles[0].Events =
_epoll_events_to_afd_events(sock_info->user_events);
sock_state->poll_info.Exclusive = FALSE;
sock_state->poll_info.NumberOfHandles = 1;
sock_state->poll_info.Timeout.QuadPart = INT64_MAX;
sock_state->poll_info.Handles[0].Handle = (HANDLE) sock_state->base_socket;
sock_state->poll_info.Handles[0].Status = 0;
sock_state->poll_info.Handles[0].Events =
_epoll_events_to_afd_events(sock_state->user_events);
memset(&sock_info->overlapped, 0, sizeof sock_info->overlapped);
memset(&sock_state->overlapped, 0, sizeof sock_state->overlapped);
if (afd_poll(poll_group_get_socket(sock_info->poll_group),
&sock_info->poll_info,
&sock_info->overlapped) < 0) {
if (afd_poll(poll_group_get_socket(sock_state->poll_group),
&sock_state->poll_info,
&sock_state->overlapped) < 0) {
switch (GetLastError()) {
case ERROR_IO_PENDING:
/* Overlapped poll operation in progress; this is expected. */
break;
case ERROR_INVALID_HANDLE:
/* Socket closed; it'll be dropped from the epoll set. */
return _ep_sock_delete(port_info, sock_info, false);
return _sock_delete(port_state, sock_state, false);
default:
/* Other errors are propagated to the caller. */
return_map_error(-1);
@ -249,31 +249,32 @@ int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) {
}
/* The poll request was successfully submitted. */
sock_info->poll_status = _POLL_PENDING;
sock_info->pending_events = sock_info->user_events;
sock_state->poll_status = _POLL_PENDING;
sock_state->pending_events = sock_state->user_events;
} else {
/* Unreachable. */
assert(false);
}
ep_port_cancel_socket_update(port_info, sock_info);
port_cancel_socket_update(port_state, sock_state);
return 0;
}
int ep_sock_feed_event(ep_port_t* port_info,
OVERLAPPED* overlapped,
struct epoll_event* ev) {
ep_sock_t* sock_info = container_of(overlapped, ep_sock_t, overlapped);
AFD_POLL_INFO* poll_info = &sock_info->poll_info;
int sock_feed_event(port_state_t* port_state,
OVERLAPPED* overlapped,
struct epoll_event* ev) {
sock_state_t* sock_state =
container_of(overlapped, sock_state_t, overlapped);
AFD_POLL_INFO* poll_info = &sock_state->poll_info;
uint32_t epoll_events = 0;
sock_info->poll_status = _POLL_IDLE;
sock_info->pending_events = 0;
sock_state->poll_status = _POLL_IDLE;
sock_state->pending_events = 0;
if (sock_info->delete_pending) {
if (sock_state->delete_pending) {
/* Socket has been deleted earlier and can now be freed. */
return _ep_sock_delete(port_info, sock_info, false);
return _sock_delete(port_state, sock_state, false);
} else if ((NTSTATUS) overlapped->Internal == STATUS_CANCELLED) {
/* The poll request was cancelled by CancelIoEx. */
@ -287,7 +288,7 @@ int ep_sock_feed_event(ep_port_t* port_info,
} else if (poll_info->Handles[0].Events & AFD_POLL_LOCAL_CLOSE) {
/* The poll operation reported that the socket was closed. */
return _ep_sock_delete(port_info, sock_info, false);
return _sock_delete(port_state, sock_state, false);
} else {
/* Events related to our socket were reported. */
@ -295,10 +296,10 @@ int ep_sock_feed_event(ep_port_t* port_info,
}
/* Requeue the socket so a new poll request will be submitted. */
ep_port_request_socket_update(port_info, sock_info);
port_request_socket_update(port_state, sock_state);
/* Filter out events that the user didn't ask for. */
epoll_events &= sock_info->user_events;
epoll_events &= sock_state->user_events;
/* Return if there are no epoll events to report. */
if (epoll_events == 0)
@ -306,26 +307,26 @@ int ep_sock_feed_event(ep_port_t* port_info,
/* If the the socket has the EPOLLONESHOT flag set, unmonitor all events,
* even EPOLLERR and EPOLLHUP. But always keep looking for closed sockets. */
if (sock_info->user_events & EPOLLONESHOT)
sock_info->user_events = 0;
if (sock_state->user_events & EPOLLONESHOT)
sock_state->user_events = 0;
ev->data = sock_info->user_data;
ev->data = sock_state->user_data;
ev->events = epoll_events;
return 1;
}
queue_node_t* ep_sock_to_queue_node(ep_sock_t* sock_info) {
return &sock_info->queue_node;
queue_node_t* sock_state_to_queue_node(sock_state_t* sock_state) {
return &sock_state->queue_node;
}
ep_sock_t* ep_sock_from_tree_node(tree_node_t* tree_node) {
return container_of(tree_node, ep_sock_t, tree_node);
sock_state_t* sock_state_from_tree_node(tree_node_t* tree_node) {
return container_of(tree_node, sock_state_t, tree_node);
}
tree_node_t* ep_sock_to_tree_node(ep_sock_t* sock_info) {
return &sock_info->tree_node;
tree_node_t* sock_state_to_tree_node(sock_state_t* sock_state) {
return &sock_state->tree_node;
}
ep_sock_t* ep_sock_from_queue_node(queue_node_t* queue_node) {
return container_of(queue_node, ep_sock_t, queue_node);
sock_state_t* sock_state_from_queue_node(queue_node_t* queue_node) {
return container_of(queue_node, sock_state_t, queue_node);
}

View File

@ -10,27 +10,32 @@
#include "wepoll.h"
#include "win.h"
typedef struct ep_port ep_port_t;
typedef struct ep_sock ep_sock_t;
typedef struct port_state port_state_t;
typedef struct sock_state sock_state_t;
WEPOLL_INTERNAL ep_sock_t* ep_sock_new(ep_port_t* port_info, SOCKET socket);
WEPOLL_INTERNAL void ep_sock_delete(ep_port_t* port_info,
ep_sock_t* sock_info);
WEPOLL_INTERNAL void ep_sock_force_delete(ep_port_t* port_info,
ep_sock_t* sock_info);
WEPOLL_INTERNAL sock_state_t* sock_new(port_state_t* port_state,
SOCKET socket);
WEPOLL_INTERNAL void sock_delete(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL void sock_force_delete(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL int ep_sock_set_event(ep_port_t* port_info,
ep_sock_t* sock_info,
const struct epoll_event* ev);
WEPOLL_INTERNAL int sock_set_event(port_state_t* port_state,
sock_state_t* sock_state,
const struct epoll_event* ev);
WEPOLL_INTERNAL int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info);
WEPOLL_INTERNAL int ep_sock_feed_event(ep_port_t* port_info,
OVERLAPPED* overlapped,
struct epoll_event* ev);
WEPOLL_INTERNAL int sock_update(port_state_t* port_state,
sock_state_t* sock_state);
WEPOLL_INTERNAL int sock_feed_event(port_state_t* port_state,
OVERLAPPED* overlapped,
struct epoll_event* ev);
WEPOLL_INTERNAL ep_sock_t* ep_sock_from_queue_node(queue_node_t* queue_node);
WEPOLL_INTERNAL queue_node_t* ep_sock_to_queue_node(ep_sock_t* sock_info);
WEPOLL_INTERNAL ep_sock_t* ep_sock_from_tree_node(tree_node_t* tree_node);
WEPOLL_INTERNAL tree_node_t* ep_sock_to_tree_node(ep_sock_t* sock_info);
WEPOLL_INTERNAL sock_state_t* sock_state_from_queue_node(
queue_node_t* queue_node);
WEPOLL_INTERNAL queue_node_t* sock_state_to_queue_node(
sock_state_t* sock_state);
WEPOLL_INTERNAL sock_state_t* sock_state_from_tree_node(
tree_node_t* tree_node);
WEPOLL_INTERNAL tree_node_t* sock_state_to_tree_node(sock_state_t* sock_state);
#endif /* WEPOLL_SOCK_H_ */