Windows 线程池

2018-08-16  本文已影响0人  szn好色仙人
Windows提供的线程池工作模式
备注
  • 使用以上内存池时,需要考虑线程同步问题
以异步方式调用函数

TrySubmitThreadpoolCallback

BOOL WINAPI TrySubmitThreadpoolCallback
(
    PTP_SIMPLE_CALLBACK pfns,
    PVOID pv,
    PTP_CALLBACK_ENVIRON pcbe   //用于对线程池进行定制
);
//回调函数类型
VOID CALLBACK SimpleCallback
(
    PTP_CALLBACK_INSTANCE Instance,
    PVOID                 Context
);
示例
#include <windows.h>
#include <cstdio>
#include <cassert>

void CALLBACK Fun(PTP_CALLBACK_INSTANCE, void* pVoid)
{
    printf("%s\n", static_cast<const char*>(pVoid));
}

int main()
{
    const char* pStrC = "Hello World";

    BOOL bRe = TrySubmitThreadpoolCallback(Fun, const_cast<char*>(pStrC), NULL);
    assert(bRe);

    system("pause");
    return 0;
    //程序输出"Hello World"
}

CreateThreadpoolWork

PTP_WORK WINAPI CreateThreadpoolWork
(
    PTP_WORK_CALLBACK pfnwk,
    PVOID pv,
    PTP_CALLBACK_ENVIRON pcbe   //用于对线程池进行定制
);
VOID CALLBACK WorkCallback
(
    PTP_CALLBACK_INSTANCE Instance,
    PVOID                 Context,
    PTP_WORK              Work
);

SubmitThreadpoolWork

void SubmitThreadpoolWork
(
    PTP_WORK pwk
);

WaitForThreadpoolWorkCallbacks

VOID WINAPI WaitForThreadpoolWorkCallbacks
(
    PTP_WORK pwk,
    BOOL fCancelPendingCallbacks
);

CloseThreadpoolWork

VOID WINAPI CloseThreadpoolWork
(
    PTP_WORK pwk
);
示例
#include <windows.h>
#include <cstdio>
#include <cassert>

void CALLBACK Fun(PTP_CALLBACK_INSTANCE, void* pVoid, PTP_WORK)
{
    printf("%s\n", static_cast<const char*>(pVoid));
}

int main()
{
    const char* pStrC = "Hello World";
    PTP_WORK pWork = CreateThreadpoolWork(Fun, const_cast<char*>(pStrC), NULL);
    assert(pWork);

    SubmitThreadpoolWork(pWork);
    SubmitThreadpoolWork(pWork);
    WaitForThreadpoolWorkCallbacks(pWork, FALSE);

    CloseThreadpoolWork(pWork);

    system("pause");
    /*
    输出:
    "Hello World"
    "Hello World"
    */
}
备注
  • 每一次调用TrySubmitThreadpoolCallback,系统会在内部分配一个工作项,如果打算提交大量的工作项,则出于内存以及性能的考虑,使用CreateThreadpoolWork创建一个工作项并分多次提交会更好
每隔一段时间调用一个函数

CreateThreadpoolTimer

PTP_TIMER WINAPI CreateThreadpoolTimer
(
    PTP_TIMER_CALLBACK pfnti,   //回调函数
    PVOID pv,                   //传给回调函数的值
    PTP_CALLBACK_ENVIRON pcbe   //用于线程池定制
);
void __stdcall FunCallBack
(
    PTP_CALLBACK_INSTANCE Instance,
    PVOID                 Context,
    PTP_TIMER             Timer
);

SetThreadpoolTimer

VOID WINAPI SetThreadpoolTimer
(
    PTP_TIMER pti,
    PFILETIME pftDueTime,
    DWORD msPeriod,
    DWORD msWindowLength
);

WaitForThreadpoolTimerCallbacks

VOID WINAPI WaitForThreadpoolTimerCallbacks
(
    PTP_TIMER pti,
    BOOL fCancelPendingCallbacks
);

CloseThreadpoolTimer

VOID WINAPI CloseThreadpoolTimer
(
    PTP_TIMER pti
);
示例
#include <windows.h>
#include <cstdio>
#include <cassert>

void __stdcall Fun(PTP_CALLBACK_INSTANCE, void* pVoid, PTP_TIMER Timer)
{
    printf("%s\n", static_cast<const char*>(pVoid));
}

