前面我已经向大家介绍了,如何使用创建线程,启动线程。相信大家都会有这样一个想法,线程无非就是创建一下,然后再start()
下,实在是太简单了。
可是要知道,在真实的项目中,实际场景可要我们举的例子要复杂的多得多,不同线程的执行可能是有顺序的,或者说他们的执行是有条件的,是要受控制的。如果仅仅依靠前面学的那点浅薄的知识,是远远不够的。
那今天,我们就来探讨一下如何控制线程的触发执行。
要实现对多个线程进行控制,其实本质上就是消息通信机制在起作用,利用这个机制发送指令,告诉线程,什么时候可以执行,什么时候不可以执行,执行什么内容。
经过我的总结,线程中通信方法大致有如下三种:
threading.Event
threading.Condition
queue.Queue
接下来我们来一一探讨下。
1 Event事件
Python提供了非常简单的通信机制 Threading.Event
,通用的条件变量。多个线程可以等待某个事件的发生,在事件发生后,所有的线程都会被激活。
关于Event的使用也超级简单,就三个函数
1
2
3
4
5
6
7
8
9
10
|
event = threading.Event() # 重置event,使得所有该event事件都处于待命状态 event.clear() # 等待接收event的指令,决定是否阻塞程序执行 event.wait() # 发送event指令,使所有设置该event事件的线程执行 event. set () |
举个例子来看下。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
import time import threading class MyThread(threading.Thread): def __init__( self , name, event): super ().__init__() self .name = name self .event = event def run( self ): print ( 'Thread: {} start at {}' . format ( self .name, time.ctime(time.time()))) # 等待event.set()后,才能往下执行 self .event.wait() print ( 'Thread: {} finish at {}' . format ( self .name, time.ctime(time.time()))) threads = [] event = threading.Event() # 定义五个线程 [threads.append(MyThread( str (i), event)) for i in range ( 1 , 5 )] # 重置event,使得event.wait()起到阻塞作用 event.clear() # 启动所有线程 [t.start() for t in threads] print ( '等待5s...' ) time.sleep( 5 ) print ( '唤醒所有线程...' ) event. set () |
1
2
3
4
5
6
7
8
9
10
11
12
|
Thread: 1 start at Sun May 13 20:38:08 2018 Thread: 2 start at Sun May 13 20:38:08 2018 Thread: 3 start at Sun May 13 20:38:08 2018 Thread: 4 start at Sun May 13 20:38:08 2018 等待5s... 唤醒所有线程... Thread: 1 finish at Sun May 13 20:38:13 2018 Thread: 4 finish at Sun May 13 20:38:13 2018 Thread: 2 finish at Sun May 13 20:38:13 2018 Thread: 3 finish at Sun May 13 20:38:13 2018 |
可见在所有线程都启动(start()
)后,并不会执行完,而是都在self.event.wait()
止住了,需要我们通过event.set()
来给所有线程发送执行指令才能往下执行。
2 Condition
Condition和Event 是类似的,并没有多大区别。
同样,Condition也只需要掌握几个函数即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
cond = threading.Condition() # 类似lock.acquire() cond.acquire() # 类似lock.release() cond.release() # 等待指定触发,同时会释放对锁的获取,直到被notify才重新占有琐。 cond.wait() # 发送指定,触发执行 cond.notify() |
举个网上一个比较趣的捉迷藏的例子来看看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
import threading, time class Hider(threading.Thread): def __init__( self , cond, name): super (Hider, self ).__init__() self .cond = cond self .name = name def run( self ): time.sleep( 1 ) #确保先运行Seeker中的方法 self .cond.acquire() print ( self .name + ': 我已经把眼睛蒙上了' ) self .cond.notify() self .cond.wait() print ( self .name + ': 我找到你了哦 ~_~' ) self .cond.notify() self .cond.release() print ( self .name + ': 我赢了' ) class Seeker(threading.Thread): def __init__( self , cond, name): super (Seeker, self ).__init__() self .cond = cond self .name = name def run( self ): self .cond.acquire() self .cond.wait() print ( self .name + ': 我已经藏好了,你快来找我吧' ) self .cond.notify() self .cond.wait() self .cond.release() print ( self .name + ': 被你找到了,哎~~~' ) cond = threading.Condition() seeker = Seeker(cond, 'seeker' ) hider = Hider(cond, 'hider' ) seeker.start() hider.start() |
通过cond来通信,阻塞自己,并使对方执行。从而,达到有顺序的执行。
看下结果
1
2
3
4
5
|
hider: 我已经把眼睛蒙上了 seeker: 我已经藏好了,你快来找我吧 hider: 我找到你了 ~_~ hider: 我赢了 seeker: 被你找到了,哎~~~ |
3 Queue队列
最后一个,队列,它是本节的重点,因为它是我们日常开发中最使用频率最高的。
从一个线程向另一个线程发送数据最安全的方式可能就是使用 queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用put()
和 get()
操作来向队列中发送和获取元素。
同样,对于Queue,我们也只需要掌握几个函数即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from queue import Queue # maxsize默认为0,不受限 # 一旦>0,而消息数又达到限制,q.put()也将阻塞 q = Queue(maxsize = 0 ) # 默认阻塞程序,等待队列消息,可设置超时时间 q.get(block = True , timeout = None ) # 发送消息:默认会阻塞程序至队列中有空闲位置放入数据 q.put(item, block = True , timeout = None ) # 等待所有的消息都被消费完 q.join() # 通知队列任务处理已经完成,当所有任务都处理完成时,join() 阻塞将会解除 q.task_done() |
以下三个方法,知道就好,一般不需要使用
1
2
3
4
5
6
7
8
|
# 查询当前队列的消息个数 q.qsize() # 队列消息是否都被消费完,返回 True/False q.empty() # 检测队列里消息是否已满 q.full() |
函数会比之前的多一些,同时也从另一方面说明了其功能更加丰富。
我来举个老师点名的例子。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
# coding=utf-8 # /usr/bin/env python ''' Author: wangbm Email: wongbingming@163.com Wechat: mrbensonwon Blog: python-online.cn 公众号:Python编程时光 date: 2020/9/20 下午7:30 desc: ''' __author__ = 'wangbm' from queue import Queue from threading import Thread import time class Student: def __init__( self , name): self .name = name def speak( self ): print ( "{}:到!" . format ( self .name)) class Teacher: def __init__( self , queue): super ().__init__() self .queue = queue def call( self , student_name): if student_name = = "exit" : print ( "点名结束,开始上课.." ) else : print ( "老师:{}来了没?" . format (student_name)) # 发送消息,要点谁的名 self .queue.put(student_name) class CallManager(Thread): def __init__( self , queue): super ().__init__() self .students = {} self .queue = queue def put( self , student): self .students.setdefault(student.name, student) def run( self ): while True : # 阻塞程序,时刻监听老师,接收消息 student_name = queue.get() if student_name = = "exit" : break elif student_name in self .students: self .students[student_name].speak() else : print ( "老师,咱班,没有 {} 这个人" . format (student_name)) queue = Queue() teacher = Teacher(queue = queue) s1 = Student(name = "小明" ) s2 = Student(name = "小亮" ) cm = CallManager(queue) cm.put(s1) cm.put(s2) cm.start() print ( '开始点名~' ) teacher.call( '小明' ) time.sleep( 1 ) teacher.call( '小亮' ) time.sleep( 1 ) teacher.call( "exit" ) |
运行结果如下
开始点名~
老师:小明来了没?
小明:到!
老师:小亮来了没?
小亮:到!
点名结束,开始上课..
其实 queue 还有一个很重要的方法,Queue.task_done()
如果不明白它的原理,我们在写程序,就很有可能卡死。
当我们使用 Queue.get() 从队列取出数据后,这个数据有没有被正常消费,是很重要的。
如果数据没有被正常消费,那么Queue会认为这个任务还在执行中,此时你使用 Queue.join() 会一直阻塞,即使此时你的队列里已经没有消息了。
那么如何解决这种一直阻塞的问题呢?
就是在我们正常消费完数据后,记得调用一下 Queue.task_done(),说明队列这个任务已经结束了。
当队列内部的任务计数器归于零时,调用 Queue.join() 就不会再阻塞了。
要理解这个过程,请参考 http://python.iswbm.com/en/latest/c02/c02_06.html 里自定义线程池的的例子。
4 总结一下
学习了以上三种通信方法,我们很容易就能发现Event
和 Condition
是threading模块原生提供的模块,原理简单,功能单一,它能发送 True
和 False
的指令,所以只能适用于某些简单的场景中。
而Queue
则是比较高级的模块,它可能发送任何类型的消息,包括字符串、字典等。其内部实现其实也引用了Condition
模块(譬如put
和get
函数的阻塞),正是其对Condition
进行了功能扩展,所以功能更加丰富,更能满足实际应用。
以上就是Python并发编程线程消息通信机制详解的详细内容,更多关于Python并发线程消息通信机制的资料请关注服务器之家其它相关文章!
原文链接:https://blog.csdn.net/weixin_36338224/article/details/109220069