1.1 什么是 Multiprocessing
多线程在同一时间只能处理一个任务。
可把任务平均分配给每个核,而每个核具有自己的运算空间。
1.2 添加进程 Process
与线程类似,如下所示,但是该程序直接运行无结果,因为IDLE不支持多进程,在命令行终端运行才有结果显示
1
2
3
4
5
6
7
8
|
import multiprocessing as mp def job(a,b): print ( 'abc' ) if __name__ = = '__main__' : p1 = mp.Process(target = job,args = ( 1 , 2 )) p1.start() p1.join() |
1.3 存储进程输出 Queue
不知道为什么下面的这个程序可以在IDLE中正常运行。首先定义了一个job函数作系列数学运算,然后将结果放到res中,在main函数运行,取出queue中存储的结果再进行一次加法运算。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import multiprocessing as mp def job(q): res = 0 for i in range ( 1000 ): res + = i + i * * 2 + i * * 3 q.put(res) if __name__ = = '__main__' : q = mp.Queue() p1 = mp.Process(target = job,args = (q,)) #注意当参数只有一个时,应加上逗号 p2 = mp.Process(target = job,args = (q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print (res1 + res2) |
结果如下所示:
1.4 效率比对 threading & multiprocessing
在job函数中定义了数学运算,比较正常情况、多线程和多进程分别的运行时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
import multiprocessing as mp import threading as td import time def job(q): res = 0 for i in range ( 10000000 ): res + = i + i * * 2 + i * * 3 q.put(res) # queue def multicore(): q = mp.Queue() p1 = mp.Process(target = job, args = (q,)) p2 = mp.Process(target = job, args = (q,)) p1.start() p2.start() p1.join() p2.join() res1 = q.get() res2 = q.get() print ( 'multicore:' , res1 + res2) def normal(): res = 0 for _ in range ( 2 ): #线程或进程都构造了两个,进行了两次运算,所以这里循环两次 for i in range ( 10000000 ): res + = i + i * * 2 + i * * 3 print ( 'normal:' , res) def multithread(): q = mp.Queue() t1 = td.Thread(target = job, args = (q,)) t2 = td.Thread(target = job, args = (q,)) t1.start() t2.start() t1.join() t2.join() res1 = q.get() res2 = q.get() print ( 'multithread:' , res1 + res2) if __name__ = = '__main__' : st = time.time() normal() st1 = time.time() print ( 'normal time:' , st1 - st) multithread() st2 = time.time() print ( 'multithread time:' , st2 - st1) multicore() print ( 'multicore time:' , time.time() - st2) |
在视频中的运行结果是多进程<正常<多线程,而我的运行结果为下图所示:
综上,多核/多进程运行最快,说明在同时间运行了多个任务,而多线程却不一定会比正常情况下的运行来的快,这和多线程中的GIL有关。
1.5 进程池
进程池Pool,就是我们将所要运行的东西,放到池子里,Python会自行解决多进程的问题。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
import multiprocessing as mp def job(x): return x * x def multicore(): pool = mp.Pool(processes = 2 ) #定义一个Pool,并定义CPU核数量为2 res = pool. map (job, range ( 10 )) print (res) res = pool.apply_async(job,( 2 ,)) print (res.get()) multi_res = [pool.apply_async(job,(i,)) for i in range ( 10 )] print ([res.get() for res in multi_res]) if __name__ = = '__main__' : multicore() |
运行结果如下所示:
首先定义一个池子,有了池子之后,就可以让池子对应某一个函数,在上述代码中定义的pool对应job函数。我们向池子里丢数据,池子就会返回函数返回的值。 Pool和之前的Process的不同点是丢向Pool的函数有返回值,而Process的没有返回值。
接下来用map()获取结果,在map()中需要放入函数和需要迭代运算的值,然后它会自动分配给CPU核,返回结果
我们怎么知道Pool是否真的调用了多个核呢?我们可以把迭代次数增大些,然后打开CPU负载看下CPU运行情况
打开CPU负载(Mac):活动监视器 > CPU > CPU负载(单击一下即可)
Pool默认大小是CPU的核数,我们也可以通过在Pool中传入processes参数即可自定义需要的核数量。
Pool除了可以用map来返回结果之外,还可以用apply_async(),与map不同的是,只能传递一个值,只会放入一个核进行计算,但是传入值时要注意是可迭代的,所以在传入值后需要加逗号, 同时需要用get()方法获取返回值。所对应的代码为:
1
2
|
res = pool.apply_async(job,( 2 ,)) print (res.get()) |
运行结果为4。
由于传入值是可以迭代的,则我们同样可以使用apply_async()来输出多个结果。如果在apply_async()中输入多个传入值:
1
|
res = pool.apply_async(job, ( 2 , 3 , 4 ,)) |
结果会报错:
1
|
TypeError: job() takes exactly 1 argument ( 3 given) |
即apply_async()只能输入一组参数。
在此我们将apply_async()放入迭代器中,定义一个新的multi_res
1
|
multi_res = [pool.apply_async(job, (i,)) for i in range ( 10 )] |
同样在取出值时需要一个一个取出来
1
|
print ([res.get() for res in multi_res]) |
apply用迭代器的运行结果与map取出的结果相同。
note:
(1)Pool默认调用是CPU的核数,传入processes参数可自定义CPU核数
(2)map() 放入迭代参数,返回多个结果
(3)apply_async()只能放入一组参数,并返回一个结果,如果想得到map()的效果需要通过迭代
1.6 共享内存 shared memory
只有通过共享内存才能让CPU之间进行交流。
通过Value将数据存储在一个共享的内存表中。
1
2
3
4
|
import multiprocessing as mp value1 = mp.Value( 'i' , 0) value2 = mp.Value( 'd' , 3.14) |
其中,i和d表示数据类型。i为带符号的整型,d为双精浮点类型。更多数据类型可参考网址:https://docs.python.org/3/library/array.html
在多进程中有一个Array类,可以和共享内存交互,来实现进程之间共享数据。
和numpy中的不同,这里的Array只能是一维的,并且需要定义数据类型否则会报错。
1
|
array = mp.Array( 'i' , [ 1 , 2 , 3 , 4 ]) |
1.7 进程锁 Lock
首先是不加进程锁的运行情况,在下述代码中定义了共享变量v,定义了两个进程,均可对v进行操作。job函数的作用是每隔0.1s输出一次累加num的值,累加值num在两个进程中分别为1和3。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import multiprocessing as mp import time def job(v,num): for _ in range ( 10 ): time.sleep( 0.1 ) #暂停0.1s,让输出效果更明显 v.value + = num #v.value获取共享变量值 print (v.value) def multicore(): v = mp.Value( 'i' , 0 ) #定义共享变量 p1 = mp.Process(target = job,args = (v, 1 )) p2 = mp.Process(target = job,args = (v, 3 )) p1.start() p2.start() p1.join() p2.join() if __name__ = = '__main__' : multicore() |
运行结果如下所示:
可以看到两个进程互相抢占共享内存v。
为了解决上述不同进程抢共享资源的问题,我们可以用加进程锁来解决。
首先需要定义一个进程锁:
1
|
l = mp.Lock() # 定义一个进程锁 |
然后将进程锁的信息传入各个进程中
1
2
|
p1 = mp.Process(target=job, args=( v ,1,l)) # 需要将Lock传入 p2 = mp.Process(target=job, args=( v ,3,l)) |
在job()中设置进程锁的使用,保证运行时一个进程的对锁内内容的独占
1
2
3
4
5
6
7
|
def job( v , num, l): l.acquire() # 锁住 for _ in range(5): time . sleep (0.1) v .value += num # v.value获取共享内存 print( v .value) l.release() # 释放 |
完整代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
def job(v, num, l): l.acquire() # 锁住 for _ in range ( 5 ): time.sleep( 0.1 ) v.value + = num # 获取共享内存 print (v.value) l.release() # 释放 def multicore(): l = mp.Lock() # 定义一个进程锁 v = mp.Value( 'i' , 0 ) # 定义共享内存 p1 = mp.Process(target = job, args = (v, 1 ,l)) # 需要将lock传入 p2 = mp.Process(target = job, args = (v, 3 ,l)) p1.start() p2.start() p1.join() p2.join() if __name__ = = '__main__' : multicore() |
运行结果如下所示:
可以看到进程1运行完之后才运行进程2。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.cnblogs.com/wwf828/p/7344338.html