int main()
{
    const char* pStrC = "Hello";
    PTP_TIMER pTimer = CreateThreadpoolTimer(Fun, const_cast<char*>(pStrC), NULL);
    assert(pTimer);

    ULARGE_INTEGER nTem;
    FILETIME FileTime;
    nTem.QuadPart = -1 * 20000000;
    FileTime.dwHighDateTime = nTem.HighPart;
    FileTime.dwLowDateTime = nTem.LowPart;
    /*
    FileTime.dwHighDateTime = 4294967295
    FileTime.dwLowDateTime = 4274967296
    */

    SetThreadpoolTimer(pTimer, &FileTime, 1000, NULL);
    
    Sleep(5000);

    WaitForThreadpoolTimerCallbacks(pTimer, FALSE);
    CloseThreadpoolTimer(pTimer);
    return 0;

    /*
    在程序执行2秒后,输出Hello,然后每隔1秒输出Hello,一共输出3个Hello,程序结束
    */
}
在内核对象触发时调用一个函数

CreateThreadpoolWait

PTP_WAIT WINAPI CreateThreadpoolWait
(
    PTP_WAIT_CALLBACK pfnwa,        //回调函数
    PVOID pv,                       //传递给回调函数的值
    PTP_CALLBACK_ENVIRON pcbe       //用于定制线程池
);
void __stdcall FunCallBack
(
    PTP_CALLBACK_INSTANCE Instance,
    PVOID                 Context,
    PTP_WAIT              Wait,
    TP_WAIT_RESULT        WaitResult    //等待结果
);
等待结果的值 意义
WAIT_OBJECT_0 内核对象被触发
WAIT_TIMEOUT 超时
WAIT_ABANDONED_0 互斥量被遗弃

SetThreadpoolWait

VOID WINAPI SetThreadpoolWait
(
    PTP_WAIT pwa,
    HANDLE h,
    PFILETIME pftTimeout
);

WaitForThreadpoolWaitCallbacks

VOID WINAPI WaitForThreadpoolTimerCallbacks
(
    PTP_TIMER pti,
    BOOL fCancelPendingCallbacks
);

CloseThreadpoolWait

示例
#include <windows.h>
#include <cstdio>
#include <cassert>

void __stdcall Fun(PTP_CALLBACK_INSTANCE, void* pVoid, PTP_WAIT pWait,
    TP_WAIT_RESULT nWaitResult)
{
    if (WAIT_OBJECT_0 == nWaitResult)
    {
        printf("%s\n", static_cast<const char*>(pVoid));
    }
    else if (WAIT_TIMEOUT == nWaitResult)
    {
        printf("TimeOut\n");
    }
    else if (WAIT_ABANDONED_0 == nWaitResult)
    {
        printf("Abandoned\n");
    }
    else
    {
        assert(false);
    }
}

int main()
{
    const char* pStrC = "Hello";
    HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
    PTP_WAIT pWait = CreateThreadpoolWait(Fun, const_cast<char*>(pStrC), NULL);
    assert(pWait && hEvent);

    ULARGE_INTEGER nTem;
    nTem.QuadPart = -10000000 * 2;
    FILETIME FileTime;
    FileTime.dwLowDateTime = nTem.LowPart;
    FileTime.dwHighDateTime = nTem.HighPart;

    SetThreadpoolWait(pWait, hEvent, &FileTime);
    Sleep(3000);
    SetThreadpoolWait(pWait, hEvent, &FileTime);
    Sleep(1000);
    SetEvent(hEvent);
    Sleep(2000);

    WaitForThreadpoolWaitCallbacks(pWait, FALSE);

    CloseHandle(hEvent);
    CloseThreadpoolWait(pWait);
    
    /*
    运行结果:
    运行2s后输出"TimeOut"
    运行4s后输出"Hello"
    运行6s后退出
    */
}
在异步I/O请求完成时调用一个函数
PTP_IO WINAPI CreateThreadpoolIo
(
    HANDLE fl,                      //被线程池内部I/O完成端口关联的设备句柄
    PTP_WIN32_IO_CALLBACK pfnio,    //回调函数
    PVOID pv,                       //回调函数参数
    PTP_CALLBACK_ENVIRON pcbe       //用于线程池定制
);
回调函数
VOID _stdcall Fun
(
    PTP_CALLBACK_INSTANCE Instance,                 
    PVOID                 Context,
    PVOID                 Overlapped,
    ULONG                 IoResult,
    ULONG_PTR             NumberOfBytesTransferred,
    PTP_IO                Io
);

StartThreadpoolIo

VOID WINAPI StartThreadpoolIo(PTP_IO pio);

WaitForThreadpoolIoCallbacks

VOID WINAPI WaitForThreadpoolIoCallbacks
(
    PTP_IO pio,
    BOOL fCancelPendingCallbacks
);

CloseThreadpoolIo

