服务器之家

服务器之家 > 正文

Springboot整合Quartz实现动态定时任务的示例代码

时间:2021-06-02 13:54     来源/作者:小揪揪

简介

quartz是一款功能强大的任务调度器,可以实现较为复杂的调度功能,如每月一号执行、每天凌晨执行、每周五执行等等,还支持分布式调度。本文使用springboot+mybatis+quartz实现对定时任务的增、删、改、查、启用、停用等功能。并把定时任务持久化到数据库以及支持集群。

quartz的3个基本要素

  1. scheduler:调度器。所有的调度都是由它控制。
  2. trigger: 触发器。决定什么时候来执行任务。
  3. jobdetail & job: jobdetail定义的是任务数据,而真正的执行逻辑是在job中。使用jobdetail + job而不是job,这是因为任务是有可能并发执行,如果scheduler直接使用job,就会存在对同一个job实例并发访问的问题。而jobdetail & job 方式,sheduler每次执行,都会根据jobdetail创建一个新的job实例,这样就可以规避并发访问的问题。

如何使用quartz

1.添加依赖

?
1
2
3
4
5
6
7
8
9
10
<dependency>
  <groupid>org.quartz-scheduler</groupid>
  <artifactid>quartz</artifactid>
  <version>2.2.3</version>
</dependency>
<dependency>
  <groupid>org.quartz-scheduler</groupid>
  <artifactid>quartz-jobs</artifactid>
  <version>2.2.3</version>
</dependency>

2.创建配置文件

在maven项目的resource目录下创建quartz.properties

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
org.quartz.scheduler.instancename = myscheduler
org.quartz.scheduler.instanceid = auto
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapjobexecutioninusertransaction = false
 
#线程池配置
org.quartz.threadpool.class = org.quartz.simpl.simplethreadpool
org.quartz.threadpool.threadcount = 10
org.quartz.threadpool.threadpriority = 5
org.quartz.threadpool.threadsinheritcontextclassloaderofinitializingthread = true
 
#持久化配置
org.quartz.jobstore.misfirethreshold = 50000
org.quartz.jobstore.class = org.quartz.impl.jdbcjobstore.jobstoretx
#支持集群
org.quartz.jobstore.isclustered = true
org.quartz.jobstore.useproperties:true
org.quartz.jobstore.clustercheckininterval = 15000
#使用weblogic连接oracle驱动
org.quartz.jobstore.driverdelegateclass = org.quartz.impl.jdbcjobstore.oracle.weblogic.weblogicoracledelegate
#org.quartz.jobstore.driverdelegateclass = org.quartz.impl.jdbcjobstore.stdjdbcdelegate
org.quartz.jobstore.tableprefix = qrtz_
org.quartz.jobstore.datasource = qzds
#数据源连接信息,quartz默认使用c3p0数据源可以被自定义数据源覆盖
org.quartz.datasource.qzds.driver = oracle.jdbc.driver.oracledriver
org.quartz.datasource.qzds.url = jdbc:oracle:thin:@localhost:1521/xe
org.quartz.datasource.qzds.user = root
org.quartz.datasource.qzds.password = 123456
org.quartz.datasource.qzds.maxconnections = 10

说明:在使用quartz做持久化的时候需要用到quartz的11张表,可以去quartz官网下载对应版本的quartz,解压打开docs/dbtables里面有对应数据库的建表语句。关于quartz.properties配置的详细解释可以查看quartz官网。另外新建一张表tb_app_quartz用于存放定时任务基本信息和描述等信息,定时任务的增、删、改、执行等功能与此表没有任何关系。
quartz的11张表:

Springboot整合Quartz实现动态定时任务的示例代码

?
1
2
3
4
5
6
7
8
9
10
//tb_app_quartz表的实体类
public class appquartz {
  private integer quartzid; //id 主键
  private string jobname; //任务名称
  private string jobgroup; //任务分组
  private string starttime; //任务开始时间
  private string cronexpression; //corn表达式
  private string invokeparam;//需要传递的参数
  ...省略set get
}

3.quartz配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * 创建job 实例工厂,解决spring注入问题,如果使用默认会导致spring的@autowired 无法注入问题
 * @author llq
 *
 */
