服务器之家

服务器之家 > 正文

springboot整合rabbitmq的示例代码

时间:2021-03-03 13:59     来源/作者:胡运凡

概述

  1. rabbitmq是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理。
  2. 它现实了amqp协议,并且遵循mozilla public license开源协议,它支持多种语言,可以方便的和spring集成。
  3. 消息队列使用消息将应用程序连接起来,这些消息通过像rabbitmq这样的消息代理服务器在应用程序之间路由。

基本概念

broker

用来处理数据的消息队列服务器实体

vhost

由rabbitmq服务器创建的虚拟消息主机,拥有自己的权限机制,一个broker里可以开设多个vhost,用于不同用户的权限隔离,vhost之间是也完全隔离的。

productor

产生用于消息通信的数据

channel

消息通道,在amqp中可以建立多个channel,每个channel代表一个会话任务。

exchange

direct

转发消息到routing-key指定的队列

springboot整合rabbitmq的示例代码fanout

fanout

转发消息到所有绑定的队列,类似于一种广播发送的方式。

springboot整合rabbitmq的示例代码topic

topic

按照规则转发消息,这种规则多为模式匹配,也显得更加灵活

springboot整合rabbitmq的示例代码queue

queue

  1.  队列是rabbitmq的内部对象,存储消息
  2. 以动态的增加消费者,队列将接受到的消息以轮询(round-robin)的方式均匀的分配给多个消费者。

binding

表示交换机和队列之间的关系,在进行绑定时,带有一个额外的参数binding-key,来和routing-key相匹配。

consumer

监听消息队列来进行消息数据的读取

springboot下三种exchange模式(fanout,direct,topic)实现

pom.xml中引用spring-boot-starter-amqp

?
1
2
3
4
<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-amqp</artifactid>
</dependency>

增加rabbitmq配置

?
1
2
3
4
5
6
spring:
 rabbitmq:
 host: localhost
 port: 5672
 username: guest
 password: guest

direct

direct模式一般情况下只需要定义queue 使用自带交换机(defaultexchange)无需绑定交换机

?
1
2
3
4
5
6
7
8
9
  @configuration
public class rabbitp2pconfigure { 
 public static final string queue_name = "p2p-queue";
  @bean
  public queue queue() {
    return new queue(queue_name, true);
  }
 
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@runwith(springrunner.class)
@springboottest(classes = bootcoretestapplication.class)
@slf4j
public class rabbittest {
  @autowired
  private amqptemplate amqptemplate;
 
  /**
  * 发送
  */
  @test
  public void sendlazy() throws interruptedexception {
    city city = new city(234556666l, "direct_name", "direct_code");
    amqptemplate.convertandsend(rabbitlazyconfigure.queue_name, city);
  }
  
  /**
  * 领取
  */
  @test
  public void receive() throws interruptedexception {
    object obj = amqptemplate.receiveandconvert(rabbitlazyconfigure.queue_name);
    assert.notnull(obj, "");
    log.debug(obj.tostring());
  }
}

适用场景:点对点

fanout

fanout则模式需要将多个queue绑定在同一个交换机上

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@configuration
public class rabbitfanoutconfigure {
  public static final string exchange_name = "fanout-exchange";
  public static final string fanout_a = "fanout.a";
  public static final string fanout_b = "fanout.b";
  public static final string fanout_c = "fanout.c";
 
  @bean
  public queue amessage() {
    return new queue(fanout_a);
  }
 
  @bean
  public queue bmessage() {
    return new queue(fanout_b);
  }
 
  @bean
  public queue cmessage() {
    return new queue(fanout_c);
  }
 
  @bean
  public fanoutexchange fanoutexchange() {
    return new fanoutexchange(exchange_name);
  }
 
  @bean
  public binding bindingexchangea(queue amessage, fanoutexchange fanoutexchange) {
    return bindingbuilder.bind(amessage).to(fanoutexchange);
  }
 
  @bean
  public binding bindingexchangeb(queue bmessage, fanoutexchange fanoutexchange) {
    return bindingbuilder.bind(bmessage).to(fanoutexchange);
  }
 
  @bean
  public binding bindingexchangec(queue cmessage, fanoutexchange fanoutexchange) {
    return bindingbuilder.bind(cmessage).to(fanoutexchange);
  }
 
}

发送者

?
1
2
3
4
5
6
7
8
9
10
11
12
@slf4j
public class sender {
 
  @autowired
  private amqptemplate rabbittemplate;
 
  public void sendfanout(object message) {
    log.debug("begin send fanout message<" + message + ">");
    rabbittemplate.convertandsend(rabbitfanoutconfigure.exchange_name, "", message);
  }
 
}

我们可以通过@rabbitlistener监听多个queue来进行消费

?
1
2
3
4
5
6
7
8
9
10
11
12
13
@slf4j
@rabbitlistener(queues = {
    rabbitfanoutconfigure.fanout_a,
    rabbitfanoutconfigure.fanout_b,
    rabbitfanoutconfigure.fanout_c
})
public class receiver {
 
