服务器之家

服务器之家 > 正文

C/C++ 原生API实现线程池的方法

时间:2022-02-17 16:44     来源/作者:lyshark

线程池有两个核心的概念,一个是任务队列,一个是工作线程队列。任务队列负责存放主线程需要处理的任务,工作线程队列其实是一个死循环,负责从任务队列中取出和运行任务,可以看成是一个生产者和多个消费l者的模型。在一些高并发的网络应用中,线程池也是常用的技术。陈硕大神推荐的C++多线程服务端编程模式为:one loop per thread + thread pool,通常会有单独的线程负责接受来自客户端的请求,对请求稍作解析后将数据处理的任务提交到专门的计算线程池。

ThreadPool 线程池同步事件: 线程池内的线程函数同样支持互斥锁,信号控制,内核事件控制,临界区控制.

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
#include <Windows.h>
#include <iostream>
#include <stdlib.h>
 
unsigned long g_count = 0;
 
// --------------------------------------------------------------
// 线程池同步-互斥量同步
void NTAPI TaskHandlerMutex(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
    // 锁定资源
    WaitForSingleObject(*(HANDLE *)Context, INFINITE);
 
    for (int x = 0; x < 100; x++)
    {
        printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
        g_count = g_count + 1;
    }
 
    // 解锁资源
    ReleaseMutexWhenCallbackReturns(Instance, *(HANDLE*)Context);
}
 
void TestMutex()
{
    // 创建互斥量
    HANDLE hMutex = CreateMutex(NULL, FALSE, NULL);
 
    PTP_WORK pool = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerMutex, &hMutex, NULL);
 
    for (int i = 0; i < 1000; i++)
    {
        SubmitThreadpoolWork(pool);
    }
 
    WaitForThreadpoolWorkCallbacks(pool, FALSE);
    CloseThreadpoolWork(pool);
    CloseHandle(hMutex);
 
    printf("相加后 ---> %d \n", g_count);
}
 
// --------------------------------------------------------------
// 线程池同步-事件内核对象
void NTAPI TaskHandlerKern(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
    // 锁定资源
    WaitForSingleObject(*(HANDLE *)Context, INFINITE);
 
    for (int x = 0; x < 100; x++)
    {
        printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
        g_count = g_count + 1;
    }
 
    // 解锁资源
    SetEventWhenCallbackReturns(Instance, *(HANDLE*)Context);
}
 
void TestKern()
{
    HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
    SetEvent(hEvent);
 
    PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerKern, &hEvent, NULL);
 
    for (int i = 0; i < 1000; i++)
    {
        SubmitThreadpoolWork(pwk);
    }
 
    WaitForThreadpoolWorkCallbacks(pwk, FALSE);
    CloseThreadpoolWork(pwk);
 
    printf("相加后 ---> %d \n", g_count);
}
 
// --------------------------------------------------------------
// 线程池同步-信号量同步
void NTAPI TaskHandlerSemaphore(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
    // 锁定资源
    WaitForSingleObject(*(HANDLE *)Context, INFINITE);
 
    for (int x = 0; x < 100; x++)
    {
        printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
        g_count = g_count + 1;
    }
 
    // 解锁资源
    ReleaseSemaphoreWhenCallbackReturns(Instance, *(HANDLE*)Context, 1);
}
 
void TestSemaphore()
{
    // 创建信号量为100
    HANDLE hSemaphore = CreateSemaphore(NULL, 0, 100, NULL);
 
    ReleaseSemaphore(hSemaphore, 10, NULL);
 
    PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerSemaphore, &hSemaphore, NULL);
 
    for (int i = 0; i < 1000; i++)
    {
        SubmitThreadpoolWork(pwk);
    }
 
    WaitForThreadpoolWorkCallbacks(pwk, FALSE);
    CloseThreadpoolWork(pwk);
    CloseHandle(hSemaphore);
 
    printf("相加后 ---> %d \n", g_count);
}
 
// --------------------------------------------------------------
// 线程池同步-临界区
void NTAPI TaskHandlerLeave(PTP_CALLBACK_INSTANCE Instance, PVOID Context, PTP_WORK Work)
{
    // 锁定资源
    EnterCriticalSection((CRITICAL_SECTION*)Context);
 
    for (int x = 0; x < 100; x++)
    {
        printf("线程ID: %d ---> 子线程: %d \n", GetCurrentThreadId(), x);
        g_count = g_count + 1;
    }
 
    // 解锁资源
    LeaveCriticalSectionWhenCallbackReturns(Instance, (CRITICAL_SECTION*)Context);
}
 