@component
public class jobfactory extends adaptablejobfactory{
  @autowired
  private autowirecapablebeanfactory capablebeanfactory;
  
   @override
    protected object createjobinstance(triggerfiredbundle bundle) throws exception {
      //调用父类的方法
      object jobinstance = super.createjobinstance(bundle);
      //进行注入
      capablebeanfactory.autowirebean(jobinstance);
      return jobinstance;
    }
 
}
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@configuration
public class schedulerconfig implements applicationlistener<contextrefreshedevent>{ 
  @autowired
  private jobfactory jobfactory;
  @autowired
  @qualifier("datasource")
  private datasource primarydatasource;
  
  @override
   public void onapplicationevent(contextrefreshedevent event) {
    system.out.println("任务已经启动..."+event.getsource());
  }
  
  @bean
  public schedulerfactorybean schedulerfactorybean() throws ioexception {   
    //获取配置属性
    propertiesfactorybean propertiesfactorybean = new propertiesfactorybean();
    propertiesfactorybean.setlocation(new classpathresource("/quartz.properties"));
    //在quartz.properties中的属性被读取并注入后再初始化对象
    propertiesfactorybean.afterpropertiesset();
    //创建schedulerfactorybean
    schedulerfactorybean factory = new schedulerfactorybean();
    factory.setquartzproperties(propertiesfactorybean.getobject());
    //使用数据源,自定义数据源
    factory.setdatasource(this.primarydatasource);
    factory.setjobfactory(jobfactory);
    factory.setwaitforjobstocompleteonshutdown(true);//这样当spring关闭时,会等待所有已经启动的quartz job结束后spring才能完全shutdown。
    factory.setoverwriteexistingjobs(false);
    factory.setstartupdelay(1);
    return factory;
  }
  
  
  /*
   * 通过schedulerfactorybean获取scheduler的实例
   */
  @bean(name="scheduler")
  public scheduler scheduler() throws ioexception {
    return schedulerfactorybean().getscheduler();
  }
  
  
  @bean
  public quartzinitializerlistener executorlistener() {
    return new quartzinitializerlistener();
  }
}

4.创建定时任务服务

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@service
public class jobutil {
   @autowired
   @qualifier("scheduler")
   private scheduler scheduler;
     
   
   /**
   * 新建一个任务
   *
   */  
   public string addjob(appquartz appquartz) throws exception {
     
       simpledateformat df = new simpledateformat("yyyy-mm-dd hh:mm:ss");
       date date=df.parse(appquartz.getstarttime());
     
       if (!cronexpression.isvalidexpression(appquartz.getcronexpression())) {     
        return "illegal cron expression"//表达式格式不正确
      }             
      jobdetail jobdetail=null;
      //构建job信息
      if("jobone".equals(appquartz.getjobgroup())) {
         jobdetail = jobbuilder.newjob(jobone.class).withidentity(appquartz.getjobname(), appquartz.getjobgroup()).build();
      }
      if("jobtwo".equals(appquartz.getjobgroup())) {
         jobdetail = jobbuilder.newjob(jobtwo.class).withidentity(appquartz.getjobname(), appquartz.getjobgroup()).build();
      }
          
      //表达式调度构建器(即任务执行的时间,不立即执行)
      cronschedulebuilder schedulebuilder = cronschedulebuilder.cronschedule(appquartz.getcronexpression()).withmisfirehandlinginstructiondonothing();
 
      //按新的cronexpression表达式构建一个新的trigger
      crontrigger trigger = triggerbuilder.newtrigger().withidentity(appquartz.getjobname(), appquartz.getjobgroup()).startat(date)
          .withschedule(schedulebuilder).build();
                 
      //传递参数
      if(appquartz.getinvokeparam()!=null && !"".equals(appquartz.getinvokeparam())) {
        trigger.getjobdatamap().put("invokeparam",appquartz.getinvokeparam()); 
      }               
      scheduler.schedulejob(jobdetail, trigger);
      // pausejob(appquartz.getjobname(),appquartz.getjobgroup());
      return "success";
    }
     /**
     * 获取job状态
     * @param jobname
     * @param jobgroup
     * @return
     * @throws schedulerexception
     */
     public string getjobstate(string jobname, string jobgroup) throws schedulerexception {      
       triggerkey triggerkey = new triggerkey(jobname, jobgroup); 
       return scheduler.gettriggerstate(triggerkey).name();
      }
     
