服务器之家

服务器之家 > 正文

利用Redis实现延时处理的方法实例

时间:2021-07-20 16:13     来源/作者:我一定会有猫的

背景

在开发中,往往会遇到一些关于延时任务的需求。例如

•生成订单30分钟未支付,则自动取消

•生成订单60秒后,给用户发短信

对上述的任务,我们给一个专业的名字来形容,那就是延时任务。

最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下。

实现过程

说到java中的定时功能,首先想到的timer和scheduledthreadpoolexecutor,但是相比之下timer可以排除,主要原因有以下几点:

  • timer使用的是绝对时间,系统时间的改变会对timer产生一定的影响;而scheduledthreadpoolexecutor使用的是相对时间,所以不会有这个问题。
  • timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理,而scheduledthreadpoolexecutor可以自定义线程数量。
  • timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个timer崩溃,而scheduledthreadpoolexecutor对运行时异常做了捕获(可以在 afterexecute() 回调方法中进行处理),所以更加安全。

1、scheduledthreadpoolexecutor决定了用scheduledthreadpoolexecutor来进行实现,接下来就是代码编写啦(大体流程代码)。

主要的延时实现如下:

?
1
2
3
4
5
6
7
8
9
scheduledexecutorservice executorservice = new scheduledthreadpoolexecutor(10, new namedthreadfactory("schedulethreadpool"), new
threadpoolexecutor.abortpolicy());
//从消息中取出延迟时间及相关信息的代码略
int delaytime = 0;
executorservice.schedulewithfixeddelay(new runnable() {
  @override
  public void run() {
   //具体操作逻辑
  }},0,delaytime, timeunit.seconds);

其中namedthreadfactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就不多做介绍了。拒绝策略也是采用默认的拒绝策略。

然后测试了一下,满足目标需求的功能,可以做到延迟指定时间后执行,至此似乎功能就被完成了。

大家可能疑问,这也太简单了有什么好说的,但是这种方式实现简单是简单但是存在一个潜在的问题,问题在哪呢,让我们看一下scheduledthreadpoolexecutor的源码:

?
1
2
3
public scheduledthreadpoolexecutor(int corepoolsize,threadfactory threadfactory) {
 super(corepoolsize, integer.max_value, 0,
 timeunit.nanoseconds,new delayedworkqueue(), threadfactory);}

scheduledthreadpoolexecutor由于它自身的延时和周期的特性,默认使用了delayworkqueue,而并不像我们平时使用的singlethreadexecutor等构造是可以使用自己定义的linkedblockingqueue并且设置队列大小,问题就出在这里。

delaywrokqueue是一个无界队列,而我们的目标数据源是kafka,也就是一个高并发高吞吐的消息队列,很大可能在某一时间段有大量的消息过来从而导致oom,在使用多线程时我们是肯定要考虑到oom的可能性的,因为oom带来的后果往往比较严重,系统oom临时的解决办法一般只能是重启,可能会导致用户数据丢失等不可能挽回的问题,所以从编码设计阶段要采用尽可能稳妥的手段来避免这些问题。

2、采用redis和线程结合

这一次换了思路,采用redis来帮助我们做缓冲,从而避免消息过多oom的问题。

相关redis zset api:

?
1
2
3
4
5
6
//添加元素
zadd key score member [[score member] [score member] …]
//根据分值及限制数量查询
zrangebyscore key min max [withscores] [limit offset count]
//从zset中删除指定成员
zrem key member [member …]

