使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
#!/usr/bin/env python #coding=utf-8 import sys import time import json sys.path.append( '/usr/local/lib/python3.5/site-packages' ) from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol from hbase1 import Hbase #调用hbase thrif1 from hbase1.ttypes import * from kafka import KafkaConsumer from kafka import KafkaProducer from kafka.errors import KafkaError import unittest class HbaseOpreator: def __init__( self ,host,port,table = 'test' ): self .tableName = table self .transport = TTransport.TBufferedTransport(TSocket.TSocket(host,port)) self .protocol = TBinaryProtocol.TBinaryProtocol( self .transport) self .client = Hbase.Client( self .protocol) self .transport. open () def __del__( self ): self .transport.close() def scanTablefilter( self ,table, * args): d = dict () L = [] try : tableName = table # scan = Hbase.TScan(startRow, stopRow) scan = TScan() #主键首字母123 # filter = "PrefixFilter('123_')" # filter = "RowFilter(=,'regexstring:.aaa')" #过滤条件,当前为 statis_date 字段,值为20170223 # fitler = "SingleColumnValueFilter(tableName,'f','statis_date','20170223')" # filter="SingleColumnValueFilter('f','statis_date',=,'binary:20170223') AND SingleColumnValueFilter('f','name',=,'binary:LXS')" filter = "SingleColumnValueFilter('info','name',=,'binary:lilei') OR SingleColumnValueFilter('info','name',=,'binary:lily')" scan.filterString = filter id = self .client.scannerOpenWithScan(tableName,scan, None ) result = self .client.scannerGet( id ) # result=self.client.scannerGetList(id,100) while result: for r in result: key = r.row name = r.columns.get( 'info:name' ).value age = r.columns.get( 'info:age' ).value phone = r.columns.get( 'info:phone' ).value d[ 'key' ] = key d[ 'name' ] = name d[ 'age' ] = age d[ 'phone' ] = phone # encode_result_json=json.dumps(d).encode(encoding="utf-8") # print(encode_result_json) L.append(d) result = self .client.scannerGet( id ) return json.dumps(L).encode(encoding = "utf-8" ) finally : # self.client.scannerClose(scan) print ( "scan finish" ) def sendKfafkaProduct(data): # self.host_port='localhost:9092' producer = KafkaProducer(bootstrap_servers = [ 'localhost:9092' ]) for d in data: producer.send( 'test' , key = b 'lxs' , value = d) time.sleep( 5 ) print (d) while True : producer.send( 'test' , key = b 'lxs' , value = data) time.sleep( 5 ) print (data) if __name__ = = '__main__' : # unittest.main() B = HbaseOpreator( '10.27.1.138' , 9090 ) value = B.scanTablefilter( 'ns_lbi:test_hbase_student' ) print (value) #sendKfafkaProduct(value) |
以上这篇python hbase读取数据发送kafka的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/meiguopai1/article/details/70175069