     //暂停所有任务
     public void pausealljob() throws schedulerexception {     
       scheduler.pauseall();
     }
    
    //暂停任务
    public string pausejob(string jobname, string jobgroup) throws schedulerexception {     
      jobkey jobkey = new jobkey(jobname, jobgroup);
      jobdetail jobdetail = scheduler.getjobdetail(jobkey);
      if (jobdetail == null) {
         return "fail";
      }else {
         scheduler.pausejob(jobkey);
         return "success";
      }
                     
    }
    
    //恢复所有任务
    public void resumealljob() throws schedulerexception {     
      scheduler.resumeall();
    }
    
    // 恢复某个任务
    public string resumejob(string jobname, string jobgroup) throws schedulerexception {
      
      jobkey jobkey = new jobkey(jobname, jobgroup);
      jobdetail jobdetail = scheduler.getjobdetail(jobkey);
      if (jobdetail == null) {
        return "fail";
      }else {       
        scheduler.resumejob(jobkey);
        return "success";
      }
    }
    
    //删除某个任务
    public string deletejob(appquartz appquartz) throws schedulerexception {     
      jobkey jobkey = new jobkey(appquartz.getjobname(), appquartz.getjobgroup());
      jobdetail jobdetail = scheduler.getjobdetail(jobkey);
      if (jobdetail == null ) {
         return "jobdetail is null";
      }else if(!scheduler.checkexists(jobkey)) {
        return "jobkey is not exists";
      }else {
         scheduler.deletejob(jobkey);
         return "success";
      }
      
    }
    
    //修改任务
    public string modifyjob(appquartz appquartz) throws schedulerexception {     
      if (!cronexpression.isvalidexpression(appquartz.getcronexpression())) {
        return "illegal cron expression";
      }
      triggerkey triggerkey = triggerkey.triggerkey(appquartz.getjobname(),appquartz.getjobgroup());     
      jobkey jobkey = new jobkey(appquartz.getjobname(),appquartz.getjobgroup());
      if (scheduler.checkexists(jobkey) && scheduler.checkexists(triggerkey)) {
        crontrigger trigger = (crontrigger) scheduler.gettrigger(triggerkey);     
        //表达式调度构建器,不立即执行
        cronschedulebuilder schedulebuilder = cronschedulebuilder.cronschedule(appquartz.getcronexpression()).withmisfirehandlinginstructiondonothing();
        //按新的cronexpression表达式重新构建trigger
        trigger = trigger.gettriggerbuilder().withidentity(triggerkey)
          .withschedule(schedulebuilder).build();
        //修改参数
        if(!trigger.getjobdatamap().get("invokeparam").equals(appquartz.getinvokeparam())) {
          trigger.getjobdatamap().put("invokeparam",appquartz.getinvokeparam());
        }       
        //按新的trigger重新设置job执行
        scheduler.reschedulejob(triggerkey, trigger);                       
        return "success";         
      }else {
        return "job or trigger not exists";
      
      
    }
 
}
?
1
2
3
4
5
6
7
8
9
10
11
@persistjobdataafterexecution
@disallowconcurrentexecution
@component
public class jonone implements job{
  @override
  public void execute(jobexecutioncontext context) throws jobexecutionexception{
    jobdatamap data=context.gettrigger().getjobdatamap();
    string invokeparam =(string) data.get("invokeparam");
    //在这里实现业务逻辑
    }
}
?
1
2
3
4
5
6
7
8
9
10
11
@persistjobdataafterexecution
@disallowconcurrentexecution
@component
public class jobtwo implements job{
  @override
  public void execute(jobexecutioncontext context) throws jobexecutionexception{
    jobdatamap data=context.gettrigger().getjobdatamap();
    string invokeparam =(string) data.get("invokeparam");
    //在这里实现业务逻辑
    }
}

说明:每个定时任务都必须有一个分组,名称和corn表达式,corn表达式也就是定时任务的触发时间,关于corn表达式格式以及含义可以参考一些网络资源。每个定时任务都有一个入口类在这里我把类名当成定时任务的分组名称,例如:只要创建定时任务的分组是jobone的都会执行jobone这个任务类里面的逻辑。如果定时任务需要额外的参数可以使用jobdatamap传递参数,当然也可以从数据库中获取需要的数据。@persistjobdataafterexecution和@disallowconcurrentexecution注解是不让某个定时任务并发执行,只有等当前任务完成下一个任务才会去执行。

5.封装定时任务接口

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@restcontroller
public class jobcontroller {
  @autowired
  private jobutil jobutil; 
  @autowired
  private appquartzservice appquartzservice;
  
  
  //添加一个job
  @requestmapping(value="/addjob",method=requestmethod.post)
  public returnmsg addjob(@requestbody appquartz appquartz) throws exception { 
    appquartzservice.insertappquartzser(appquartz);   
    result=jobutil.addjob(appquartz);                       
  }
  
