上一篇文章介绍了线程的使用。然而 python 中由于 global interpreter lock
(全局解释锁 gil )的存在,每个线程在在执行时需要获取到这个 gil ,在同一时刻中只有一个线程得到解释锁的执行, python 中的线程并没有真正意义上的并发执行,多线程的执行效率也不一定比单线程的效率更高。 如果要充分利用现代多核 cpu 的并发能力,就要使用 multipleprocessing 模块了。
0x01 multipleprocessing
与使用线程的 threading 模块类似, multipleprocessing
模块提供许多高级 api 。最常见的是 pool 对象了,使用它的接口能很方便地写出并发执行的代码。
1
2
3
4
5
6
7
8
9
|
from multiprocessing import pool def f(x): return x * x if __name__ = = '__main__' : with pool( 5 ) as p: # map方法的作用是将f()方法并发地映射到列表中的每个元素 print (p. map (f, [ 1 , 2 , 3 ])) # 执行结果 # [1, 4, 9] |
关于 pool 下文中还会提到,这里我们先来看 process 。
process
要创建一个进程可以使用 process 类,使用 start() 方法启动进程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from multiprocessing import process import os def echo(text): # 父进程id print ( "process parent id : " , os.getppid()) # 进程id print ( "process pid : " , os.getpid()) print ( 'echo : ' , text) if __name__ = = '__main__' : p = process(target = echo, args = ( 'hello process' ,)) p.start() p.join() # 执行结果 # process parent id : 27382 # process pid : 27383 # echo : hello process |
进程池
正如开篇提到的 multiprocessing
模块提供了 pool 类可以很方便地实现一些简单多进程场景。 它主要有以下接口
- apply(func[, args[, kwds]])
- 执行 func(args,kwds) 方法,在方法结束返回前会阻塞。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])
- 异步执行 func(args,kwds) ,会立即返回一个 result 对象,如果指定了 callback 参数,结果会通过回调方法返回,还可以指定执行出错的回调方法 error_callback()
- map(func, iterable[, chunksize])
- 类似内置函数 map() ,可以并发执行 func ,是同步方法
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])
- 异步版本的 map
- close()
- 关闭进程池。当池中的所有工作进程都执行完毕时,进程会退出。
- terminate()
- 终止进程池
- join()
- 等待工作进程执行完,必需先调用 close() 或者 terminate()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
from multiprocessing import pool def f(x): return x * x if __name__ = = '__main__' : with pool( 5 ) as p: # map方法的作用是将f()方法并发地映射到列表中的每个元素 a = p. map (f, [ 1 , 2 , 3 ]) print (a) # 异步执行map b = p.map_async(f, [ 3 , 5 , 7 , 11 ]) # b 是一个result对象,代表方法的执行结果 print (b) # 为了拿到结果,使用join方法等待池中工作进程退出 p.close() # 调用join方法前,需先执行close或terminate方法 p.join() # 获取执行结果 print (b.get()) # 执行结果 # [1, 4, 9] # <multiprocessing.pool.mapresult object at 0x10631b710> # [9, 25, 49, 121] |
map_async() 和 apply_async() 执行后会返回一个 class multiprocessing.pool.asyncresult 对象,通过它的 get() 可以获取到执行结果, ready() 可以判断 asyncresult 的结果是否准备好。
进程间数据的传输
multiprocessing 模块提供了两种方式用于进程间的数据共享:队列( queue )和管道( pipe )
queue 是线程安全,也是进程安全的。使用 queue 可以实现进程间的数据共享,例如下面的 demo 中子进程 put 一个对象,在主进程中就能 get 到这个对象。 任何可以序列化的对象都可以通过 queue 来传输。
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from multiprocessing import process, queue def f(q): q.put([ 42 , none, 'hello' ]) if __name__ = = '__main__' : # 使用queue进行数据通信 q = queue() p = process(target = f, args = (q,)) p.start() # 主进程取得子进程中的数据 print (q.get()) # prints "[42, none, 'hello']" p.join() # 执行结果 # [42, none, 'hello'] |
pipe() 返回一对通过管道连接的 connection 对象。这两个对象可以理解为管道的两端,它们通过 send() 和 recv() 发送和接收数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
from multiprocessing import process, pipe def write(conn): # 子进程中发送一个对象 conn.send([ 42 , none, 'hello' ]) conn.close() def read(conn): # 在读的进程中通过recv接收对象 data = conn.recv() print (data) if __name__ = = '__main__' : # pipe()方法返回一对连接对象 w_conn, r_conn = pipe() wp = process(target = write, args = (w_conn,)) rp = process(target = read, args = (r_conn,)) wp.start() rp.start() # 执行结果 # [42, none, 'hello'] |
需要注意的是,两个进程不能同时对一个连接对象进行 send 或 recv 操作。
同步
我们知道线程间的同步是通过锁机制来实现的,进程也一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
from multiprocessing import process, lock import time def print_with_lock(l, i): l.acquire() try : time.sleep( 1 ) print ( 'hello world' , i) finally : l.release() def print_without_lock(i): time.sleep( 1 ) print ( 'hello world' , i) if __name__ = = '__main__' : lock = lock() # 先执行有锁的 for num in range ( 5 ): process(target = print_with_lock, args = (lock, num)).start() # 再执行无锁的 # for num in range(5): # process(target=print_without_lock, args=(num,)).start() |
有锁的代码将每秒依次打印
hello world 0
hello world 1
hello world 2
hello world 3
hello world 4
如果执行无锁的代码,则在我的电脑上执行结果是这样的
hello worldhello world 0
1
hello world 2
hello world 3
hello world 4
除了 lock ,还包括 rlock 、 condition 、 semaphore 和 event 等进程间的同步原语。其用法也与线程间的同步原语很类似。 api 使用可以参考文末中引用的文档链接。
在工程中实现进程间的数据共享应当优先使用 队列或管道。
0x02 总结
本文对 multiprocessing 模块中常见的 api 作了简单的介绍。讲述了 process 和 pool 的常见用法,同时介绍了进程间的数据方式:队列和管道。最后简单了解了进程间的同步原语。
原文链接:https://juejin.im/post/5cefdc60f265da1bca51c0cf