void TestLeave()
{
    CRITICAL_SECTION cs;
    InitializeCriticalSection(&cs);
 
    PTP_WORK pwk = CreateThreadpoolWork((PTP_WORK_CALLBACK)TaskHandlerLeave, &cs, NULL);
 
    for (int i = 0; i < 1000; i++)
    {
        SubmitThreadpoolWork(pwk);
    }
 
    WaitForThreadpoolWorkCallbacks(pwk, FALSE);
    DeleteCriticalSection(&cs);
    CloseThreadpoolWork(pwk);
 
    printf("相加后 ---> %d \n", g_count);
}
 
int main(int argc,char *argv)
{
    //TestMutex();
    //TestKern();
    //TestSemaphore();
    TestLeave();
 
    system("pause");
    return 0;
}

简单的IO读写:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#include <Windows.h>
#include <iostream>
#include <stdlib.h>
 
// 简单的异步文本读写
int ReadWriteIO()
{
    char enContent[] = "hello lyshark";
    char deContent[255] = { 0 };
 
    // 异步写文件
    HANDLE hFileWrite = CreateFile(L"d://test.txt", GENERIC_WRITE, 0, NULL, OPEN_ALWAYS, FILE_FLAG_SEQUENTIAL_SCAN, NULL);
    if (INVALID_HANDLE_VALUE == hFileWrite)
    {
        return 0;
    }
 
    WriteFile(hFileWrite, enContent, strlen(enContent), NULL, NULL);
    FlushFileBuffers(hFileWrite);
 
    CancelSynchronousIo(hFileWrite);
    CloseHandle(hFileWrite);
 
    // 异步读文件
 
    HANDLE hFileRead = CreateFile(L"d://test.txt", GENERIC_READ, 0, NULL, OPEN_ALWAYS, NULL, NULL);
    if (INVALID_HANDLE_VALUE == hFileRead)
    {
        return 0;
    }
 
    ReadFile(hFileRead, deContent, 255, NULL, NULL);
    CloseHandle(hFileRead);
    std::cout << "读出内容: " << deContent << std::endl;
    return 1;
}
 
 
// 通过IO获取文件大小
int GetFileSize()
{
    HANDLE hFile = CreateFile(L"d://test.txt", 0, 0, NULL, OPEN_EXISTING, NULL, NULL);
    if (INVALID_HANDLE_VALUE == hFile)
    {
        return 0;
    }
 
    ULARGE_INTEGER ulFileSize;
    ulFileSize.LowPart = GetFileSize(hFile, &ulFileSize.HighPart);
 
    LARGE_INTEGER lFileSize;
    BOOL ret = GetFileSizeEx(hFile, &lFileSize);
 
    std::cout << "文件大小A: " << ulFileSize.QuadPart << " bytes" << std::endl;
    std::cout << "文件大小B: " << lFileSize.QuadPart << " bytes" << std::endl;
    CloseHandle(hFile);
 
    return 1;
}
 
// 通过IO设置文件指针和文件尾
int SetFilePointer()
{
    char deContent[255] = { 0 };
    DWORD readCount = 0;
 
    HANDLE hFile = CreateFile(L"d://test.txt", GENERIC_WRITE, 0, NULL, OPEN_ALWAYS, NULL, NULL);
    if (INVALID_HANDLE_VALUE == hFile)
    {
        return 0;
    }
 
    LARGE_INTEGER liMove;
 
    // 设置移动位置
    liMove.QuadPart = 2;
    SetFilePointerEx(hFile, liMove, NULL, FILE_BEGIN);
 
    // 移动到文件末尾
    SetEndOfFile(hFile);
 
    ReadFile(hFile, deContent, 255, &readCount, NULL);
    std::cout << "移动指针后读取: " << deContent << " 读入长度: " << readCount << std::endl;
 
    CloseHandle(hFile);
 
    // 设置编码格式
    _wsetlocale(LC_ALL, L"chs");
    setlocale(LC_ALL, "chs");
    wprintf(L"%s", deContent);
}
 
int main(int argc,char *argv)
{
    // 读写IO
    ReadWriteIO();
 
    // 取文件长度
    GetFileSize();
 
    // 设置文件指针
    SetFilePointer();
 
    return 0;
}

到此这篇关于C/C++ 原生API实现线程池的文章就介绍到这了,更多相关C++实现线程池内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/LyShark/p/15493202.html

标签:

相关文章

热门资讯

蜘蛛侠3英雄无归3正片免费播放 蜘蛛侠3在线观看免费高清完整
蜘蛛侠3英雄无归3正片免费播放 蜘蛛侠3在线观看免费高清完整 2021-08-24
2022年最旺的微信头像大全 微信头像2022年最新版图片
2022年最旺的微信头像大全 微信头像2022年最新版图片 2022-01-10
背刺什么意思 网络词语背刺是什么梗
背刺什么意思 网络词语背刺是什么梗 2020-05-22
yue是什么意思 网络流行语yue了是什么梗
yue是什么意思 网络流行语yue了是什么梗 2020-10-11
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
返回顶部