  @rabbithandler
  public void receivemessage(string message) {
    log.debug("received <" + message + ">");
  }
}

适用场景
- 大规模多用户在线(mmo)游戏可以使用它来处理排行榜更新等全局事件
- 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
- 分发系统使用它来广播各种状态和配置更新
- 在群聊的时候,它被用来分发消息给参与群聊的用户

topic

这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”,exchange会将消息转发到所有关注主题能与routekey模糊匹配的队列。

在进行绑定时,要提供一个该队列关心的主题,如“topic.# (“#”表示0个或若干个关键字,“*”表示一个关键字。 )

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@configuration
public class rabbittopicconfigure {
  public static final string exchange_name = "topic-exchange";
  public static final string topic = "topic";
  public static final string topic_a = "topic.a";
  public static final string topic_b = "topic.b";
 
  @bean
  public queue queuetopic() {
    return new queue(rabbittopicconfigure.topic);
  }
 
  @bean
  public queue queuetopica() {
    return new queue(rabbittopicconfigure.topic_a);
  }
 
  @bean
  public queue queuetopicb() {
    return new queue(rabbittopicconfigure.topic_b);
  }
 
  @bean
  public topicexchange exchange() {
    topicexchange topicexchange = new topicexchange(exchange_name);
    topicexchange.setdelayed(true);
    return new topicexchange(exchange_name);
  }
 
  @bean
  public binding bindingexchangetopic(queue queuetopic, topicexchange exchange) {
    return bindingbuilder.bind(queuetopic).to(exchange).with(rabbittopicconfigure.topic);
  }
 
  @bean
  public binding bindingexchangetopics(queue queuetopica, topicexchange exchange) {
    return bindingbuilder.bind(queuetopica).to(exchange).with("topic.#");
  }
}

同时去监听三个queue

?
1
2
3
4
5
6
7
8
9
10
11
12
@slf4j
@rabbitlistener(queues = {
    rabbittopicconfigure.topic,
    rabbittopicconfigure.topic_a,
    rabbittopicconfigure.topic_b
})
public class receiver {
  @rabbithandler
  public void receivemessage(string message) {
    log.debug("received <" + message + ">");
  }
}

通过测试我们可以发现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@runwith(springrunner.class)
@springboottest(classes = bootcoretestapplication.class)
public class rabbittest {
  @autowired
  private amqptemplate rabbittemplate;
 
  @test
  public void sendall() {
    rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, "topic.test", "send all");
  }
 
  @test
  public void sendtopic() {
    rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic, "send topic");
  }
 
  @test
  public void sendtopica() {
    rabbittemplate.convertandsend(rabbittopicconfigure.exchange_name, rabbittopicconfigure.topic_a, "send topica");
  }
}

适用场景
- 分发有关于特定地理位置的数据,例如销售点
- 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
- 股票价格更新(以及其他类型的金融数据更新)
- 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
- 云端的不同种类服务的协调
- 分布式架构/基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。

延迟队列

延迟消费:

  1. 如用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。
  2. 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

延迟重试:

  1. 如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。
  2. 如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。

设置交换机延迟属性为true

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@configuration
public class rabbitlazyconfigure {
  public static final string queue_name = "lazy-queue-t";
  public static final string exchange_name = "lazy-exchange-t";
 
  @bean
  public queue queue() {
    return new queue(queue_name, true);
  }
 
  @bean
  public directexchange defaultexchange() {
    directexchange directexchange = new directexchange(exchange_name, true, false);
    directexchange.setdelayed(true);
    return directexchange;
  }
 
  @bean
  public binding binding() {
    return bindingbuilder.bind(queue()).to(defaultexchange()).with(queue_name);
  }
 
}

发送时设置延迟时间即可

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@slf4j
public class sender {
  @autowired
  private amqptemplate rabbittemplate;
  public void sendlazy(object msg) {
    log.debug("begin send lazy message<" + msg + ">");
    rabbittemplate.convertandsend(rabbitlazyconfigure.exchange_name,
        rabbitlazyconfigure.queue_name, msg, message -> {
          message.getmessageproperties().setheader("x-delay", 10000);
          return message;
        }
    );
  }
}

结束

各种使用案例请直接查看 官方文档

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:http://www.cnblogs.com/huyunfan/p/8024131.html

标签:

相关文章

热门资讯

2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
yue是什么意思 网络流行语yue了是什么梗
yue是什么意思 网络流行语yue了是什么梗 2020-10-11
Intellij idea2020永久破解,亲测可用!!!
Intellij idea2020永久破解,亲测可用!!! 2020-07-29
背刺什么意思 网络词语背刺是什么梗
背刺什么意思 网络词语背刺是什么梗 2020-05-22
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总 2020-11-13
返回顶部