VOID WINAPI CloseThreadpoolIo(PTP_IO pio);

CancelThreadpoolIo

void CancelThreadpoolIo(PTP_IO pio);
示例一
#include <windows.h>
#include <cstdio>
#include <cassert>

VOID _stdcall Fun
(
    PTP_CALLBACK_INSTANCE Instance,
    PVOID                 Context,
    PVOID                 Overlapped,
    ULONG                 IoResult,
    ULONG_PTR             NumberOfBytesTransferred,
    PTP_IO                Io
)
{
    assert(NO_ERROR == IoResult);

    printf("I/O操作完成,传输字节数:%d\n", NumberOfBytesTransferred);
}

int main()
{
    HANDLE hFile = CreateFileA("1.dat", GENERIC_READ | GENERIC_WRITE, 0,
        NULL, OPEN_ALWAYS, FILE_FLAG_OVERLAPPED, NULL);
    assert(INVALID_HANDLE_VALUE != hFile);

    PTP_IO pIo = CreateThreadpoolIo(hFile, Fun, NULL, NULL);
    assert(pIo);

    StartThreadpoolIo(pIo);

    const char* pStrC = "Hello World";
    OVERLAPPED ov = {};
    BOOL nRe = WriteFile(hFile, pStrC, static_cast<DWORD>(strlen(pStrC)), NULL, &ov);
    if (!(!nRe && ERROR_IO_PENDING == GetLastError()))
    {
        CancelThreadpoolIo(pIo);
    }

    WaitForThreadpoolIoCallbacks(pIo, false);
    CloseThreadpoolIo(pIo);
    CloseHandle(hFile);

    system("pause");
    //程序输出:I/O操作完成,传输字节数:11
}
示例二(TCP客户端)
#include <WS2tcpip.h>
#include <cstdio>
#include <cassert>

#pragma comment(lib, "ws2_32")

struct SIoContext
{
    SIoContext() : sock(INVALID_SOCKET)
    {
        memset(&OverLapped, 0, sizeof OverLapped);
        wsBuff.buf = buff;
        wsBuff.len = sizeof buff;
    }

    OVERLAPPED OverLapped;
    SOCKET sock;
    WSABUF wsBuff;
    char buff[65536];
};

void InitNetEnvironment()
{
    WSAData wsaData;
    if (0 == WSAStartup(MAKEWORD(2, 2), &wsaData)
        && wsaData.wVersion == 0x202)
    {
        return;
    }

    assert(false);
}

VOID _stdcall Fun
(
    PTP_CALLBACK_INSTANCE Instance,
    PVOID                 Context,
    PVOID                 Overlapped,
    ULONG                 IoResult,
    ULONG_PTR             NumberOfBytesTransferred,
    PTP_IO                Io
)
{
    assert(NO_ERROR == IoResult);

    SIoContext* pIoContext = static_cast<SIoContext*>(Context);
    
    if (NumberOfBytesTransferred > 0)
    {
        pIoContext->buff[NumberOfBytesTransferred] = 0;
        printf("%s\n", pIoContext->buff);

        StartThreadpoolIo(Io);

        DWORD nFlag = 0;
        if (!WSARecv(pIoContext->sock, &pIoContext->wsBuff, 1, nullptr, 
            &nFlag, &pIoContext->OverLapped, NULL))
        {
            CancelThreadpoolIo(Io);
        }
    }
    else
    {
        printf("连接中断\n");
    }
}


int main()
{
    InitNetEnvironment();

    SIoContext IoContext;
    IoContext.sock = socket(AF_INET, SOCK_STREAM, 0);
    assert(INVALID_SOCKET != IoContext.sock);

    sockaddr_in sockAddr = {};
    sockAddr.sin_family = AF_INET;
    sockAddr.sin_port = htons(8080);
    if (1 != inet_pton(AF_INET, "127.0.0.1", &sockAddr.sin_addr.s_addr))
    {
        assert(false);
    }

    int nRe = connect(IoContext.sock,
        reinterpret_cast<sockaddr*>(&sockAddr), sizeof sockaddr);
    assert(SOCKET_ERROR != nRe);

    PTP_IO pIo = CreateThreadpoolIo
        (reinterpret_cast<HANDLE>(IoContext.sock), Fun, &IoContext, NULL);
    assert(pIo);

    StartThreadpoolIo(pIo);

    DWORD nFlag = 0;
    if (!WSARecv(IoContext.sock, &IoContext.wsBuff, 1, nullptr, &nFlag,
        &IoContext.OverLapped, NULL))
    {
        CancelThreadpoolIo(pIo);
    }

    system("pause");

    WaitForThreadpoolIoCallbacks(pIo, false);
    CloseThreadpoolIo(pIo);
    closesocket(IoContext.sock);
    WSACleanup();
    /*
    运行结果:
    先启动服务器,然后服务器先发送"Hello",然后再发送"World",然后断开连接
    本软件输出:
    "Hello"
    "World"
    "连接中断"
    */
}
回调函数的终止操作
函数 功能
LeaveCriticalSectionWhenCallbackReturns Specifies the critical section that the thread pool will release when the current callback completes.
ReleaseMutexWhenCallbackReturns Specifies the mutex that the thread pool will release when the current callback completes.
ReleaseSemaphoreWhenCallbackReturns Specifies the semaphore that the thread pool will release when the current callback completes.
SetEventWhenCallbackReturns Specifies the event that the thread pool will set when the current callback completes.
FreeLibraryWhenCallbackReturns Specifies the DLL that the thread pool will unload when the current callback completes.
示例
#include <windows.h>
#include <cstdio>
#include <cassert>

