服务器之家

服务器之家 > 正文

Spring Boot集成Kafka的示例代码

时间:2021-04-21 12:23     来源/作者:寒武没有纪

本文介绍了spring boot集成kafka的示例代码,分享给大家,也给自己留个笔记

系统环境

使用远程服务器上搭建的kafka服务

  1. ubuntu 16.04 lts
  2. kafka_2.12-0.11.0.0.tgz
  3. zookeeper-3.5.2-alpha.tar.gz

集成过程

1.创建spring boot工程,添加相关依赖:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance"
     xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelversion>4.0.0</modelversion>
 
  <groupid>com.laravelshao.springboot</groupid>
  <artifactid>spring-boot-integration-kafka</artifactid>
  <version>0.0.1-snapshot</version>
  <packaging>jar</packaging>
 
  <name>spring-boot-integration-kafka</name>
  <description>demo project for spring boot</description>
 
  <parent>
    <groupid>org.springframework.boot</groupid>
    <artifactid>spring-boot-starter-parent</artifactid>
    <version>2.0.0.release</version>
    <relativepath/> <!-- lookup parent from repository -->
  </parent>
 
  <properties>
    <project.build.sourceencoding>utf-8</project.build.sourceencoding>
    <project.reporting.outputencoding>utf-8</project.reporting.outputencoding>
    <java.version>1.8</java.version>
  </properties>
 
  <dependencies>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter</artifactid>
    </dependency>
    <!--kafka-->
    <dependency>
      <groupid>org.springframework.kafka</groupid>
      <artifactid>spring-kafka</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-json</artifactid>
    </dependency>
    <dependency>
      <groupid>org.springframework.boot</groupid>
      <artifactid>spring-boot-starter-test</artifactid>
      <scope>test</scope>
    </dependency>
  </dependencies>
 
  <build>
    <plugins>
      <plugin>
        <groupid>org.springframework.boot</groupid>
        <artifactid>spring-boot-maven-plugin</artifactid>
      </plugin>
    </plugins>
  </build>
</project>

2.添加配置信息,这里使用yml文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
 kafka:
  bootstrap-servers:x.x.x.x:9092
  producer:
   value-serializer: org.springframework.kafka.support.serializer.jsonserializer
  consumer:
   group-id: test
   auto-offset-reset: earliest
   value-deserializer: org.springframework.kafka.support.serializer.jsondeserializer
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

3.创建消息对象

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class message {
  private integer id;
  private string msg;
 
  public message() {
  }
 
  public message(integer id, string msg) {
    this.id = id;
    this.msg = msg;
  }
 
  public integer getid() {
    return id;
  }
 
  public void setid(integer id) {
    this.id = id;
  }
 
  public string getmsg() {
    return msg;
  }
 
  public void setmsg(string msg) {
    this.msg = msg;
  }
 
  @override
  public string tostring() {
    return "message{" +
        "id=" + id +
        ", msg='" + msg + '\'' +
        '}';
  }
}

4.创建生产者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.laravelshao.springboot.kafka;
 
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.kafka.core.kafkatemplate;
import org.springframework.stereotype.component;
 
/**
 * created by shaoqinghua on 2018/3/23.
 */
@component
public class producer {
  private static logger log = loggerfactory.getlogger(producer.class);
 
  @autowired
  private kafkatemplate kafkatemplate;
 
  public void send(string topic, message message) {
    kafkatemplate.send(topic, message);
    log.info("producer->topic:{}, message:{}", topic, message);
  }
 
}

5.创建消费者,使用@ kafkalistener注解监听主题

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.laravelshao.springboot.kafka;
 
import org.apache.kafka.clients.consumer.consumerrecord;
import org.slf4j.logger;
import org.slf4j.loggerfactory;
import org.springframework.kafka.annotation.kafkalistener;
import org.springframework.stereotype.component;
 
/**
 * created by shaoqinghua on 2018/3/23.
 */
@component
public class consumer {
  private static logger log = loggerfactory.getlogger(consumer.class);
 
