1.创建maven项目
目录如下:
2.pom文件:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
<project xmlns= "http://maven.apache.org/pom/4.0.0" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation= "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelversion> 4.0 . 0 </modelversion> <groupid>kafka-maven</groupid> <artifactid>kafka-maven</artifactid> <version> 0.0 . 1 -snapshot</version> <dependencies> <dependency> <groupid>org.apache.kafka</groupid> <artifactid>kafka_2. 11 </artifactid> <version> 0.10 . 1.1 </version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version> 2.2 . 0 </version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-hdfs</artifactid> <version> 2.2 . 0 </version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-client</artifactid> <version> 2.2 . 0 </version> </dependency> <dependency> <groupid>org.apache.hbase</groupid> <artifactid>hbase-client</artifactid> <version> 1.0 . 3 </version> </dependency> <dependency> <groupid>org.apache.hbase</groupid> <artifactid>hbase-server</artifactid> <version> 1.0 . 3 </version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-hdfs</artifactid> <version> 2.2 . 0 </version> </dependency> <dependency> <groupid>jdk.tools</groupid> <artifactid>jdk.tools</artifactid> <version> 1.7 </version> <scope>system</scope> <systempath>${java_home}/lib/tools.jar</systempath> </dependency> <dependency> <groupid>org.apache.httpcomponents</groupid> <artifactid>httpclient</artifactid> <version> 4.3 . 6 </version> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <configuration> <source> 1.7 </source> <target> 1.7 </target> </configuration> </plugin> </plugins> </build> </project> |
3.kafka生产者kafkaproduce:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package com.lijie.producer; import java.io.file; import java.io.fileinputstream; import java.util.properties; import org.apache.kafka.clients.producer.callback; import org.apache.kafka.clients.producer.kafkaproducer; import org.apache.kafka.clients.producer.producerrecord; import org.apache.kafka.clients.producer.recordmetadata; import org.slf4j.logger; import org.slf4j.loggerfactory; public class kafkaproduce { private static properties properties; static { properties = new properties(); string path = kafkaproducer. class .getresource( "/" ).getfile().tostring() + "kafka.properties" ; try { fileinputstream fis = new fileinputstream( new file(path)); properties.load(fis); } catch (exception e) { e.printstacktrace(); } } /** * 发送消息 * * @param topic * @param key * @param value */ public void sendmsg(string topic, byte [] key, byte [] value) { // 实例化produce kafkaproducer< byte [], byte []> kp = new kafkaproducer< byte [], byte []>( properties); // 消息封装 producerrecord< byte [], byte []> pr = new producerrecord< byte [], byte []>( topic, key, value); // 发送数据 kp.send(pr, new callback() { // 回调函数 @override public void oncompletion(recordmetadata metadata, exception exception) { if ( null != exception) { system.out.println( "记录的offset在:" + metadata.offset()); system.out.println(exception.getmessage() + exception); } } }); // 关闭produce kp.close(); } } |
4.kafka消费者kafkaconsume:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
package com.lijie.consumer; import java.io.file; import java.io.fileinputstream; import java.util.hashmap; import java.util.list; import java.util.map; import java.util.properties; import org.apache.htrace.fasterxml.jackson.databind.objectmapper; import com.lijie.pojo.user; import com.lijie.utils.jsonutils; import kafka.consumer.consumerconfig; import kafka.consumer.consumeriterator; import kafka.consumer.kafkastream; import kafka.javaapi.consumer.consumerconnector; import kafka.serializer.stringdecoder; import kafka.utils.verifiableproperties; public class kafkaconsume { private final static string topic = "lijietest" ; private static properties properties; static { properties = new properties(); string path = kafkaconsume. class .getresource( "/" ).getfile().tostring() + "kafka.properties" ; try { fileinputstream fis = new fileinputstream( new file(path)); properties.load(fis); } catch (exception e) { e.printstacktrace(); } } /** * 获取消息 * * @throws exception */ public void getmsg() throws exception { consumerconfig config = new consumerconfig(properties); consumerconnector consumer = kafka.consumer.consumer .createjavaconsumerconnector(config); map<string, integer> topiccountmap = new hashmap<string, integer>(); topiccountmap.put(topic, new integer( 1 )); stringdecoder keydecoder = new stringdecoder( new verifiableproperties()); stringdecoder valuedecoder = new stringdecoder( new verifiableproperties()); map<string, list<kafkastream<string, string>>> consumermap = consumer .createmessagestreams(topiccountmap, keydecoder, valuedecoder); kafkastream<string, string> stream = consumermap.get(topic).get( 0 ); consumeriterator<string, string> it = stream.iterator(); while (it.hasnext()) { string json = it.next().message(); user user = (user) jsonutils.jsontoobj(json, user. class ); system.out.println(user); } } } |
5.kafka.properties文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
##produce bootstrap.servers= 192.168 . 80.123 : 9092 producer.type=sync request.required.acks= 1 serializer. class =kafka.serializer.defaultencoder key.serializer=org.apache.kafka.common.serialization.bytearrayserializer value.serializer=org.apache.kafka.common.serialization.bytearrayserializer bak.partitioner. class =kafka.producer.defaultpartitioner bak.key.serializer=org.apache.kafka.common.serialization.stringserializer bak.value.serializer=org.apache.kafka.common.serialization.stringserializer ##consume zookeeper.connect= 192.168 . 80.123 : 2181 group.id=lijiegroup zookeeper.session.timeout.ms= 4000 zookeeper.sync.time.ms= 200 auto.commit.interval.ms= 1000 auto.offset.reset=smallest serializer. class =kafka.serializer.stringencoder |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/qq_20641565/article/details/56277537