前言
实现消息队列的关键因素是考量不同线程访问消息队列的同步问题。本实现涉及到几个知识点
std::lock_guard 介绍
std::lock_gurad 是 C++11 中定义的模板类。定义如下:
1
|
template < class Mutex> class lock_guard; |
lock_guard 对象通常用于管理某个锁(Lock)对象,因此与 Mutex RAII 相关,方便线程对互斥量上锁,即在某个 lock_guard 对象的声明周期内,它所管理的锁对象会一直保持上锁状态;而 lock_guard 的生命周期结束之后,它所管理的锁对象会被解锁(注:类似 shared_ptr 等智能指针管理动态分配的内存资源 )。
模板参数 Mutex 代表互斥量类型,例如 std::mutex 类型,它应该是一个基本的 BasicLockable 类型,标准库中定义几种基本的 BasicLockable 类型,分别 std::mutex, std::recursive_mutex, std::timed_mutex,std::recursive_timed_mutex 以及 std::unique_lock
std::unique_lock 介绍
lock_guard 最大的缺点也是简单,没有给程序员提供足够的灵活度,因此,C++11 标准中定义了另外一个与 Mutex RAII 相关类 unique_lock,该类与 lock_guard 类相似,也很方便线程对互斥量上锁,但它提供了更好的上锁和解锁控制。
顾名思义,unique_lock 对象以独占所有权的方式( unique owership)管理 mutex 对象的上锁和解锁操作,所谓独占所有权,就是没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权。
新创建的 unique_lock 对象管理 Mutex 对象 m,并尝试调用 m.lock() 对 Mutex 对象进行上锁,如果此时另外某个 unique_lock 对象已经管理了该 Mutex 对象 m,则当前线程将会被阻塞。
std::condition介绍
当 std::condition_variable 对象的某个 wait 函数被调用的时候,它使用 std::unique_lock(通过 std::mutex) 来锁住当前线程。当前线程会一直被阻塞,直到另外一个线程在相同的 std::condition_variable 对象上调用了 notification 函数来唤醒当前线程。
std::condition_variable 提供了两种 wait() 函数。当前线程调用 wait() 后将被阻塞(此时当前线程应该获得了锁(mutex),不妨设获得锁 lck),直到另外某个线程调用 notify_* 唤醒了当前线程。
在线程被阻塞时,该函数会自动调用 lck.unlock() 释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行。另外,一旦当前线程获得通知(notified,通常是另外某个线程调用 notify_* 唤醒了当前线程),wait() 函数也是自动调用 lck.lock(),使得 lck 的状态和 wait 函数被调用时相同。
在第二种情况下(即设置了 Predicate),只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞。因此第二种情况类似以下代码:
1
|
while (!pred()) wait(lck); |
std::function介绍
使用std::function可以将普通函数,lambda表达式和函数对象类统一起来。它们并不是相同的类型,然而通过function模板类,可以转化为相同类型的对象(function对象),从而放入一个vector或其他容器里,方便回调。
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
#pragma once #ifndef MESSAGE_QUEUE_H #define MESSAGE_QUEUE_H #include <queue> #include <mutex> #include <condition_variable> template < class Type> class CMessageQueue { public : CMessageQueue& operator = ( const CMessageQueue&) = delete ; CMessageQueue( const CMessageQueue& mq) = delete ; CMessageQueue() :_queue(), _mutex(), _condition(){} virtual ~CMessageQueue(){} void Push(Type msg){ std::lock_guard <std::mutex> lock(_mutex); _queue.push(msg); //当使用阻塞模式从消息队列中获取消息时,由condition在新消息到达时提醒等待线程 _condition.notify_one(); } //blocked定义访问方式是同步阻塞或者非阻塞模式 bool Pop(Type& msg, bool isBlocked = true ){ if (isBlocked) { std::unique_lock <std::mutex> lock(_mutex); while (_queue.empty()) { _condition.wait(lock); } //注意这一段必须放在if语句中,因为lock的生命域仅仅在if大括号内 msg = std::move(_queue.front()); _queue.pop(); return true ; } else { std::lock_guard<std::mutex> lock(_mutex); if (_queue.empty()) return false ; msg = std::move(_queue.front()); _queue.pop(); return true ; } } int32_t Size(){ std::lock_guard<std::mutex> lock(_mutex); return _queue.size(); } bool Empty(){ std::lock_guard<std::mutex> lock(_mutex); return _queue.empty(); } private : std::queue<Type> _queue; //存储消息的队列 mutable std::mutex _mutex; //同步锁 std::condition_variable _condition; //实现同步式获取消息 }; #endif//MESSAGE_QUEUE_H |
线程池可以直接在构造函数中构造线程,并传入回调函数,也可以写一个Run函数显示调用。这里我们选择了第二种,对比:
-
在handler函数外部做循环接受消息,当消息到达后调用hanlder处理。这种实现在上层做封装,但是会在线程中频繁的切换调用函数。这种设计无法复用一些资源,如当在handler中做数据库操作时,需要频繁的连接和断开连接,可以通过定义两个虚函数Prehandler和AfterHandler来实现。
!!!构造函数中调用虚函数并不会能真正的调用子类的实现!!!
虽然可以对虚函数进行实调用,但程序员编写虚函数的本意应该是实现动态联编。在构造函数中调用虚函数,函数的入口地址是在编译时静态确定的,并未实现虚调用 -
写一个Run函数,将这一部分实现放在run函数中,显示调用。
《Effective C++ 》条款9:永远不要在构造函数或析构函数中调用虚函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
#ifndef THREAD_POOL_H #define THREAD_POOL_H #include <functional> #include <vector> #include <thread> #include "MessageQueue.h" #define MIN_THREADS 1 template < class Type> class CThreadPool { CThreadPool& operator = ( const CThreadPool&) = delete ; CThreadPool( const CThreadPool& other) = delete ; public : CThreadPool(int32_t threads, std::function< void (Type& record, CThreadPool<Type>* pSub)> handler); virtual ~CThreadPool(); void Run(); virtual void PreHandler(){} virtual void AfterHandler(){} void Submit(Type record); private : bool _shutdown; int32_t _threads; std::function< void (Type& record, CThreadPool<Type>* pSub)> _handler; std::vector<std:: thread > _workers; CMessageQueue<Type> _tasks; }; template < class Type> CThreadPool<Type>::CThreadPool(int32_t threads, std::function< void (Type& record, CThreadPool<Type>* pSub)> handler) :_shutdown( false ), _threads(threads), _handler(handler), _workers(), _tasks() { //第一种实现方案,注意这里的虚函数调用不正确 /*if (_threads < MIN_THREADS) _threads = MIN_THREADS; for (int32_t i = 0; i < _threads; i++) { _workers.emplace_back( [this]{ PreHandler(); while (!_shutdown){ Type record; _tasks.Pop(record, true); _handler(record, this); } AfterHandler(); } ); }*/ } //第二种实现方案 template < class Type> void CThreadPool<Type>::Run() { if (_threads < MIN_THREADS) _threads = MIN_THREADS; for (int32_t i = 0; i < _threads; i++) { _workers.emplace_back( [ this ]{ PreHandler(); while (!_shutdown){ Type record; _tasks.Pop(record, true ); _handler(record, this ); } AfterHandler(); } ); } } template < class Type> CThreadPool<Type>::~CThreadPool() { for (std:: thread & worker : _workers) worker.join(); } template < class Type> void CThreadPool<Type>::Submit(Type record) { _tasks.Push(record); } #endif // !THREAD_POOL_H |
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对服务器之家的支持。
原文链接:https://www.jianshu.com/p/1affe12e03b3