实战10种实现延迟任务的方法,附代码
编辑
什么是延迟任务?
顾明思议,我们把需要延迟执行的任务叫做延迟任务。
延迟任务的使用场景有以下这些:
红包24小时未被查收,需要延迟执退还业务;每个月账单日,需要给用户发送当月的对账单;订单下单之后30分钟后,用户如果没有付钱,系统需要自动取消订单。
等事件都需要使用延迟任务。
延迟任务实现思路分析
延迟任务实现的关键是在某个时间节点执行某个任务。基于这个信息我们可以想到实现延迟任务的手段有以下两个:
自己手写一个死循环一直判断当前时间节点有没有要执行的任务;借助JDK或者第三方提供的工具类来实现延迟任务。
而通过JDK实现延迟任务我们能想到的关键词是:DelayQueue、ScheduledExecutorService,而第三方提供的延迟任务执行方法就有很多了,例如:Redis、Netty、MQ等手段。
延迟任务实现
下面我们将结合代码来讲解每种延迟任务的具体实现。
1。无限循环实现延迟任务
此方式我们需要开启一个无限循环一直扫描任务,然后使用一个Map集合用来存储任务和延迟执行的时间,实现代码如下:
importjava。time。Instant;importjava。time。LocalDateTime;importjava。util。HashMap;importjava。util。Iterator;importjava。util。Map;延迟任务执行方法汇总publicclassDelayTaskExample{存放定时任务privatestaticMapString,LongTaskMapnewHashMap();publicstaticvoidmain(String〔〕args){System。out。println(程序启动时间:LocalDateTime。now());添加定时任务TaskMap。put(task1,Instant。now()。plusSeconds(3)。toEpochMilli());延迟3s调用无限循环实现延迟任务loopTask();}无限循环实现延迟任务publicstaticvoidloopTask(){LongitemLong0L;while(true){IteratoritTaskMap。entrySet()。iterator();while(it。hasNext()){Map。Entryentry(Map。Entry)it。next();itemLong(Long)entry。getValue();有任务需要执行if(Instant。now()。toEpochMilli()itemLong){延迟任务,业务逻辑执行System。out。println(执行任务:entry。getKey(),执行时间:LocalDateTime。now());删除任务TaskMap。remove(entry。getKey());}}}}}
以上程序执行的结果为:
程序启动时间:20200412T18:51:28。188
执行任务:task1,执行时间:20200412T18:51:31。189
可以看出任务延迟了3s钟执行了,符合我们的预期。
2。JavaAPI实现延迟任务
JavaAPI提供了两种实现延迟任务的方法:DelayQueue和ScheduledExecutorService。
ScheduledExecutorService实现延迟任务
我们可以使用ScheduledExecutorService来以固定的频率一直执行任务,实现代码如下:
publicclassDelayTaskExample{publicstaticvoidmain(String〔〕args){System。out。println(程序启动时间:LocalDateTime。now());scheduledExecutorServiceTask();}ScheduledExecutorService实现固定频率一直循环执行任务publicstaticvoidscheduledExecutorServiceTask(){ScheduledExecutorServiceexecutorExecutors。newScheduledThreadPool(1);executor。scheduleWithFixedDelay(newRunnable(){Overridepublicvoidrun(){执行任务的业务代码System。out。println(执行任务,执行时间:LocalDateTime。now());}},2,初次执行间隔2,2s执行一次TimeUnit。SECONDS);}}
以上程序执行的结果为:
程序启动时间:20200412T21:28:10。416
执行任务,执行时间:20200412T21:28:12。421
执行任务,执行时间:20200412T21:28:14。422
。。。。。。
可以看出使用ScheduledExecutorServicescheduleWithFixedDelay(。。。)方法之后,会以某个频率一直循环执行延迟任务。
DelayQueue实现延迟任务
DelayQueue是一个支持延时获取元素的无界阻塞队列,队列中的元素必须实现Delayed接口,并重写getDelay(TimeUnit)和compareTo(Delayed)方法,DelayQueue实现延迟队列的完整代码如下:
publicclassDelayTest{publicstaticvoidmain(String〔〕args)throwsInterruptedException{DelayQueuedelayQueuenewDelayQueue();添加延迟任务delayQueue。put(newDelayElement(1000));delayQueue。put(newDelayElement(3000));delayQueue。put(newDelayElement(5000));System。out。println(开始时间:DateFormat。getDateTimeInstance()。format(newDate()));while(!delayQueue。isEmpty()){执行延迟任务System。out。println(delayQueue。take());}System。out。println(结束时间:DateFormat。getDateTimeInstance()。format(newDate()));}staticclassDelayElementimplementsDelayed{延迟截止时间(单面:毫秒)longdelayTimeSystem。currentTimeMillis();publicDelayElement(longdelayTime){this。delayTime(this。delayTimedelayTime);}Override获取剩余时间publiclonggetDelay(TimeUnitunit){returnunit。convert(delayTimeSystem。currentTimeMillis(),TimeUnit。MILLISECONDS);}Override队列里元素的排序依据publicintcompareTo(Delayedo){if(this。getDelay(TimeUnit。MILLISECONDS)o。getDelay(TimeUnit。MILLISECONDS)){return1;}elseif(this。getDelay(TimeUnit。MILLISECONDS)o。getDelay(TimeUnit。MILLISECONDS)){return1;}else{return0;}}OverridepublicStringtoString(){returnDateFormat。getDateTimeInstance()。format(newDate(delayTime));}}}
以上程序执行的结果为:
开始时间:202041220:40:38
202041220:40:39
202041220:40:41
202041220:40:43
结束时间:202041220:40:43
3。Redis实现延迟任务
使用Redis实现延迟任务的方法大体可分为两类:通过zset数据判断的方式,和通过键空间通知的方式。
通过数据判断的方式
我们借助zset数据类型,把延迟任务存储在此数据集合中,然后在开启一个无线循环查询当前时间的所有任务进行消费,实现代码如下(需要借助Jedis框架):
importredis。clients。jedis。Jedis;importutils。JedisUtils;importjava。time。Instant;importjava。util。Set;publicclassDelayQueueExample{zsetkeyprivatestaticfinalStringKEYmyDelayQueue;publicstaticvoidmain(String〔〕args)throwsInterruptedException{JedisjedisJedisUtils。getJedis();延迟30s执行(30s后的时间)longdelayTimeInstant。now()。plusSeconds(30)。getEpochSecond();jedis。zadd(KEY,delayTime,order1);继续添加测试数据jedis。zadd(KEY,Instant。now()。plusSeconds(2)。getEpochSecond(),order2);jedis。zadd(KEY,Instant。now()。plusSeconds(2)。getEpochSecond(),order3);jedis。zadd(KEY,Instant。now()。plusSeconds(7)。getEpochSecond(),order4);jedis。zadd(KEY,Instant。now()。plusSeconds(10)。getEpochSecond(),order5);开启延迟队列doDelayQueue(jedis);}延迟队列消费paramjedisRedis客户端publicstaticvoiddoDelayQueue(Jedisjedis)throwsInterruptedException{while(true){当前时间InstantnowInstantInstant。now();longlastSecondnowInstant。plusSeconds(1)。getEpochSecond();上一秒时间longnowSecondnowInstant。getEpochSecond();查询当前时间的所有任务SetStringdatajedis。zrangeByScore(KEY,lastSecond,nowSecond);for(Stringitem:data){消费任务System。out。println(消费:item);}删除已经执行的任务jedis。zremrangeByScore(KEY,lastSecond,nowSecond);Thread。sleep(1000);每秒轮询一次}}}
通过键空间通知
默认情况下Redis服务器端是不开启键空间通知的,需要我们通过configsetnotifykeyspaceeventsEx的命令手动开启,开启键空间通知后,我们就可以拿到每个键值过期的事件,我们利用这个机制实现了给每个人开启一个定时任务的功能,实现代码如下:
importredis。clients。jedis。Jedis;importredis。clients。jedis。JedisPubSub;importutils。JedisUtils;publicclassTaskExample{publicstaticfinalStringTOPICkeyevent0:expired;订阅频道名称publicstaticvoidmain(String〔〕args){JedisjedisJedisUtils。getJedis();执行定时任务doTask(jedis);}订阅过期消息,执行定时任务paramjedisRedis客户端publicstaticvoiddoTask(Jedisjedis){订阅过期消息jedis。psubscribe(newJedisPubSub(){OverridepublicvoidonPMessage(Stringpattern,Stringchannel,Stringmessage){接收到消息,执行定时任务System。out。println(收到消息:message);}},TOPIC);}}
4。Netty实现延迟任务
Netty是由JBOSS提供的一个Java开源框架,它是一个基于NIO的客户、服务器端的编程框架,使用Netty可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
可以使用Netty提供的工具类HashedWheelTimer来实现延迟任务,实现代码如下。
首先在项目中添加Netty引用,配置如下:
!https:mvnrepository。comartifactio。nettynettycommondependencygroupIdio。nettygroupIdnettycommonartifactIdversion4。1。48。Finalversiondependency
Netty实现的完整代码如下:
publicclassDelayTaskExample{publicstaticvoidmain(String〔〕args){System。out。println(程序启动时间:LocalDateTime。now());NettyTask();}基于Netty的延迟任务privatestaticvoidNettyTask(){创建延迟任务实例HashedWheelTimertimernewHashedWheelTimer(3,时间间隔TimeUnit。SECONDS,100);时间轮中的槽数创建一个任务TimerTasktasknewTimerTask(){Overridepublicvoidrun(Timeouttimeout)throwsException{System。out。println(执行任务,执行时间:LocalDateTime。now());}};将任务添加到延迟队列中timer。newTimeout(task,0,TimeUnit。SECONDS);}}
以上程序执行的结果为:
程序启动时间:20200413T10:16:23。033
执行任务,执行时间:20200413T10:16:26。118
HashedWheelTimer是使用定时轮实现的,定时轮其实就是一种环型的数据结构,可以把它想象成一个时钟,分成了许多格子,每个格子代表一定的时间,在这个格子上用一个链表来保存要执行的超时任务,同时有一个指针一格一格的走,走到那个格子时就执行格子对应的延迟任务,如下图所示:
编辑
(图片来源于网络)
以上的图片可以理解为,时间轮大小为8,某个时间转一格(例如1s),每格指向一个链表,保存着待执行的任务。
5。MQ实现延迟任务
如果专门开启一个MQ中间件来执行延迟任务,就有点杀鸡用宰牛刀般的奢侈了,不过已经有了MQ环境的话,用它来实现延迟任务的话,还是可取的。
几乎所有的MQ中间件都可以实现延迟任务,在这里更准确的叫法应该叫延队列。本文就使用RabbitMQ为例,来看它是如何实现延迟任务的。
RabbitMQ实现延迟队列的方式有两种:
通过消息过期后进入死信交换器,再由交换器转发到延迟消费队列,实现延迟功能;使用rabbitmqdelayedmessageexchange插件实现延迟功能。
注意:延迟插件rabbitmqdelayedmessageexchange是在RabbitMQ3。5。7及以上的版本才支持的,依赖ErlangOPT18。0及以上运行环境。
由于使用死信交换器比较麻烦,所以推荐使用第二种实现方式rabbitmqdelayedmessageexchange插件的方式实现延迟队列的功能。
首先,我们需要下载并安装rabbitmqdelayedmessageexchange插件,下载地址:http:www。rabbitmq。comcommunityplugins。html
选择相应的对应的版本进行下载,然后拷贝到RabbitMQ服务器目录,使用命令rabbitmqpluginsenablerabbitmqdelayedmessageexchange开启插件,在使用命令rabbitmqpluginslist查询安装的所有插件,安装成功如下图所示:
编辑
最后重启RabbitMQ服务,使插件生效。
首先,我们先要配置消息队列,实现代码如下:
importcom。example。rabbitmq。mq。DirectConfig;importorg。springframework。amqp。core。;importorg。springframework。context。annotation。Bean;importorg。springframework。context。annotation。Configuration;importjava。util。HashMap;importjava。util。Map;ConfigurationpublicclassDelayedConfig{finalstaticStringQUEUENAMEdelayed。goods。order;finalstaticStringEXCHANGENAMEdelayedec;BeanpublicQueuequeue(){returnnewQueue(DelayedConfig。QUEUENAME);}配置默认的交换机BeanCustomExchangecustomExchange(){MapString,ObjectargsnewHashMap();args。put(xdelayedtype,direct);参数二为类型:必须是xdelayedmessagereturnnewCustomExchange(DelayedConfig。EXCHANGENAME,xdelayedmessage,true,false,args);}绑定队列到交换器BeanBindingbinding(Queuequeue,CustomExchangeexchange){returnBindingBuilder。bind(queue)。to(exchange)。with(DelayedConfig。QUEUENAME)。noargs();}}
然后添加增加消息的代码,具体实现如下:
importorg。springframework。amqp。AmqpException;importorg。springframework。amqp。core。AmqpTemplate;importorg。springframework。amqp。core。Message;importorg。springframework。amqp。core。MessagePostProcessor;importorg。springframework。beans。factory。annotation。Autowired;importorg。springframework。stereotype。Component;importjava。text。SimpleDateFormat;importjava。util。Date;ComponentpublicclassDelayedSender{AutowiredprivateAmqpTemplaterabbitTemplate;publicvoidsend(Stringmsg){SimpleDateFormatsfnewSimpleDateFormat(yyyyMMddHH:mm:ss);System。out。println(发送时间:sf。format(newDate()));rabbitTemplate。convertAndSend(DelayedConfig。EXCHANGENAME,DelayedConfig。QUEUENAME,msg,newMessagePostProcessor(){OverridepublicMessagepostProcessMessage(Messagemessage)throwsAmqpException{message。getMessageProperties()。setHeader(xdelay,3000);returnmessage;}});}}
再添加消费消息的代码:
importorg。springframework。amqp。rabbit。annotation。RabbitHandler;importorg。springframework。amqp。rabbit。annotation。RabbitListener;importorg。springframework。stereotype。Component;importjava。text。SimpleDateFormat;importjava。util。Date;ComponentRabbitListener(queuesdelayed。goods。order)publicclassDelayedReceiver{RabbitHandlerpublicvoidprocess(Stringmsg){SimpleDateFormatsdfnewSimpleDateFormat(yyyyMMddHH:mm:ss);System。out。println(接收时间:sdf。format(newDate()));System。out。println(消息内容:msg);}}
最后,我们使用代码测试一下:
importcom。example。rabbitmq。RabbitmqApplication;importcom。example。rabbitmq。mq。delayed。DelayedSender;importorg。junit。Test;importorg。junit。runner。RunWith;importorg。springframework。beans。factory。annotation。Autowired;importorg。springframework。boot。test。context。SpringBootTest;importorg。springframework。test。context。junit4。SpringRunner;importjava。text。SimpleDateFormat;importjava。util。Date;RunWith(SpringRunner。class)SpringBootTestpublicclassDelayedTest{AutowiredprivateDelayedSendersender;TestpublicvoidTest()throwsInterruptedException{SimpleDateFormatsfnewSimpleDateFormat(yyyyMMdd);sender。send(HiAdmin。);Thread。sleep(51000);等待接收程序执行之后,再退出测试}}
以上程序的执行结果如下:
发送时间:2020041320:47:51
接收时间:2020041320:47:54
消息内容:HiAdmin。
从结果可以看出,以上程序执行符合延迟任务的实现预期。
6。使用Spring定时任务
如果你使用的是Spring或SpringBoot的项目的话,可以使用借助Scheduled来实现,本文将使用SpringBoot项目来演示Scheduled的实现,实现我们需要声明开启Scheduled,实现代码如下:
SpringBootApplicationEnableSchedulingpublicclassApplication{publicstaticvoidmain(String〔〕args){SpringApplication。run(Application。class,args);}}
然后添加延迟任务,实现代码如下:
ComponentpublicclassScheduleJobs{Scheduled(fixedDelay21000)publicvoidfixedDelayJob()throwsInterruptedException{System。out。println(任务执行,时间:LocalDateTime。now());}}
此时当我们启动项目之后就可以看到任务以延迟了2s的形式一直循环执行,结果如下:
任务执行,时间:20200413T14:07:53。349
任务执行,时间:20200413T14:07:55。350
任务执行,时间:20200413T14:07:57。351
。。。
我们也可以使用Corn表达式来定义任务执行的频率,例如使用Scheduled(cron04?)。
7。Quartz实现延迟任务
Quartz是一款功能强大的任务调度器,可以实现较为复杂的调度功能,它还支持分布式的任务调度。
我们使用Quartz来实现一个延迟任务,首先定义一个执行任务代码如下:
importorg。quartz。JobExecutionContext;importorg。quartz。JobExecutionException;importorg。springframework。scheduling。quartz。QuartzJobBean;importjava。time。LocalDateTime;publicclassSampleJobextendsQuartzJobBean{OverrideprotectedvoidexecuteInternal(JobExecutionContextjobExecutionContext)throwsJobExecutionException{System。out。println(任务执行,时间:LocalDateTime。now());}}
在定义一个JobDetail和Trigger实现代码如下:
importorg。quartz。;importorg。springframework。context。annotation。Bean;importorg。springframework。context。annotation。Configuration;ConfigurationpublicclassSampleScheduler{BeanpublicJobDetailsampleJobDetail(){returnJobBuilder。newJob(SampleJob。class)。withIdentity(sampleJob)。storeDurably()。build();}BeanpublicTriggersampleJobTrigger(){3s后执行SimpleScheduleBuilderscheduleBuilderSimpleScheduleBuilder。simpleSchedule()。withIntervalInSeconds(3)。withRepeatCount(1);returnTriggerBuilder。newTrigger()。forJob(sampleJobDetail())。withIdentity(sampleTrigger)。withSchedule(scheduleBuilder)。build();}}
最后在SpringBoot项目启动之后开启延迟任务,实现代码如下:
importorg。springframework。beans。factory。annotation。Autowired;importorg。springframework。boot。CommandLineRunner;importorg。springframework。scheduling。quartz。SchedulerFactoryBean;SpringBoot项目启动后执行publicclassMyStartupRunnerimplementsCommandLineRunner{AutowiredprivateSchedulerFactoryBeanschedulerFactoryBean;AutowiredprivateSampleSchedulersampleScheduler;Overridepublicvoidrun(String。。。args)throwsException{启动定时任务schedulerFactoryBean。getScheduler()。scheduleJob(sampleScheduler。sampleJobTrigger());}}
以上程序的执行结果如下:
2020041319:02:12。331INFO17768〔restartedMain〕com。example。demo。DemoApplication:StartedDemoApplicationin1。815seconds(JVMrunningfor3。088)
任务执行,时间:20200413T19:02:15。019
从结果可以看出在项目启动3s之后执行了延迟任务。
总结
本文讲了延迟任务的使用场景,以及延迟任务的10种实现方式:
手动无线循环;ScheduledExecutorService;DelayQueue;Rediszset数据判断的方式;Redis键空间通知的方式;Netty提供的HashedWheelTimer工具类;RabbitMQ死信队列;RabbitMQ延迟消息插件rabbitmqdelayedmessageexchange;SpringScheduled;Quartz。
光触媒在什么时间添加到食品中去的光触媒技术是1967年东京大学本多建一教授发现的,在光照下二氧化钛电极可以将水分解成氢气与氧气。2015年,日本研发光触媒净化技术,从火山矿石中提取铂元素,加入到光触媒液体中,……
董明珠格力电器十年给国家交了1000多亿税!董明珠表示,格力电器十年给国家纳税1300亿元1400亿元,这还是国家退税后的数字,如果没有退税,估计1600亿元以上。所以好企业才有实力,才能给国家纳税。企业天天靠国家政策补……
十里将牛烤肉加盟费总部咨询十里将牛烤肉加盟费用【总部】如果您对项目感兴趣,请联系图片电话,免费咨询加盟详情近几年来,加盟行业的发展真是越来越快了,随之产生的十里将牛烤肉加盟品牌也是越来越多,……
洗面奶有什么用(洗面奶的正确用法)说到洗面奶,又有高泡和低泡,还有洗面奶和洁面乳两种不同的名称,还真让一些人摸不着头脑。到底是高泡奶呢?还是低泡好?为什么又有叫洗面奶的,又有叫洁面乳的呢?真晕了好吧!今天茜茜带……
prpr什么意思(prp是什么治疗)近日,张大娘的膝关节炎又犯了,上下楼很费劲,走路一瘸一拐,来到运动医学门诊找到李博士,问李博士:我最近听他们说治疗膝关节炎有新的技术了,叫什么PRP,我能用吗?李博士说,根据你……
自渡什么意思(自渡的含义)最近在抖音上看到一个抖友的一段话,乍一看,觉得有点超然的态度,仔细想想,这不是要自闭吗?这段话是这样的:《自渡》我慢慢开始自渡,变得不悲不喜,我接受世界上所有的不公……
北京论语带薪年休假不能总是纸面福利强制实施带薪年休假政策让职工享有更多休闲时间据工人日报报道,在今年的全国两会上,全国政协总工会界别三位委员呼吁,强制实施带薪年休假相关政策,避免长时间劳作给职工身心……
握握金服最新清退消息2022发布相关清退进展消息情况,平台将握握金服本息清退回款进度通知,目前平台已经接到了全面的消息,现在要进行用户的清退工作,所有的用户都要进行清退,那么在清退用户的过程当中就需要有大量的工作去做,首先要做好投资人的……
雪山贷最新清退消息2022发布相关清退进展消息情况,平台将积雪山贷出借人回款进度的通知会在链接当中为大家提供雪山贷,当前已经决定全面清退用户,关于雪山贷的用户清退情况,会通过专人来做,在雪山贷进行清退工作的,有专业的清退工作小组,并不会……
怕中国CPU崛起,抢intel的饭碗,美国把龙芯也制裁了中国有6大国产CPU,分别是龙芯、兆芯、海光、飞腾、华为鲲鹏、申威。但其中龙芯相对而言更受大家关注。一是龙芯目前已经自研出了LoongArch指令集,与MIPS完全没有关……
篮网球星欧文在与马刺的比赛中受伤本已缺兵少将的布鲁克林网队周三将在圣安东尼奥迎战马刺队。凯里欧文,谁是网队的伤病报告的最后补充,因为小腿紧绷排除根据团队的Twitter账号。网队最近两场比赛都输了凯文杜……
王霜全国三八红旗手对我是很大鼓励,小王也会继续努力直播吧3月4日讯在荣膺2022年度全国三八红旗手标兵称号后,王霜接受了《长江日报》的采访,她表示自己接下来会继续努力。3月2日晚,全国妇联发布了2022年度全国三八红旗手……