本文实例讲述了Python多进程multiprocessing用法。分享给大家供大家参考,具体如下:
mutilprocess简介
像线程一样管理进程,这个是mutilprocess的核心,他与threading很是相像,对多核CPU的利用率会比threading好的多。
简单的创建进程:
1
2
3
4
5
6
7
8
9
10
11
|
import multiprocessing def worker(num): """thread worker function""" print 'Worker:' , num return if __name__ = = '__main__' : jobs = [] for i in range ( 5 ): p = multiprocessing.Process(target = worker, args = (i,)) jobs.append(p) p.start() |
确定当前的进程,即是给进程命名,方便标识区分,跟踪
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import multiprocessing import time def worker(): name = multiprocessing.current_process().name print name, 'Starting' time.sleep( 2 ) print name, 'Exiting' def my_service(): name = multiprocessing.current_process().name print name, 'Starting' time.sleep( 3 ) print name, 'Exiting' if __name__ = = '__main__' : service = multiprocessing.Process(name = 'my_service' , target = my_service) worker_1 = multiprocessing.Process(name = 'worker 1' , target = worker) worker_2 = multiprocessing.Process(target = worker) # default name worker_1.start() worker_2.start() service.start() |
守护进程就是不阻挡主程序退出,自己干自己的 mutilprocess.setDaemon(True)就这句等待守护进程退出,要加上join,join可以传入浮点数值,等待n久就不等了
守护进程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import multiprocessing import time import sys def daemon(): name = multiprocessing.current_process().name print 'Starting:' , name time.sleep( 2 ) print 'Exiting :' , name def non_daemon(): name = multiprocessing.current_process().name print 'Starting:' , name print 'Exiting :' , name if __name__ = = '__main__' : d = multiprocessing.Process(name = 'daemon' , target = daemon) d.daemon = True n = multiprocessing.Process(name = 'non-daemon' , target = non_daemon) n.daemon = False d.start() n.start() d.join( 1 ) print 'd.is_alive()' , d.is_alive() n.join() |
最好使用 poison pill,强制的使用terminate()注意 terminate之后要join,使其可以更新状态
终止进程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
import multiprocessing import time def slow_worker(): print 'Starting worker' time.sleep( 0.1 ) print 'Finished worker' if __name__ = = '__main__' : p = multiprocessing.Process(target = slow_worker) print 'BEFORE:' , p, p.is_alive() p.start() print 'DURING:' , p, p.is_alive() p.terminate() print 'TERMINATED:' , p, p.is_alive() p.join() print 'JOINED:' , p, p.is_alive() |
①. == 0 未生成任何错误
②. 0 进程有一个错误,并以该错误码退出
③. < 0 进程由一个-1 * exitcode信号结束
进程的退出状态:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
import multiprocessing import sys import time def exit_error(): sys.exit( 1 ) def exit_ok(): return def return_value(): return 1 def raises(): raise RuntimeError( 'There was an error!' ) def terminated(): time.sleep( 3 ) if __name__ = = '__main__' : jobs = [] for f in [exit_error, exit_ok, return_value, raises, terminated]: print 'Starting process for' , f.func_name j = multiprocessing.Process(target = f, name = f.func_name) jobs.append(j) j.start() jobs[ - 1 ].terminate() for j in jobs: j.join() print '%15s.exitcode = %s' % (j.name, j.exitcode) |
方便的调试,可以用logging
日志:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import multiprocessing import logging import sys def worker(): print 'Doing some work' sys.stdout.flush() if __name__ = = '__main__' : multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) p = multiprocessing.Process(target = worker) p.start() p.join() |
利用class来创建进程,定制子类
派生进程:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
import multiprocessing class Worker(multiprocessing.Process): def run( self ): print 'In %s' % self .name return if __name__ = = '__main__' : jobs = [] for i in range ( 5 ): p = Worker() jobs.append(p) p.start() for j in jobs: j.join() |
python进程间传递消息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
import multiprocessing class MyFancyClass( object ): def __init__( self , name): self .name = name def do_something( self ): proc_name = multiprocessing.current_process().name print 'Doing something fancy in %s for %s!' % \ (proc_name, self .name) def worker(q): obj = q.get() obj.do_something() if __name__ = = '__main__' : queue = multiprocessing.Queue() p = multiprocessing.Process(target = worker, args = (queue,)) p.start() queue.put(MyFancyClass( 'Fancy Dan' )) # Wait for the worker to finish queue.close() queue.join_thread() p.join() import multiprocessing import time class Consumer(multiprocessing.Process): def __init__( self , task_queue, result_queue): multiprocessing.Process.__init__( self ) self .task_queue = task_queue self .result_queue = result_queue def run( self ): proc_name = self .name while True : next_task = self .task_queue.get() if next_task is None : # Poison pill means shutdown print '%s: Exiting' % proc_name self .task_queue.task_done() break print '%s: %s' % (proc_name, next_task) answer = next_task() self .task_queue.task_done() self .result_queue.put(answer) return class Task( object ): def __init__( self , a, b): self .a = a self .b = b def __call__( self ): time.sleep( 0.1 ) # pretend to take some time to do the work return '%s * %s = %s' % ( self .a, self .b, self .a * self .b) def __str__( self ): return '%s * %s' % ( self .a, self .b) if __name__ = = '__main__' : # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() * 2 print 'Creating %d consumers' % num_consumers consumers = [ Consumer(tasks, results) for i in xrange (num_consumers) ] for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in xrange (num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in xrange (num_consumers): tasks.put( None ) # Wait for all of the tasks to finish tasks.join() # Start printing results while num_jobs: result = results.get() print 'Result:' , result num_jobs - = 1 |
Event提供一种简单的方法,可以在进程间传递状态信息。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置。
进程间信号传递:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import multiprocessing import time def wait_for_event(e): """Wait for the event to be set before doing anything""" print 'wait_for_event: starting' e.wait() print 'wait_for_event: e.is_set()->' , e.is_set() def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" print 'wait_for_event_timeout: starting' e.wait(t) print 'wait_for_event_timeout: e.is_set()->' , e.is_set() if __name__ = = '__main__' : e = multiprocessing.Event() w1 = multiprocessing.Process(name = 'block' , target = wait_for_event, args = (e,)) w1.start() w2 = multiprocessing.Process(name = 'nonblock' , target = wait_for_event_timeout, args = (e, 2 )) w2.start() print 'main: waiting before calling Event.set()' time.sleep( 3 ) e. set () print 'main: event is set' |
Python多进程,一般的情况是Queue来传递。
Queue:
1
2
3
4
5
6
7
8
9
|
from multiprocessing import Process, Queue def f(q): q.put([ 42 , None , 'hello' ]) if __name__ = = '__main__' : q = Queue() p = Process(target = f, args = (q,)) p.start() print q.get() # prints "[42, None, 'hello']" p.join() |
多线程优先队列Queue:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
import Queue import threading import time exitFlag = 0 class myThread (threading.Thread): def __init__( self , threadID, name, q): threading.Thread.__init__( self ) self .threadID = threadID self .name = name self .q = q def run( self ): print "Starting " + self .name process_data( self .name, self .q) print "Exiting " + self .name def process_data(threadName, q): while not exitFlag: queueLock.acquire() if not workQueue.empty(): data = q.get() queueLock.release() print "%s processing %s" % (threadName, data) else : queueLock.release() time.sleep( 1 ) threadList = [ "Thread-1" , "Thread-2" , "Thread-3" ] nameList = [ "One" , "Two" , "Three" , "Four" , "Five" ] queueLock = threading.Lock() workQueue = Queue.Queue( 10 ) threads = [] threadID = 1 # Create new threads for tName in threadList: thread = myThread(threadID, tName, workQueue) thread.start() threads.append(thread) threadID + = 1 # Fill the queue queueLock.acquire() for word in nameList: workQueue.put(word) queueLock.release() # Wait for queue to empty while not workQueue.empty(): pass # Notify threads it's time to exit exitFlag = 1 # Wait for all threads to complete for t in threads: t.join() print "Exiting Main Thread" |
多进程使用Queue通信的例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
import time from multiprocessing import Process,Queue MSG_QUEUE = Queue( 5 ) def startA(msgQueue): while True : if msgQueue.empty() > 0 : print ( 'queue is empty %d' % (msgQueue.qsize())) else : msg = msgQueue.get() print ( 'get msg %s' % (msg,)) time.sleep( 1 ) def startB(msgQueue): while True : msgQueue.put( 'hello world' ) print ( 'put hello world queue size is %d' % (msgQueue.qsize(),)) time.sleep( 3 ) if __name__ = = '__main__' : processA = Process(target = startA,args = (MSG_QUEUE,)) processB = Process(target = startB,args = (MSG_QUEUE,)) processA.start() print ( 'processA start..' ) |
主进程定义了一个Queue类型的变量,并作为Process的args参数传给子进程processA和processB,两个进程一个向队列中写数据,一个读数据。
希望本文所述对大家Python程序设计有所帮助。
原文链接:http://www.cnblogs.com/IPYQ/p/5573628.html