springboot定时任务功能怎么实现-mile米乐体育
springboot定时任务功能怎么实现
本篇内容介绍了“springboot定时任务功能怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
一 背景
项目中需要一个可以动态新增定时定时任务的功能,现在项目中使用的是xxl-job定时任务调度系统,但是经过一番对xxl-job功能的了解,发现xxl-job对项目动态新增定时任务,动态删除定时任务的支持并不是那么好,所以需要自己手动实现一个定时任务的功能
二 动态定时任务调度
1 技术选择
timer
or scheduledexecutorservice
这两个都能实现定时任务调度,先看下timer的定时任务调度
publicclassmytimertaskextendstimertask{privatestringname;publicmytimertask(stringname){this.name=name;}publicstringgetname(){returnname;}publicvoidsetname(stringname){this.name=name;}@overridepublicvoidrun(){//taskcalendarinstance=calendar.getinstance();system.out.println(newsimpledateformat("yyyy-mm-ddhh:mm:ss").format(instance.gettime()));}}timertimer=newtimer();mytimertasktimertask=newmytimertask("no.1");//首次执行,在当前时间的1秒以后,之后每隔两秒钟执行一次timer.schedule(timertask,1000l,2000l);
在看下scheduledthreadpoolexecutor的实现
//org.apache.commons.lang3.concurrent.basicthreadfactoryscheduledexecutorserviceexecutorservice=newscheduledthreadpoolexecutor(1,newbasicthreadfactory.builder().namingpattern("example-schedule-pool-%d").daemon(true).build());executorservice.scheduleatfixedrate(newrunnable(){@overridepublicvoidrun(){//dosomething}},initialdelay,period,timeunit.hours);
两个都能实现定时任务,那他们的区别呢,使用阿里p3c会给出建议和区别
多线程并行处理定时任务时,timer运行多个timetask时,只要其中之一没有捕获抛出的异常,其它任务便会自动终止运行,使用scheduledexecutorservice则没有这个问题。
从建议上来看,是一定要选择scheduledexecutorservice
了,我们看看源码看看为什么timer
出现问题会终止执行
/***thetimerthread.*/privatefinaltimerthreadthread=newtimerthread(queue);publictimer(){this("timer-" serialnumber());}publictimer(stringname){thread.setname(name);thread.start();}
新建对象时,我们看到开启了一个线程,那么这个线程在做什么呢?一起看看
classtimerthreadextendsthread{booleannewtasksmaybescheduled=true;/***每一件一个任务都是一个quene*/privatetaskqueuequeue;timerthread(taskqueuequeue){this.queue=queue;}publicvoidrun(){try{mainloop();}finally{//someonekilledthisthread,behaveasiftimercancelledsynchronized(queue){newtasksmaybescheduled=false;queue.clear();//清除所有任务信息}}}/***themaintimerloop.(seeclasscomment.)*/privatevoidmainloop(){while(true){try{timertasktask;booleantaskfired;synchronized(queue){//waitforqueuetobecomenon-emptywhile(queue.isempty()&&newtasksmaybescheduled)queue.wait();if(queue.isempty())break;//queueisemptyandwillforeverremain;die//queuenonempty;lookatfirstevtanddotherightthinglongcurrenttime,executiontime;task=queue.getmin();synchronized(task.lock){if(task.state==timertask.cancelled){queue.removemin();continue;//noactionrequired,pollqueueagain}currenttime=system.currenttimemillis();executiontime=task.nextexecutiontime;if(taskfired=(executiontime<=currenttime)){if(task.period==0){//non-repeating,removequeue.removemin();task.state=timertask.executed;}else{//repeatingtask,reschedulequeue.reschedulemin(task.period<0?currenttime-task.period:executiontime task.period);}}}if(!taskfired)//taskhasn'tyetfired;waitqueue.wait(executiontime-currenttime);}if(taskfired)//taskfired;runit,holdingnolockstask.run();}catch(interruptedexceptione){}}}}
我们看到,执行了 mainloop()
,里面是 while (true)
方法无限循环,获取程序中任务对象中的时间和当前时间比对,相同就执行,但是一旦报错,就会进入finally中清除掉所有任务信息。
这时候我们已经找到了答案,timer是在被实例化后,启动一个线程,不间断的循环匹配,来执行任务,他是单线程的,一旦报错,线程就终止了,所以不会执行后续的任务,而scheduledthreadpoolexecutor是多线程执行的,就算其中有一个任务报错了,并不影响其他线程的执行。
2 使用scheduledthreadpoolexecutor
从上面看,使用scheduledthreadpoolexecutor
还是比较简单的,但是我们要实现的更优雅一些,所以选择 taskscheduler
来实现
@componentpublicclasscrontaskregistrarimplementsdisposablebean{privatefinalmapscheduledtasks=newconcurrenthashmap<>(16);@autowiredprivatetaskschedulertaskscheduler;publictaskschedulergetscheduler(){returnthis.taskscheduler;}publicvoidaddcrontask(runnabletask,stringcronexpression){addcrontask(newcrontask(task,cronexpression));}privatevoidaddcrontask(crontaskcrontask){if(crontask!=null){runnabletask=crontask.getrunnable();if(this.scheduledtasks.containskey(task)){removecrontask(task);}this.scheduledtasks.put(task,schedulecrontask(crontask));}}publicvoidremovecrontask(runnabletask){set runnables=this.scheduledtasks.keyset();iteratorit1=runnables.iterator();while(it1.hasnext()){schedulingrunnableschedulingrunnable=(schedulingrunnable)it1.next();longtaskid=schedulingrunnable.gettaskid();schedulingrunnablecancelrunnable=(schedulingrunnable)task;if(taskid.equals(cancelrunnable.gettaskid())){scheduledtaskscheduledtask=this.scheduledtasks.remove(schedulingrunnable);if(scheduledtask!=null){scheduledtask.cancel();}}}}publicscheduledtaskschedulecrontask(crontaskcrontask){scheduledtaskscheduledtask=newscheduledtask();scheduledtask.future=this.taskscheduler.schedule(crontask.getrunnable(),crontask.gettrigger());returnscheduledtask;}@overridepublicvoiddestroy()throwsexception{for(scheduledtasktask:this.scheduledtasks.values()){task.cancel();}this.scheduledtasks.clear();}}
taskscheduler
是本次功能实现的核心类,但是他是一个接口
publicinterfacetaskscheduler{/***schedulethegiven{@linkrunnable},invokingitwheneverthetrigger*indicatesanextexecutiontime.*executionwillendoncetheschedulershutsdownorthereturned*{@linkscheduledfuture}getscancelled.*@paramtasktherunnabletoexecutewheneverthetriggerfires*@paramtriggeranimplementationofthe{@linktrigger}interface,*e.g.a{@linkorg.springframework.scheduling.support.crontrigger}object*wrappingacronexpression*@returna{@linkscheduledfuture}representingpendingcompletionofthetask,*or{@codenull}ifthegiventriggerobjectneverfires(i.e.returns*{@codenull}from{@linktrigger#nextexecutiontime})*@throwsorg.springframework.core.task.taskrejectedexceptionifthegiventaskwasnotaccepted*forinternalreasons(e.g.apooloverloadhandlingpolicyorapoolshutdowninprogress)*@seeorg.springframework.scheduling.support.crontrigger*/@nullablescheduledfutureschedule(runnabletask,triggertrigger);
前面的代码可以看到,我们在类中注入了这个类,但是他是接口,我们怎么知道是那个实现类呢,以往出现这种情况要在类上面加@primany或者@quality来执行实现的类,但是我们看到我的注入上并没有标记,因为是通过另一种方式实现的
@configurationpublicclassschedulingconfig{@beanpublictaskschedulertaskscheduler(){threadpooltaskschedulertaskscheduler=newthreadpooltaskscheduler();//定时任务执行线程池核心线程数taskscheduler.setpoolsize(4);taskscheduler.setremoveoncancelpolicy(true);taskscheduler.setthreadnameprefix("taskschedulerthreadpool-");returntaskscheduler;}}
在spring初始化时就注册了bean taskscheduler,而我们可以看到他的实现是threadpooltaskscheduler,在网上的资料中有人说threadpooltaskscheduler是taskscheduler的默认实现类,其实不是,还是需要我们去指定,而这种方式,当我们想替换实现时,只需要修改配置类就行了,很灵活。
而为什么说他是更优雅的实现方式呢,因为他的核心也是通过scheduledthreadpoolexecutor来实现的
publicscheduledexecutorservicegetscheduledexecutor()throwsillegalstateexception{assert.state(this.scheduledexecutor!=null,"threadpooltaskschedulernotinitialized");returnthis.scheduledexecutor;}
三 多节点任务执行问题
这次的实现过程中,我并没有选择xxl-job来进行实现,而是采用了taskscheduler来实现,这也产生了一个问题,xxl-job是分布式的程序调度系统,当想要执行定时任务的应用使用xxl-job时,无论应用程序中部署多少个节点,xxl-job只会选择其中一个节点作为定时任务执行的节点,从而不会产生定时任务在不同节点上同时执行,导致重复执行问题,而使用taskscheduler来实现,就要考虑多节点重复执行问题。当然既然有问题,就有mile米乐体育的解决方案
· 方案一 将定时任务功能拆出来单独部署,且只部署一个节点 · 方案二 使用redis setnx的形式,保证同一时间只有一个任务在执行
我选择的是方案二来执行,当然还有一些方式也能保证不重复执行,这里就不多说了,一下是我的实现
publicvoidexecutetask(longtaskid){if(!redisservice.setifabsent(string.valueof(taskid),"1",2l,timeunit.seconds)){log.info("已有执行中定时发送短信任务,本次不执行!");return;}
“springboot定时任务功能怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注恰卡编程网网站,小编将为大家输出更多高质量的实用文章!