最近消费kafka数据到磁盘的时候遇到了这样的问题:
需求:每天大概有1千万条数据,每条数据包含19个字段信息,需要将数据写到服务器磁盘,以第二个字段作为大类建立目录,第7个字段作为小类配合时间戳作为文件名,临时文件后缀tmp,当每个文件的写入条数(可配置,比如100条)达到要求条数时,将后缀tmp改为out。
问题:大类共有30个,小类不计其数而且未知,比如大类为A,小类为a,时间戳为20180606095835234,则A目录下的文件名为20180606095835234_a.tmp,这样一来需要在此文件写满100条时,更新时间戳生成第二个文件名,如果此时有1000个文件都在写则需要有1000个时间戳,和1000个计数器记录每个文件当前的条数,如果分别定义1000个变量显然是不划算的,
尝试:中间过程想到了动态定义变量名,即
定义第七个字段:seven = data.split('|')[7]
定义文件名:filename = time_stamp + '_' + seven+'.tmp',
定义文件计数器:seven + ‘_num' = 0
定义文件时间戳:seven + '_stamp' = time.time( )
想法其实是没问题的,但是这里用到了一个不常用的语法:用一个变量名和一个字符串拼接出来一个新的变量名,并继续赋值(不知道我的表述是否清楚),试过了用local()函数、global()函数、exec()函数都没有达到预期效果,也许是把问题想的太复杂了
解决:最后使用三个字典将这个问题完美解决,
定义一个字典用来存计数器,字典的每一个键对应一个文件名,值对应当前计数,并实时更新;
定义一个字典用来存时间戳,键对应一个文件名,值对应时间戳,达到100条就更新一次;
定义一个字典用来存大类,键对应代号,值对应分类;
局部功能代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
|
def kafka_to_disk(): print ( '启动前检测上次运行时是否存在意外中断的数据文件......' ) print ( '搜索最近一次执行脚本产生的时间目录......' ) # 待处理临时文件列表 tmp_list = [] try : for category_dir in os.listdir(local_file_path): if len (os.listdir(local_file_path + os.sep + category_dir)) > 0 : for file in os.listdir(local_file_path + os.sep + category_dir): if suffix in file : tmp_list.append(local_file_path + os.sep + category_dir + os.sep + file ) # print('上次运行程序产生的临时文件有---{}'.format(tmp_list)) except Exception as e: pass if len (tmp_list) = = 0 : print ( '未扫描任何残留临时文件' ) else : print ( '开始修复残留临时文件......' ) tmp_num = 0 for tmp in tmp_list: os.rename(tmp, tmp.split( '.' )[ 0 ] + '.out' ) tmp_num + = 1 print ( '本次启动共修复残留临时文件★★★★★-----{}个-----★★★★★' . format (tmp_num)) category_poor = { '1' : 'news' , '2' : 'weibo' , '3' : 'weixin' , '4' : 'app' , '5' : 'newspaper' , '6' : 'luntan' , '7' : 'blog' , '8' : 'video' , '9' : 'shangji' , '10' : 'shangjia' , '11' : 'gtzy' , '12' : 'zfztb' , '13' : 'gyfp' , '14' : 'gjz' , '15' : 'zfxx' , '16' : 'ptztb' , '17' : 'company' , '18' : 'house' , '19' : 'hospital' , '20' : 'bank' , '21' : 'zone' , '22' : 'express' , '23' : 'zpgw' , '24' : 'zscq' , '25' : 'hotel' , '26' : 'cpws' , '27' : 'gxqy' , '28' : 'gpjj' , '29' : 'dtyy' , '30' : 'bdbk' } time_stamp = utils.get_time_stamp() # 初始化毫秒级时间戳 : 20180509103015125 consumer = KafkaConsumer(topic, group_id = group_id, auto_offset_reset = auto_offset_reset, bootstrap_servers = eval (bootstrap_servers)) print ( '连接kafka成功,数据筛选中......' ) file_poor = {} # 子类池用于文件计数器 time_stamp_poor = {} # 子类时间戳池,用于触发文件切换 time_stamp = utils.get_time_stamp() # 初始化毫秒级时间戳 :20180509103015125 for message in consumer: # 提取第8个字段自动匹配目录进行创建 if message.value.decode().split( '|' )[ 1 ] in category_poor: category = category_poor[message.value.decode().split( '|' )[ 1 ]] else : print (message.value.decode()) continue category_dir = local_file_path + os.sep + category if not os.path.exists(category_dir): os.makedirs(category_dir) # 提取第2个字段,用于生成文件名 if message.value.decode().split( '|' )[ 7 ] in time_stamp_poor: shot_file_name = time_stamp_poor[message.value.decode().split( '|' )[ 7 ]] + '_' + message.value.decode().split( '|' )[ 7 ] else : shot_file_name = time_stamp + '_' + message.value.decode().split( '|' )[ 7 ] file_name = category_dir + os.sep + shot_file_name + '.tmp' # 给每一个文件设定一个计数器 if message.value.decode().split( '|' )[ 7 ] not in file_poor: file_poor[message.value.decode().split( '|' )[ 7 ]] = 0 with open (file_name, 'a' , encoding = 'utf-8' )as f1: f1.write(message.value.decode()) file_poor[message.value.decode().split( '|' )[ 7 ]] + = 1 # 触发切换文件的操作,用时间戳生成第二文件名 if file_poor[message.value.decode().split( '|' )[ 7 ]] = = strip_number: time_stamp_poor[message.value.decode().split( '|' )[ 7 ]] = utils.get_time_stamp() file_poor[message.value.decode().split( '|' )[ 7 ]] = 0 |
以上这篇python 解决动态的定义变量名,并给其赋值的方法(大数据处理)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/Beyond_F4/article/details/80590082