diff --git a/include/wpoll.h b/include/wpoll.h index 05eeec3..5a3ccd4 100644 --- a/include/wpoll.h +++ b/include/wpoll.h @@ -49,10 +49,13 @@ struct wpoll_event { wpoll_t wpoll_create(); -int wpoll_close(); +int wpoll_close(wpoll_t wpoll_hnd); int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock, struct wpoll_event* event); -int epoll_wait(wpoll_t wpoll_hnd, struct wpoll_event* events, int maxevents, int timeout); +int wpoll_wait(wpoll_t wpoll_hnd, struct wpoll_event* events, int maxevents, int timeout); + +int afd_init(); + #endif /* WPOLL_H_ */ \ No newline at end of file diff --git a/src/msafd.h b/src/msafd.h index 4b0ff49..680df03 100644 --- a/src/msafd.h +++ b/src/msafd.h @@ -69,7 +69,7 @@ static const GUID AFD_PROVIDER_IDS[] = { }; -static int afd_init(); +int afd_init(); int WSAAPI afd_poll(SOCKET socket, AFD_POLL_INFO* info, OVERLAPPED* overlapped); diff --git a/src/test.c b/src/test.c index 5cc0ada..3bc094c 100644 --- a/src/test.c +++ b/src/test.c @@ -3,51 +3,190 @@ #include #include -#include s +#include +#include #include +static const char PING[] = "PING"; +static const int NUM_PINGERS = 10000; +static const int RUN_TIME = 10000; + int main(int argc, char* argv[]) { wpoll_t wpoll_hnd; - SOCKET sock; WSADATA wsa_data; int r; u_long one = 1; struct addrinfo hints; struct addrinfo* addrinfo; - struct sockaddr_in addr; - struct wpoll_event event; + struct sockaddr_in addr, bind_addr; + DWORD ticks_start, ticks_last; + long long pings = 0, pings_sent = 0; + int i; r = WSAStartup(MAKEWORD(2, 2), &wsa_data); assert(r == 0); + afd_init(); + wpoll_hnd = wpoll_create(); assert(wpoll_hnd); - sock = socket(AF_INET, SOCK_STREAM, 0); - - r = ioctlsocket(sock, FIONBIO, &one); - assert(r == 0); - memset(&hints, 0, sizeof hints); hints.ai_family = AF_INET; hints.ai_protocol = IPPROTO_IP; - r = getaddrinfo("www.google.com", NULL, &hints, &addrinfo); + r = getaddrinfo("localhost", NULL, &hints, &addrinfo); assert(r == 0); + assert(addrinfo->ai_addrlen <= sizeof addr); memset(&addr, 0, sizeof addr); - addr.sin_addr = *(IN_ADDR*) &addrinfo->ai_addr; - addr.sin_family = addrinfo->ai_family; - addr.sin_port = htons(80); + memcpy(&addr, addrinfo->ai_addr, addrinfo->ai_addrlen); + addr.sin_port = htons(9123); freeaddrinfo(addrinfo); - r = connect(sock, (sockaddr*) &addr, sizeof addr);\ - assert(r == 0 || WSAGetLastError() == WSAEINPROGRESS); + printf("resolved\n"); - event.events = WPOLLOUT | WPOLLERR; - event.data.sock = sock; - r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_ADD, sock, &event); + /* + bind_addr.sin_family = AF_INET; + bind_addr.sin_port = htons(0); + bind_addr.sin_addr.S_un.S_addr = htonl(INADDR_ANY); + memset(bind_addr.sin_zero, 0, sizeof bind_addr.sin_zero); + */ - + + for (i = 0; i < NUM_PINGERS; i++) { + SOCKET sock; + struct wpoll_event ev; + + sock = socket(AF_INET, SOCK_STREAM, 0); + + r = ioctlsocket(sock, FIONBIO, &one); + assert(r == 0); + + r = connect(sock, (struct sockaddr*) &addr, sizeof addr); + /* Unlike unix, windows sets the error to EWOULDBLOCK when the connection */ + /* is being established in the background. */ + assert(r == 0 || WSAGetLastError() == WSAEWOULDBLOCK); + + ev.events = WPOLLOUT | WPOLLERR; + ev.data.sock = sock; + r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_ADD, sock, &ev); + assert(r == 0); + } + + { + SOCKET sock; + struct wpoll_event ev; + + sock = socket(AF_INET, SOCK_STREAM, 0); + + r = ioctlsocket(sock, FIONBIO, &one); + assert(r == 0); + + ev.events = WPOLLOUT | WPOLLERR; + ev.data.sock = sock; + r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_ADD, sock, &ev); + assert(r == 0); + } + + + ticks_start = GetTickCount(); + ticks_last = ticks_start; + + for (;;) { + int i, count; + struct wpoll_event events[16]; + DWORD ticks; + + ticks = GetTickCount(); + if (ticks >= ticks_last + 1000) { + printf("%lld pings (%f per sec), %lld sent\n", pings, (double) pings / (ticks - ticks_start) * 1000, pings_sent); + ticks_last = ticks; + + if (ticks - ticks_start > RUN_TIME) + break; + } + + count = wpoll_wait(wpoll_hnd, events, 16, 1000); + assert(count >= 0); + + for (i = 0; i < count; i++) { + SOCKET sock = events[i].data.sock; + int revents = events[i].events; + + if (revents & WPOLLERR) { + int r; + int err = -1; + int err_len = sizeof err; + + r = getsockopt(sock, SOL_SOCKET, SO_ERROR, &err, &err_len); + assert(r == 0); + fprintf(stderr, "Socket error: %d\n", err); + + r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_DEL, sock, NULL); + assert(r == 0); + continue; + } + + if (revents & WPOLLIN) { + char buf[1024]; + WSABUF wsa_buf; + DWORD flags, bytes; + struct wpoll_event ev; + int r; + + wsa_buf.buf = buf; + wsa_buf.len = sizeof buf; + + flags = 0; + r = WSARecv(sock, &wsa_buf, 1, &bytes, &flags, NULL, NULL); + assert(r >= 0); + + assert(bytes == sizeof PING); + + ev.data.sock = sock; + ev.events = WPOLLOUT; + + r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_DEL, sock, &ev); + assert(r == 0); + r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_ADD, sock, &ev); + assert(r == 0); + + pings++; + + continue; + } + + if (revents & WPOLLOUT) { + WSABUF wsa_buf; + DWORD bytes; + int r; + struct wpoll_event ev; + + wsa_buf.buf = PING; + wsa_buf.len = sizeof PING; + r = WSASend(sock, &wsa_buf, 1, &bytes, 0, NULL, NULL); + assert(r >= 0); + assert(bytes == sizeof PING); + + ev.data.sock = sock; + ev.events = WPOLLIN; + + r = wpoll_ctl(wpoll_hnd, WPOLL_CTL_MOD, sock, &ev); + assert(r == 0); + + pings_sent++; + + continue; + } + + assert(0); + } + } + + r = wpoll_close(wpoll_hnd); + assert(r == 0); } + + diff --git a/src/wpoll.c b/src/wpoll.c index d85f0eb..3a7ba63 100644 --- a/src/wpoll.c +++ b/src/wpoll.c @@ -1,4 +1,5 @@ - + +#include #include #include @@ -9,31 +10,34 @@ #define WPOLL_KEY 0xE9011 +typedef struct wpoll_port_data_s wpoll_port_data_t; typedef struct wpoll_op_s wpoll_op_t; -typedef struct wpoll_socket_state_s wpoll_socket_state_t; +typedef struct wpoll_sock_data_s wpoll_sock_data_t; + /* State associated with a wpoll handle. */ -typedef struct wpoll_handle_state_s { +struct wpoll_port_data_s { HANDLE iocp; SOCKET peer_sockets[ARRAY_COUNT(AFD_PROVIDER_IDS)]; + RB_HEAD(wpoll_sock_data_tree, wpoll_sock_data_s) sock_data_tree; + wpoll_sock_data_t* attn; size_t pending_ops_count; - RB_HEAD(wpoll_socket_state_tree, wpoll_socket_state_s) socket_state_tree; - wpoll_socket_state_t* attn; -} wpoll_handle_state_t; +}; /* State associated with a socket that is registered to the wpoll port. */ -typedef struct wpoll_socket_state_s { +typedef struct wpoll_sock_data_s { SOCKET sock; SOCKET base_sock; SOCKET peer_sock; int op_generation; int submitted_events; int events; + int attn; uint64_t user_data; wpoll_op_t* free_op; - wpoll_socket_state_t* attn_prev; - wpoll_socket_state_t* attn_next; - RB_ENTRY(wpoll_socket_state_s) tree_entry; + wpoll_sock_data_t* attn_prev; + wpoll_sock_data_t* attn_next; + RB_ENTRY(wpoll_sock_data_s) tree_entry; }; /* State associated with a AFD_POLL request. */ @@ -41,23 +45,23 @@ struct wpoll_op_s { OVERLAPPED overlapped; AFD_POLL_INFO poll_info; int generation; - wpoll_socket_state_t* socket_state; + wpoll_sock_data_t* sock_data; }; -int wpoll_socket_compare(wpoll_socket_state_t* a, wpoll_socket_state_t* b) { +int wpoll_socket_compare(wpoll_sock_data_t* a, wpoll_sock_data_t* b) { return a->sock - b->sock; } -RB_GENERATE_STATIC(wpoll_socket_state_tree, wpoll_socket_state_s, tree_entry, wpoll_socket_compare) +RB_GENERATE_STATIC(wpoll_sock_data_tree, wpoll_sock_data_s, tree_entry, wpoll_socket_compare) wpoll_t wpoll_create() { HANDLE iocp; - wpoll_handle_state_t* state = malloc(sizeof *state); - if (state == NULL) { + wpoll_port_data_t* port_data = malloc(sizeof *port_data); + if (port_data == NULL) { SetLastError(ERROR_OUTOFMEMORY); return NULL; } @@ -67,17 +71,18 @@ wpoll_t wpoll_create() { 0, 0); if (iocp == INVALID_HANDLE_VALUE) { - free(state); + free(port_data); return NULL; } - state->iocp = iocp; - state->attn = NULL; - memset(&state->peer_sockets, 0, sizeof state->peer_sockets); - state->pending_ops_count = 0; - RB_INIT(&state->socket_state_tree); + port_data->iocp = iocp; + port_data->attn = NULL; + port_data->pending_ops_count = 0; - return (wpoll_t*) state; + memset(&port_data->peer_sockets, 0, sizeof port_data->peer_sockets); + RB_INIT(&port_data->sock_data_tree); + + return (wpoll_t) port_data; } static SOCKET wpoll__create_peer_socket(HANDLE iocp, @@ -113,7 +118,7 @@ static SOCKET wpoll__create_peer_socket(HANDLE iocp, } -static SOCKET wpoll__get_peer_socket(wpoll_handle_state_t* port_data, +static SOCKET wpoll__get_peer_socket(wpoll_port_data_t* port_data, WSAPROTOCOL_INFOW* protocol_info) { int index, i; SOCKET peer_socket; @@ -146,16 +151,20 @@ static SOCKET wpoll__get_peer_socket(wpoll_handle_state_t* port_data, } -int wpoll__submit_poll_op(wpoll_handle_state_t* port_data, wpoll_socket_state_t* sock_data) { - wpoll_op_t* op = sock_data->free_op; +int wpoll__submit_poll_op(wpoll_port_data_t* port_data, wpoll_sock_data_t* sock_data) { + wpoll_op_t* op; int events; DWORD result, afd_events; - assert(op != NULL); - + op = sock_data->free_op; events = sock_data->events; - /* Always observe these events. */ + /* wpoll_ctl should ensure that there is a free op struct. */ + assert(op != NULL); + + /* These events should always be registered. */ + assert(events & WPOLLERR); + assert(events & WPOLLHUP); afd_events = AFD_POLL_ABORT | AFD_POLL_CONNECT_FAIL | AFD_POLL_LOCAL_CLOSE; if (events & (WPOLLIN | WPOLLRDNORM)) @@ -163,11 +172,11 @@ int wpoll__submit_poll_op(wpoll_handle_state_t* port_data, wpoll_socket_state_t* if (events & (WPOLLIN | WPOLLRDBAND)) afd_events |= AFD_POLL_RECEIVE_EXPEDITED; if (events & (WPOLLOUT | WPOLLWRNORM | WPOLLRDBAND)) - afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; + afd_events |= AFD_POLL_SEND | AFD_POLL_CONNECT; op->generation = ++sock_data->op_generation; - op->socket_state = sock_data; - + op->sock_data = sock_data; + memset(&op->overlapped, 0, sizeof op->overlapped); op->poll_info.Exclusive = TRUE; @@ -190,20 +199,21 @@ int wpoll__submit_poll_op(wpoll_handle_state_t* port_data, wpoll_socket_state_t* } sock_data->free_op = NULL; + port_data->pending_ops_count++; return 0; } -int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock, +int wpoll_ctl(wpoll_t port_handle, int op, SOCKET sock, struct wpoll_event* event) { - wpoll_handle_state_t* port_data; + wpoll_port_data_t* port_data; - port_data = (wpoll_handle_state_t*) wpoll_hnd; + port_data = (wpoll_port_data_t*) port_handle; switch (op) { case WPOLL_CTL_ADD: { - wpoll_socket_state_t* sock_data; + wpoll_sock_data_t* sock_data; wpoll_op_t* op; SOCKET peer_sock; WSAPROTOCOL_INFOW protocol_info; @@ -218,12 +228,12 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock, &len) != 0) { return -1; } - + peer_sock = wpoll__get_peer_socket(port_data, &protocol_info); if (peer_sock == INVALID_SOCKET) { return -1; } - + sock_data = malloc(sizeof *sock_data); if (sock_data == NULL) { SetLastError(ERROR_OUTOFMEMORY); @@ -242,41 +252,43 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock, sock_data->base_sock = sock; sock_data->op_generation = 0; sock_data->submitted_events = 0; - sock_data->events = event->events; + sock_data->events = event->events | WPOLLERR | WPOLLHUP; sock_data->user_data = event->data.u64; sock_data->peer_sock = peer_sock; sock_data->free_op = op; - if (RB_INSERT(wpoll_socket_state_tree, &port_data->socket_state_tree, sock_data) != NULL) { - /* Poll entry was already there. */ + if (RB_INSERT(wpoll_sock_data_tree, &port_data->sock_data_tree, sock_data) != NULL) { + /* Socket was already added. */ free(sock_data); free(op); SetLastError(ERROR_ALREADY_EXISTS); return -1; } - + // Add to attention list + sock_data->attn = 1; sock_data->attn_prev = NULL; sock_data->attn_next = port_data->attn; - port_data->attn->attn_next = sock_data; + if (port_data->attn) + port_data->attn->attn_prev = sock_data; port_data->attn = sock_data; return 0; } case WPOLL_CTL_MOD: { - wpoll_socket_state_t lookup; - wpoll_socket_state_t* sock_data; + wpoll_sock_data_t lookup; + wpoll_sock_data_t* sock_data; lookup.sock = sock; - sock_data = RB_FIND(wpoll_socket_state_tree, &port_data->socket_state_tree, &lookup); + sock_data = RB_FIND(wpoll_sock_data_tree, &port_data->sock_data_tree, &lookup); if (sock_data == NULL) { /* Socket has not been registered with wpoll instance. */ SetLastError(ERROR_NOT_FOUND); return -1; } - if (event->events & ~sock_data->events) { + if (event->events & ~sock_data->submitted_events) { if (sock_data->free_op == NULL) { wpoll_op_t* op = malloc(sizeof *op); if (op == NULL) { @@ -287,40 +299,50 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock, sock_data->free_op = NULL; } - // Add to attention list - sock_data->attn_prev = NULL; - sock_data->attn_next = port_data->attn; - port_data->attn->attn_next = sock_data; - port_data->attn = sock_data; + // Add to attention list, if not already added. + if (!sock_data->attn) { + sock_data->attn_prev = NULL; + sock_data->attn_next = port_data->attn; + if (port_data->attn) + port_data->attn->attn_prev = sock_data; + port_data->attn = sock_data; + sock_data->attn = 1; + } } + sock_data->events = event->events | WPOLLERR | WPOLLHUP; sock_data->user_data = event->data.u64; return 0; } case WPOLL_CTL_DEL: { - wpoll_socket_state_t lookup; - wpoll_socket_state_t* sock_data; + wpoll_sock_data_t lookup; + wpoll_sock_data_t* sock_data; lookup.sock = sock; - sock_data = RB_REMOVE(wpoll_socket_state_tree, &port_data->socket_state_tree, &lookup); + sock_data = RB_FIND(wpoll_sock_data_tree, &port_data->sock_data_tree, &lookup); if (sock_data == NULL) { /* Socket has not been registered with wpoll instance. */ SetLastError(ERROR_NOT_FOUND); return -1; } + RB_REMOVE(wpoll_sock_data_tree, &port_data->sock_data_tree, sock_data); + free(sock_data->free_op); sock_data->events = -1; /* Remove from attention list. */ - if (sock_data->attn_prev != NULL) - sock_data->attn_prev->attn_next = sock_data->attn_next; - if (sock_data->attn_next != NULL) - sock_data->attn_next->attn_prev = sock_data->attn_prev; - if (port_data->attn == sock_data) - port_data->attn = sock_data->attn_next; - sock_data->attn_prev = sock_data->attn_next = NULL; + if (sock_data->attn) { + if (sock_data->attn_prev != NULL) + sock_data->attn_prev->attn_next = sock_data->attn_next; + if (sock_data->attn_next != NULL) + sock_data->attn_next->attn_prev = sock_data->attn_prev; + if (port_data->attn == sock_data) + port_data->attn = sock_data->attn_next; + sock_data->attn = 0; + sock_data->attn_prev = sock_data->attn_next = NULL; + } if (sock_data->submitted_events == 0) { assert(sock_data->op_generation == 0); @@ -341,135 +363,249 @@ int wpoll_ctl(wpoll_t wpoll_hnd, int op, SOCKET sock, } -int epoll_wait(wpoll_t wpoll_hnd, struct wpoll_event* events, int maxevents, int timeout) { - wpoll_handle_state_t* port_data = (wpoll_handle_state_t*) wpoll_hnd; +int wpoll_wait(wpoll_t port_handle, struct wpoll_event* events, int maxevents, int timeout) { + wpoll_port_data_t* port_data; + DWORD due; + DWORD gqcs_timeout; + port_data = (wpoll_port_data_t*) port_handle; + + /* Create overlapped poll operations for all sockets on the attention list. */ while (port_data->attn != NULL) { - wpoll_socket_state_t* sock_data = port_data->attn; + wpoll_sock_data_t* sock_data = port_data->attn; + assert(sock_data->attn); + /* Check if we need to submit another req. */ if (sock_data->events & WPOLL_EVENT_MASK & ~sock_data->submitted_events) { int r = wpoll__submit_poll_op(port_data, sock_data); - assert(r == 0); + /* TODO: handle error. */ } /* Remove from attention list */ port_data->attn = sock_data->attn_next; sock_data->attn_prev = sock_data->attn_next = NULL; + sock_data->attn = 0; } port_data->attn = NULL; - { - DWORD due = GetTickCount() + timeout; - do { - DWORD result; - ULONG count; - OVERLAPPED_ENTRY entries[64]; - DWORD max_entries; - DWORD i; - int num_events = 0; - - max_entries = ARRAY_COUNT(entries); - if (max_entries > maxevents) - max_entries = maxevents; - - result = GetQueuedCompletionStatusEx(port_data->iocp, - entries, - max_entries, - &count, - timeout, - FALSE); - - if (!result) { - DWORD error = GetLastError(); - if (error == WAIT_TIMEOUT) { - return 0; - } else { - return -1; - } - } - - /* Successfully dequeued overlappeds. */ - for (i = 0; i < count; i++) { - OVERLAPPED* overlapped = entries[i].lpOverlapped; - wpoll_op_t* op = CONTAINING_RECORD(overlapped, wpoll_op_t, overlapped); - wpoll_socket_state_t* sock_data = op->socket_state; - DWORD afd_events; - int reg_events, reported_events; - - if (op->generation < sock_data->op_generation) { - /* This op has been superseded. Free and ignore it. */ - free(op); - continue; - } - - /* Dequeued the most recent op. Reset generation and submitted_events. */ - sock_data->op_generation = 0; - sock_data->submitted_events = 0; - sock_data->free_op = op; - - /* Check for error. */ - if (!NT_SUCCESS(overlapped->Internal)) { - struct wpoll_event* ev = events + (num_events++); - ev->data.u64 = sock_data->user_data; - ev->events = WPOLLERR; - continue; - } - - reg_events = sock_data->events; - afd_events = op->poll_info.Handles[0].Events; - reported_events = 0; - - /* Check for a closed socket, or a socket that has been removed */ - /* with EPOLL_CTL_DEL. */ - if ((afd_events & AFD_POLL_LOCAL_CLOSE) || reg_events == -1) { - free(op); - free(sock_data); - continue; - } - - /* Unless WPOLLONESHOT is used, add the socket back to the attention list. */ - if (!reg_events & WPOLLONESHOT) { - if (port_data->attn == NULL) { - sock_data->attn_next = sock_data->attn_prev = NULL; - port_data->attn = sock_data; - } else { - sock_data->attn_prev = NULL; - sock_data->attn_next = port_data->attn; - port_data->attn->attn_next = sock_data; - port_data->attn = sock_data; - } - } - - /* Convert afd events to epoll events. */ - if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) - reported_events |= (WPOLLIN | WPOLLRDNORM) & reg_events; - if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) - reported_events |= (WPOLLIN | WPOLLRDBAND) & reg_events; - if (afd_events & AFD_POLL_SEND) - reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND) & reg_events; - if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT)) - reported_events |= (WPOLLRDHUP | WPOLLIN | WPOLLRDNORM | WPOLLRDBAND) & reg_events; - if (afd_events & AFD_POLL_ABORT) - reported_events |= WPOLLHUP | WPOLLERR; - if (afd_events & AFD_POLL_CONNECT) - reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND) & reg_events; - if (afd_events & AFD_POLL_CONNECT_FAIL) - reported_events |= WPOLLERR; - - if (afd_events) { - struct wpoll_event* ev = events + (num_events++); - ev->data.u64 = sock_data->user_data; - ev->events = reported_events; - } - } - - if (num_events > 0) - return num_events; - - /* Events were dequeued, but none were relevant. Recompute timeout. */ - timeout = due - GetTickCount(); - } while (timeout > 0); - - return 0; + /* Compute the timeout for GetQueuedCompletionStatus, and the wait end */ + /* time, if the user specified a timeout other than zero or infinite. */ + if (timeout > 0) { + due = GetTickCount() + timeout; + gqcs_timeout = (DWORD) timeout; + } else if (timeout == 0) { + gqcs_timeout = 0; + } else { + gqcs_timeout = INFINITE; } + + /* Dequeue completion packets until either at least one interesting event */ + /* has been discovered, or the timeout is reached. */ + do { + DWORD result, max_entries; + ULONG count, i; + OVERLAPPED_ENTRY entries[64]; + int num_events = 0; + + /* Compute how much overlapped entries can be dequeued at most. */ + max_entries = ARRAY_COUNT(entries); + if ((int) max_entries > maxevents) + max_entries = maxevents; + + result = GetQueuedCompletionStatusEx(port_data->iocp, + entries, + max_entries, + &count, + gqcs_timeout, + FALSE); + + if (!result) { + DWORD error = GetLastError(); + if (error == WAIT_TIMEOUT) { + return 0; + } else { + return -1; + } + } + + port_data->pending_ops_count -= count; + + /* Successfully dequeued overlappeds. */ + for (i = 0; i < count; i++) { + OVERLAPPED* overlapped; + wpoll_op_t* op; + wpoll_sock_data_t* sock_data; + DWORD afd_events; + int registered_events, reported_events; + + overlapped = entries[i].lpOverlapped; + op = CONTAINING_RECORD(overlapped, wpoll_op_t, overlapped); + sock_data = op->sock_data; + + if (op->generation < sock_data->op_generation) { + /* This op has been superseded. Free and ignore it. */ + free(op); + continue; + } + + /* Dequeued the most recent op. Reset generation and submitted_events. */ + sock_data->op_generation = 0; + sock_data->submitted_events = 0; + sock_data->free_op = op; + + registered_events = sock_data->events; + reported_events = 0; + + /* Check if this op was associated with a socket that was removed */ + /* with EPOLL_CTL_DEL. */ + if (registered_events == -1) { + free(op); + free(sock_data); + continue; + } + + /* Check for error. */ + if (!NT_SUCCESS(overlapped->Internal)) { + struct wpoll_event* ev = events + (num_events++); + ev->data.u64 = sock_data->user_data; + ev->events = WPOLLERR; + continue; + } + + if (op->poll_info.NumberOfHandles == 0) { + /* NumberOfHandles can be zero if this poll operation was canceled */ + /* due to a more recent exclusive poll operation. */ + afd_events = 0; + } else { + afd_events = op->poll_info.Handles[0].Events; + } + + /* Check for a closed socket. */ + if (afd_events & AFD_POLL_LOCAL_CLOSE) { + free(op); + free(sock_data); + continue; + } + + /* Convert afd events to epoll events. */ + if (afd_events & (AFD_POLL_RECEIVE | AFD_POLL_ACCEPT)) + reported_events |= (WPOLLIN | WPOLLRDNORM); + if (afd_events & AFD_POLL_RECEIVE_EXPEDITED) + reported_events |= (WPOLLIN | WPOLLRDBAND); + if (afd_events & AFD_POLL_SEND) + reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND); + if ((afd_events & AFD_POLL_DISCONNECT) && !(afd_events & AFD_POLL_ABORT)) + reported_events |= (WPOLLRDHUP | WPOLLIN | WPOLLRDNORM | WPOLLRDBAND); + if (afd_events & AFD_POLL_ABORT) + reported_events |= WPOLLHUP | WPOLLERR; + if (afd_events & AFD_POLL_CONNECT) + reported_events |= (WPOLLOUT | WPOLLWRNORM | WPOLLWRBAND); + if (afd_events & AFD_POLL_CONNECT_FAIL) + reported_events |= WPOLLERR; + + /* Don't report events that the user didn't specify. */ + reported_events &= registered_events; + + /* Unless WPOLLONESHOT is used or no events were reported that the */ + /* user is interested in, add the socket back to the attention list. */ + if (!registered_events & WPOLLONESHOT || reported_events == 0) { + assert(!sock_data->attn); + if (port_data->attn == NULL) { + sock_data->attn_next = sock_data->attn_prev = NULL; + port_data->attn = sock_data; + } else { + sock_data->attn_prev = NULL; + sock_data->attn_next = port_data->attn; + port_data->attn->attn_next = sock_data; + port_data->attn = sock_data; + } + } + + if (reported_events) { + struct wpoll_event* ev = events + (num_events++); + ev->data.u64 = sock_data->user_data; + ev->events = reported_events; + } + } + + if (num_events > 0) + return num_events; + + /* Events were dequeued, but none were relevant. Recompute timeout. */ + if (timeout > 0) { + gqcs_timeout = due - GetTickCount(); + } + } while (timeout > 0); + + return 0; } + + +int wpoll_close(wpoll_t port_handle) { + wpoll_port_data_t* port_data; + wpoll_sock_data_t* sock_data; + int i; + + port_data = (wpoll_port_data_t*) port_handle; + + /* Close all peer sockets. This will make all pending ops return. */ + for (i = 0; i < ARRAY_COUNT(port_data->peer_sockets); i++) { + SOCKET peer_sock = port_data->peer_sockets[i]; + if (peer_sock != 0 && peer_sock != INVALID_SOCKET) { + if (closesocket(peer_sock) != 0) + return -1; + + port_data->peer_sockets[i] = 0; + } + } + + /* There is no list of wpoll_ops the free. And even if there was, just */ + /* freeing them would be dangerous since the kernel might still alter */ + /* the overlapped status contained in them. But since we are sure that */ + /* all ops will soon return, just await them all. */ + while (port_data->pending_ops_count > 0) { + OVERLAPPED_ENTRY entries[64]; + DWORD result; + ULONG count, i; + + printf("ops: %d\n", port_data->pending_ops_count); + + result = GetQueuedCompletionStatusEx(port_data->iocp, + entries, + ARRAY_COUNT(entries), + &count, + INFINITE, + FALSE); + + if (!result) { + DWORD error = GetLastError(); + return -1; + } + + port_data->pending_ops_count -= count; + + for (i = 0; i < count; i++) { + wpoll_op_t* op = CONTAINING_RECORD(entries[i].lpOverlapped, + wpoll_op_t, + overlapped); + free(op); + } + } + + /* Remove all entries from the socket_state tree. */ + while (sock_data = RB_ROOT(&port_data->sock_data_tree)) { + RB_REMOVE(wpoll_sock_data_tree, &port_data->sock_data_tree, sock_data); + + if (sock_data->free_op != NULL) + free(sock_data->free_op); + free(sock_data); + } + + /* Close the I/O completion port. */ + CloseHandle(port_data->iocp); + + /* Finally, remove the port data. */ + free(port_data); + + return 0; +} \ No newline at end of file diff --git a/test.vcxproj b/test.vcxproj index 25a3a44..ef7426b 100644 --- a/test.vcxproj +++ b/test.vcxproj @@ -1 +1,126 @@ -DebugWin32ReleaseWin32{58F32AFA-FCBB-2A4D-DCB4-242324A2FAC4}Win32ProjtestApplication$(ExecutablePath);$(MSBuildProjectDirectory)\.\bin\;$(MSBuildProjectDirectory)\.\bin\$(Configuration)\obj\$(ProjectName)\falsetrue$(SolutionDir)$(Configuration)\$(ProjectName)$(OutDir)\$(ProjectName).exeinclude;src;%(AdditionalIncludeDirectories)/MP %(AdditionalOptions)EnableFastCheckstrueProgramDatabaseSyncfalsefalseDisabledWIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;DEBUG;_DEBUG;%(PreprocessorDefinitions)MultiThreadedDebugtruetruefalseLevel3ws2_32.lib;%(AdditionalDependencies)truetruetrue$(OutDir)$(ProjectName).exetrueConsoletrueinclude;src;%(AdditionalIncludeDirectories)WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;DEBUG;_DEBUG;%(PreprocessorDefinitions);%(PreprocessorDefinitions)include;src;%(AdditionalIncludeDirectories)/MP %(AdditionalOptions)trueProgramDatabaseSyncSpeedtrueAnySuitabletruetrueFullWIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;NDEBUG;%(PreprocessorDefinitions)MultiThreadedtruetruefalseLevel3true/LTCG %(AdditionalOptions)ws2_32.lib;%(AdditionalDependencies)truetruetruetrueUseLinkTimeCodeGenerationtrue$(OutDir)$(ProjectName).exetrueConsoletrueinclude;src;%(AdditionalIncludeDirectories)WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;NDEBUG;%(PreprocessorDefinitions);%(PreprocessorDefinitions){9221E672-D419-F7D7-9259-A7D2BE1D177A}false \ No newline at end of file + + + + + Debug + Win32 + + + Release + Win32 + + + + {58F32AFA-FCBB-2A4D-DCB4-242324A2FAC4} + Win32Proj + test + + + + Application + + + + + + + + + $(ExecutablePath);$(MSBuildProjectDirectory)\.\bin\;$(MSBuildProjectDirectory)\.\bin\ + $(Configuration)\obj\$(ProjectName)\ + false + true + $(SolutionDir)$(Configuration)\ + $(ProjectName) + $(OutDir)\$(ProjectName).exe + + + + include;src;%(AdditionalIncludeDirectories) + /MP %(AdditionalOptions) + EnableFastChecks + true + ProgramDatabase + false + false + false + Disabled + WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;DEBUG;_DEBUG;%(PreprocessorDefinitions) + MultiThreadedDebug + true + true + false + Level3 + + + ws2_32.lib;%(AdditionalDependencies) + true + true + true + $(OutDir)$(ProjectName).exe + true + Console + true + + + include;src;%(AdditionalIncludeDirectories) + WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;DEBUG;_DEBUG;%(PreprocessorDefinitions);%(PreprocessorDefinitions) + + + + + include;src;%(AdditionalIncludeDirectories) + /MP %(AdditionalOptions) + true + ProgramDatabase + Sync + Speed + true + AnySuitable + true + true + Full + WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;NDEBUG;%(PreprocessorDefinitions) + MultiThreaded + true + true + false + Level3 + true + + + /LTCG %(AdditionalOptions) + + + ws2_32.lib;%(AdditionalDependencies) + true + true + true + true + UseLinkTimeCodeGeneration + true + $(OutDir)$(ProjectName).exe + true + Console + true + + + include;src;%(AdditionalIncludeDirectories) + WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;NDEBUG;%(PreprocessorDefinitions);%(PreprocessorDefinitions) + + + + + + + + + + + {9221E672-D419-F7D7-9259-A7D2BE1D177A} + false + + + + + \ No newline at end of file diff --git a/wpoll.vcxproj b/wpoll.vcxproj index bf33713..618df75 100644 --- a/wpoll.vcxproj +++ b/wpoll.vcxproj @@ -1 +1,129 @@ -DebugWin32ReleaseWin32{9221E672-D419-F7D7-9259-A7D2BE1D177A}Win32ProjwpollStaticLibrary$(ExecutablePath);$(MSBuildProjectDirectory)\.\bin\;$(MSBuildProjectDirectory)\.\bin\$(Configuration)\obj\$(ProjectName)\falsetrue$(SolutionDir)$(Configuration)\$(ProjectName)$(OutDir)\lib\$(ProjectName).libinclude;src;%(AdditionalIncludeDirectories)/MP %(AdditionalOptions)EnableFastCheckstrueProgramDatabaseSyncfalsefalseDisabledWIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;DEBUG;_DEBUG;%(PreprocessorDefinitions)MultiThreadedDebugtruetruefalseLevel3$(OutDir)lib\$(ProjectName).libws2_32.lib;%(AdditionalDependencies)truetruetruetruetrueinclude;src;%(AdditionalIncludeDirectories)WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;DEBUG;_DEBUG;%(PreprocessorDefinitions);%(PreprocessorDefinitions)include;src;%(AdditionalIncludeDirectories)/MP %(AdditionalOptions)trueProgramDatabaseSyncSpeedtrueAnySuitabletruetrueFullWIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;NDEBUG;%(PreprocessorDefinitions)MultiThreadedtruetruefalseLevel3true/LTCG %(AdditionalOptions)$(OutDir)lib\$(ProjectName).libws2_32.lib;%(AdditionalDependencies)truetruetruetrueUseLinkTimeCodeGenerationtruetruetrueinclude;src;%(AdditionalIncludeDirectories)WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;NDEBUG;%(PreprocessorDefinitions);%(PreprocessorDefinitions) \ No newline at end of file + + + + + Debug + Win32 + + + Release + Win32 + + + + {9221E672-D419-F7D7-9259-A7D2BE1D177A} + Win32Proj + wpoll + + + + StaticLibrary + + + + + + + + + $(ExecutablePath);$(MSBuildProjectDirectory)\.\bin\;$(MSBuildProjectDirectory)\.\bin\ + $(Configuration)\obj\$(ProjectName)\ + false + true + $(SolutionDir)$(Configuration)\ + $(ProjectName) + $(OutDir)\lib\$(ProjectName).lib + + + + include;src;%(AdditionalIncludeDirectories) + /MP %(AdditionalOptions) + EnableFastChecks + true + ProgramDatabase + false + false + false + Disabled + WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;DEBUG;_DEBUG;%(PreprocessorDefinitions) + MultiThreadedDebug + true + true + false + Level3 + + + $(OutDir)lib\$(ProjectName).lib + + + ws2_32.lib;%(AdditionalDependencies) + true + true + true + true + true + + + include;src;%(AdditionalIncludeDirectories) + WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;DEBUG;_DEBUG;%(PreprocessorDefinitions);%(PreprocessorDefinitions) + + + + + include;src;%(AdditionalIncludeDirectories) + /MP %(AdditionalOptions) + true + ProgramDatabase + Sync + Speed + true + AnySuitable + true + true + Full + WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;NDEBUG;%(PreprocessorDefinitions) + MultiThreaded + true + true + false + Level3 + true + + + /LTCG %(AdditionalOptions) + $(OutDir)lib\$(ProjectName).lib + + + ws2_32.lib;%(AdditionalDependencies) + true + true + true + true + UseLinkTimeCodeGeneration + true + true + true + + + include;src;%(AdditionalIncludeDirectories) + WIN32;_CRT_SECURE_NO_DEPRECATE;_CRT_NONSTDC_NO_DEPRECATE;NDEBUG;%(PreprocessorDefinitions);%(PreprocessorDefinitions) + + + + + + + + + + + + + + + + + + + + \ No newline at end of file