一、多进程的实现
方法一
1
2
3
4
5
6
7
8
9
10
11
12
|
# 方法包装 多进程 from multiprocessing import Process from time import sleep def func1(arg): print (f '{arg}开始...' ) sleep( 2 ) print (f '{arg}结束...' ) if __name__ = = "__main__" : p1 = Process(target = func1,args = ( 'p1' ,)) p2 = Process(target = func1,args = ( 'p2' ,)) p1.start() p2.start() |
方法二:
二、使用进程的优缺点
1、优点
- 可以使用计算机多核,进行任务的并发执行,提高执行效率
- 运行不受其他进程影响,创建方便
- 空间独立,数据安全
2、缺点
- 进程的创建和删除消耗的系统资源较多
三、进程的通信
Python 提供了多种实现进程间通信的机制,主要有以下 2 种:
1. Python multiprocessing
模块下的 Queue 类,提供了多个进程之间实现通信的诸多 方法
2. Pipe,又被称为“管道”,常用于实现 2 个进程之间的通信,这 2 个进程分别位于管 道的两端
Pipe 直译过来的意思是“管”或“管道”,该种实现多进程编程的方式,和实际生活中 的管(管道)是非常类似的。通常情况下,管道有 2 个口,而 Pipe 也常用来实现 2 个进程之 间的通信,这 2 个进程分别位于管道的两端,一端用来发送数据,另一端用来接收数据 - send(obj)
发送一个 obj 给管道的另一端,另一端使用 recv() 方法接收。需要说明的是,该 obj 必 须是可序列化的,如果该对象序列化之后超过 32MB,则很可能会引发 ValueError 异常 - recv()
接收另一端通过 send() 方法发送过来的数据 - close()
关闭连接 - poll([timeout])
返回连接中是否还有数据可以读取 - end_bytes(buffer[, offset[, size]])
发送字节数据。如果没有指定 offset、size 参数,则默认发送 buffer 字节串的全部数 据;如果指定了 offset 和 size 参数,则只发送 buffer 字节串中从 offset 开始、长度为 size 的字节数据。通过该方法发送的数据,应该使用 recv_bytes() 或 recv_bytes_into 方法接收 - recv_bytes([maxlength])
接收通过 send_bytes() 方法发送的数据,maxlength 指定最多接收的字节数。该方法返 回接收到的字节数据 - recv_bytes_into(buffer[, offset])
功能与 recv_bytes() 方法类似,只是该方法将接收到的数据放在 buffer 中
1、Queue 实现进程间通信
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from multiprocessing import Process,current_process,Queue # current_process 指的是当前进程 # from queue import Queue import os def func(name,mq): print ( '进程ID {} 获取了数据:{}' . format (os.getpid(),mq.get())) mq.put( 'shiyi' ) if __name__ = = "__main__" : # print('进程ID:{}'.format(current_process().pid)) # print('进程ID:{}'.format(os.getpid())) mq = Queue() mq.put( 'yangyang' ) p1 = Process(target = func,args = ( 'p1' ,mq)) p1.start() p1.join() print (mq.get()) |
2、Pipe 实现进程间通信(一边发送send(obj),一边接收(obj))
1
2
3
4
5
6
7
8
9
10
11
12
13
|
from multiprocessing import Process,current_process,Pipe import os def func(name,con): print ( '进程ID {} 获取了数据:{}' . format (os.getpid(),con.recv())) con.send( '你好!' ) if __name__ = = "__main__" : # print('进程ID:{}'.format(current_process().pid)) con1,con2 = Pipe() p1 = Process(target = func,args = ( 'p1' ,con1)) p1.start() con2.send( "hello!" ) p1.join() print (con2.recv()) |
四、Manager管理器
管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from multiprocessing import Process,current_process import os from multiprocessing import Manager def func(name,m_list,m_dict): print ( '子进程ID {} 获取了数据:{}' . format (os.getpid(),m_list)) print ( '子进程ID {} 获取了数据:{}' . format (os.getpid(),m_dict)) m_list.append( '你好' ) m_dict[ 'name' ] = 'shiyi' if __name__ = = "__main__" : print ( '主进程ID:{}' . format (current_process().pid)) with Manager() as mgr: m_list = mgr. list () m_dict = mgr. dict () m_list.append( 'Hello!!' ) p1 = Process(target = func,args = ( 'p1' ,m_list,m_dict)) p1.start() p1.join() print (m_list) print (m_dict) |
五、进程池
Python 提供了更好的管理多个进程的方式,就是使用进程池。
进程池可以提供指定数量的进程给用户使用,即当有新的请求提交到进程池中时,如果池 未满,则会创建一个新的进程用来执行该请求;反之,如果池中的进程数已经达到规定最大 值,那么该请求就会等待,只要池中有进程空闲下来,该请求就能得到执行。
使用进程池的优点
1. 提高效率,节省开辟进程和开辟内存空间的时间及销毁进程的时间
2. 节省内存空间
类/方法 | 功能 | 参数 |
Pool(processes)
|
创建进程池对象
|
processes 表示进程池
中有多少进程
|
pool.apply_async(func,a
rgs,kwds)
|
异步执行 ;将事件放入到进 程池队列
|
func 事件函数
args 以元组形式给
func 传参
kwds 以字典形式给
func 传参 返回值:返
回一个代表进程池事件的对
象,通过返回值的 get 方法
可以得到事件函数的返回值
|
pool.apply(func,args,kw
ds)
|
同步执行;将事件放入到进程 池队列
|
func 事件函数 args 以
元组形式给 func 传参
kwds 以字典形式给 func
传参
|
pool.close()
|
关闭进程池
|
|
pool.join()
|
回收进程池
|
|
pool.map(func,iter)
|
类似于 python 的 map 函
数,将要做的事件放入进程池
|
func 要执行的函数
iter 迭代对象
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
from multiprocessing import Pool import os from time import sleep def func1(name): print (f "当前进程的ID:{os.getpid()},{name}" ) sleep( 2 ) return name def func2(args): print (args) if __name__ = = "__main__" : pool = Pool( 5 ) pool.apply_async(func = func1,args = ( 't1' ,),callback = func2) pool.apply_async(func = func1,args = ( 't2' ,),callback = func2) pool.apply_async(func = func1,args = ( 't3' ,),callback = func2) pool.apply_async(func = func1,args = ( 't4' ,)) pool.apply_async(func = func1,args = ( 't5' ,)) pool.apply_async(func = func1,args = ( 't6' ,)) pool.close() pool.join() |
1
2
3
4
5
6
7
8
9
10
11
12
|
from multiprocessing import Pool import os from time import sleep def func1(name): print (f "当前进程的ID:{os.getpid()},{name}" ) sleep( 2 ) return name if __name__ = = "__main__" : with Pool( 5 ) as pool: args = pool. map (func1,( 't1,' , 't2,' , 't3,' , 't4,' , 't5,' , 't6,' , 't7,' , 't8,' )) for a in args: print (a) |
总结
本篇文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注服务器之家的更多内容!
原文链接:https://blog.csdn.net/qq_53582111/article/details/120941027