服务器之家

服务器之家 > 正文

Spring整合websocket整合应用示例(下)

时间:2020-04-15 11:38     来源/作者:Bridge Li

在Spring整合websocket整合应用示例(上)文章中,我们已经实现了websocket,但还有一个核心的业务实现类没有实现,这里我们就实现这个业务核心类,因为老夫参与的这个系统使用websocket发送消息,所以其实现就是如何发送消息了。

7. NewsListenerImpl的实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package cn.bridgeli.websocket;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.lagou.common.base.util.date.DateUtil;
import com.lagou.platform.news.api.enumeration.PlatNewsCategoryType;
import com.lagou.platform.news.web.dao.ext.model.PlatNewsVo;
import com.lagou.platform.news.web.dao.ext.model.SearchCondition;
import com.lagou.platform.news.web.quartz.impl.TimingJob;
import com.lagou.platform.news.web.service.PlatNewsService;
import org.apache.commons.lang.StringUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import java.io.IOException;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @Description : 站内消息监听器实现
* @Date : 16-3-7
*/
@Component
public class NewsListenerImpl implements NewsListener{
private static final Logger logger = LoggerFactory.getLogger(NewsListenerImpl.class);
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
//线程池
private ExecutorService executorService = Executors.newCachedThreadPool();
//任务调度
private SchedulerFactory sf = new StdSchedulerFactory();
@Autowired
private PlatNewsService platNewsService;
@Override
public void afterPersist(PlatNewsVo platNewsVo) {
logger.info("监听到有新消息添加。。。");
logger.info("新消息为:"+gson.toJson(platNewsVo));
//启动线程
if(null != platNewsVo && !StringUtils.isBlank(platNewsVo.getCurrentoperatoremail())){
//如果是定时消息
if(platNewsVo.getNewsType() == PlatNewsCategoryType.TIMING_TIME.getCategoryId()){
startTimingTask(platNewsVo); //定时推送
}else{
//立即推送
executorService.execute(new AfterConnectionEstablishedTask(platNewsVo.getCurrentoperatoremail()));
}
}
}
@Override
public void afterConnectionEstablished(String email) {
logger.info("建立websocket连接后推送新消息。。。");
if(!StringUtils.isBlank(email)){
executorService.execute(new AfterConnectionEstablishedTask(email));
}
}
/**
* @Description : 如果新添加了定时消息,启动定时消息任务
* @param platNewsVo
*/
private void startTimingTask(PlatNewsVo platNewsVo){
logger.info("开始定时推送消息任务。。。");
Date timingTime = platNewsVo.getTimingTime();
if(null == timingTime){
logger.info("定时消息时间为null。");
return;
}
logger.info("定时推送任务时间为:"+DateUtil.date2String(timingTime));
JobDetail jobDetail= JobBuilder.newJob(TimingJob.class)
.withIdentity(platNewsVo.getCurrentoperatoremail()+"定时消息"+platNewsVo.getId(), "站内消息")
.build();
//传递参数
jobDetail.getJobDataMap().put("platNewsService",platNewsService);
jobDetail.getJobDataMap().put("userEmail",platNewsVo.getCurrentoperatoremail());
Trigger trigger= TriggerBuilder
.newTrigger()
.withIdentity("定时消息触发"+platNewsVo.getId(), "站内消息")
.startAt(timingTime)
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withIntervalInSeconds(0) //时间间隔
.withRepeatCount(0) //重复次数
)
.build();
//启动定时任务
try {
Scheduler sched = sf.getScheduler();
sched.scheduleJob(jobDetail,trigger);
if(!sched.isShutdown()){
sched.start();
}
} catch (SchedulerException e) {
logger.info(e.toString());
}
logger.info("完成开启定时推送消息任务。。。");
}
/**
* @Description : 建立websocket链接后的推送线程
*/
class AfterConnectionEstablishedTask implements Runnable{
String email ;
public AfterConnectionEstablishedTask(String email){
this.email = email;
}
@Override
public void run() {
logger.info("开始推送消息给用户:"+email+"。。。");
if(!StringUtils.isBlank(email)){
SearchCondition searchCondition = new SearchCondition();
searchCondition.setOperatorEmail(email);
JSONArray jsonArray = new JSONArray();
for(PlatNewsCategoryType type : PlatNewsCategoryType.values()){
searchCondition.setTypeId(type.getCategoryId());
int count = platNewsService.countPlatNewsByExample(searchCondition);
JSONObject object = new JSONObject();
object.put("name",type.name());
object.put("description",type.getDescription());
object.put("count",count);
jsonArray.add(object);
}
if(null != jsonArray && jsonArray.size()>0){
UserSocketVo userSocketVo = WSSessionLocalCache.get(email);
TextMessage reMessage = new TextMessage(gson.toJson(jsonArray));
try {
if(null != userSocketVo){
//推送消息
userSocketVo.getWebSocketSession().sendMessage(reMessage);
//更新推送时间
userSocketVo.setLastSendTime(DateUtil.getNowDate());
logger.info("完成推送新消息给用户:"+userSocketVo.getUserEmail()+"。。。");
}
} catch (IOException e) {
logger.error(e.toString());
logger.info("站内消息推送失败。。。"+e.toString());
}
}
}
logger.info("结束推送消息给"+email+"。。。");
}
}
}

这个类就是websocket的核心业务的实现,其具体肯定和业务相关,由于业务的不同,实现肯定不同,因为老夫参与的系统是发送消息,所以里面最核心的一句就是:

?
1
userSocketVo.getWebSocketSession().sendMessage(reMessage);

通过WebSocketSession的sendMessage方法把我们的消息发送出去。另外,这主要是后端的实现,至于前端的实现,因为老夫是后端程序猿比较关注后端,所以前端就不多做介绍了,大家可以自己去网上查资料。最后需要说明的是,老夫之前搜一些学习资料的时候,发现老夫该同事的写法和有一篇文章几乎一样,我想该同事应该是参考了这篇文章,所以列在下面,算作参考资料。

标签:

相关文章

热门资讯

沙雕群名称大全2019精选 今年最火的微信群名沙雕有创意
沙雕群名称大全2019精选 今年最火的微信群名沙雕有创意 2019-07-07
玄元剑仙肉身有什么用 玄元剑仙肉身境界等级划分
玄元剑仙肉身有什么用 玄元剑仙肉身境界等级划分 2019-06-21
男生常说24816是什么意思?女生说13579是什么意思?
男生常说24816是什么意思?女生说13579是什么意思? 2019-09-17
超A是什么意思 你好a表达的是什么
超A是什么意思 你好a表达的是什么 2019-06-06
华为nova5pro和p30pro哪个好 华为nova5pro和华为p30pro对比详情
华为nova5pro和p30pro哪个好 华为nova5pro和华为p30pro对比详情 2019-06-22
返回顶部