  //暂停job 
  @requestmapping(value="/pausejob",method=requestmethod.post)
  public returnmsg pausejob(@requestbody integer[]quartzids) throws exception { 
    appquartz appquartz=null;     
    if(quartzids.length>0){
      for(integer quartzid:quartzids) {
        appquartz=appquartzservice.selectappquartzbyidser(quartzid).get(0);
        jobutil.pausejob(appquartz.getjobname(), appquartz.getjobgroup());           
      }
      return new returnmsg("200","success pausejob"); 
    }else {
      return new returnmsg("404","fail pausejob"); 
    }                               
  }
  
  //恢复job
  @requestmapping(value="/resumejob",method=requestmethod.post)
  public returnmsg resumejob(@requestbody integer[]quartzids) throws exception { 
    appquartz appquartz=null;
    if(quartzids.length>0) {
      for(integer quartzid:quartzids) {
        appquartz=appquartzservice.selectappquartzbyidser(quartzid).get(0);
        jobutil.resumejob(appquartz.getjobname(), appquartz.getjobgroup());       
      }
      return new returnmsg("200","success resumejob");
    }else {
      return new returnmsg("404","fail resumejob");
    }     
  }
    
  
  //删除job
  @requestmapping(value="/deletjob",method=requestmethod.post)
  public returnmsg deletjob(@requestbody integer[]quartzids) throws exception {
    appquartz appquartz=null;
    for(integer quartzid:quartzids) {
      appquartz=appquartzservice.selectappquartzbyidser(quartzid).get(0);
      string ret=jobutil.deletejob(appquartz);
      if("success".equals(ret)) {
        appquartzservice.deleteappquartzbyidser(quartzid);
      }
    }
    return new returnmsg("200","success deletejob"); 
  }
    
  //修改
  @requestmapping(value="/updatejob",method=requestmethod.post)
  public returnmsg modifyjob(@requestbody appquartz appquartz) throws exception {
    string ret= jobutil.modifyjob(appquartz);     
    if("success".equals(ret)) {     
      appquartzservice.updateappquartzser(appquartz);
      return new returnmsg("200","success updatejob",ret);
    }else {
      return new returnmsg("404","fail updatejob",ret);
    }       
  }
  
  //暂停所有
  @requestmapping(value="/pauseall",method=requestmethod.get)
  public returnmsg pausealljob() throws exception {
    jobutil.pausealljob();
    return new returnmsg("200","success pauseall");
  }
  
  //恢复所有
  @requestmapping(value="/repauseall",method=requestmethod.get)
  public returnmsg repausealljob() throws exception {
    jobutil.resumealljob();
    return new returnmsg("200","success repauseall");
  
  
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://segmentfault.com/a/1190000016554033

相关文章

热门资讯

2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
yue是什么意思 网络流行语yue了是什么梗
yue是什么意思 网络流行语yue了是什么梗 2020-10-11
背刺什么意思 网络词语背刺是什么梗
背刺什么意思 网络词语背刺是什么梗 2020-05-22
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总
苹果12mini价格表官网报价 iPhone12mini全版本价格汇总 2020-11-13
2021德云社封箱演出完整版 2021年德云社封箱演出在线看
2021德云社封箱演出完整版 2021年德云社封箱演出在线看 2021-03-15
返回顶部