CRITICAL_SECTION cs;

void CALLBACK Fun(PTP_CALLBACK_INSTANCE pInstance, void* pVoid, PTP_WORK)
{
    EnterCriticalSection(&cs);
    LeaveCriticalSectionWhenCallbackReturns(pInstance, &cs);

    printf("%s\n", static_cast<const char*>(pVoid));
}

int main()
{
    InitializeCriticalSection(&cs);

    const char* pStrC = "Hello World";
    PTP_WORK pWork = CreateThreadpoolWork(Fun, const_cast<char*>(pStrC), NULL);
    assert(pWork);

    SubmitThreadpoolWork(pWork);
    SubmitThreadpoolWork(pWork);
    WaitForThreadpoolWorkCallbacks(pWork, FALSE);

    CloseThreadpoolWork(pWork);

    DeleteCriticalSection(&cs);
    system("pause");

    /*
    程序运行结果:
    输出2个"Hello World"

    若注释掉 LeaveCriticalSectionWhenCallbackReturns(pInstance, &cs);
    则只能输出一个"Hello World",并陷入阻塞状态
    */
}
线程池定制
函数 功能
CreateThreadpool Allocates a new pool of threads to execute callbacks.
唯一的参数:This parameter is reserved and must be NULL.
SetThreadpoolThreadMinimum Sets the minimum number of threads that the specified thread pool must make available to process callbacks.
SetThreadpoolThreadMaximum Sets the maximum number of threads that the specified thread pool can allocate to process callbacks.
CloseThreadpool Closes the specified thread pool.
Remarks:The thread pool is closed immediately if there are no outstanding work, I/O, timer, or wait objects that are bound to the pool; otherwise, the thread pool is released asynchronously after the outstanding objects are freed.
函数 功能
InitializeThreadpoolEnvironment Initializes a callback environment.
SetThreadpoolCallbackPool Sets the thread pool to be used when generating(发生) callbacks.
DestroyThreadpoolEnvironment Deletes the specified callback environment. Call this function when the callback environment is no longer needed for creating new thread pool objects.
示例
#include <windows.h>
#include <cstdio>
#include <cassert>

void CALLBACK Fun(PTP_CALLBACK_INSTANCE pInstance, void* pVoid, PTP_WORK)
{
    printf("%s\n", static_cast<const char*>(pVoid));

    while (true)
    {
        Sleep(1);
    }
}

int main()
{
    const char* pStrC = "Hello World";

    PTP_POOL pPool = CreateThreadpool(NULL);
    assert(pPool);

    SetThreadpoolThreadMaximum(pPool, 1);
    SetThreadpoolThreadMinimum(pPool, 1);

    TP_CALLBACK_ENVIRON CallbackEnviron;
    InitializeThreadpoolEnvironment(&CallbackEnviron);
    SetThreadpoolCallbackPool(&CallbackEnviron, pPool);

    PTP_WORK pWork = CreateThreadpoolWork(Fun, const_cast<char*>(pStrC),
        &CallbackEnviron);
    assert(pWork);

    SubmitThreadpoolWork(pWork);
    SubmitThreadpoolWork(pWork);
    WaitForThreadpoolWorkCallbacks(pWork, FALSE);

    CloseThreadpoolWork(pWork);
    CloseThreadpool(pPool);
    DestroyThreadpoolEnvironment(&CallbackEnviron);

    system("pause");
    /*
    程序运行结果:
    输出"Hello World"后无反应,进程开启的线程总数为2
    */
}
得体的销毁线程池
上一篇 下一篇

猜你喜欢

热点阅读