  @kafkalistener(topics = "test_topic")
  public void receive(consumerrecord<string, message> consumerrecord) {
    log.info("consumer->topic:{}, value:{}", consumerrecord.topic(), consumerrecord.value());
  }
 
}

6.发送消费测试

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.laravelshao.springboot;
 
import com.laravelshao.springboot.kafka.message;
import com.laravelshao.springboot.kafka.producer;
import org.springframework.boot.springapplication;
import org.springframework.boot.autoconfigure.springbootapplication;
import org.springframework.context.applicationcontext;
 
@springbootapplication
public class integrationkafkaapplication {
 
  public static void main(string[] args) throws interruptedexception {
    applicationcontext context = springapplication.run(integrationkafkaapplication.class, args);
    producer producer = context.getbean(producer.class);
 
    for (int i = 1; i < 10; i++) {
      producer.send("test_topic", new message(i, "test topic message " + i));
      thread.sleep(2000);
    }
  }
 
}

可以依次看到发送消息,消费消息

Spring Boot集成Kafka的示例代码

异常问题

反序列化异常(自定义的消息对象不在kafka信任的包路径下)?

[org.springframework.kafka.kafkalistenerendpointcontainer#0-0-c-1] error org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.719 container exception
org.apache.kafka.common.errors.serializationexception: error deserializing key/value for partition test_topic-0 at offset 9. if needed, please seek past the record to continue consumption.
caused by: java.lang.illegalargumentexception: the class 'com.laravelshao.springboot.kafka.message' is not in the trusted packages: [java.util, java.lang]. if you believe this class is safe to deserialize, please provide its name. if the serialization is only done by a trusted source, you can also enable trust all (*).
 at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.getclassidtype(defaultjackson2javatypemapper.java:139)
 at org.springframework.kafka.support.converter.defaultjackson2javatypemapper.tojavatype(defaultjackson2javatypemapper.java:113)
 at org.springframework.kafka.support.serializer.jsondeserializer.deserialize(jsondeserializer.java:191)
 at org.apache.kafka.clients.consumer.internals.fetcher.parserecord(fetcher.java:923)
 at org.apache.kafka.clients.consumer.internals.fetcher.access$2600(fetcher.java:93)
 at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.fetchrecords(fetcher.java:1100)
 at org.apache.kafka.clients.consumer.internals.fetcher$partitionrecords.access$1200(fetcher.java:949)
 at org.apache.kafka.clients.consumer.internals.fetcher.fetchrecords(fetcher.java:570)
 at org.apache.kafka.clients.consumer.internals.fetcher.fetchedrecords(fetcher.java:531)
 at org.apache.kafka.clients.consumer.kafkaconsumer.pollonce(kafkaconsumer.java:1146)
 at org.apache.kafka.clients.consumer.kafkaconsumer.poll(kafkaconsumer.java:1103)
 at org.springframework.kafka.listener.kafkamessagelistenercontainer$listenerconsumer.run(kafkamessagelistenercontainer.java:667)
 at java.util.concurrent.executors$runnableadapter.call(executors.java:511)
 at java.util.concurrent.futuretask.run(futuretask.java:266)
 at java.lang.thread.run(thread.java:745)

解决方法:将当前包添加到kafka信任的包路径下

?
1
2
3
4
5
6
7
8
spring:
 kafka:
  consumer:
   properties:
    spring:
     json:
      trusted:
       packages: com.laravelshao.springboot.kafka

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://my.oschina.net/LaravelShao/blog/1788005

标签:

相关文章

热门资讯

2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
yue是什么意思 网络流行语yue了是什么梗
yue是什么意思 网络流行语yue了是什么梗 2020-10-11
背刺什么意思 网络词语背刺是什么梗
背刺什么意思 网络词语背刺是什么梗 2020-05-22
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总 2020-11-13
Intellij idea2020永久破解,亲测可用!!!
Intellij idea2020永久破解,亲测可用!!! 2020-07-29
返回顶部