1、es的批量插入
这是为了方便后期配置的更改,把配置信息放在logging.conf中
用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from elasticsearch import Elasticsearch class ImportEsData: logging.config.fileConfig( "logging.conf" ) logger = logging.getLogger( "msg" ) def __init__( self ,hosts,index, type ): self .es = Elasticsearch(hosts = hosts.strip( ',' ).split( ',' ), timeout = 5000 ) self .index = index self . type = type def set_date( self ,data): # 批量处理 # es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()}) self .es.index(index = self .index,doc_type = self .index,body = data) |
2、使用pykafka消费kafka
1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现
2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition
3. 为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。
4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer 的外层加入了while True 一个死循环
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
#!/usr/bin/python # -*- coding: UTF-8 -*- from pykafka import KafkaClient import logging import logging.config from ConfigUtil import ConfigUtil import datetime class KafkaPython: logging.config.fileConfig( "logging.conf" ) logger = logging.getLogger( "msg" ) logger_data = logging.getLogger( "data" ) def __init__( self ): self .server = ConfigUtil().get( "kafka" , "kafka_server" ) self .topic = ConfigUtil().get( "kafka" , "topic" ) self .group = ConfigUtil().get( "kafka" , "group" ) self .partition_id = int (ConfigUtil().get( "kafka" , "partition" )) self .consumer_timeout_ms = int (ConfigUtil().get( "kafka" , "consumer_timeout_ms" )) self .consumer = None self .hosts = ConfigUtil().get( "es" , "hosts" ) self .index_name = ConfigUtil().get( "es" , "index_name" ) self .type_name = ConfigUtil().get( "es" , "type_name" ) def getConnect( self ): client = KafkaClient( self .server) topic = client.topics[ self .topic] p = topic.partitions ps = {p.get( self .partition_id)} self .consumer = topic.get_simple_consumer( consumer_group = self .group, auto_commit_enable = True , consumer_timeout_ms = self .consumer_timeout_ms, # num_consumer_fetchers=1, # consumer_id='test1', partitions = ps ) self .starttime = datetime.datetime.now() def beginConsumer( self ): print ( "beginConsumer kafka-python" ) imprtEsData = ImportEsData( self .hosts, self .index_name, self .type_name) #创建ACTIONS count = 0 ACTIONS = [] while True : endtime = datetime.datetime.now() print (endtime - self .starttime).seconds for message in self .consumer: if message is not None : try : count = count + 1 # print(str(message.partition.id)+","+str(message.offset)+","+str(count)) # self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count)) action = { "_index" : self .index_name, "_type" : self .type_name, "_source" : message.value } ACTIONS.append(action) if len (ACTIONS) > = 10000 : imprtEsData.set_date(ACTIONS) ACTIONS = [] self .consumer.commit_offsets() endtime = datetime.datetime.now() print (endtime - self .starttime).seconds #break except (Exception) as e: # self.consumer.commit_offsets() print (e) self .logger.error(e) self .logger.error( str (message.partition. id ) + "," + str (message.offset) + "," + message.value + "\n" ) # self.logger_data.error(message.value+"\n") # self.consumer.commit_offsets() if len (ACTIONS) > 0 : self .logger.info( "等待时间超过,consumer_timeout_ms,把集合数据插入es" ) imprtEsData.set_date(ACTIONS) ACTIONS = [] self .consumer.commit_offsets() def disConnect( self ): self .consumer.close() from elasticsearch import Elasticsearch from elasticsearch.helpers import bulk class ImportEsData: logging.config.fileConfig( "logging.conf" ) logger = logging.getLogger( "msg" ) def __init__( self ,hosts,index, type ): self .es = Elasticsearch(hosts = hosts.strip( ',' ).split( ',' ), timeout = 5000 ) self .index = index self . type = type def set_date( self ,data): # 批量处理 success = bulk( self .es, data, index = self .index, raise_on_error = True ) self .logger.info(success) |
3、运行
1
2
3
4
5
|
if __name__ = = '__main__' : kp = KafkaPython() kp.getConnect() kp.beginConsumer() # kp.disConnect() |
注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到 es的插件
现在还在批量的压测中。。。
以上这篇python消费kafka数据批量插入到es的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/liagliang/article/details/78712475