This commit is contained in:
Bert Belder 2012-09-06 02:28:12 +02:00
parent 0d07f24236
commit a1aa2c7d80
6 changed files with 733 additions and 202 deletions

View File

@ -49,10 +49,13 @@ struct wpoll_event {
wpoll_t wpoll_create();
int wpoll_close();
int wpoll_close(wpoll_t wpoll_hnd);
int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock, struct wpoll_event* event);
int epoll_wait(wpoll_t wpoll_hnd, struct wpoll_event* events, int maxevents, int timeout);
int wpoll_wait(wpoll_t wpoll_hnd, struct wpoll_event* events, int maxevents, int timeout);
int afd_init();
#endif /* WPOLL_H_ */

View File

@ -69,7 +69,7 @@ static const GUID AFD_PROVIDER_IDS[] = {
};
static int afd_init();
int afd_init();
int WSAAPI afd_poll(SOCKET socket, AFD_POLL_INFO* info,
OVERLAPPED* overlapped);

View File

@ -3,51 +3,190 @@
#include <WS2tcpip.h>
#include <Windows.h>
#include <assert.h>s
#include <assert.h>
#include <stdio.h>
#include <wpoll.h>
static const char PING[] = "PING";
static const int NUM_PINGERS = 10000;
static const int RUN_TIME = 10000;
int main(int argc, char* argv[]) {
wpoll_t wpoll_hnd;
SOCKET sock;
WSADATA wsa_data;
int r;
u_long one = 1;
struct addrinfo hints;
struct addrinfo* addrinfo;
struct sockaddr_in addr;
struct wpoll_event event;
struct sockaddr_in addr, bind_addr;
DWORD ticks_start, ticks_last;
long long pings = 0, pings_sent = 0;
int i;
r = WSAStartup(MAKEWORD(2, 2), &wsa_data);
assert(r == 0);
afd_init();
wpoll_hnd = wpoll_create();
assert(wpoll_hnd);
sock = socket(AF_INET, SOCK_STREAM, 0);
r = ioctlsocket(sock, FIONBIO, &one);
assert(r == 0);
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_INET;
hints.ai_protocol = IPPROTO_IP;
r = getaddrinfo("www.google.com", NULL, &hints, &addrinfo);
r = getaddrinfo("localhost", NULL, &hints, &addrinfo);
assert(r == 0);
assert(addrinfo->ai_addrlen <= sizeof addr);
memset(&addr, 0, sizeof addr);
addr.sin_addr = *(IN_ADDR*) &addrinfo->ai_addr;
addr.sin_family = addrinfo->ai_family;
addr.sin_port = htons(80);
memcpy(&addr, addrinfo->ai_addr, addrinfo->ai_addrlen);
addr.sin_port = htons(9123);
freeaddrinfo(addrinfo);
r = connect(sock, (sockaddr*) &addr, sizeof addr);\
assert(r == 0 || WSAGetLastError() == WSAEINPROGRESS);
printf("resolved\n");
event.events = WPOLLOUT | WPOLLERR;
event.data.sock = sock;
r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_ADD, sock, &event);
/*
bind_addr.sin_family = AF_INET;
bind_addr.sin_port = htons(0);
bind_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
memset(bind_addr.sin_zero, 0, sizeof bind_addr.sin_zero);
*/
for (i = 0; i < NUM_PINGERS; i++) {
SOCKET sock;
struct wpoll_event ev;
sock = socket(AF_INET, SOCK_STREAM, 0);
r = ioctlsocket(sock, FIONBIO, &one);
assert(r == 0);
r = connect(sock, (struct sockaddr*) &addr, sizeof addr);
/* Unlike unix, windows sets the error to EWOULDBLOCK when the connection */
/* is being established in the background. */
assert(r == 0 || WSAGetLastError() == WSAEWOULDBLOCK);
ev.events = WPOLLOUT | WPOLLERR;
ev.data.sock = sock;
r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_ADD, sock, &ev);
assert(r == 0);
}
{
SOCKET sock;
struct wpoll_event ev;
sock = socket(AF_INET, SOCK_STREAM, 0);
r = ioctlsocket(sock, FIONBIO, &one);
assert(r == 0);
ev.events = WPOLLOUT | WPOLLERR;
ev.data.sock = sock;
r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_ADD, sock, &ev);
assert(r == 0);
}
ticks_start = GetTickCount();
ticks_last = ticks_start;
for (;;) {
int i, count;
struct wpoll_event events[16];
DWORD ticks;
ticks = GetTickCount();
if (ticks >= ticks_last + 1000) {
printf("%lld pings (%f per sec), %lld sent\n", pings, (double) pings / (ticks - ticks_start) * 1000, pings_sent);
ticks_last = ticks;
if (ticks - ticks_start > RUN_TIME)
break;
}
count = wpoll_wait(wpoll_hnd, events, 16, 1000);
assert(count >= 0);
for (i = 0; i < count; i++) {
SOCKET sock = events[i].data.sock;
int revents = events[i].events;
if (revents & WPOLLERR) {
int r;
int err = -1;
int err_len = sizeof err;
r = getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_len);
assert(r == 0);
fprintf(stderr, "Socket error: %d\n", err);
r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_DEL, sock, NULL);
assert(r == 0);
continue;
}
if (revents & WPOLLIN) {
char buf[1024];
WSABUF wsa_buf;
DWORD flags, bytes;
struct wpoll_event ev;
int r;
wsa_buf.buf = buf;
wsa_buf.len = sizeof buf;
flags = 0;
r = WSARecv(sock, &wsa_buf, 1, &bytes, &flags, NULL, NULL);
assert(r >= 0);
assert(bytes == sizeof PING);
ev.data.sock = sock;
ev.events = WPOLLOUT;
r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_DEL, sock, &ev);
assert(r == 0);
r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_ADD, sock, &ev);
assert(r == 0);
pings++;
continue;
}
if (revents & WPOLLOUT) {
WSABUF wsa_buf;
DWORD bytes;
int r;
struct wpoll_event ev;
wsa_buf.buf = PING;
wsa_buf.len = sizeof PING;
r = WSASend(sock, &wsa_buf, 1, &bytes, 0, NULL, NULL);
assert(r >= 0);
assert(bytes == sizeof PING);
ev.data.sock = sock;
ev.events = WPOLLIN;
r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_MOD, sock, &ev);
assert(r == 0);
pings_sent++;
continue;
}
assert(0);
}
}
r = wpoll_close(wpoll_hnd);
assert(r == 0);
}

