网站后台 编辑器 调用,网站优化搜索排名,旅游网站信息门户建设方案,中国建设银行集团Spring定时任务动态更改#xff08;增、删、改#xff09;Cron表达式方案实例详解
最近在做一个需求#xff0c;用户可以在平台上配置任务计划数据#xff08;就是任务的执行和描述的相关信息#xff0c;包括任务参数、cron表达式#xff09;#xff0c;配置后#xf…Spring定时任务动态更改增、删、改Cron表达式方案实例详解
最近在做一个需求用户可以在平台上配置任务计划数据就是任务的执行和描述的相关信息包括任务参数、cron表达式配置后后端可以根据配置的计划信息在cron表示式的时间生成相应的一条任务记录。这里用户可以新增任务计划、修改任务计划、删除任务计划后端要根据用户的配置动态的生成相应的任务记录。
我这里的做法都是定时比如每隔30s扫描数据库中所有可用的用户任务配置根据每次扫描到的配置信息对已有的调度信息做增加、删除、修改。虽然服务是分布式部署的因为我们的场景是根据用户的任务配置信息在用户配置的时间点向数据库生成一条任务信息给下游使用并不是马上执行任务的内容所以资源的消耗不大为了简化开发我们要求任务生成要只会在一台机器上执行。
这我们的方案中不考虑用户实时修改后马上生效这里主要原因是服务可能是分布式部署的如果不同的任务信息分布到不同的机器用户修改后要实时生效就必须将变化的任务分配到调度信息所在的机器上才能实时更新。
这里我实现了两个版本。方案一是基于延迟队列做的方案二是基于Spring的SchedulingConfigurer做的。
方案一基于延迟队列
延迟队列DelayQueue的做法是基于延迟的属性让服务在固定的时间根据用户的配置生成一条任务信息。使用消息的生产和消费模式。延迟消息的延迟时间根据cron表达式生成。
每隔30s扫描一次用户的配置表根据用户的各个任务配置信息分别生产一条延迟消息同时使用Map记录消息信息Map的key使用这个任务配置信息的json字符串数据库的一条数据对应的md5值value是这条任务配置的数据对象。
每次生产消息时校验Map中是否已经存在相同的md5值如果存在相同的md5值说明配置信息没有更新并且延迟队列中已经有未消费的消息本次就不生成新的消息。如果不存在相同的md5值说明是一个新任务配置或是用户修改后的任务配置不管是修改cron表达式还是任务的其他参数这时就生成新的延迟消息。因此对应任务的修改同一个任务配置在Map中会有多条消息在消费时需要校验哪条消息才是有效的无效的消息消费后被过滤掉。
首先定义一个消息对象实现Delayed接口
package com.XXXXX.or.algo.full.warehouse.bo.msg;import com.XXXXX.or.algo.full.warehouse.entity.db1.WarehouseAdjustmentPlan;
import lombok.Data;
import org.jetbrains.annotations.NotNull;import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;/*** 延迟队列的消息*/
Data
public class PlanMsg implements Delayed {private String planId;// 延迟时间 秒private long delaySec;// 过期时间 纳秒 对与cpu来说毫秒太大private long expire;// 数据库中的任务计划配置private WarehouseAdjustmentPlan detail;// 本条消息中数据库的md5private String md5;public PlanMsg(String planId, long delaySec,WarehouseAdjustmentPlan detail,String md5) {this.planId planId;this.delaySec delaySec;// 过期时间 纳秒this.expire System.nanoTime() TimeUnit.NANOSECONDS.convert(delaySec,TimeUnit.SECONDS);this.detail detail;this.md5 md5;}Overridepublic long getDelay(NotNull TimeUnit unit) {return unit.convert(this.expire - System.nanoTime(),TimeUnit.NANOSECONDS);}Overridepublic int compareTo(NotNull Delayed o) {long time getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS);return time 0 ? 0 : ( time 0 ? 1 : -1);}
}使用一个组件对消息做生产和消费
cron表达式依赖
!-- cron表达式相关--dependencygroupIdcom.cronutils/groupIdartifactIdcron-utils/artifactIdversion9.1.5/version/dependency这里的生产使用Scheduled(cron “0/30 * * * * ?”)定期扫描数据库中计划配置信息和planMd5Map比较后决定是否生成新的消息。这里的消息的延迟时间根据cron表达式生成。
消息的消费项目启动后使用PostConstruct启动一个线程消费消息如果消息没有到延迟的时间会阻塞在delayQueue.take()位置。当消费到消息后根据消息的id到数据库中找到这条配置消息通过比较md5决定是否向数据库插入一条任务。如果消息中的md5和根据消息id到数据库中查询的记录的md5一致则插入一条任务数据否则丢弃该消息。这样用户对任务配置的参数增删改都能很好的覆盖了。
package com.XXXXX.or.algo.full.warehouse.job;import com.alibaba.fastjson.JSON;
import com.cronutils.model.CronType;
import com.cronutils.model.definition.CronDefinition;
import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.model.time.ExecutionTime;
import com.cronutils.parser.CronParser;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.XXXXX.or.algo.full.warehouse.bo.PlanningCommitReq;
import com.XXXXX.or.algo.full.warehouse.bo.msg.PlanMsg;
import com.XXXXX.or.algo.full.warehouse.entity.db1.WarehouseAdjustmentPlan;
import com.XXXXX.or.algo.full.warehouse.service.WarehouseAdjustmentService;
import com.XXXXX.or.algo.full.warehouse.util.CommUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.*;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;Component
Slf4j
public class CronExpTaskJob implements SimpleJob {// 每个plan的一条或多条消息(如果用户修改计划会生成多条消息)private DelayQueuePlanMsg delayQueue new DelayQueue();// 每个md5 对应的计划数据private MapString, WarehouseAdjustmentPlan planMd5Map new HashMap();private ExecutorService singleThreadExecutor Executors.newSingleThreadExecutor();Autowiredprivate WarehouseAdjustmentService warehouseAdjustmentService;PostConstructpublic void loadValidPlans(){// 监听消息 生成任务singleThreadExecutor.execute(()-{while (true) {String planId ;String msgMd5 ;try {// 阻塞队列PlanMsg msg delayQueue.take();log.info(消费消息:{},JSON.toJSONString(msg));planId msg.getPlanId();msgMd5 msg.getMd5();// 校验WarehouseAdjustmentPlan dbPlan warehouseAdjustmentService.query(planId);String dbPlanMd5 CommUtil.getMD5(JSON.toJSONString(dbPlan));// 消息的md5值和数据中数据的md5值不一样说明数据有变不生成任务if(! msgMd5.equals(dbPlanMd5)){log.info(计划改变不提交任务。改变前消息:{}; 改变后数据库:{}, JSON.toJSONString(msg.getDetail()), JSON.toJSONString(dbPlan));continue;}PlanningCommitReq req new PlanningCommitReq();req.setPlanId(msg.getPlanId());req.setUserId(sys);req.setUserName(sys);// 生成任务warehouseAdjustmentService.commit(req);log.info(计划id:{},提交成功。时间{},msg.getPlanId(),new Date());} catch (Exception e) {log.info(计划id:{},提交失败,提交时间{}, planId, new Date(), e);}finally {planMd5Map.remove(msgMd5);}}});}Scheduled(cron 0/30 * * * * ?) // 30秒一次测试使用 线上分布式部署使用elastic-jobpublic void generateMsg(){// 找到所有计划ListWarehouseAdjustmentPlan planList warehouseAdjustmentService.loadValidPlans();if(CollectionUtils.isEmpty(planList)){return;}for (WarehouseAdjustmentPlan plan : planList) {try {String dbPlanMd5 CommUtil.getMD5(JSON.toJSONString(plan));// 不同md5值的相同计划id都可以提交相同md5值计划,不能重复提交if(planMd5Map.containsKey(dbPlanMd5)){// 消息已经存 并且未被消费log.info(存在未消费的相同信息不生成当前消息,plan_id:{},plan.getPlanId());continue;}CronDefinition cronDefinition CronDefinitionBuilder.instanceDefinitionFor(CronType.QUARTZ);CronParser parser new CronParser(cronDefinition);ExecutionTime executionTime ExecutionTime.forCron(parser.parse(plan.getCronExpression()));// 离下一次执行还有多久OptionalDuration duration executionTime.timeToNextExecution(ZonedDateTime.now());PlanMsg planMsg new PlanMsg(plan.getPlanId(), duration.get().getSeconds(),plan,dbPlanMd5);// 发消息log.info(生产消息成功。计划:{}, JSON.toJSONString(plan));delayQueue.add(planMsg);// 记录队列中planId的最新的一个md5值planMd5Map.put(dbPlanMd5, plan);}catch (Exception e){log.info(任务消息生产失败。计划:{}, JSON.toJSONString(plan), e);}}}
}
方案二基于Spring的SchedulingConfigurer接口
实现SchedulingConfigurer接口中的public void configureTasks(ScheduledTaskRegistrar taskRegistrar)方法方法的入参ScheduledTaskRegistrar是个关键变量。
为了适合用户配置计划任务较多的场景使用ThreadPoolTaskScheduler线程池。
这里的关键是自定义的freshTasks()方法这个方法有两处调用一个是configureTasks方法中的调用一个是通过Scheduled(cron “0/30 * * * * ?”)定时调用。freshTasks()方案中首先全量查询数据库中的用户任务配置数据和上一次查询的全量配置数据进行比较找到哪些是用户新增的哪些是用户修改的哪些是用户删除的停止的。然后针对这三种数据分别调用对应的方法修改ScheduledTaskRegistrar 中已经加载的任务信息。
成员变量MapString,ScheduledTask是一个自定义的关键变量key是数据库中用户的配置计划的idvalue是Spring调度器中的每个任务任务的增、删、改都是操作这个map。
package com.XXXXX.or.algo.full.warehouse.job;import com.alibaba.fastjson.JSON;
import com.XXXXX.or.algo.full.warehouse.bo.PlanningCommitReq;
import com.XXXXX.or.algo.full.warehouse.entity.db1.WarehouseAdjustmentPlan;
import com.XXXXX.or.algo.full.warehouse.service.WarehouseAdjustmentService;
import com.XXXXX.or.algo.full.warehouse.util.CommUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.CronTask;
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.util.CollectionUtils;import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;/*** 定期扫描数据库的最新配置信息对任务做增、删、改*/
Configuration
Slf4j
SuppressWarnings(Duplicates)
public class MyScheduleConfig implements SchedulingConfigurer{// 用于查询数据库中每一个任务配置信息包括任务id对应的cron表达式Autowiredprivate WarehouseAdjustmentService warehouseAdjustmentService;// 上一次查询到的数据库任务配置信息 用于和本次查询进行对比后对existedTask任务做增、删、改private ListWarehouseAdjustmentPlan historyConfList new ArrayList();// 根据数据库任务配置信息生成的任务 任务的增、删、改都是操作这个mapprivate MapString,ScheduledTask existedTask new HashMap();private ScheduledTaskRegistrar taskRegistrar;/*** 用线程池执行任务* return*/Beanpublic ThreadPoolTaskScheduler threadPoolTaskScheduler(){ThreadPoolTaskScheduler threadPool new ThreadPoolTaskScheduler();threadPool.setPoolSize(20);threadPool.setThreadNamePrefix(plan-to-task);return threadPool;}Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {// 将taskRegistrar作为成员变量 便于后续任务的增删改this.taskRegistrar taskRegistrar;// 通过线程池去启动不同的定时任务。适合定时任务较多的场景。ThreadPoolTaskScheduler threadPool threadPoolTaskScheduler();taskRegistrar.setScheduler(threadPool);// 根据数据库配置 启动全量刷新任务 分布式部署时这里不能加载 // freshTasks();}/*** 根据数据库配置 定期全量刷新任务*/Scheduled(cron 0/30 * * * * ?) // 分布式部署时需要考虑其他方案比如Scheduled分布式锁 或使用elastic-job等public void shceduled(){freshTasks();}/*** 通过比较数据库中配置信息变化 找到增、删、改的任务并刷新任务列表*/public synchronized void freshTasks(){// 找到数据库最新的全量有效配置ListWarehouseAdjustmentPlan newestConfList warehouseAdjustmentService.loadValidPlans();// 上一次的全量有效配置ListWarehouseAdjustmentPlan historyConfList this.historyConfList;if(CollectionUtils.isEmpty(newestConfList)){newestConfList new ArrayList();}if(CollectionUtils.isEmpty(historyConfList)){historyConfList new ArrayList();}// list转mapMapString, WarehouseAdjustmentPlan newestConfMap newestConfList.stream().collect(Collectors.toMap(WarehouseAdjustmentPlan::getPlanId, Function.identity(), (o1, o2) - o1));MapString, WarehouseAdjustmentPlan historyConfMap historyConfList.stream().collect(Collectors.toMap(WarehouseAdjustmentPlan::getPlanId, Function.identity(), (o1, o2) - o1));// 找到哪些是新增的、哪些是修改的、哪些是删除的ListWarehouseAdjustmentPlan addList findAddList(newestConfMap,historyConfMap);ListWarehouseAdjustmentPlan modifyList findModifyList(newestConfMap,historyConfMap);ListWarehouseAdjustmentPlan delList findDelList(newestConfMap,historyConfMap);// 新增任务for(WarehouseAdjustmentPlan tmp : addList){addTask(tmp.getPlanId(),tmp.getCronExpression());}// 修改任务for(WarehouseAdjustmentPlan tmp : modifyList){modifyTask(tmp.getPlanId(),tmp.getCronExpression());}// 删除任务for(WarehouseAdjustmentPlan tmp : delList){stopTask(tmp.getPlanId());}// 将本次查询的列表做历史列表this.historyConfList newestConfList;}/*** 找到新增的用户配置*/private ListWarehouseAdjustmentPlan findAddList(MapString, WarehouseAdjustmentPlan newestConfMap, MapString, WarehouseAdjustmentPlan historyConfMap) {ListWarehouseAdjustmentPlan result new ArrayList();for(Map.EntryString, WarehouseAdjustmentPlan n : newestConfMap.entrySet()){// 只在新map中存在 即为新增if(! historyConfMap.containsKey(n.getKey())){result.add(n.getValue());}}return result;}/*** 找到修改的用户配置*/private ListWarehouseAdjustmentPlan findModifyList(MapString, WarehouseAdjustmentPlan newestConfMap, MapString, WarehouseAdjustmentPlan historyConfMap) {ListWarehouseAdjustmentPlan result new ArrayList();for(Map.EntryString, WarehouseAdjustmentPlan n : newestConfMap.entrySet()){// 新老map同时存在 并且md5值不一样 即为修改if(historyConfMap.containsKey(n.getKey())){String newMd5 CommUtil.getMD5(JSON.toJSONString(n.getValue()));String oldMd5 CommUtil.getMD5(JSON.toJSONString(historyConfMap.get(n.getKey())));if(!newMd5.equals(oldMd5)){result.add(n.getValue());}}}return result;}/*** 找到删除的用户配置*/private ListWarehouseAdjustmentPlan findDelList(MapString, WarehouseAdjustmentPlan newestConfMap, MapString, WarehouseAdjustmentPlan historyConfMap) {ListWarehouseAdjustmentPlan result new ArrayList();for(Map.EntryString, WarehouseAdjustmentPlan h : historyConfMap.entrySet()){// 只在老的map中存在 即为删除if(! newestConfMap.containsKey(h.getKey())){result.add(h.getValue());}}return result;}/*** 添加任务* param taskId* param cronExp*/public void addTask(String taskId, String cronExp){if(existedTask.containsKey(taskId)){log.info(任务添加失败重复。{}, taskId);return;}cronExp CommUtil.corn7To6(cronExp);try {// 执行的具体内容Runnable task ()-{PlanningCommitReq req new PlanningCommitReq();req.setPlanId(taskId);req.setUserId(sys);req.setUserName(sys);// 生成任务warehouseAdjustmentService.commit(req);log.info(计划提交成功。planId:{},taskId);};// 组成具体任务CronTask cronTask new CronTask(task,cronExp);ScheduledTask scheduledTask taskRegistrar.scheduleCronTask(cronTask);// 保存任务信息existedTask.put(taskId,scheduledTask);log.info(任务添加成功。{}, taskId);}catch (Exception e){log.info(任务添加失败。{}, taskId, e);}}/*** 修改任务* param taskId* param cronExp*/public void modifyTask(String taskId,String cronExp){if(! existedTask.containsKey(taskId)){log.info(任务修改失败不存在。{}, taskId);return;}cronExp CommUtil.corn7To6(cronExp);try {ScheduledTask currTask existedTask.get(taskId);Runnable runnable currTask.getTask().getRunnable();// 停止currTask任务currTask.cancel();// 重新添加并修改触发时间ScheduledTask newTask taskRegistrar.scheduleCronTask(new CronTask(runnable, cronExp));// 保存修改后的任务信息existedTask.put(taskId,newTask);log.info(任务修改成功。{}, taskId);}catch (Exception e){log.info(任务修改失败。{}, taskId, e);}}/*** 停止任务* param taskId*/public void stopTask(String taskId){if(! existedTask.containsKey(taskId)){log.info(任务删除失败不存在。{}, taskId);return;}try{ScheduledTask currTask existedTask.get(taskId);// 停止currTask任务currTask.cancel();// 删除任务信息existedTask.remove(taskId);log.info(任务删除成功。{}, taskId);}catch (Exception e){log.info(任务删除失败。{}, taskId, e);}}
}