本文实例讲述了Python多进程分块读取超大文件的方法。分享给大家供大家参考,具体如下:
读取超大的文本文件,使用多进程分块读取,将每一块单独输出成文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# -*- coding: GBK -*- import urlparse import datetime import os from multiprocessing import Process,Queue,Array,RLock """ 多进程分块读取文件 """ WORKERS = 4 BLOCKSIZE = 100000000 FILE_SIZE = 0 def getFilesize( file ): """ 获取要读取文件的大小 """ global FILE_SIZE fstream = open ( file , 'r' ) fstream.seek( 0 ,os.SEEK_END) FILE_SIZE = fstream.tell() fstream.close() def process_found(pid,array, file ,rlock): global FILE_SIZE global JOB global PREFIX """ 进程处理 Args: pid:进程编号 array:进程间共享队列,用于标记各进程所读的文件块结束位置 file:所读文件名称 各个进程先从array中获取当前最大的值为起始位置startpossition 结束的位置endpossition (startpossition+BLOCKSIZE) if (startpossition+BLOCKSIZE)<FILE_SIZE else FILE_SIZE if startpossition==FILE_SIZE则进程结束 if startpossition==0则从0开始读取 if startpossition!=0为防止行被block截断的情况,先读一行不处理,从下一行开始正式处理 if 当前位置 <=endpossition 就readline 否则越过边界,就从新查找array中的最大值 """ fstream = open ( file , 'r' ) while True : rlock.acquire() print 'pid%s' % pid, ',' .join([ str (v) for v in array]) startpossition = max (array) endpossition = array[pid] = (startpossition + BLOCKSIZE) if (startpossition + BLOCKSIZE)<FILE_SIZE else FILE_SIZE rlock.release() if startpossition = = FILE_SIZE: #end of the file print 'pid%s end' % (pid) break elif startpossition ! = 0 : fstream.seek(startpossition) fstream.readline() pos = ss = fstream.tell() ostream = open ( '/data/download/tmp_pid' + str (pid) + '_jobs' + str (endpossition), 'w' ) while pos<endpossition: #处理line line = fstream.readline() ostream.write(line) pos = fstream.tell() print 'pid:%s,startposition:%s,endposition:%s,pos:%s' % (pid,ss,pos,pos) ostream.flush() ostream.close() ee = fstream.tell() fstream.close() def main(): global FILE_SIZE print datetime.datetime.now().strftime( "%Y/%d/%m %H:%M:%S" ) file = "/data/pds/download/scmcc_log/tmp_format_2011004.log" getFilesize( file ) print FILE_SIZE rlock = RLock() array = Array( 'l' ,WORKERS,lock = rlock) threads = [] for i in range (WORKERS): p = Process(target = process_found, args = [i,array, file ,rlock]) threads.append(p) for i in range (WORKERS): threads[i].start() for i in range (WORKERS): threads[i].join() print datetime.datetime.now().strftime( "%Y/%d/%m %H:%M:%S" ) if __name__ = = '__main__' : main() |
希望本文所述对大家Python程序设计有所帮助。