View File

@ -1,4 +1,5 @@
#include <stdio.h>
#include <assert.h>
#include <wpoll.h>
@ -9,31 +10,34 @@
#define WPOLL_KEY 0xE9011
typedef struct wpoll_port_data_s wpoll_port_data_t;
typedef struct wpoll_op_s wpoll_op_t;
typedef struct wpoll_socket_state_s wpoll_socket_state_t;
typedef struct wpoll_sock_data_s wpoll_sock_data_t;
/* State associated with a wpoll handle. */
typedef struct wpoll_handle_state_s {
struct wpoll_port_data_s {
HANDLE iocp;
SOCKET peer_sockets[ARRAY_COUNT(AFD_PROVIDER_IDS)];
RB_HEAD(wpoll_sock_data_tree, wpoll_sock_data_s) sock_data_tree;
wpoll_sock_data_t* attn;
size_t pending_ops_count;
RB_HEAD(wpoll_socket_state_tree, wpoll_socket_state_s) socket_state_tree;
wpoll_socket_state_t* attn;
} wpoll_handle_state_t;
};
/* State associated with a socket that is registered to the wpoll port. */
typedef struct wpoll_socket_state_s {
typedef struct wpoll_sock_data_s {
SOCKET sock;
SOCKET base_sock;
SOCKET peer_sock;
int op_generation;
int submitted_events;
int events;
int attn;
uint64_t user_data;
wpoll_op_t* free_op;
wpoll_socket_state_t* attn_prev;
wpoll_socket_state_t* attn_next;
RB_ENTRY(wpoll_socket_state_s) tree_entry;
wpoll_sock_data_t* attn_prev;
wpoll_sock_data_t* attn_next;
RB_ENTRY(wpoll_sock_data_s) tree_entry;
};
/* State associated with a AFD_POLL request. */
@ -41,23 +45,23 @@ struct wpoll_op_s {
OVERLAPPED overlapped;
AFD_POLL_INFO poll_info;
int generation;
wpoll_socket_state_t* socket_state;
wpoll_sock_data_t* sock_data;
};
int wpoll_socket_compare(wpoll_socket_state_t* a, wpoll_socket_state_t* b) {
int wpoll_socket_compare(wpoll_sock_data_t* a, wpoll_sock_data_t* b) {
return a->sock - b->sock;
}
RB_GENERATE_STATIC(wpoll_socket_state_tree, wpoll_socket_state_s, tree_entry, wpoll_socket_compare)
RB_GENERATE_STATIC(wpoll_sock_data_tree, wpoll_sock_data_s, tree_entry, wpoll_socket_compare)
wpoll_t wpoll_create() {
HANDLE iocp;
wpoll_handle_state_t* state = malloc(sizeof *state);
if (state == NULL) {
wpoll_port_data_t* port_data = malloc(sizeof *port_data);
if (port_data == NULL) {
SetLastError(ERROR_OUTOFMEMORY);
return NULL;
}
@ -67,17 +71,18 @@ wpoll_t wpoll_create() {
0,
0);
if (iocp == INVALID_HANDLE_VALUE) {
free(state);
free(port_data);
return NULL;
}
state->iocp = iocp;
state->attn = NULL;
memset(&state->peer_sockets, 0, sizeof state->peer_sockets);
state->pending_ops_count = 0;
RB_INIT(&state->socket_state_tree);
port_data->iocp = iocp;
port_data->attn = NULL;
port_data->pending_ops_count = 0;
return (wpoll_t*) state;
memset(&port_data->peer_sockets, 0, sizeof port_data->peer_sockets);
RB_INIT(&port_data->sock_data_tree);
return (wpoll_t) port_data;
}
static SOCKET wpoll__create_peer_socket(HANDLE iocp,
@ -113,7 +118,7 @@ static SOCKET wpoll__create_peer_socket(HANDLE iocp,
}
static SOCKET wpoll__get_peer_socket(wpoll_handle_state_t* port_data,
static SOCKET wpoll__get_peer_socket(wpoll_port_data_t* port_data,
WSAPROTOCOL_INFOW* protocol_info) {
int index, i;
SOCKET peer_socket;
@ -146,16 +151,20 @@ static SOCKET wpoll__get_peer_socket(wpoll_handle_state_t* port_data,
}
int wpoll__submit_poll_op(wpoll_handle_state_t* port_data, wpoll_socket_state_t* sock_data) {
wpoll_op_t* op = sock_data->free_op;
int wpoll__submit_poll_op(wpoll_port_data_t* port_data, wpoll_sock_data_t* sock_data) {
wpoll_op_t* op;
int events;
DWORD result, afd_events;
assert(op != NULL);
op = sock_data->free_op;
events = sock_data->events;
/* Always observe these events. */
/* wpoll_ctl should ensure that there is a free op struct. */
assert(op != NULL);
/* These events should always be registered. */
assert(events & WPOLLERR);
assert(events & WPOLLHUP);
afd_events = AFD_POLL_ABORT | AFD_POLL_CONNECT_FAIL | AFD_POLL_LOCAL_CLOSE;
if (events & (WPOLLIN | WPOLLRDNORM))
@ -163,11 +172,11 @@ int wpoll__submit_poll_op(wpoll_handle_state_t* port_data, wpoll_socket_state_t*
if (events & (WPOLLIN | WPOLLRDBAND))
afd_events |= AFD_POLL_RECEIVE_EXPEDITED;
if (events & (WPOLLOUT | WPOLLWRNORM | WPOLLRDBAND))
afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT;
afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT;
op->generation = ++sock_data->op_generation;
op->socket_state = sock_data;
op->sock_data = sock_data;
memset(&op->overlapped, 0, sizeof op->overlapped);
op->poll_info.Exclusive = TRUE;
@ -190,20 +199,21 @@ int wpoll__submit_poll_op(wpoll_handle_state_t* port_data, wpoll_socket_state_t*
}
sock_data->free_op = NULL;
port_data->pending_ops_count++;
return 0;
}
int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
int wpoll_ctl(wpoll_t port_handle, int op, SOCKET sock,
struct wpoll_event* event) {
wpoll_handle_state_t* port_data;
wpoll_port_data_t* port_data;
port_data = (wpoll_handle_state_t*) wpoll_hnd;
port_data = (wpoll_port_data_t*) port_handle;
switch (op) {
case WPOLL_CTL_ADD: {
wpoll_socket_state_t* sock_data;
wpoll_sock_data_t* sock_data;
wpoll_op_t* op;
SOCKET peer_sock;
WSAPROTOCOL_INFOW protocol_info;
@ -218,12 +228,12 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
&len) != 0) {
return -1;
}
peer_sock = wpoll__get_peer_socket(port_data, &protocol_info);
if (peer_sock == INVALID_SOCKET) {
return -1;
}
sock_data = malloc(sizeof *sock_data);
if (sock_data == NULL) {
SetLastError(ERROR_OUTOFMEMORY);
@ -242,41 +252,43 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
sock_data->base_sock = sock;
sock_data->op_generation = 0;
sock_data->submitted_events = 0;
sock_data->events = event->events;
sock_data->events = event->events | WPOLLERR | WPOLLHUP;
sock_data->user_data = event->data.u64;
sock_data->peer_sock = peer_sock;
sock_data->free_op = op;
if (RB_INSERT(wpoll_socket_state_tree, &port_data->socket_state_tree, sock_data) != NULL) {
/* Poll entry was already there. */
if (RB_INSERT(wpoll_sock_data_tree, &port_data->sock_data_tree, sock_data) != NULL) {
/* Socket was already added. */
free(sock_data);
free(op);
SetLastError(ERROR_ALREADY_EXISTS);
return -1;
}
// Add to attention list
sock_data->attn = 1;
sock_data->attn_prev = NULL;
sock_data->attn_next = port_data->attn;
port_data->attn->attn_next = sock_data;
if (port_data->attn)
port_data->attn->attn_prev = sock_data;
port_data->attn = sock_data;
return 0;
}
case WPOLL_CTL_MOD: {
wpoll_socket_state_t lookup;
wpoll_socket_state_t* sock_data;
wpoll_sock_data_t lookup;
wpoll_sock_data_t* sock_data;
lookup.sock = sock;
sock_data = RB_FIND(wpoll_socket_state_tree, &port_data->socket_state_tree, &lookup);
sock_data = RB_FIND(wpoll_sock_data_tree, &port_data->sock_data_tree, &lookup);
if (sock_data == NULL) {
/* Socket has not been registered with wpoll instance. */
SetLastError(ERROR_NOT_FOUND);
return -1;
}
if (event->events & ~sock_data->events) {
if (event->events & ~sock_data->submitted_events) {
if (sock_data->free_op == NULL) {
wpoll_op_t* op = malloc(sizeof *op);
if (op == NULL) {
@ -287,40 +299,50 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
sock_data->free_op = NULL;
}
// Add to attention list
sock_data->attn_prev = NULL;
sock_data->attn_next = port_data->attn;
port_data->attn->attn_next = sock_data;
port_data->attn = sock_data;
// Add to attention list, if not already added.
if (!sock_data->attn) {
sock_data->attn_prev = NULL;
sock_data->attn_next = port_data->attn;
if (port_data->attn)
port_data->attn->attn_prev = sock_data;
port_data->attn = sock_data;
sock_data->attn = 1;
}
}
sock_data->events = event->events | WPOLLERR | WPOLLHUP;
sock_data->user_data = event->data.u64;
return 0;
}
case WPOLL_CTL_DEL: {
wpoll_socket_state_t lookup;
wpoll_socket_state_t* sock_data;
wpoll_sock_data_t lookup;
wpoll_sock_data_t* sock_data;
lookup.sock = sock;
sock_data = RB_REMOVE(wpoll_socket_state_tree, &port_data->socket_state_tree, &lookup);
sock_data = RB_FIND(wpoll_sock_data_tree, &port_data->sock_data_tree, &lookup);
if (sock_data == NULL) {
/* Socket has not been registered with wpoll instance. */
SetLastError(ERROR_NOT_FOUND);
return -1;
}
RB_REMOVE(wpoll_sock_data_tree, &port_data->sock_data_tree, sock_data);
free(sock_data->free_op);
sock_data->events = -1;
/* Remove from attention list. */
if (sock_data->attn_prev != NULL)
sock_data->attn_prev->attn_next = sock_data->attn_next;
if (sock_data->attn_next != NULL)
sock_data->attn_next->attn_prev = sock_data->attn_prev;
if (port_data->attn == sock_data)
port_data->attn = sock_data->attn_next;
sock_data->attn_prev = sock_data->attn_next = NULL;
if (sock_data->attn) {
if (sock_data->attn_prev != NULL)
sock_data->attn_prev->attn_next = sock_data->attn_next;
if (sock_data->attn_next != NULL)
sock_data->attn_next->attn_prev = sock_data->attn_prev;
if (port_data->attn == sock_data)
port_data->attn = sock_data->attn_next;
sock_data->attn = 0;
sock_data->attn_prev = sock_data->attn_next = NULL;
}
if (sock_data->submitted_events == 0) {
assert(sock_data->op_generation == 0);
@ -341,135 +363,249 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock,
}
int epoll_wait(wpoll_t wpoll_hnd, struct wpoll_event* events, int maxevents, int timeout) {
wpoll_handle_state_t* port_data = (wpoll_handle_state_t*) wpoll_hnd;
int wpoll_wait(wpoll_t port_handle, struct wpoll_event* events, int maxevents, int timeout) {
wpoll_port_data_t* port_data;
DWORD due;
DWORD gqcs_timeout;
port_data = (wpoll_port_data_t*) port_handle;
/* Create overlapped poll operations for all sockets on the attention list. */
while (port_data->attn != NULL) {
wpoll_socket_state_t* sock_data = port_data->attn;
wpoll_sock_data_t* sock_data = port_data->attn;
assert(sock_data->attn);
/* Check if we need to submit another req. */
if (sock_data->events & WPOLL_EVENT_MASK & ~sock_data->submitted_events) {
int r = wpoll__submit_poll_op(port_data, sock_data);
assert(r == 0);
/* TODO: handle error. */
}
/* Remove from attention list */
port_data->attn = sock_data->attn_next;
sock_data->attn_prev = sock_data->attn_next = NULL;
sock_data->attn = 0;
}
port_data->attn = NULL;
{
DWORD due = GetTickCount() + timeout;
do {
DWORD result;
ULONG count;
OVERLAPPED_ENTRY entries[64];
DWORD max_entries;
DWORD i;
int num_events = 0;
max_entries = ARRAY_COUNT(entries);
if (max_entries > maxevents)
max_entries = maxevents;
result = GetQueuedCompletionStatusEx(port_data->iocp,
entries,
max_entries,
&count,
timeout,
FALSE);
if (!result) {
DWORD error = GetLastError();
if (error == WAIT_TIMEOUT) {
return 0;
} else {
return -1;
}
}
/* Successfully dequeued overlappeds. */
for (i = 0; i < count; i++) {
OVERLAPPED* overlapped = entries[i].lpOverlapped;
wpoll_op_t* op = CONTAINING_RECORD(overlapped, wpoll_op_t, overlapped);
wpoll_socket_state_t* sock_data = op->socket_state;
DWORD afd_events;
int reg_events, reported_events;
if (op->generation < sock_data->op_generation) {
/* This op has been superseded. Free and ignore it. */
free(op);
continue;
}
/* Dequeued the most recent op. Reset generation and submitted_events. */
sock_data->op_generation = 0;
sock_data->submitted_events = 0;
sock_data->free_op = op;
/* Check for error. */
if (!NT_SUCCESS(overlapped->Internal)) {
struct wpoll_event* ev = events + (num_events++);
ev->data.u64 = sock_data->user_data;
ev->events = WPOLLERR;
continue;
}
reg_events = sock_data->events;
afd_events = op->poll_info.Handles[0].Events;
reported_events = 0;
/* Check for a closed socket, or a socket that has been removed */
/* with EPOLL_CTL_DEL. */
if ((afd_events & AFD_POLL_LOCAL_CLOSE) || reg_events == -1) {
free(op);
free(sock_data);
continue;
}
/* Unless WPOLLONESHOT is used, add the socket back to the attention list. */
if (!reg_events & WPOLLONESHOT) {
if (port_data->attn == NULL) {
sock_data->attn_next = sock_data->attn_prev = NULL;
port_data->attn = sock_data;
} else {
sock_data->attn_prev = NULL;
sock_data->attn_next = port_data->attn;
port_data->attn->attn_next = sock_data;
port_data->attn = sock_data;
}
}
/* Convert afd events to epoll events. */
if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT))
reported_events |= (WPOLLIN | WPOLLRDNORM) & reg_events;
if (afd_events & AFD_POLL_RECEIVE_EXPEDITED)
reported_events |= (WPOLLIN | WPOLLRDBAND) & reg_events;
if (afd_events & AFD_POLL_SEND)
reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND) & reg_events;
if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT))
reported_events |= (WPOLLRDHUP | WPOLLIN | WPOLLRDNORM | WPOLLRDBAND) & reg_events;
if (afd_events & AFD_POLL_ABORT)
reported_events |= WPOLLHUP | WPOLLERR;
if (afd_events & AFD_POLL_CONNECT)
reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND) & reg_events;
if (afd_events & AFD_POLL_CONNECT_FAIL)
reported_events |= WPOLLERR;
if (afd_events) {
struct wpoll_event* ev = events + (num_events++);
ev->data.u64 = sock_data->user_data;
ev->events = reported_events;
}
}
if (num_events > 0)
return num_events;
/* Events were dequeued, but none were relevant. Recompute timeout. */
timeout = due - GetTickCount();
} while (timeout > 0);
return 0;
/* Compute the timeout for GetQueuedCompletionStatus, and the wait end */
/* time, if the user specified a timeout other than zero or infinite. */
if (timeout > 0) {
due = GetTickCount() + timeout;
gqcs_timeout = (DWORD) timeout;
} else if (timeout == 0) {
gqcs_timeout = 0;
} else {
gqcs_timeout = INFINITE;
}
/* Dequeue completion packets until either at least one interesting event */
/* has been discovered, or the timeout is reached. */
do {
DWORD result, max_entries;
ULONG count, i;
OVERLAPPED_ENTRY entries[64];
int num_events = 0;
/* Compute how much overlapped entries can be dequeued at most. */
max_entries = ARRAY_COUNT(entries);
if ((int) max_entries > maxevents)
max_entries = maxevents;
result = GetQueuedCompletionStatusEx(port_data->iocp,
entries,
max_entries,
&count,
gqcs_timeout,
FALSE);
if (!result) {
DWORD error = GetLastError();
if (error == WAIT_TIMEOUT) {
return 0;
} else {
return -1;
}
}
port_data->pending_ops_count -= count;
/* Successfully dequeued overlappeds. */
for (i = 0; i < count; i++) {
OVERLAPPED* overlapped;
wpoll_op_t* op;
wpoll_sock_data_t* sock_data;
DWORD afd_events;
int registered_events, reported_events;
overlapped = entries[i].lpOverlapped;
op = CONTAINING_RECORD(overlapped, wpoll_op_t, overlapped);
sock_data = op->sock_data;
if (op->generation < sock_data->op_generation) {
/* This op has been superseded. Free and ignore it. */
free(op);
continue;
}
/* Dequeued the most recent op. Reset generation and submitted_events. */
sock_data->op_generation = 0;
sock_data->submitted_events = 0;
sock_data->free_op = op;
registered_events = sock_data->events;
reported_events = 0;
/* Check if this op was associated with a socket that was removed */
/* with EPOLL_CTL_DEL. */
if (registered_events == -1) {
free(op);
free(sock_data);
continue;
}
/* Check for error. */
if (!NT_SUCCESS(overlapped->Internal)) {
struct wpoll_event* ev = events + (num_events++);
ev->data.u64 = sock_data->user_data;
ev->events = WPOLLERR;
continue;
}
if (op->poll_info.NumberOfHandles == 0) {
/* NumberOfHandles can be zero if this poll operation was canceled */
/* due to a more recent exclusive poll operation. */
afd_events = 0;
} else {
afd_events = op->poll_info.Handles[0].Events;
}
/* Check for a closed socket. */
if (afd_events & AFD_POLL_LOCAL_CLOSE) {
free(op);
free(sock_data);
continue;
}
/* Convert afd events to epoll events. */
if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT))
reported_events |= (WPOLLIN | WPOLLRDNORM);
if (afd_events & AFD_POLL_RECEIVE_EXPEDITED)
reported_events |= (WPOLLIN | WPOLLRDBAND);
if (afd_events & AFD_POLL_SEND)
reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND);
if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT))
reported_events |= (WPOLLRDHUP | WPOLLIN | WPOLLRDNORM | WPOLLRDBAND);
if (afd_events & AFD_POLL_ABORT)
reported_events |= WPOLLHUP | WPOLLERR;
if (afd_events & AFD_POLL_CONNECT)
reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND);
if (afd_events & AFD_POLL_CONNECT_FAIL)
reported_events |= WPOLLERR;
/* Don't report events that the user didn't specify. */
reported_events &= registered_events;
/* Unless WPOLLONESHOT is used or no events were reported that the */
/* user is interested in, add the socket back to the attention list. */
if (!registered_events & WPOLLONESHOT || reported_events == 0) {
assert(!sock_data->attn);
if (port_data->attn == NULL) {
sock_data->attn_next = sock_data->attn_prev = NULL;
port_data->attn = sock_data;
} else {
sock_data->attn_prev = NULL;
sock_data->attn_next = port_data->attn;
port_data->attn->attn_next = sock_data;
port_data->attn = sock_data;
}
}
if (reported_events) {
struct wpoll_event* ev = events + (num_events++);
ev->data.u64 = sock_data->user_data;
ev->events = reported_events;
}
}
if (num_events > 0)
return num_events;
/* Events were dequeued, but none were relevant. Recompute timeout. */
if (timeout > 0) {
gqcs_timeout = due - GetTickCount();
}
} while (timeout > 0);
return 0;
}
int wpoll_close(wpoll_t port_handle) {
wpoll_port_data_t* port_data;
wpoll_sock_data_t* sock_data;
int i;
port_data = (wpoll_port_data_t*) port_handle;
/* Close all peer sockets. This will make all pending ops return. */
for (i = 0; i < ARRAY_COUNT(port_data->peer_sockets); i++) {
SOCKET peer_sock = port_data->peer_sockets[i];
if (peer_sock != 0 && peer_sock != INVALID_SOCKET) {
if (closesocket(peer_sock) != 0)
return -1;
port_data->peer_sockets[i] = 0;
}
}
/* There is no list of wpoll_ops the free. And even if there was, just */
/* freeing them would be dangerous since the kernel might still alter */
/* the overlapped status contained in them. But since we are sure that */
/* all ops will soon return, just await them all. */
while (port_data->pending_ops_count > 0) {
OVERLAPPED_ENTRY entries[64];
DWORD result;
ULONG count, i;
printf("ops: %d\n", port_data->pending_ops_count);
result = GetQueuedCompletionStatusEx(port_data->iocp,
entries,
ARRAY_COUNT(entries),
&count,
INFINITE,
FALSE);
if (!result) {
DWORD error = GetLastError();
return -1;
}
port_data->pending_ops_count -= count;
for (i = 0; i < count; i++) {
wpoll_op_t* op = CONTAINING_RECORD(entries[i].lpOverlapped,
wpoll_op_t,
overlapped);
free(op);
}
}
/* Remove all entries from the socket_state tree. */
while (sock_data = RB_ROOT(&port_data->sock_data_tree)) {
RB_REMOVE(wpoll_sock_data_tree, &port_data->sock_data_tree, sock_data);
if (sock_data->free_op != NULL)
free(sock_data->free_op);
free(sock_data);
}
/* Close the I/O completion port. */
CloseHandle(port_data->iocp);
/* Finally, remove the port data. */
free(port_data);
return 0;
}

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long