WIP
This commit is contained in:
parent
c3f7e36acb
commit
39e68401ce
@ -7,9 +7,28 @@
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#define WPOLL_CTL_ADD 0
|
||||
#define WPOLL_CTL_MOD 1
|
||||
#define WPOLL_CTL_DEL 2
|
||||
|
||||
#define WPOLLIN 0x001
|
||||
#define WPOLLPRI 0x002
|
||||
#define WPOLLOUT 0x004
|
||||
#define WPOLLERR 0x008
|
||||
#define WPOLLHUP 0x010
|
||||
#define WPOLLRDNORM 0x040
|
||||
#define WPOLLRDBAND 0x080
|
||||
#define WPOLLWRNORM 0x100
|
||||
#define WPOLLMSG 0x400
|
||||
#define WPOLLWRBAND 0x200
|
||||
#define WPOLLRDHUP 0x2000
|
||||
|
||||
#define WPOLL_EVENT_MASK 0xffff
|
||||
|
||||
/* #define WPOLLET (1 << 30) Not supported */
|
||||
#define WPOLLONESHOT (1 << 31)
|
||||
|
||||
|
||||
#define WPOLL_CTL_ADD 1
|
||||
#define WPOLL_CTL_MOD 2
|
||||
#define WPOLL_CTL_DEL 3
|
||||
|
||||
typedef void* wpoll_t;
|
||||
|
||||
|
||||
@ -41,7 +41,7 @@
|
||||
#define _AFD_CONTROL_CODE(operation, method) \
|
||||
((FSCTL_AFD_BASE) << 12 | (operation << 2) | method)
|
||||
|
||||
#define AFD_POLL 9
|
||||
#define AFD_POLL 9
|
||||
|
||||
#define IOCTL_AFD_POLL \
|
||||
_AFD_CONTROL_CODE(AFD_POLL, METHOD_BUFFERED)
|
||||
@ -68,4 +68,10 @@ static const GUID AFD_PROVIDER_IDS[] = {
|
||||
{0xb7, 0xbd, 0x18, 0x1f, 0x20, 0x89, 0x79, 0x2a}}
|
||||
};
|
||||
|
||||
|
||||
static int afd_init();
|
||||
|
||||
int WSAAPI afd_poll(SOCKET socket, AFD_POLL_INFO* info,
|
||||
OVERLAPPED* overlapped);
|
||||
|
||||
#endif /* WPOLL_MSAFD_H_ */
|
||||
231
src/wpoll.c
231
src/wpoll.c
@ -8,7 +8,9 @@
|
||||
#define ARRAY_COUNT(a) (sizeof(a) / (sizeof((a)[0])))
|
||||
#define WPOLL_KEY 0xE9011
|
||||
|
||||
|
||||
typedef struct wpoll_op_s wpoll_op_t;
|
||||
typedef struct wpoll_socket_state_s wpoll_socket_state_t;
|
||||
|
||||
/* State associated with a wpoll handle. */
|
||||
typedef struct wpoll_handle_state_s {
|
||||
@ -16,6 +18,7 @@ typedef struct wpoll_handle_state_s {
|
||||
SOCKET peer_sockets[ARRAY_COUNT(AFD_PROVIDER_IDS)];
|
||||
size_t pending_ops_count;
|
||||
RB_HEAD(wpoll_socket_state_tree, wpoll_socket_state_s) socket_state_tree;
|
||||
wpoll_socket_state_t* attn;
|
||||
} wpoll_handle_state_t;
|
||||
|
||||
/* State associated with a socket that is registered to the wpoll port. */
|
||||
@ -28,15 +31,17 @@ typedef struct wpoll_socket_state_s {
|
||||
int events;
|
||||
uint64_t user_data;
|
||||
wpoll_op_t* free_op;
|
||||
wpoll_socket_state_t* attn_prev;
|
||||
wpoll_socket_state_t* attn_next;
|
||||
RB_ENTRY(wpoll_socket_state_s) tree_entry;
|
||||
} wpoll_socket_state_t;
|
||||
};
|
||||
|
||||
/* State associated with a AFD_POLL request. */
|
||||
struct wpoll_op_s {
|
||||
IO_STATUS_BLOCK status;
|
||||
OVERLAPPED overlapped;
|
||||
AFD_POLL_INFO poll_info;
|
||||
int generation;
|
||||
struct wpoll_socket_state* socket_state;
|
||||
wpoll_socket_state_t* socket_state;
|
||||
};
|
||||
|
||||
|
||||
@ -67,6 +72,7 @@ wpoll_t wpoll_create() {
|
||||
}
|
||||
|
||||
state->iocp = iocp;
|
||||
state->attn = NULL;
|
||||
memset(&state->peer_sockets, 0, sizeof state->peer_sockets);
|
||||
state->pending_ops_count = 0;
|
||||
RB_INIT(&state->socket_state_tree);
|
||||
@ -142,10 +148,50 @@ static SOCKET wpoll__get_peer_socket(wpoll_handle_state_t* port_data,
|
||||
|
||||
int wpoll__submit_poll_op(wpoll_handle_state_t* port_data, wpoll_socket_state_t* sock_data) {
|
||||
wpoll_op_t* op = sock_data->free_op;
|
||||
|
||||
int events;
|
||||
DWORD result, afd_events;
|
||||
|
||||
assert(op != NULL);
|
||||
|
||||
events = sock_data->events;
|
||||
|
||||
/* Always observe these events. */
|
||||
afd_events = AFD_POLL_ABORT | AFD_POLL_CONNECT_FAIL | AFD_POLL_LOCAL_CLOSE;
|
||||
|
||||
if (events & (WPOLLIN | WPOLLRDNORM))
|
||||
afd_events |= AFD_POLL_RECEIVE | AFD_POLL_ACCEPT;
|
||||
if (events & (WPOLLIN | WPOLLRDBAND))
|
||||
afd_events |= AFD_POLL_RECEIVE_EXPEDITED;
|
||||
if (events & (WPOLLOUT | WPOLLWRNORM | WPOLLRDBAND))
|
||||
afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT;
|
||||
|
||||
op->generation = ++sock_data->op_generation;
|
||||
op->socket_state = sock_data;
|
||||
|
||||
memset(&op->overlapped, 0, sizeof op->overlapped);
|
||||
|
||||
op->poll_info.Exclusive = TRUE;
|
||||
op->poll_info.NumberOfHandles = 1;
|
||||
op->poll_info.Timeout.QuadPart = INT64_MAX;
|
||||
op->poll_info.Handles[0].Handle = (HANDLE) sock_data->base_sock;
|
||||
op->poll_info.Handles[0].Status = 0;
|
||||
op->poll_info.Handles[0].Events = afd_events;
|
||||
|
||||
result = afd_poll(sock_data->peer_sock,
|
||||
&op->poll_info,
|
||||
&op->overlapped);
|
||||
if (result != 0) {
|
||||
DWORD error = WSAGetLastError();
|
||||
if (error != WSA_IO_PENDING) {
|
||||
/* If this happens an error happened and no overlapped operation was */
|
||||
/* started. */
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
sock_data->free_op = NULL;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@ -172,10 +218,10 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
|
||||
&len) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
peer_sock = wpoll__get_peer_socket(port_data, &protocol_info);
|
||||
if (peer_sock == INVALID_SOCKET) {
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
sock_data = malloc(sizeof *sock_data);
|
||||
@ -192,6 +238,8 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
|
||||
}
|
||||
|
||||
sock_data->sock = sock;
|
||||
/* TODO: actually get base socket. */
|
||||
sock_data->base_sock = sock;
|
||||
sock_data->op_generation = 0;
|
||||
sock_data->submitted_events = 0;
|
||||
sock_data->events = event->events;
|
||||
@ -207,8 +255,11 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
|
||||
return -1;
|
||||
}
|
||||
|
||||
// TODO: Add to attention list
|
||||
|
||||
// Add to attention list
|
||||
sock_data->attn_prev = NULL;
|
||||
sock_data->attn_next = port_data->attn;
|
||||
port_data->attn->attn_next = sock_data;
|
||||
port_data->attn = sock_data;
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -236,7 +287,11 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
|
||||
sock_data->free_op = NULL;
|
||||
}
|
||||
|
||||
// TODO: Add to attention list
|
||||
// Add to attention list
|
||||
sock_data->attn_prev = NULL;
|
||||
sock_data->attn_next = port_data->attn;
|
||||
port_data->attn->attn_next = sock_data;
|
||||
port_data->attn = sock_data;
|
||||
}
|
||||
|
||||
sock_data->user_data = event->data.u64;
|
||||
@ -248,25 +303,173 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
|
||||
wpoll_socket_state_t* sock_data;
|
||||
|
||||
lookup.sock = sock;
|
||||
sock_data = RB_FIND(wpoll_socket_state_tree, &port_data->socket_state_tree, &lookup);
|
||||
sock_data = RB_REMOVE(wpoll_socket_state_tree, &port_data->socket_state_tree, &lookup);
|
||||
if (sock_data == NULL) {
|
||||
/* Socket has not been registered with wpoll instance. */
|
||||
SetLastError(ERROR_NOT_FOUND);
|
||||
return -1;
|
||||
}
|
||||
|
||||
free(sock_data->free_op);
|
||||
sock_data->events = -1;
|
||||
|
||||
/* Remove from attention list. */
|
||||
if (sock_data->attn_prev != NULL)
|
||||
sock_data->attn_prev->attn_next = sock_data->attn_next;
|
||||
if (sock_data->attn_next != NULL)
|
||||
sock_data->attn_next->attn_prev = sock_data->attn_prev;
|
||||
if (port_data->attn == sock_data)
|
||||
port_data->attn = sock_data->attn_next;
|
||||
sock_data->attn_prev = sock_data->attn_next = NULL;
|
||||
|
||||
SetLastError(ERROR_NOT_FOUND);
|
||||
break;
|
||||
if (sock_data->submitted_events == 0) {
|
||||
assert(sock_data->op_generation == 0);
|
||||
free(sock_data);
|
||||
} else {
|
||||
/* There are still one or more ops pending. */
|
||||
/* Wait for all pending ops to return before freeing. */
|
||||
assert(sock_data->op_generation > 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
default:
|
||||
WSASetLastError(WSAEINVAL);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int epoll_wait(wpoll_t wpoll_hnd, struct wpoll_event* events, int maxevents, int timeout);
|
||||
|
||||
int epoll_wait(wpoll_t wpoll_hnd, struct wpoll_event* events, int maxevents, int timeout) {
|
||||
wpoll_handle_state_t* port_data = (wpoll_handle_state_t*) wpoll_hnd;
|
||||
|
||||
while (port_data->attn != NULL) {
|
||||
wpoll_socket_state_t* sock_data = port_data->attn;
|
||||
/* Check if we need to submit another req. */
|
||||
if (sock_data->events & WPOLL_EVENT_MASK & ~sock_data->submitted_events) {
|
||||
int r = wpoll__submit_poll_op(port_data, sock_data);
|
||||
assert(r == 0);
|
||||
}
|
||||
|
||||
/* Remove from attention list */
|
||||
port_data->attn = sock_data->attn_next;
|
||||
sock_data->attn_prev = sock_data->attn_next = NULL;
|
||||
}
|
||||
port_data->attn = NULL;
|
||||
|
||||
{
|
||||
DWORD due = GetTickCount() + timeout;
|
||||
do {
|
||||
DWORD result;
|
||||
ULONG count;
|
||||
OVERLAPPED_ENTRY entries[64];
|
||||
DWORD max_entries;
|
||||
DWORD i;
|
||||
int num_events = 0;
|
||||
|
||||
max_entries = ARRAY_COUNT(entries);
|
||||
if (max_entries > maxevents)
|
||||
max_entries = maxevents;
|
||||
|
||||
result = GetQueuedCompletionStatusEx(port_data->iocp,
|
||||
entries,
|
||||
max_entries,
|
||||
&count,
|
||||
timeout,
|
||||
FALSE);
|
||||
|
||||
if (!result) {
|
||||
DWORD error = GetLastError();
|
||||
if (error == WAIT_TIMEOUT) {
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* Successfully dequeued overlappeds. */
|
||||
for (i = 0; i < count; i++) {
|
||||
OVERLAPPED* overlapped = entries[i].lpOverlapped;
|
||||
wpoll_op_t* op = CONTAINING_RECORD(overlapped, wpoll_op_t, overlapped);
|
||||
wpoll_socket_state_t* sock_data = op->socket_state;
|
||||
DWORD afd_events;
|
||||
int reg_events, reported_events;
|
||||
|
||||
if (op->generation < sock_data->op_generation) {
|
||||
/* This op has been superseded. Free and ignore it. */
|
||||
free(op);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Dequeued the most recent op. Reset generation and submitted_events. */
|
||||
sock_data->op_generation = 0;
|
||||
sock_data->submitted_events = 0;
|
||||
sock_data->free_op = op;
|
||||
|
||||
/* Check for error. */
|
||||
if (!NT_SUCCESS(overlapped->Internal)) {
|
||||
struct wpoll_event* ev = events + (num_events++);
|
||||
ev->data.u64 = sock_data->user_data;
|
||||
ev->events = WPOLLERR;
|
||||
continue;
|
||||
}
|
||||
|
||||
reg_events = sock_data->events;
|
||||
afd_events = op->poll_info.Handles[0].Events;
|
||||
reported_events = 0;
|
||||
|
||||
/* Check for a closed socket, or a socket that has been removed */
|
||||
/* with EPOLL_CTL_DEL. */
|
||||
if ((afd_events & AFD_POLL_LOCAL_CLOSE) || reg_events == -1) {
|
||||
free(op);
|
||||
free(sock_data);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* Unless WPOLLONESHOT is used, add the socket back to the attention list. */
|
||||
if (!reg_events & WPOLLONESHOT) {
|
||||
if (port_data->attn == NULL) {
|
||||
sock_data->attn_next = sock_data->attn_prev = NULL;
|
||||
port_data->attn = sock_data;
|
||||
} else {
|
||||
sock_data->attn_prev = NULL;
|
||||
sock_data->attn_next = port_data->attn;
|
||||
port_data->attn->attn_next = sock_data;
|
||||
port_data->attn = sock_data;
|
||||
}
|
||||
}
|
||||
|
||||
/* Convert afd events to epoll events. */
|
||||
if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT))
|
||||
reported_events |= (WPOLLIN | WPOLLRDNORM) & reg_events;
|
||||
if (afd_events & AFD_POLL_RECEIVE_EXPEDITED)
|
||||
reported_events |= (WPOLLIN | WPOLLRDBAND) & reg_events;
|
||||
if (afd_events & AFD_POLL_SEND)
|
||||
reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND) & reg_events;
|
||||
if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT))
|
||||
reported_events |= (WPOLLRDHUP | WPOLLIN | WPOLLRDNORM | WPOLLRDBAND) & reg_events;
|
||||
if (afd_events & AFD_POLL_ABORT)
|
||||
reported_events |= WPOLLHUP | WPOLLERR;
|
||||
if (afd_events & AFD_POLL_CONNECT)
|
||||
reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND) & reg_events;
|
||||
if (afd_events & AFD_POLL_CONNECT_FAIL)
|
||||
reported_events |= WPOLLERR;
|
||||
|
||||
if (afd_events) {
|
||||
struct wpoll_event* ev = events + (num_events++);
|
||||
ev->data.u64 = sock_data->user_data;
|
||||
ev->events = reported_events;
|
||||
}
|
||||
}
|
||||
|
||||
if (num_events > 0)
|
||||
return num_events;
|
||||
|
||||
/* Events were dequeued, but none were relevant. Recompute timeout. */
|
||||
timeout = due - GetTickCount();
|
||||
} while (timeout > 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user