如下所示:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
from kafka import KafkaProducer import json ''' 生产者demo 向test_lyl2主题中循环写入10条json数据 注意事项:要写入json数据需加上value_serializer参数,如下代码 ''' producer = KafkaProducer( value_serializer = lambda v: json.dumps(v).encode( 'utf-8' ), bootstrap_servers = [ '192.168.12.101:6667' , '192.168.12.102:6667' , '192.168.12.103:6667' ] ) for i in range ( 10 ): data = { "name" : "李四" , "age" : 23 , "gender" : "男" , "id" :i } producer.send( 'test_lyl2' , data) producer.close() |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from kafka import KafkaConsumer import json ''' 消费者demo 消费test_lyl2主题中的数据 注意事项:如需以json格式读取数据需加上value_deserializer参数 ''' consumer = KafkaConsumer( 'test_lyl2' ,group_id = "lyl-gid1" , bootstrap_servers = [ '192.168.12.101:6667' , '192.168.12.102:6667' , '192.168.12.103:6667' ], auto_offset_reset = 'earliest' ,value_deserializer = json.loads ) for message in consumer: print (message.value) |
以上这篇对python操作kafka写入json数据的简单demo分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/qq_32502511/article/details/82109933