服务器之家

服务器之家 > 正文

Springboot 配置RabbitMQ文档的方法步骤

时间:2021-07-22 16:20     来源/作者:houshiqun689

简介

rabbitmq是实现amqp(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

概念:

  • 生产者 消息的产生方,负责将消息推送到消息队列
  • 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
  • 队列 消息的寄存器,负责存放生产者发送的消息
  • 交换机 负责根据一定规则分发生产者产生的消息
  • 绑定 完成交换机和队列之间的绑定

模式:

  • direct:直连模式,用于实例间的任务分发
  • topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列
  • headers:适用规则复杂的分发,用headers里的参数表达规则
  • fanout:分发给所有绑定到该exchange上的队列,忽略routing key

springboot集成rabbitmq

一、引入maven依赖

?
1
2
3
4
5
<dependency>
 <groupid>org.springframework.boot</groupid>
 <artifactid>spring-boot-starter-amqp</artifactid>
 <version>1.5.2.release</version>
</dependency>

二、配置application.properties

?
1
2
3
4
5
6
# rabbitmq
spring.rabbitmq.host = dev-mq.a.pa.com
spring.rabbitmq.port = 5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = admin
spring.rabbitmq.virtualhost = /message-test/

三、编写amqpconfiguration配置文件

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package message.test.configuration;
import org.springframework.amqp.core.acknowledgemode;
import org.springframework.amqp.core.amqptemplate;
import org.springframework.amqp.core.binding;
import org.springframework.amqp.core.bindingbuilder;
import org.springframework.amqp.core.directexchange;
import org.springframework.amqp.core.queue;
import org.springframework.amqp.rabbit.config.simplerabbitlistenercontainerfactory;
import org.springframework.amqp.rabbit.connection.cachingconnectionfactory;
import org.springframework.amqp.rabbit.connection.connectionfactory;
import org.springframework.amqp.rabbit.core.rabbittemplate;
import org.springframework.beans.factory.annotation.autowired;
import org.springframework.beans.factory.annotation.qualifier;
import org.springframework.boot.autoconfigure.amqp.rabbitproperties;
import org.springframework.context.annotation.bean;
import org.springframework.context.annotation.configuration;
 
@configuration
public class amqpconfiguration {
/**
 * 消息编码
 */
 public static final string message_encoding = "utf-8";
 public static final string exchange_issue = "exchange_message_issue";
 public static final string queue_issue_user = "queue_message_issue_user";
 public static final string queue_issue_all_user = "queue_message_issue_all_user";
 public static final string queue_issue_all_device = "queue_message_issue_all_device";
 public static final string queue_issue_city = "queue_message_issue_city";
 public static final string routing_key_issue_user = "routing_key_message_issue_user";
 public static final string routing_key_issue_all_user = "routing_key_message_issue_all_user";
 public static final string routing_key_issue_all_device = "routing_key_message_issue_all_device";
 public static final string routing_key_issue_city = "routing_key_message_issue_city";
 public static final string exchange_push = "exchange_message_push";
 public static final string queue_push_result = "queue_message_push_result";
 
 @autowired
 private rabbitproperties rabbitproperties;
 
 @bean
 public queue issueuserqueue() {
  return new queue(queue_issue_user);
 }
 
 @bean
 public queue issuealluserqueue() {
  return new queue(queue_issue_all_user);
 }
 
 @bean
 public queue issuealldevicequeue() {
  return new queue(queue_issue_all_device);
 }
 
 @bean
 public queue issuecityqueue() {
  return new queue(queue_issue_city);
 }
 
 @bean
 public queue pushresultqueue() {
  return new queue(queue_push_result);
 }
 
 @bean
 public directexchange issueexchange() {
  return new directexchange(exchange_issue);
 }
 
 @bean
 public directexchange pushexchange() {
  // 参数1:队列
  // 参数2:是否持久化
  // 参数3:是否自动删除
  return new directexchange(exchange_push, true, true);
 }
 
 @bean
 public binding issueuserqueuebinding(@qualifier("issueuserqueue") queue queue,
    @qualifier("issueexchange") directexchange exchange) {
   return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_user);
 }
 
 @bean
 public binding issuealluserqueuebinding(@qualifier("issuealluserqueue") queue queue,
    @qualifier("issueexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_user);
 }
 
 @bean
 public binding issuealldevicequeuebinding(@qualifier("issuealldevicequeue") queue queue,
    @qualifier("issueexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_all_device);
 }
 
 @bean
 public binding issuecityqueuebinding(@qualifier("issuecityqueue") queue queue,
    @qualifier("issueexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).with(routing_key_issue_city);
 }
 
 @bean
 public binding pushresultqueuebinding(@qualifier("pushresultqueue") queue queue,
    @qualifier("pushexchange") directexchange exchange) {
  return bindingbuilder.bind(queue).to(exchange).withqueuename();
 }
 
 @bean
 public connectionfactory defaultconnectionfactory() {
  cachingconnectionfactory connectionfactory = new cachingconnectionfactory();
  connectionfactory.sethost(rabbitproperties.gethost());
  connectionfactory.setport(rabbitproperties.getport());
  connectionfactory.setusername(rabbitproperties.getusername());
  connectionfactory.setpassword(rabbitproperties.getpassword());
  connectionfactory.setvirtualhost(rabbitproperties.getvirtualhost());
  return connectionfactory;
 }
 
 @bean
 public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory(
    @qualifier("defaultconnectionfactory") connectionfactory connectionfactory) {
  simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory();
  factory.setconnectionfactory(connectionfactory);
  factory.setacknowledgemode(acknowledgemode.manual);
  return factory;
 }
 
 @bean
 public amqptemplate rabbittemplate(@qualifier("defaultconnectionfactory") connectionfactory connectionfactory)
 {
  return new rabbittemplate(connectionfactory);
 }
}

三、编写生产者

?
1
2
3
body = json.tojsonstring(issuemessage).getbytes(amqpconfiguration.message_encoding);
 rabbittemplate.convertandsend(amqpconfiguration.exchange_issue,
            amqpconfiguration.routing_key_issue_user, body);

四、编写消费者

?
1
2
3
4
5
@rabbitlistener(queues = amqpconfiguration.queue_push_result)
public void handlepushresult(@payload byte[] data, channel channel,
    @header(amqpheaders.delivery_tag) long deliverytag) {
    
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://segmentfault.com/a/1190000018555963

标签:

相关文章

热门资讯

2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
yue是什么意思 网络流行语yue了是什么梗
yue是什么意思 网络流行语yue了是什么梗 2020-10-11
背刺什么意思 网络词语背刺是什么梗
背刺什么意思 网络词语背刺是什么梗 2020-05-22
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总 2020-11-13
2021德云社封箱演出完整版 2021年德云社封箱演出在线看
2021德云社封箱演出完整版 2021年德云社封箱演出在线看 2021-03-15
返回顶部