我们采用redis基础数据结构的zset结构,采用score来存储我们目标发送时间的数值,整体处理流程如下:

  • 第一步数据存储:9:10分从kafka接收了一条a的订单消息,要求30分钟后进行发货通知,那我们就将当前时间加上30分钟然后转为时间戳作为a的score,key为a的订单号存入redis中。代码如下:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void onmessage(string topic, string message) {
  string orderid;
        int delaytime = 0;
  try {
   map<string, string> msgmap = gson.fromjson(message, new typetoken<map<string, string>>() {
   }.gettype());
   if (msgmap.isempty()) {
    return;
   }
   logger.info("onmessage kafka content:{}", msgmap.tostring());
     orderid = msgmap.get("orderid");
   if(stringutils.isnotempty(orderid)){
    delaytime = integer.parseint(msgmap.get("delaytime"));
    calendar calendar = calendar.getinstance();
    //计算出预计发送时间
    calendar.add(calendar.minute, delaytime);
    long sendtime = calendar.gettimeinmillis();
    redisutils.getinstance().zetadd(constant.delay, sendtime, orderid);
    logger.info("orderid:{}---放入redis中等待发送---sendtime:{}", ---orderid:{}, sendtime);
   }
  } catch (exception e) {
   logger.info("onmessage 延时发送异常:{}", e);
  }
 }
  • 第二步数据处理:另起一个线程具体调度时间根据业务需求来定,我这里3分钟执行一次,内部逻辑:从redis中取出一定量的zset数据,如何取呢,使用zset的zrangebyscore方法,根据数据的score进行排序,当然可以带上时间段,这里从0到现在,来进行消费,需要注意的一点是,在取出数据后我们需要用zrem方法将取出的数据从zset中删除,防止其他线程重复消费数据。在此之后进行接下来的发货通知等相关逻辑。代码如下:
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run(){
  //获取批量大小
  int ordernum = integer.parseint(propertyutil.get(constant.order_num,"100"));
  try {
   //批量获取离发送时间最近的ordernum条数据
     calendar calendar = calendar.getinstance();
     long now = calendar.gettimeinmillis();
     //获取无限早到现在的事件key(防止上次批量数量小于放入数量,存在历史数据未消费情况)
     set<string> orderids = redisutils.getinstance().zrangebyscore(constant.delay, 0, now, 0, ordernum);
     logger.info("task.getorderfromredis---size:{}---orderids:{}", orderids.size(), gson.tojson(orderids));
   if (collectionutils.isnotempty(orders)){
    //删除key 防止重复发送
    for (string orderid : orderids) {
     redisutils.getinstance().zrem(constant.delay, orderid);
    }
      //接下来执行发送等业务逻辑    
   }
  } catch (exception e) {
   logger.warn("task.run exception:{}", e);
  }
 }

至此完成了依赖redis和线程完成了延时发送的功能。

结语

那么对上面两种不同的实现方式进行一下优缺点比较:

  • 第一种方式实现简单,不依赖外部组件,能够快速的实现目标功能,但缺点也很明显,需要在特定的场景下使用,如果是我这种消息量大的情况下使用很可能是有问题,当然在数据源消息不多的情况下不失为好的选择。
  • 第二种方式实现稍微复杂一点,但是能够适应消息量大的场景,采用redis的zset作为了“中间件”的效果,并且帮助我们进行延时的功能实现能够较好的适应高并发场景,缺点在于在编写的过程中需要考虑实际的因素较多,例如线程的执行周期时间,发送可能会有一定时间的延迟,批量数据大小的设置等等。

综上是本人这次延时功能的实现过程的两种实现方式的总结,具体采用哪种方式还需大家根据实际情况选择,希望能给大家带来帮助。ps:由于本人的技术能力有限,文章中可能出现技术描述不准确或者错误的情况恳请各位大佬指出,我立马进行改正,避免误导大家,谢谢!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对服务器之家的支持。

原文链接:https://juejin.im/post/5c8361626fb9a04a037a06dd

标签:

相关文章

热门资讯

2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
yue是什么意思 网络流行语yue了是什么梗
yue是什么意思 网络流行语yue了是什么梗 2020-10-11
背刺什么意思 网络词语背刺是什么梗
背刺什么意思 网络词语背刺是什么梗 2020-05-22
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总 2020-11-13
2021德云社封箱演出完整版 2021年德云社封箱演出在线看
2021德云社封箱演出完整版 2021年德云社封箱演出在线看 2021-03-15
返回顶部