本文介绍了spring boot集成kafka的示例代码,分享给大家,也给自己留个笔记
系统环境
使用远程服务器上搭建的kafka服务
- ubuntu 16.04 lts
- kafka_2.12-0.11.0.0.tgz
- zookeeper-3.5.2-alpha.tar.gz
集成过程
1.创建spring boot工程,添加相关依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
<?xml version= "1.0" encoding= "utf-8" ?> <project xmlns= "http://maven.apache.org/pom/4.0.0" xmlns:xsi= "http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation= "http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelversion> 4.0 . 0 </modelversion> <groupid>com.laravelshao.springboot</groupid> <artifactid>spring-boot-integration-kafka</artifactid> <version> 0.0 . 1 -snapshot</version> <packaging>jar</packaging> <name>spring-boot-integration-kafka</name> <description>demo project for spring boot</description> <parent> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-parent</artifactid> <version> 2.0 . 0 .release</version> <relativepath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceencoding>utf- 8 </project.build.sourceencoding> <project.reporting.outputencoding>utf- 8 </project.reporting.outputencoding> <java.version> 1.8 </java.version> </properties> <dependencies> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter</artifactid> </dependency> <!--kafka--> <dependency> <groupid>org.springframework.kafka</groupid> <artifactid>spring-kafka</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-json</artifactid> </dependency> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-test</artifactid> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-maven-plugin</artifactid> </plugin> </plugins> </build> </project> |
2.添加配置信息,这里使用yml文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
spring: kafka: bootstrap-servers:x.x.x.x: 9092 producer: value-serializer: org.springframework.kafka.support.serializer.jsonserializer consumer: group-id: test auto-offset-reset: earliest value-deserializer: org.springframework.kafka.support.serializer.jsondeserializer properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka |
3.创建消息对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class message { private integer id; private string msg; public message() { } public message(integer id, string msg) { this .id = id; this .msg = msg; } public integer getid() { return id; } public void setid(integer id) { this .id = id; } public string getmsg() { return msg; } public void setmsg(string msg) { this .msg = msg; } @override public string tostring() { return "message{" + "id=" + id + ", msg='" + msg + '\ '' + '}' ; } } |
4.创建生产者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package com.laravelshao.springboot.kafka; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.kafka.core.kafkatemplate; import org.springframework.stereotype.component; /** * created by shaoqinghua on 2018/3/23. */ @component public class producer { private static logger log = loggerfactory.getlogger(producer. class ); @autowired private kafkatemplate kafkatemplate; public void send(string topic, message message) { kafkatemplate.send(topic, message); log.info( "producer->topic:{}, message:{}" , topic, message); } } |
5.创建消费者,使用@ kafkalistener注解监听主题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
package com.laravelshao.springboot.kafka; import org.apache.kafka.clients.consumer.consumerrecord; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.kafka.annotation.kafkalistener; import org.springframework.stereotype.component; /** * created by shaoqinghua on 2018/3/23. */ @component public class consumer { private static logger log = loggerfactory.getlogger(consumer. class ); @kafkalistener (topics = "test_topic" ) public void receive(consumerrecord<string, message> consumerrecord) { log.info( "consumer->topic:{}, value:{}" , consumerrecord.topic(), consumerrecord.value()); } } |
6.发送消费测试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.laravelshao.springboot; import com.laravelshao.springboot.kafka.message; import com.laravelshao.springboot.kafka.producer; import org.springframework.boot.springapplication; import org.springframework.boot.autoconfigure.springbootapplication; import org.springframework.context.applicationcontext; @springbootapplication public class integrationkafkaapplication { public static void main(string[] args) throws interruptedexception { applicationcontext context = springapplication.run(integrationkafkaapplication. class , args); producer producer = context.getbean(producer. class ); for ( int i = 1 ; i < 10 ; i++) { producer.send( "test_topic" , new message(i, "test topic message " + i)); thread.sleep( 2000 ); } } } |
可以依次看到发送消息,消费消息
异常问题
反序列化异常(自定义的消息对象不在kafka信任的包路径下)?
[org.springframework.kafka.kafkalistenerendpointcontainer#0-0-c-1] error org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.719 container exception
org.apache.kafka.common.errors.serializationexception: error deserializing key/value for partition test_topic-0 at offset 9. if needed, please seek past the record to continue consumption.
caused by: java.lang.illegalargumentexception: the class 'com.laravelshao.springboot.kafka.message' is not in the trusted packages: [java.util, java.lang]. if you believe this class is safe to deserialize, please provide its name. if the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getclassidtype(defaultjackson2javatypemapper.java:139)
at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype(defaultjackson2javatypemapper.java:113)
at org.springframework.kafka.support.serializer.jsondeserializer.deserialize(jsondeserializer.java:191)
at org.apache.kafka.clients.consumer.internals.fetcher.parserecord(fetcher.java:923)
at org.apache.kafka.clients.consumer.internals.fetcher.access$2600(fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.fetchrecords(fetcher.java:1100)
at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.access$1200(fetcher.java:949)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchrecords(fetcher.java:570)
at org.apache.kafka.clients.consumer.internals.fetcher.fetchedrecords(fetcher.java:531)
at org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer.java:1146)
at org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:1103)
at org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.run(kafkamessagelistenercontainer.java:667)
at java.util.concurrent.executors$runnableadapter.call(executors.java:511)
at java.util.concurrent.futuretask.run(futuretask.java:266)
at java.lang.thread.run(thread.java:745)
解决方法:将当前包添加到kafka信任的包路径下
1
2
3
4
5
6
7
8
|
spring: kafka: consumer: properties: spring: json: trusted: packages: com.laravelshao.springboot.kafka |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://my.oschina.net/LaravelShao/blog/1788005