企业网站建设费用计入什么科目,制作网站心得,河北省建设注册中心网站,网站建设公司销售提成前言
IOCP 全称 Input/Ouput Completion Ports#xff0c;中文中翻译一般为“完成端口”#xff0c;本文中我们使用 IOCP 简写.
IOCP 模型是迄今为止最为复杂的一种 I/O 模型#xff0c;但是同时通过使用 IOCP 我们往往可以达到最佳的系统性能. 当你的网络应用程序需要管理…前言
IOCP 全称 Input/Ouput Completion Ports中文中翻译一般为“完成端口”本文中我们使用 IOCP 简写.
IOCP 模型是迄今为止最为复杂的一种 I/O 模型但是同时通过使用 IOCP 我们往往可以达到最佳的系统性能. 当你的网络应用程序需要管理大量的 Socket I/O 请求时你或许没有其他的选择.
本篇文章我们将通过一个官方的 IOCP demo 程序来介绍如何使用 IOCP. 因为其复杂性这篇文章中我们主要介绍如何使用不深入内部的实现更多的详细信息请参考官方文档.
官方程序的地址 https://github.com/microsoft/Windows-classic-samples/tree/master/Samples/Win7Samples/netds/winsock/iocp/serverex 个人感觉官方的 demo 代码不太好看包括格式和一些额外琐碎的可省略的细节因此文末我会附上自己精简过的代码以便读者阅读. 读者按需自取.
API 基础
关于我们将要使用的数据结构
OVERLAPPED 结构体WSAEventCriticalSectionCreateThread
等相关知识在 WinSocket I/O 模型的相关文章 WinSock I/O 模型 – OVERLAPPED I/O 模型 中均已介绍过这里不在赘述.
CreateIoCompletionPort
CreateIoCompletionPort 方法用于创建一个 IOCP handle 或者将现有的 Socket handle 与已经创建的 IOCP 关联起来.
HANDLE WINAPI CreateIoCompletionPort(_In_ HANDLE FileHandle,_In_opt_ HANDLE ExistingCompletionPort,_In_ ULONG_PTR CompletionKey,_In_ DWORD NumberOfConcurrentThreads
);FileHandle 指定与 ExistingCompletionPort 关联的文件 handle注意不仅仅是 socket handle。 这个 fileHandle 必须支持 overlapped I/O。 对于 Socket handle 来说该 socket 在创建时需要指定 WSA_FLAG_OVERLAPPED 标志. 当我们想要使用这个 API 来创建一个新的 IOCP handle 时 将这个参数设置为 INVALID_HANDLE_VALUE.ExistingCompletionPort: NULL 或者一个已经使用 CreateIoCompletionPort 创建出来的 IOCP 实例. 当我们想要使用这个 API 来创建一个新的 IOCP handle 时 将这个参数设置为 NULL. 此时该方法的返回值是新创建出来的 IOCP 实例。 当我们想要将一个 IOCP 实例与一个 FileHandle 关联以来的时候将这个参数设置为当前已经存在的 IOCP 实例. 此时当此次方法调用成功的时候该方法返回是 ExistingCompletionPort.CompletionKey: 此参数用于指定一个与当前 FileHandle 关联的数据结构这个结构将包含在每个 I/O completion packet (后面我们会解释该packet) 中.NumberOfConcurrentThreads 指定最大允许的线程数 这些线程用于处理 I/O completion packet。 这个参数仅仅在创建新的 IOCP 实例时有用其他情况下会被忽略. 当指定为 0 系统将使用等同与当前系统 CPU 处理器数量的线程.
GetQueuedCompletionStatus
GetQueuedCompletionStatus 方法用于从指定的 IOCP 实例上获取 I/O completion packet.
I/O completion packet通缩来讲当我们创建一个 IOCP实例之后系统内部会给对应的 IOCP 实例分配一个队列这个队列用户保存所有与当前 IOCP 关联起来的 FileHandle 上已经完成的异步任务的信息。我们将这样的保存这个队列中的已完成的异步任务的信息称作 I/O completion packet.
使用这个 API 可以从该队列中取出这些 I/O completion packet. 注意这是一个队列 意味着即使有多个线程同时从一个 IOCP 实例上获取 I/O completion packet 时他们也不会获取到相同的 I/O completion packet
还有一个更高级的方法 GetQueuedCompletionStatusEx这里我们没有使用它暂且不提.
BOOL GetQueuedCompletionStatus(HANDLE CompletionPort,LPDWORD lpNumberOfBytesTransferred,PULONG_PTR lpCompletionKey,LPOVERLAPPED *lpOverlapped,DWORD dwMilliseconds
);CompletionPort: IOCP 实例lpNumberOfBytesTransferred当前已完成的异步任务成功传输的字节数. 如果当前异步任务是一个发送操作这个这个参数返回成功发送的字节数。读操作同理.lpCompletionKey: 我们在将一个 FileHandle 和 IOCP实例关联起来时指定了一个 lpCompletionKey在这个 FileHandle 上有任务完成我们通过GetQueuedCompletionStatus 获取到该任务完成的 I/O completion packet 时这个参数便等于我们指定的那个 lpCompletionKey.lpOverlapped: 提交异步任务给 IOCP 实例时所指定的 OVERLAPPED 结构体. 我们之前说过OVERLAPPED 数据结构就像是一个异步任务的id我们在开始一个异步任务的时候需要指定一个 OVERLAPPED结构体当这个异步任务完成时操作系统便可以通过返回这个 OVERLAPPED 结构体给我们这样我们便能得知是我们提交的哪个异步任务完成了. 关于这个数据结构的使用还有一些技巧我们后边再解释.dwMilliseconds: 指定一个超时时间在指定时间内没有获取到任何 I/O completion packet该方法将会返回, 此时该方法返回 FALSE. 实例中我们将使用 INFINITE 来让这个方法一直阻塞直到有至少一个任务完成.
返回值 当该方法成功的获取到一个 I/O completion packet 时该方法会返回 TRUE。 此时lpNumberOfByteslpOverlapped lpCompletionKey 会被填充上与当前 I/O completion packet 对应的数据结构.
当该方法调用失败时该方法会返回 FALSE。此时 lpNumberOfByteslpOverlapped lpCompletionKey 的可能返回值如下
lpOverlapped 返回参数是 NULL 代表我们没有从 IOCP 实例上获取到任何异步任务的完成信息. lpNumberOfBytes lpCompletionKey 也不包含任何有效信息.lpOverlapped 返回参数不为 NULL 代表我们从 IOCP 实例上获取到了异步任务的信息. 这种情况下该异步任务发生了错误 lpNumberOfByteslpOverlapped lpCompletionKey 返回参数上保存这个失败的任务的信息。 详细的错误信息需要使用 GetLastError.来获取.
当该方法返回 FALSE且 lpOverlapped 是 NULL GetLastError 返回 ERROR_ABANDONED_WAIT_0 代表当前 IOCP 实例被关闭.
HasOverlappedIoCompleted
HasOverlappedIoCompleted 是一个宏这个宏用来查询在当前 IOCP 实例上是否有正在执行的异步任务.
void HasOverlappedIoCompleted(lpOverlapped
);lpOverlapped 返回参数表示当前处于 Pending 状态的异步任务所关联的 OVERLAPPED 结构体. 如果你的异步任务不处于 ERROR_IO_PENDING 在这种情况下不要使用该宏 我们已经直到如何创建一个 IOCP 实例以及如何得到异步任务完成的通知我们接下来看看如何提交一个异步任务。 注意我们将只关注这些 API 与 IOCP 搭配使用不再提及他们支持的其他操作. AcceptEx
AcceptEx 方法用来接收新连接.
BOOL AcceptEx(SOCKET sListenSocket,SOCKET sAcceptSocket,PVOID lpOutputBuffer,DWORD dwReceiveDataLength,DWORD dwLocalAddressLength,DWORD dwRemoteAddressLength,LPDWORD lpdwBytesReceived,LPOVERLAPPED lpOverlapped
);sAcceptSocket: 不同与 accept 方法因为我们异步的接收新连接因此在调用此方法之前我们需要创建一个 Socket Handle 来保存新接收到的 Socket 实例.lpOutputBuffer: 该方法支持在接收连接的同时解析该新socket的本地和远程地址同时接收一块数据。接收到的数据会从该buffer 的开始位置地址相关的数据紧跟这个接收到的数据.dwReceiveDataLength 用于指定我们用来期待接收到的第一块儿数据的长度. 当该参数为 0 时意味着我们不接收数据只接收新的连接. 此时 lpOutputBuffer 仅仅用来保存本地和远程地址。dwLocalAddressLength dwRemoteAddressLength 指定需要为保存本地/远程地址应该在 lpOutputBuffer 中保留的地址。 该参数至少为 16不能为 0.lpdwBytesReceived 返回我们接收到的第一块儿数据的长度. 这个参数仅仅在 AcceptEx 方法立马成功的情况下有效如果当前接收操作返回 ERROR_IO_PENDING 错误该返回值无效.lpOverlapped指定与当前异步接收操作关联的 OVERLAPPED 结构体.
返回值
当该方法调用立马成功时该方法返回 TRUE.当该方法没有立马成功时该方法返回 FALSE。 此时应该使用 WSAGetLastError 获取具体的错误信息. 如果 WSAGetLastError 返回 ERROR_IO_PENDING代表该接收任务已经提交成功当前正在进行中. 值得一提的是 官方文档中明确表明该方法的性能远远高于 accept 方法。 WSARecv
WSARecv 用于从一个处于连接状态的 Socket 上接收数据.
int WSAAPI WSARecv(SOCKET s,LPWSABUF lpBuffers,DWORD dwBufferCount,LPDWORD lpNumberOfBytesRecvd,LPDWORD lpFlags,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);这里的 lpOverlapped 参数同 AcceptEx 方法中的 lpOverlapped 参数.
dwBuffers 用于指定一个用于保存接收到的数据的 buffer的数组。 dwBufferCount 指定 buffer 数组中的 buffer 数量。 lpNumberOfBytesRecvd如果当前读操作立马完成这个参数用于保存接收到的数据长度. 如果当前任务没有立即完成而是处于 pending状态那个这个参数的值无效. lpCompletionRoutine 本例中我们不适用这个参数因此指定为空。 我们使用 GetQueuedCompletionStatus 方法来异步的获取该接收任务完成的通知.
WSASend
WSASend 用于从一个处于连接状态的 Socket 上发送数据.
int WSAAPI WSASend(SOCKET s,LPWSABUF lpBuffers,DWORD dwBufferCount,LPDWORD lpNumberOfBytesSent,DWORD dwFlags,LPWSAOVERLAPPED lpOverlapped,LPWSAOVERLAPPED_COMPLETION_ROUTINE lpCompletionRoutine
);这个方法几乎和 WSARecv 相同不再赘述。
实现思路
创建一个 socket 作为监听 socket创建 IOCP 实例并将 server socket 和 IOCP 实例关联起来使用 AcceptEx 提交异步 accept 任务。创建多个子线程 在子线程中使用 GetQueuedCompletionStatus 阻塞的等待异步任务完成的通知I/O completion packet。并处理该通知。主线程一直阻塞直到服务器退出, IOCP 实例关闭.
这个流程说起来是非常简单但是简单的流程中隐藏了极多的细节这里我们来详细描述一下我们这个 IOCP服务器的实现思路
首先在我们创建了 server socket 之后我们紧接着就需要创建对应的 IOCP实例使用 CreateIoCompletePort。同时将 server socket 与 IOCP 实例关联起来使用 CreateIoCompletePort。 在关联当前 server socket 实例的同时我们需要指定一个 lpCompletionKey。我们需要在这个 lpCompletionKey 结构中存入足够多的信息以便我们在收到该 server socket 上异步任务完成通知时做出相应操作时有足够的信息.
这里我们看看实例代码中作为 lpCompletionKey 的结构是什么样子的
typedef struct _PER_SOCKET_CONTEXT {SOCKET Socket;LPFN_ACCEPTEX fnAcceptEx;PPER_IO_CONTEXT pIOContext; struct _PER_SOCKET_CONTEXT *pCtxtBack; struct _PER_SOCKET_CONTEXT *pCtxtForward;
} PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;Socket 字段 当一个任务完成时我们需要直到是哪个 socket 上的任务完成了而 GetQueuedCompletionStatus 的返回值中并没有这个信息因此我们需要自己保存。 fnAcceptEx 这个字段的存在是因为 AcceptEx 方法的特殊性决定的。 我们无法直接调用 AcceptEx 方法而是需要先通过 WSAIoctl 搭配 SIO_GET_EXTENSION_FUNCTION_POINTER 这个参数来动态的获取该方法的指针。 并且该方法指针是和对应的 Server socket 绑定的也就是如果你有多个 server socket那么这个函数指针也会有多个。 因此这个字段不得不存储起来 pIOContext这个字段用于保存在当前 socket 上执行异步任务需要使用的 Overlapped 结构体的数据。 (接下来我们会更加详细来说这个结构) pCtxBack 和 pCtxForward这个真的不是必须的如果你使用其他方式维护多个 _PER_SOCKET_CONTEXT 数据结构那个两个字段完全不需要. 在将 Server socket 和 IOCP 绑定之后我们需要启用其他线程使用 GetQueuedCompletionStatus 来处理完成的异步任务。这里需要斟酌的点是 我们需要使用几个线程这些线程是应该的阻塞的等待还是使用 timeout 来一轮询的方式等待这需要读者自己好好斟酌。 将 server socket 和 IOCP 实例关联起来之后 处理任务完成通知的线程也有了我们如何让 server socket 开始接收新的连接呢 使用 accept 不这里我们不是用它它是阻塞的方式这里我们用 AcceptEx 来异步的接收新连接。 那么我们如何做呢
要使用 AcceptEx非常重要的一点是我们得先有个 Overlapped 结构体. 直接创建一个 Overlapped 结构体实例使用好不好 也不能说不好但是就目前看到的 IOCP 实现中没有人这样玩儿本人看过两个 IOCP 的实现不包括微软的官方demo报错 libuv。
目前他们使用的方法都是将 Overlapped 数据结构包进另外一个结构体。 demo 中的结构体如下
typedef struct _PER_IO_CONTEXT {WSAOVERLAPPED Overlapped;char Buffer[MAX_BUFF_SIZE];WSABUF wsabuf;int nTotalBytes;int nSentBytes;IO_OPERATION IOOperation;SOCKET SocketAccept; struct _PER_IO_CONTEXT *pIOContextForward;
} PER_IO_CONTEXT, *PPER_IO_CONTEXT;注意这个 _PER_IO_CONTEXT 包含在 _PER_SOCKET_CONTEXT也就是我们 lpCompletionKey 这个结构体中。 Overlapped: 这个字段自然是必须存在的. IOOperation: 指明我们当前异步任务的类型它的类型 IO_OPERATION accept send read SocketAccept 如果我们当前异步任务是一个 accept 任务那个这个字段用来存储我们新接收到的 socket 实例 wsaBuf: 这个字段是我们提交读或者写任务是需要传给 WSARecv 或 WSARead 的一个数据结构。 Buffer 是我们真正用来存储数据的地方。 WSABuf 这个结构中只包含一个 buffer 的指针和这个buffer 的长度。这个 demo 中这样设计那么毫无疑问 WSABuf 中的 buffer 指针必然指向 Buffer。 发送或接收到的数据都需要存在这儿 nTotalbytes nSentBytes 用来存储要发送或者接收到的数据长度 pIOcontextForward: 这个字段存在的是因为 我们将一个 Socket 与 _PER_SOCKET_CONTEXT 关联而一个 _PER_SOCKET_CONTEXT 中仅仅包含一个 _PER_IO_CONTEXT也就是 Overlapped 结构那么如何应对在一个socket 上进行多个异步任务的场景呢 此时就需要多个 _PER_IO_CONTEXT 实例了此时这个链表就发挥作用了。 这里唯一值得注意的是 Overlapped结构体放在 _PER_IO_CONTEXT 第一个字段它的好处是在我们使用 GetQueuedCompletionStatus 获取到当前完成的异步任务的 lpOverlapped 参数时我们可以直接将该指针强转为 _PER_IO_CONTEXT 这样我们便能直到当前具体的 I/O 操作是什么。 而 _PER_SOCKET_CONTEXT 这个结构会作为 lpCompletionKey 被GetQueuedCompletionStatus 返回此时我们便有了当前 Socket 所有的上下文. 这种设计下 一个 _PER_IO_CONTEXT 结构便 对应一个异步任务如果一个 socket 有多个异步任务那么便需要有多个 _PER_IO_CONTEXT 结构. 至于这个 demo 中对于这个结构体的设计在实际使用中有很多需要斟酌的地方。 到了这里我们使用 WSARecv 和 WSASend 也就不难了。
实例
代码较多细细品味
// THIS CODE AND INFORMATION IS PROVIDED AS IS WITHOUT WARRANTY OF
// ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND/OR FITNESS FOR A
// PARTICULAR PURPOSE.
//
// Copyright (C) Microsoft Corporation. All Rights Reserved.
//#pragma warning (disable:4127)
#pragma comment(lib,ws2_32.lib)#include winsock2.h
#include mswsock.h
#include Ws2tcpip.h
#include stdio.h
#include stdlib.h
#include strsafe.h#define DEFAULT_PORT 5001
#define MAX_BUFF_SIZE 8192
#define MAX_WORKER_THREAD 16#define xmalloc(s) HeapAlloc(GetProcessHeap(), HEAP_ZERO_MEMORY, (s))
#define xfree(p) HeapFree(GetProcessHeap(), 0, (p))typedef enum _IO_OPERATION {ClientIoAccept,ClientIoRead,ClientIoWrite
} IO_OPERATION, *PIO_OPERATION;typedef struct _PER_IO_CONTEXT {WSAOVERLAPPED Overlapped;char Buffer[MAX_BUFF_SIZE];WSABUF wsabuf;int nTotalBytes;int nSentBytes;IO_OPERATION IOOperation;SOCKET SocketAccept; struct _PER_IO_CONTEXT *pIOContextForward;
} PER_IO_CONTEXT, *PPER_IO_CONTEXT;// 作为 lpCompletionKey 使用
// 每个 socket 对应一个 _PER_SOCKET_CONTEXT 结构
// 该 socket 上的异步任务信息存储在 pIoContext 中该结构中是一个链表因此 pIoContext 应当被当作一个动态数组来看待
typedef struct _PER_SOCKET_CONTEXT {SOCKET Socket;LPFN_ACCEPTEX fnAcceptEx;PPER_IO_CONTEXT pIOContext; struct _PER_SOCKET_CONTEXT *pCtxtBack; struct _PER_SOCKET_CONTEXT *pCtxtForward;
} PER_SOCKET_CONTEXT, *PPER_SOCKET_CONTEXT;BOOL CreateListenSocket(void);
BOOL CreateAcceptSocket(BOOL fUpdateIOCP);
DWORD WINAPI WorkerThread(LPVOID WorkContext);PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET s, IO_OPERATION ClientIo, BOOL bAddToList);PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET s, IO_OPERATION ClientIO);
VOID CloseClient(PPER_SOCKET_CONTEXT lpPerSocketContext, BOOL bGraceful);
VOID CtxtListFree();
VOID CtxtListAddTo(PPER_SOCKET_CONTEXT lpPerSocketContext);
VOID CtxtListDeleteFrom(PPER_SOCKET_CONTEXT lpPerSocketContext);BOOL g_bEndServer FALSE;
BOOL g_bRestart TRUE;
HANDLE g_hIOCP INVALID_HANDLE_VALUE;
SOCKET g_sdListen INVALID_SOCKET;
HANDLE g_ThreadHandles[MAX_WORKER_THREAD];
WSAEVENT g_hCleanupEvent[1];
PPER_SOCKET_CONTEXT g_pCtxtListenSocket NULL;
PPER_SOCKET_CONTEXT g_pCtxtList NULL;
CRITICAL_SECTION g_CriticalSection;int myprintf(const char *lpFormat, ...);void main() {SYSTEM_INFO systemInfo;WSADATA wsaData;DWORD dwThreadCount 0;int nRet 0;HANDLE hThread;DWORD dwThreadId;g_ThreadHandles[0] (HANDLE)WSA_INVALID_EVENT;for (int i 0; i MAX_WORKER_THREAD; i) {g_ThreadHandles[i] INVALID_HANDLE_VALUE;}GetSystemInfo(systemInfo);dwThreadCount systemInfo.dwNumberOfProcessors * 2;if (WSA_INVALID_EVENT (g_hCleanupEvent[0] WSACreateEvent())) {myprintf(WSACreateEvent() failed: %d\n, WSAGetLastError());return;}if ((nRet WSAStartup(0x202, wsaData)) ! 0) {myprintf(WSAStartup() failed: %d\n,nRet);if(g_hCleanupEvent[0] ! WSA_INVALID_EVENT) {WSACloseEvent(g_hCleanupEvent[0]);g_hCleanupEvent[0] WSA_INVALID_EVENT;}return;}InitializeCriticalSection(g_CriticalSection);while (g_bRestart) {g_bRestart FALSE;g_bEndServer FALSE;WSAResetEvent(g_hCleanupEvent[0]);// 创建 IOCP 实例g_hIOCP CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);if (g_hIOCP NULL) {myprintf(CreateIoCompletionPort() failed to create I/O completion port: %d\n, GetLastError());goto done;}// 启用 worker 线程来处理异步任务完成的通知for (DWORD dwCPU0; dwCPUdwThreadCount; dwCPU) {// Create worker threads to service the overlapped I/O requests. The decision// to create 2 worker threads per CPU in the system is a heuristic. Also,// note that thread handles are closed right away, because we will not need them// and the worker threads will continue to execute.hThread CreateThread(NULL, 0, WorkerThread, g_hIOCP, 0, dwThreadId);if (hThread NULL) {myprintf(CreateThread() failed to create worker thread: %d\n, GetLastError());goto done;}g_ThreadHandles[dwCPU] hThread;hThread INVALID_HANDLE_VALUE;}if (!CreateListenSocket())goto done;// 提交 accept 任务if (!CreateAcceptSocket(TRUE))goto done;// 阻塞主线程直到服务器退出WSAWaitForMultipleEvents(1, g_hCleanupEvent, TRUE, WSA_INFINITE, FALSE);done:// 当服务器退出时做一些清理工作g_bEndServer TRUE;// Cause worker threads to exit// 因为我们在子线程中调用 GetQueuedCompletionStatus 使用的timeout 值为 INFINITE // 我们需要手动的 post 一个 I/O completion packet 到 IOCP 实例上以便子线程中的 // GetQueuedCompletionStatus 读取到我们手动 post 的任务完成通知而退出// 不致于子线程用于无法退出if (g_hIOCP) {for (DWORD i 0; i dwThreadCount; i) {PostQueuedCompletionStatus(g_hIOCP, 0, 0, NULL);}}// Make sure worker threads exits.if (WAIT_OBJECT_0 ! WaitForMultipleObjects(dwThreadCount, g_ThreadHandles, TRUE, 1000)) {myprintf(WaitForMultipleObjects() failed: %d\n, GetLastError());} else {for (DWORD i0; idwThreadCount; i) {if (g_ThreadHandles[i] ! INVALID_HANDLE_VALUE)CloseHandle(g_ThreadHandles[i]);g_ThreadHandles[i] INVALID_HANDLE_VALUE;}}if (g_sdListen ! INVALID_SOCKET) {closesocket(g_sdListen);g_sdListen INVALID_SOCKET;}if (g_pCtxtListenSocket) {// 如果当前 Server socket 上还有正在进行的异步任务等待它完成再清理while (!HasOverlappedIoCompleted((LPOVERLAPPED)g_pCtxtListenSocket-pIOContext-Overlapped))Sleep(0);if (g_pCtxtListenSocket-pIOContext-SocketAccept ! INVALID_SOCKET)closesocket(g_pCtxtListenSocket-pIOContext-SocketAccept);g_pCtxtListenSocket-pIOContext-SocketAccept INVALID_SOCKET;if (g_pCtxtListenSocket-pIOContext)xfree(g_pCtxtListenSocket-pIOContext);if (g_pCtxtListenSocket)xfree(g_pCtxtListenSocket);g_pCtxtListenSocket NULL;}CtxtListFree();if (g_hIOCP) {CloseHandle(g_hIOCP);g_hIOCP NULL;}} //while (g_bRestart)DeleteCriticalSection(g_CriticalSection);if (g_hCleanupEvent[0] ! WSA_INVALID_EVENT) {WSACloseEvent(g_hCleanupEvent[0]);g_hCleanupEvent[0] WSA_INVALID_EVENT;}WSACleanup();
} //mainSOCKET CreateSocket() {int nRet 0;int nZero 0;SOCKET sdSocket INVALID_SOCKET;sdSocket WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); if (sdSocket INVALID_SOCKET) {myprintf(WSASocket(sdSocket) failed: %d\n, WSAGetLastError());return(sdSocket);}//// Disable send buffering on the socket. Setting SO_SNDBUF// to 0 causes winsock to stop buffering sends and perform// sends directly from our buffers, thereby save one memory copy.//// However, this does prevent the socket from ever filling the// send pipeline. This can lead to packets being sent that are// not full (i.e. the overhead of the IP and TCP headers is // great compared to the amount of data being carried).//// Disabling the send buffer has less serious repercussions // than disabling the receive buffer.//nZero 0;nRet setsockopt(sdSocket, SOL_SOCKET, SO_SNDBUF, (char *)nZero, sizeof(nZero));if (nRet SOCKET_ERROR) {myprintf(setsockopt(SNDBUF) failed: %d\n, WSAGetLastError());return(sdSocket);}//// Dont disable receive buffering. This will cause poor network// performance since if no receive is posted and no receive buffers,// the TCP stack will set the window size to zero and the peer will// no longer be allowed to send data.//// // Do not set a linger value...especially dont set it to an abortive// close. If you set abortive close and there happens to be a bit of// data remaining to be transfered (or data that has not been // acknowledged by the peer), the connection will be forcefully reset// and will lead to a loss of data (i.e. the peer wont get the last// bit of data). This is BAD. If you are worried about malicious// clients connecting and then not sending or receiving, the server// should maintain a timer on each connection. If after some point,// the server deems a connection is stale it can then set linger// to be abortive and close the connection.///*LINGER lingerStruct;lingerStruct.l_onoff 1;lingerStruct.l_linger 0;nRet setsockopt(sdSocket, SOL_SOCKET, SO_LINGER,(char *)lingerStruct, sizeof(lingerStruct));if( nRet SOCKET_ERROR ) {myprintf(setsockopt(SO_LINGER) failed: %d\n, WSAGetLastError());return(sdSocket);}*/return(sdSocket);
}BOOL CreateListenSocket(void) {int nRet 0;LINGER lingerStruct;struct addrinfo hints {0};struct addrinfo *addrlocal NULL;lingerStruct.l_onoff 1;lingerStruct.l_linger 0;hints.ai_flags AI_PASSIVE;hints.ai_family AF_INET;hints.ai_socktype SOCK_STREAM;hints.ai_protocol IPPROTO_IP;if (getaddrinfo(NULL, DEFAULT_PORT, hints, addrlocal) ! 0) {myprintf(getaddrinfo() failed with error %d\n, WSAGetLastError());return FALSE;}if (addrlocal NULL) {myprintf(getaddrinfo() failed to resolve/convert the interface\n);return FALSE;}g_sdListen CreateSocket();if (g_sdListen INVALID_SOCKET) {freeaddrinfo(addrlocal);return FALSE;}nRet bind(g_sdListen, addrlocal-ai_addr, (int) addrlocal-ai_addrlen);if (nRet SOCKET_ERROR) {myprintf(bind() failed: %d\n, WSAGetLastError());freeaddrinfo(addrlocal);return FALSE;}nRet listen(g_sdListen, 5);if (nRet SOCKET_ERROR) {myprintf(listen() failed: %d\n, WSAGetLastError());freeaddrinfo(addrlocal);return FALSE;}freeaddrinfo(addrlocal);return TRUE;
}//
// Create a socket and invoke AcceptEx. Only the original call to to this
// function needs to be added to the IOCP.
//
// If the expected behaviour of connecting client applications is to NOT
// send data right away, then only posting one AcceptEx can cause connection
// attempts to be refused if a client connects without sending some initial
// data (notice that the associated iocpclient does not operate this way
// but instead makes a connection and starts sending data write away).
// This is because the IOCP packet does not get delivered without the initial
// data (as implemented in this sample) thus preventing the worker thread
// from posting another AcceptEx and eventually the backlog value set in
// listen() will be exceeded if clients continue to try to connect.
//
// One technique to address this situation is to simply cause AcceptEx
// to return right away upon accepting a connection without returning any
// data. This can be done by setting dwReceiveDataLength0 when calling AcceptEx.
//
// Another technique to address this situation is to post multiple calls
// to AcceptEx. Posting multiple calls to AcceptEx is similar in concept to
// increasing the backlog value in listen(), though posting AcceptEx is
// dynamic (i.e. during the course of running your application you can adjust
// the number of AcceptEx calls you post). It is important however to keep
// your backlog value in listen() high in your server to ensure that the
// stack can accept connections even if your application does not get enough
// CPU cycles to repost another AcceptEx under stress conditions.
//
// This sample implements neither of these techniques and is therefore
// susceptible to the behaviour described above.
//
BOOL CreateAcceptSocket(BOOL fUpdateIOCP) {int nRet 0;DWORD dwRecvNumBytes 0;DWORD bytes 0;GUID acceptex_guid WSAID_ACCEPTEX;//The context for listening socket uses the SockAccept member to store the//socket for client connection. if (fUpdateIOCP) {g_pCtxtListenSocket UpdateCompletionPort(g_sdListen, ClientIoAccept, FALSE);if (g_pCtxtListenSocket NULL) {myprintf(failed to update listen socket to IOCP\n);return FALSE;}// 动态获取 AcceptEx 方法的函数指针// 将它保存再对应 Socket context 上nRet WSAIoctl(g_sdListen,SIO_GET_EXTENSION_FUNCTION_POINTER,acceptex_guid,sizeof(acceptex_guid),g_pCtxtListenSocket-fnAcceptEx,sizeof(g_pCtxtListenSocket-fnAcceptEx),bytes,NULL,NULL);if (nRet SOCKET_ERROR) {myprintf(failed to load AcceptEx: %d\n, WSAGetLastError());return FALSE;}}g_pCtxtListenSocket-pIOContext-SocketAccept CreateSocket();if (g_pCtxtListenSocket-pIOContext-SocketAccept INVALID_SOCKET) {myprintf(failed to create new accept socket\n);return FALSE;}// 提交接收任务// 这里我们期待接收 socket 的同时从该 socket 上 接收一块儿数据nRet g_pCtxtListenSocket-fnAcceptEx(g_sdListen, g_pCtxtListenSocket-pIOContext-SocketAccept,(LPVOID)(g_pCtxtListenSocket-pIOContext-Buffer),MAX_BUFF_SIZE - (2 * (sizeof(SOCKADDR_STORAGE) 16)),sizeof(SOCKADDR_STORAGE) 16, sizeof(SOCKADDR_STORAGE) 16,dwRecvNumBytes,(LPOVERLAPPED) (g_pCtxtListenSocket-pIOContext-Overlapped));if (nRet SOCKET_ERROR (ERROR_IO_PENDING ! WSAGetLastError())) {myprintf(AcceptEx() failed: %d\n, WSAGetLastError());return FALSE;}return TRUE;
}DWORD WINAPI WorkerThread (LPVOID WorkThreadContext) {HANDLE hIOCP (HANDLE)WorkThreadContext;BOOL bSuccess FALSE;int nRet 0;LPWSAOVERLAPPED lpOverlapped NULL;PPER_SOCKET_CONTEXT lpPerSocketContext NULL;PPER_SOCKET_CONTEXT lpAcceptSocketContext NULL;PPER_IO_CONTEXT lpIOContext NULL; WSABUF buffRecv;WSABUF buffSend;DWORD dwRecvNumBytes 0;DWORD dwSendNumBytes 0;DWORD dwFlags 0;DWORD dwIoSize 0;HRESULT hRet;while (TRUE) {// 阻塞的等待有异步任务完成的通知到来// 如果没有一直等待bSuccess GetQueuedCompletionStatus(hIOCP,dwIoSize,(PDWORD_PTR)lpPerSocketContext,(LPOVERLAPPED *)lpOverlapped,INFINITE );if (!bSuccess)myprintf(GetQueuedCompletionStatus() failed: %d\n, GetLastError());// 当服务器退出时我们使用 PostQueuedCompletionStatus post 的消息会触发这个 case// 我们当前子线程便可以正常退出了if (lpPerSocketContext NULL) {return 0;}if (g_bEndServer) {return 0;}lpIOContext (PPER_IO_CONTEXT)lpOverlapped;////We should never skip the loop and not post another AcceptEx if the current//completion packet is for previous AcceptEx//if (lpIOContext-IOOperation ! ClientIoAccept) {if (!bSuccess || (bSuccess (0 dwIoSize))) {CloseClient(lpPerSocketContext, FALSE); continue;}}//// determine what type of IO packet has completed by checking the PER_IO_CONTEXT // associated with this socket. This will determine what action to take.//switch (lpIOContext-IOOperation) {case ClientIoAccept://// When the AcceptEx function returns, the socket sAcceptSocket is // in the default state for a connected socket. The socket sAcceptSocket // does not inherit the properties of the socket associated with // sListenSocket parameter until SO_UPDATE_ACCEPT_CONTEXT is set on // the socket. Use the setsockopt function to set the SO_UPDATE_ACCEPT_CONTEXT // option, specifying sAcceptSocket as the socket handle and sListenSocket // as the option value. //nRet setsockopt(lpPerSocketContext-pIOContext-SocketAccept, SOL_SOCKET,SO_UPDATE_ACCEPT_CONTEXT,(char *)g_sdListen,sizeof(g_sdListen));if (nRet SOCKET_ERROR) {////just warn user here.//myprintf(setsockopt(SO_UPDATE_ACCEPT_CONTEXT) failed to update accept socket\n);WSASetEvent(g_hCleanupEvent[0]);return 0;}lpAcceptSocketContext UpdateCompletionPort(lpPerSocketContext-pIOContext-SocketAccept, ClientIoAccept, TRUE);if (lpAcceptSocketContext NULL) {////just warn user here.//myprintf(failed to update accept socket to IOCP\n);WSASetEvent(g_hCleanupEvent[0]);return 0;}if (dwIoSize) {lpAcceptSocketContext-pIOContext-IOOperation ClientIoWrite;lpAcceptSocketContext-pIOContext-nTotalBytes dwIoSize;lpAcceptSocketContext-pIOContext-nSentBytes 0;lpAcceptSocketContext-pIOContext-wsabuf.len dwIoSize;hRet StringCbCopyNA(lpAcceptSocketContext-pIOContext-Buffer,MAX_BUFF_SIZE,lpPerSocketContext-pIOContext-Buffer,sizeof(lpPerSocketContext-pIOContext-Buffer));lpAcceptSocketContext-pIOContext-wsabuf.buf lpAcceptSocketContext-pIOContext-Buffer;nRet WSASend(lpPerSocketContext-pIOContext-SocketAccept,lpAcceptSocketContext-pIOContext-wsabuf, 1,dwSendNumBytes,0,(lpAcceptSocketContext-pIOContext-Overlapped), NULL);if (nRet SOCKET_ERROR (ERROR_IO_PENDING ! WSAGetLastError())) {myprintf (WSASend() failed: %d\n, WSAGetLastError());CloseClient(lpAcceptSocketContext, FALSE);} else {myprintf(WorkerThread %d: Socket(%d) AcceptEx completed (%d bytes), Send posted\n, GetCurrentThreadId(), lpPerSocketContext-Socket, dwIoSize);}} else {//// AcceptEx completes but doesnt read any data so we need to post// an outstanding overlapped read.//lpAcceptSocketContext-pIOContext-IOOperation ClientIoRead;dwRecvNumBytes 0;dwFlags 0;buffRecv.buf lpAcceptSocketContext-pIOContext-Buffer,buffRecv.len MAX_BUFF_SIZE;nRet WSARecv(lpAcceptSocketContext-Socket,buffRecv, 1,dwRecvNumBytes,dwFlags,lpAcceptSocketContext-pIOContext-Overlapped, NULL);if (nRet SOCKET_ERROR (ERROR_IO_PENDING ! WSAGetLastError())) {myprintf (WSARecv() failed: %d\n, WSAGetLastError());CloseClient(lpAcceptSocketContext, FALSE);}}////Time to post another outstanding AcceptEx//if (!CreateAcceptSocket(FALSE)) {myprintf(Please shut down and reboot the server.\n);WSASetEvent(g_hCleanupEvent[0]);return(0);}break;case ClientIoRead://// a read operation has completed, post a write operation to echo the// data back to the client using the same data buffer.//lpIOContext-IOOperation ClientIoWrite;lpIOContext-nTotalBytes dwIoSize;lpIOContext-nSentBytes 0;lpIOContext-wsabuf.len dwIoSize;dwFlags 0;nRet WSASend(lpPerSocketContext-Socket,lpIOContext-wsabuf, 1, dwSendNumBytes,dwFlags,(lpIOContext-Overlapped), NULL);if (nRet SOCKET_ERROR (ERROR_IO_PENDING ! WSAGetLastError())) {myprintf(WSASend() failed: %d\n, WSAGetLastError());CloseClient(lpPerSocketContext, FALSE);} else {myprintf(WorkerThread %d: Socket(%d) Recv completed (%d bytes), Send posted\n, GetCurrentThreadId(), lpPerSocketContext-Socket, dwIoSize);}break;case ClientIoWrite://// a write operation has completed, determine if all the data intended to be// sent actually was sent.//lpIOContext-IOOperation ClientIoWrite;lpIOContext-nSentBytes dwIoSize;dwFlags 0;if (lpIOContext-nSentBytes lpIOContext-nTotalBytes) {//// the previous write operation didnt send all the data,// post another send to complete the operation//buffSend.buf lpIOContext-Buffer lpIOContext-nSentBytes;buffSend.len lpIOContext-nTotalBytes - lpIOContext-nSentBytes;nRet WSASend (lpPerSocketContext-Socket,buffSend,1, dwSendNumBytes,dwFlags,(lpIOContext-Overlapped), NULL);if (nRet SOCKET_ERROR (ERROR_IO_PENDING ! WSAGetLastError())) {myprintf (WSASend() failed: %d\n, WSAGetLastError());CloseClient(lpPerSocketContext, FALSE);} else {myprintf(WorkerThread %d: Socket(%d) Send partially completed (%d bytes), Recv posted\n, GetCurrentThreadId(), lpPerSocketContext-Socket, dwIoSize);}} else {//// previous write operation completed for this socket, post another recv//lpIOContext-IOOperation ClientIoRead; dwRecvNumBytes 0;dwFlags 0;buffRecv.buf lpIOContext-Buffer,buffRecv.len MAX_BUFF_SIZE;nRet WSARecv(lpPerSocketContext-Socket,buffRecv, 1, dwRecvNumBytes,dwFlags,lpIOContext-Overlapped, NULL);if (nRet SOCKET_ERROR (ERROR_IO_PENDING ! WSAGetLastError())) {myprintf (WSARecv() failed: %d\n, WSAGetLastError());CloseClient(lpPerSocketContext, FALSE);} else {myprintf(WorkerThread %d: Socket(%d) Send completed (%d bytes), Recv posted\n, GetCurrentThreadId(), lpPerSocketContext-Socket, dwIoSize);}}break;} //switch} //whilereturn 0;
} //
// Allocate a context structures for the socket and add the socket to the IOCP.
// Additionally, add the context structure to the global list of context structures.
//
PPER_SOCKET_CONTEXT UpdateCompletionPort(SOCKET sd, IO_OPERATION ClientIo, BOOL bAddToList) {PPER_SOCKET_CONTEXT lpPerSocketContext;lpPerSocketContext CtxtAllocate(sd, ClientIo);if (lpPerSocketContext NULL)return NULL;g_hIOCP CreateIoCompletionPort((HANDLE)sd, g_hIOCP, (DWORD_PTR)lpPerSocketContext, 0);if (g_hIOCP NULL) {myprintf(CreateIoCompletionPort() failed: %d\n, GetLastError());if( lpPerSocketContext-pIOContext )xfree(lpPerSocketContext-pIOContext);xfree(lpPerSocketContext);return NULL;}////The listening socket context (bAddToList is FALSE) is not added to the list.//All other socket contexts are added to the list.//if (bAddToList) CtxtListAddTo(lpPerSocketContext);myprintf(UpdateCompletionPort: Socket(%d) added to IOCP\n, lpPerSocketContext-Socket);return lpPerSocketContext;
}//
// Close down a connection with a client. This involves closing the socket (when
// initiated as a result of a CTRL-C the socket closure is not graceful). Additionally,
// any context data associated with that socket is freed.
//
VOID CloseClient (PPER_SOCKET_CONTEXT lpPerSocketContext, BOOL bGraceful) {EnterCriticalSection(g_CriticalSection);if (lpPerSocketContext) {myprintf(CloseClient: Socket(%d) connection closing (graceful%s)\n, lpPerSocketContext-Socket, (bGraceful?TRUE:FALSE));if (!bGraceful) {//// force the subsequent closesocket to be abortative.//LINGER lingerStruct;lingerStruct.l_onoff 1;lingerStruct.l_linger 0;setsockopt(lpPerSocketContext-Socket, SOL_SOCKET, SO_LINGER, (char *)lingerStruct, sizeof(lingerStruct));}if (lpPerSocketContext-pIOContext-SocketAccept ! INVALID_SOCKET) {closesocket(lpPerSocketContext-pIOContext-SocketAccept);lpPerSocketContext-pIOContext-SocketAccept INVALID_SOCKET;};closesocket(lpPerSocketContext-Socket);lpPerSocketContext-Socket INVALID_SOCKET;CtxtListDeleteFrom(lpPerSocketContext);lpPerSocketContext NULL;} else {myprintf(CloseClient: lpPerSocketContext is NULL\n);}LeaveCriticalSection(g_CriticalSection);return;
} //
// Allocate a socket context for the new connection.
//
PPER_SOCKET_CONTEXT CtxtAllocate(SOCKET sd, IO_OPERATION ClientIO) {PPER_SOCKET_CONTEXT lpPerSocketContext;EnterCriticalSection(g_CriticalSection);lpPerSocketContext (PPER_SOCKET_CONTEXT)xmalloc(sizeof(PER_SOCKET_CONTEXT));if (lpPerSocketContext) {lpPerSocketContext-pIOContext (PPER_IO_CONTEXT)xmalloc(sizeof(PER_IO_CONTEXT));if( lpPerSocketContext-pIOContext ) {lpPerSocketContext-Socket sd;lpPerSocketContext-pCtxtBack NULL;lpPerSocketContext-pCtxtForward NULL;lpPerSocketContext-pIOContext-Overlapped.Internal 0;lpPerSocketContext-pIOContext-Overlapped.InternalHigh 0;lpPerSocketContext-pIOContext-Overlapped.Offset 0;lpPerSocketContext-pIOContext-Overlapped.OffsetHigh 0;lpPerSocketContext-pIOContext-Overlapped.hEvent NULL;lpPerSocketContext-pIOContext-IOOperation ClientIO;lpPerSocketContext-pIOContext-pIOContextForward NULL;lpPerSocketContext-pIOContext-nTotalBytes 0;lpPerSocketContext-pIOContext-nSentBytes 0;lpPerSocketContext-pIOContext-wsabuf.buf lpPerSocketContext-pIOContext-Buffer;lpPerSocketContext-pIOContext-wsabuf.len sizeof(lpPerSocketContext-pIOContext-Buffer);lpPerSocketContext-pIOContext-SocketAccept INVALID_SOCKET;ZeroMemory(lpPerSocketContext-pIOContext-wsabuf.buf, lpPerSocketContext-pIOContext-wsabuf.len);} else {xfree(lpPerSocketContext);myprintf(HeapAlloc() PER_IO_CONTEXT failed: %d\n, GetLastError());}} else {myprintf(HeapAlloc() PER_SOCKET_CONTEXT failed: %d\n, GetLastError());return NULL;}LeaveCriticalSection(g_CriticalSection);return(lpPerSocketContext);
}//
// Add a client connection context structure to the global list of context structures.
//
VOID CtxtListAddTo(PPER_SOCKET_CONTEXT lpPerSocketContext) {PPER_SOCKET_CONTEXT pTemp;EnterCriticalSection(g_CriticalSection);if (g_pCtxtList NULL) {//// add the first node to the linked list//lpPerSocketContext-pCtxtBack NULL;lpPerSocketContext-pCtxtForward NULL;g_pCtxtList lpPerSocketContext;} else {//// add node to head of list//pTemp g_pCtxtList;g_pCtxtList lpPerSocketContext;lpPerSocketContext-pCtxtBack pTemp;lpPerSocketContext-pCtxtForward NULL; pTemp-pCtxtForward lpPerSocketContext;}LeaveCriticalSection(g_CriticalSection);return;
}//
// Remove a client context structure from the global list of context structures.
//
VOID CtxtListDeleteFrom(PPER_SOCKET_CONTEXT lpPerSocketContext) {PPER_SOCKET_CONTEXT pBack;PPER_SOCKET_CONTEXT pForward;PPER_IO_CONTEXT pNextIO NULL;PPER_IO_CONTEXT pTempIO NULL;EnterCriticalSection(g_CriticalSection);if (lpPerSocketContext) {pBack lpPerSocketContext-pCtxtBack;pForward lpPerSocketContext-pCtxtForward;if (pBack NULL pForward NULL) {//// This is the only node in the list to delete//g_pCtxtList NULL;} else if (pBack NULL pForward ! NULL) {//// This is the start node in the list to delete//pForward-pCtxtBack NULL;g_pCtxtList pForward;} else if (pBack ! NULL pForward NULL) {//// This is the end node in the list to delete//pBack-pCtxtForward NULL;} else if (pBack pForward) {//// Neither start node nor end node in the list//pBack-pCtxtForward pForward;pForward-pCtxtBack pBack;}//// Free all i/o context structures per socket//pTempIO (PPER_IO_CONTEXT)(lpPerSocketContext-pIOContext);do {pNextIO (PPER_IO_CONTEXT)(pTempIO-pIOContextForward);if (pTempIO) {////The overlapped structure is safe to free when only the posted i/o has//completed. Here we only need to test those posted but not yet received //by PQCS in the shutdown process.//if (g_bEndServer)while (!HasOverlappedIoCompleted((LPOVERLAPPED)pTempIO)) Sleep(0);xfree(pTempIO);pTempIO NULL;}pTempIO pNextIO;} while (pNextIO);xfree(lpPerSocketContext);lpPerSocketContext NULL;} else {myprintf(CtxtListDeleteFrom: lpPerSocketContext is NULL\n);}LeaveCriticalSection(g_CriticalSection);return;
}//
// Free all context structure in the global list of context structures.
//
VOID CtxtListFree() {PPER_SOCKET_CONTEXT pTemp1, pTemp2;EnterCriticalSection(g_CriticalSection);pTemp1 g_pCtxtList; while (pTemp1) {pTemp2 pTemp1-pCtxtBack;CloseClient(pTemp1, FALSE);pTemp1 pTemp2;}LeaveCriticalSection(g_CriticalSection);return;
}int myprintf(const char *lpFormat, ...) {int nLen 0;int nRet 0;char cBuffer[512] ;va_list arglist ;HANDLE hOut NULL;HRESULT hRet;ZeroMemory(cBuffer, sizeof(cBuffer));va_start(arglist, lpFormat);nLen lstrlenA(lpFormat) ;hRet StringCchVPrintfA(cBuffer,512,lpFormat,arglist);if (nRet nLen || GetLastError() 0) {hOut GetStdHandle(STD_OUTPUT_HANDLE);if (hOut ! INVALID_HANDLE_VALUE)WriteConsole( hOut, cBuffer, lstrlenA(cBuffer), (LPDWORD)nLen, NULL ) ;}return nLen ;
}
END!!!