websocket在现代浏览器中的应用已经算是比较普遍了,在某些业务场景下,要求必须能够在服务器端推送消息至客户端。在没有websocket的年代,我们使用过dwr,在那个时候dwr真实一个非常棒的方案。但是在websocket兴起之后,我们更愿意使用标准实现来解决问题、
首先交代一下,本篇文章不讲解websocket的配置,主要讲的是针对在微服务架构集群模式下解决方案的选择。
微服务架构大家应该都不陌生了,在微服务架构下,服务是分布式的,而且为了保证业务的可用性,每个服务都是以集群的形式存在。在集群模式下,要保证集群的每一个节点的访问得到相同的结果就需要做到数据一致性,如缓存、session等。
微服务集群缓存通常使用分布式缓存redis解决,session一致性也通常会通过redis解决,但是现在更流行的是无状态的http,即无session化,最常见的解决方案就是oauth。
websocket有所不同,它是与服务端建立一个长连接,在集群模式下,显然不可能把前端与服务集群中的每一个节点建立连接,一个可行的思路是像解决http session的共享一样,通过redis来实现websocket的session共享,但是websocket session的数量是远多于http session的数量的(因为每打开一个页面都会建立一个websocket连接),所以随着用户量的增长,共享的数据量太大,很容易造成瓶颈。
另一个思路是,websocket总归会与集群中某个节点建立连接,那么,只要找到连接所在的节点,就可以向服务端推送消息了,那么要解决的问题就是如何找到一个websocket连接所在的节点。要找到连接在哪个节点上,我们需要一个唯一的标识符用于寻找连接,然而在基于stomp的发布-订阅模式下,一个消息的推送可能是面向若干个连接的,可能分布在集群中的每一个节点上,这样去寻找连接的代价也很高。既然这样,我们不妨换种思路,每一个websocket消息,我们在集群的每个节点上都进行推送,订阅了该消息的连接,不管有一个还是一万个,最终肯定都能收到这个消息。基于这个思路,我们做了一些技术选型:
- rabbitmq
- spring cloud stream
首先说rabbitmq,高级消息队列,可以实现消息广播(当然kafka一样可以做到,这里只介绍一种),另一项技术是spring cloud stream,stream是一个用于构建高度可扩展事件驱动型微服务的框架,并且它可以跟rabbitmq、kafka以及其他多种消息服务集成,使用了stream,要把rabbitmq换成kafka只不过是改改配置的事情。接下来重点介绍使用方法:
引入依赖
1
2
3
4
5
6
7
8
|
<dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-stream</artifactid> </dependency> <dependency> <groupid>org.springframework.cloud</groupid> <artifactid>spring-cloud-stream-binder-rabbit</artifactid> </dependency> |
配置binder
binder是stream中的重要概念,是用于配置用于stream发布和订阅事件的消息中间件。先看一段配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
spring: cloud: stream: binders: defaultrabbit: type: rabbit environment: spring: rabbitmq: host: localhost username: username password: password virtual-host: / |
配置中的 defaultrabbit 是binder的名称,一会会在其他配置中引用,type指定了消息中间件的类型,environment是对消息中间件的配置,这里的配置结构和spring.rabbitmq命名空间下的配置项一模一样的,可以参照着进行配置(这样配置的作用是可以把stream的rabbitmq配置和项目中其他地方使用的rabbitmq区分开,如果这里不配置environment,binder会沿用spring.rabbitmq命名空间下的配置),比如你的项目中的rabbitmq的配置是这样的:
1
2
3
4
5
6
|
spring: rabbitmq: host: localhost username: username password: password virtual-host: / |
那上门的binder的environment配置完全可以去掉。
消息流与binder的绑定
微服务要接收挥着发布事件消息,根据spring cloud stream的名字,顾名思义,需要使用流,所以需要在配置中声明两个事件流,一个输入流,一个输出流:
1
2
3
4
5
6
7
8
9
10
|
spring: cloud: stream: bindings: websocketmessagein: destination: websocketmessage binder: defaultrabbit websocketmessageout: destination: websocketmessage binder: defaultrabbit |
这里我们看到,事件流引用了binder,表示这两个流使用rabbitmq这个中间件(看到这里想必大家已经明白了,在一个项目中完全可以同时使用rabbit和kafka作为事件流的消息中间件)。
websocketmessagein,websocketmessageout是事件流的名字(可以自己随便起),destination指定了两个事件流的destination是同一个,这决定了写入和读取是指向同一个地方(不一定是同一个消息队列)。
事件流声明
事件流使用接口进行定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
/** * websocket消息事件流接口 * created by 吴昊 on 18-11-8. * * @author 吴昊 * @since 1.4.3 */ interface websocketmessagestream { companion object { const val input: string = "websocketmessagein" const val output: string = "websocketmessageout" } /** * 输入 */ @input (input) fun input(): subscribablechannel /** * 输出 */ @output (output) fun output(): messagechannel } |
声明事件流接口,这里面定义了两个常量,分别对应配置中的两个流名称,通过调用input()方法获取输入流,通过调用output()获取输出流。
该接口的实现由spring cloud stream完成,不需要自己实现。
使用事件流
声明一个bean:
1
2
3
4
|
@component @enablebinding (websocketmessagestream:: class ) class websocketmessageservice { …… |
这里的@enablebinding 注解指明了事件流接口类,只有添加了这个注解(要能被spring识别到,可以加在入口类上,也可以加在@configuration注解的类上),该接口才会被实现,并且加入到spring的容器中(可以注入)。
上面websocketmessageservice的内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
@autowired private lateinit var stream: websocketmessagestream @autowired private lateinit var template: simpmessagingtemplate @streamlistener (websocketmessagestream.input) fun messagereceived(message: websocketmessage) { template.convertandsend(message.destination, message.body) } fun send(destination: string, body: any) { stream.output().send( mutablemessage(websocketmessage(destination, body)) ) } |
接收消息
@streamlistener 注解指明了要监听的事件流,方法接收的参数即事件的消息内容(使用jackson反序列化),这里的messagereceived方法直接将接收到的消息直接用websocket发送给前端
发送消息
同样,发送也很简单,将消息直接发送到输入流中,上面的send方法即是将原本应该用simpmessagingtemplate发送给websocket的消息发送到spring cloud stream的事件流中。这样做以后,项目中所有需要向前端推送websocket消息的操作都应该调用send方法来进行。
讲到这里大家可能还有点糊涂,也有一些疑问,为什么这样每个微服务节点就能收到事件消息了?或者单个节点接收事件消息和多个节点接收的配置是怎么控制的。各位不要着急,待我慢慢道来,接下来就要结合rabbit的知识来讲解 了:
首先看一下rabbit的消息队列:
从图中看到,存在多个以websocketmessage开头的队列,这是每一个微服务节点创建了一个消息队列,再来看exchange:
exchange绑定的消息队列
这里的exchange名称和上面消息队列的名称前缀均是websocketmessage, 这个都是 由前面的binding配置中的destination指定的,和destination名称保持一致
当应用向输入流中写入事件时,使用destination作为key(即websocketmessage),将消息写入名为websocketmessage的exchange,由于exchange绑定的消息队列前缀均为websocketmessage且routing key都是#,所以exchange会将消息路由到每一个websocketmessage开头的消息队列上(这里涉及到rabbitmq的知识点,如过不懂请自行查阅资料),这样每一个微服务都能接收到相同的消息。
我们再来看前面提出的问题,这样的配置可以把消息推送到每一个微服务节点,那么如果需要一个消息只被一个节点接收,该怎么配置呢?很简单,一个配置项就可以搞定:
1
2
3
4
5
6
7
8
|
spring: cloud: stream: bindings: websocketmessagein: group: test destination: websocketmessage binder: defaultrabbit |
可以看到,相比前面的配置,仅仅多了一个group的配置,这样配置之后,rabbitmq会生成一个名为websocketmessage.test的消息队列(前面讲到的每个微服务建立的消息队列是自动删除的,即微服务断开连接后消息队列就被删除,而这个消息队列是持久化的,也就是即使所有的微服务节点全部断开连接也不会被删除),所有的微服务节点监听这一个队列,当队列中有消息时,只会被一个节点消费。
要讲的内容到此结束,spring cloud stream的配置远不止这些,但是这些配置已足够完成我所需要做的事情,其他的配置请参考spring cloud stream官方文档:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://juejin.im/post/5c03452bf265da61602cacfe