poll-group: use 1 driver socket per N sockets, vs. 1 for *all* sockets

This commit is contained in:
Bert Belder 2017-09-14 00:40:15 +02:00
parent 1ad0497c9c
commit 45aaa10a62
5 changed files with 211 additions and 64 deletions

View File

@ -7,6 +7,7 @@
#include "epoll-socket.h"
#include "epoll.h"
#include "error.h"
#include "poll-group.h"
#include "poll-request.h"
#include "port.h"
@ -21,7 +22,7 @@
typedef struct _ep_sock_private {
ep_sock_t pub;
SOCKET afd_socket;
SOCKET driver_socket;
poll_group_t* poll_group;
epoll_data_t user_data;
poll_req_t* latest_poll_req;
uint32_t user_events;
@ -53,8 +54,9 @@ static inline void _ep_sock_free(_ep_sock_private_t* sock_private) {
static int _get_related_sockets(ep_port_t* port_info,
SOCKET socket,
SOCKET* afd_socket_out,
SOCKET* driver_socket_out) {
SOCKET afd_socket, driver_socket;
poll_group_t** poll_group_out) {
SOCKET afd_socket;
poll_group_t* poll_group;
DWORD bytes;
/* Try to obtain a base handle for the socket, so we can bypass LSPs
@ -74,12 +76,12 @@ static int _get_related_sockets(ep_port_t* port_info,
NULL,
NULL);
driver_socket = ep_port_get_driver_socket(port_info, afd_socket);
if (driver_socket == INVALID_SOCKET)
poll_group = ep_port_acquire_poll_group(port_info, afd_socket);
if (poll_group == NULL)
return -1;
*afd_socket_out = afd_socket;
*driver_socket_out = driver_socket;
*poll_group_out = poll_group;
return 0;
}
@ -95,7 +97,7 @@ static int _ep_sock_set_socket(ep_port_t* port_info,
if (_get_related_sockets(port_info,
socket,
&sock_private->afd_socket,
&sock_private->driver_socket) < 0)
&sock_private->poll_group) < 0)
return -1;
if (ep_port_add_socket(port_info, &sock_private->pub.tree_node, socket) < 0)
@ -141,6 +143,8 @@ void ep_sock_delete(ep_port_t* port_info, ep_sock_t* sock_info) {
ep_port_del_socket(port_info, &sock_info->tree_node);
ep_port_clear_socket_update(port_info, sock_info);
ep_port_release_poll_group(sock_private->poll_group);
sock_private->poll_group = NULL;
_ep_sock_maybe_free(sock_private);
}
@ -226,7 +230,7 @@ int ep_sock_update(ep_port_t* port_info, ep_sock_t* sock_info) {
assert(ep_port_is_socket_update_pending(port_info, sock_info));
driver_socket = sock_private->driver_socket;
driver_socket = poll_group_get_socket(sock_private->poll_group);
/* Check if there are events registered that are not yet submitted. In
* that case we need to submit another req.

View File

@ -11,6 +11,7 @@
#include "error.h"
#include "init.h"
#include "nt.h"
#include "poll-group.h"
#include "poll-request.h"
#include "port.h"
#include "queue.h"
@ -24,9 +25,6 @@ typedef struct ep_port ep_port_t;
typedef struct poll_req poll_req_t;
typedef struct ep_sock ep_sock_t;
static SOCKET _ep_create_driver_socket(HANDLE iocp,
WSAPROTOCOL_INFOW* protocol_info);
static int _ep_ctl_add(ep_port_t* port_info,
uintptr_t socket,
struct epoll_event* ev) {
@ -226,17 +224,6 @@ ep_port_t* ep_port_new(HANDLE iocp) {
int ep_port_delete(ep_port_t* port_info) {
tree_node_t* tree_node;
/* Close all peer sockets. This will make all pending io requests return. */
for (size_t i = 0; i < array_count(port_info->driver_sockets); i++) {
SOCKET driver_socket = port_info->driver_sockets[i];
if (driver_socket != 0 && driver_socket != INVALID_SOCKET) {
if (closesocket(driver_socket) != 0)
return_error(-1);
port_info->driver_sockets[i] = 0;
}
}
if (!CloseHandle(port_info->iocp))
return_error(-1);
port_info->iocp = NULL;
@ -246,6 +233,12 @@ int ep_port_delete(ep_port_t* port_info) {
ep_sock_force_delete(port_info, sock_info);
}
for (size_t i = 0; i < array_count(port_info->poll_group_allocators); i++) {
poll_group_allocator_t* pga = port_info->poll_group_allocators[i];
if (pga != NULL)
poll_group_allocator_delete(pga);
}
/* Finally, remove the port data. */
free(port_info);
@ -270,12 +263,24 @@ void ep_port_del_req(ep_port_t* port_info) {
port_info->poll_req_count--;
}
SOCKET ep_port_get_driver_socket(ep_port_t* port_info, SOCKET socket) {
poll_group_allocator_t* _get_poll_group_allocator(
ep_port_t* port_info,
size_t index,
const WSAPROTOCOL_INFOW* protocol_info) {
poll_group_allocator_t** pga = &port_info->poll_group_allocators[index];
if (*pga == NULL)
*pga = poll_group_allocator_new(port_info, protocol_info);
return *pga;
}
poll_group_t* ep_port_acquire_poll_group(ep_port_t* port_info, SOCKET socket) {
ssize_t index;
size_t i;
SOCKET driver_socket;
WSAPROTOCOL_INFOW protocol_info;
int len;
poll_group_allocator_t* pga;
/* Obtain protocol information about the socket. */
len = sizeof protocol_info;
@ -284,7 +289,7 @@ SOCKET ep_port_get_driver_socket(ep_port_t* port_info, SOCKET socket) {
SO_PROTOCOL_INFOW,
(char*) &protocol_info,
&len) != 0)
return_error(INVALID_SOCKET);
return_error(NULL);
index = -1;
for (i = 0; i < array_count(AFD_PROVIDER_GUID_LIST); i++) {
@ -298,46 +303,15 @@ SOCKET ep_port_get_driver_socket(ep_port_t* port_info, SOCKET socket) {
/* Check if the protocol uses an msafd socket. */
if (index < 0)
return_error(INVALID_SOCKET, ERROR_NOT_SUPPORTED);
return_error(NULL, ERROR_NOT_SUPPORTED);
/* If we didn't (try) to create a peer socket yet, try to make one. Don't
* try again if the peer socket creation failed earlier for the same
* protocol.
*/
driver_socket = port_info->driver_sockets[index];
if (driver_socket == 0) {
driver_socket = _ep_create_driver_socket(port_info->iocp, &protocol_info);
port_info->driver_sockets[index] = driver_socket;
}
pga = _get_poll_group_allocator(port_info, index, &protocol_info);
return driver_socket;
return poll_group_acquire(pga);
}
static SOCKET _ep_create_driver_socket(HANDLE iocp,
WSAPROTOCOL_INFOW* protocol_info) {
SOCKET socket = 0;
socket = WSASocketW(protocol_info->iAddressFamily,
protocol_info->iSocketType,
protocol_info->iProtocol,
protocol_info,
0,
WSA_FLAG_OVERLAPPED);
if (socket == INVALID_SOCKET)
return_error(INVALID_SOCKET);
if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0))
goto error;
if (CreateIoCompletionPort((HANDLE) socket, iocp, 0, 0) == NULL)
goto error;
return socket;
error:;
DWORD error = GetLastError();
closesocket(socket);
return_error(INVALID_SOCKET, error);
void ep_port_release_poll_group(poll_group_t* poll_group) {
poll_group_release(poll_group);
}
bool ep_port_is_socket_update_pending(ep_port_t* port_info,

144
src/poll-group.c Normal file
View File

@ -0,0 +1,144 @@
#include <assert.h>
#include <malloc.h>
#include "error.h"
#include "poll-group.h"
#include "port.h"
#include "util.h"
#include "win.h"
static const size_t _DS_MAX_USERS = 32;
typedef struct poll_group_allocator {
ep_port_t* port_info;
queue_t poll_group_queue;
WSAPROTOCOL_INFOW protocol_info;
} poll_group_allocator_t;
typedef struct poll_group {
poll_group_allocator_t* allocator;
queue_node_t queue_node;
SOCKET socket;
size_t user_count;
} poll_group_t;
static int _poll_group_create_socket(poll_group_t* poll_group,
WSAPROTOCOL_INFOW* protocol_info,
HANDLE iocp) {
SOCKET socket;
socket = WSASocketW(protocol_info->iAddressFamily,
protocol_info->iSocketType,
protocol_info->iProtocol,
protocol_info,
0,
WSA_FLAG_OVERLAPPED);
if (socket == INVALID_SOCKET)
return_error(-1);
if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0))
goto error;
if (CreateIoCompletionPort((HANDLE) socket, iocp, 0, 0) == NULL)
goto error;
poll_group->socket = socket;
return 0;
error:;
DWORD error = GetLastError();
closesocket(socket);
return_error(-1, error);
}
static poll_group_t* _poll_group_new(poll_group_allocator_t* pga) {
poll_group_t* poll_group = malloc(sizeof *poll_group);
if (poll_group == NULL)
return_error(NULL, ERROR_NOT_ENOUGH_MEMORY);
memset(poll_group, 0, sizeof *poll_group);
queue_node_init(&poll_group->queue_node);
poll_group->allocator = pga;
if (_poll_group_create_socket(
poll_group, &pga->protocol_info, pga->port_info->iocp) < 0) {
free(poll_group);
return NULL;
}
queue_append(&pga->poll_group_queue, &poll_group->queue_node);
return poll_group;
}
static void _poll_group_delete(poll_group_t* poll_group) {
assert(poll_group->user_count == 0);
closesocket(poll_group->socket);
queue_remove(&poll_group->queue_node);
free(poll_group);
}
SOCKET poll_group_get_socket(poll_group_t* poll_group) {
return poll_group->socket;
}
poll_group_allocator_t* poll_group_allocator_new(
ep_port_t* port_info, const WSAPROTOCOL_INFOW* protocol_info) {
poll_group_allocator_t* pga = malloc(sizeof *pga);
if (pga == NULL)
return_error(NULL, ERROR_NOT_ENOUGH_MEMORY);
queue_init(&pga->poll_group_queue);
pga->port_info = port_info;
pga->protocol_info = *protocol_info;
return pga;
}
void poll_group_allocator_delete(poll_group_allocator_t* pga) {
queue_t* poll_group_queue = &pga->poll_group_queue;
while (!queue_empty(poll_group_queue)) {
queue_node_t* queue_node = queue_first(poll_group_queue);
poll_group_t* poll_group =
container_of(queue_node, poll_group_t, queue_node);
_poll_group_delete(poll_group);
}
free(pga);
}
poll_group_t* poll_group_acquire(poll_group_allocator_t* pga) {
queue_t* queue = &pga->poll_group_queue;
poll_group_t* poll_group =
!queue_empty(queue)
? container_of(queue_last(queue), poll_group_t, queue_node)
: NULL;
if (poll_group == NULL || poll_group->user_count >= _DS_MAX_USERS)
poll_group = _poll_group_new(pga);
if (poll_group == NULL)
return NULL;
if (++poll_group->user_count == _DS_MAX_USERS) {
/* Move to the front of the queue. */
queue_remove(&poll_group->queue_node);
queue_prepend(&pga->poll_group_queue, &poll_group->queue_node);
}
return poll_group;
}
void poll_group_release(poll_group_t* poll_group) {
poll_group_allocator_t* pga = poll_group->allocator;
poll_group->user_count--;
assert(poll_group->user_count < _DS_MAX_USERS);
/* Move to the back of the queue. */
queue_remove(&poll_group->queue_node);
queue_append(&pga->poll_group_queue, &poll_group->queue_node);
/* TODO: free the poll_group_t* item at some point. */
}

22
src/poll-group.h Normal file
View File

@ -0,0 +1,22 @@
#ifndef EPOLL_POLL_GROUP_H_
#define EPOLL_POLL_GROUP_H_
#include "error.h"
#include "internal.h"
#include "queue.h"
#include "win.h"
typedef struct ep_port ep_port_t;
typedef struct poll_group_allocator poll_group_allocator_t;
typedef struct poll_group poll_group_t;
EPOLL_INTERNAL poll_group_allocator_t* poll_group_allocator_new(
ep_port_t* port_info, const WSAPROTOCOL_INFOW* protocol_info);
EPOLL_INTERNAL void poll_group_allocator_delete(poll_group_allocator_t* pga);
EPOLL_INTERNAL poll_group_t* poll_group_acquire(poll_group_allocator_t* pga);
EPOLL_INTERNAL void poll_group_release(poll_group_t* ds);
EPOLL_INTERNAL SOCKET poll_group_get_socket(poll_group_t* poll_group);
#endif /* EPOLL_POLL_GROUP_H_ */

View File

@ -4,6 +4,7 @@
#include "afd.h"
#include "epoll-socket.h"
#include "internal.h"
#include "poll-group.h"
#include "queue.h"
#include "rb.h"
#include "tree.h"
@ -15,7 +16,8 @@ typedef struct ep_sock ep_sock_t;
typedef struct ep_port {
HANDLE iocp;
SOCKET driver_sockets[array_count(AFD_PROVIDER_GUID_LIST)];
poll_group_allocator_t*
poll_group_allocators[array_count(AFD_PROVIDER_GUID_LIST)];
tree_t sock_tree;
queue_t update_queue;
size_t poll_req_count;
@ -24,8 +26,9 @@ typedef struct ep_port {
EPOLL_INTERNAL ep_port_t* ep_port_new(HANDLE iocp);
EPOLL_INTERNAL int ep_port_delete(ep_port_t* port_info);
EPOLL_INTERNAL SOCKET ep_port_get_driver_socket(ep_port_t* port_info,
SOCKET socket);
EPOLL_INTERNAL poll_group_t* ep_port_acquire_poll_group(ep_port_t* port_info,
SOCKET socket);
EPOLL_INTERNAL void ep_port_release_poll_group(poll_group_t* poll_group);
EPOLL_INTERNAL int ep_port_add_socket(ep_port_t* port_info,
tree_node_t* tree_node,