网站分几类,十大牌子网,中山专业外贸网站建设,软件开发外包方案目录 背景
思路
实现逻辑
总结 背景 在使用xxl-job框架时#xff0c;由于系统是由线程池去做异步逻辑#xff0c;然后主线程等待#xff0c;在控制台手动停止时#xff0c;会出现异步线程不感知信号中断的场景#xff0c;如下场景 而此时如果人工在控制台停止xxl-job执…目录 背景
思路
实现逻辑
总结 背景 在使用xxl-job框架时由于系统是由线程池去做异步逻辑然后主线程等待在控制台手动停止时会出现异步线程不感知信号中断的场景如下场景 而此时如果人工在控制台停止xxl-job执行异步任务并不会感知到调度线程被interrupt了上面3个异步任务仍旧执行而主线程却退出了如果此时再次调度该任务而代码逻辑没做幂等可能出现预期外的异常
思路 先看看xxl-job trigger的时序图 原图plantuml startuml
https://plantuml.com/sequence-diagram!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用actor User as userbox xxl-job-admin #LightGrayparticipant controller as controllerparticipant trigger as triggerparticipant executor-proxy as proxyparticipant adminBiz as admin
end boxbox xxl-job-client #LightGray
participant executor as executor
participant jobThread as job
participant callBackThread as callback
participant retryCallThread as retryCallBack
end boxautonumber 1
user - controller:手动调度 /jobinfo/trigger
controller-trigger: jobId/触发类型/参数
trigger-trigger:提交trigger任务group 异步流程trigger-trigger:根据jobId获取jobInfotrigger-trigger:获取执行器信息note left已注册的机器地址列表end notealt 分片广播looptrigger-trigger:遍历触发end loopelse 其他trigger-trigger:单个触发endend groupreturn 返回提交结果 异步rpc触发autonumber 1group 触发流程trigger-trigger:获取路由策略阻塞策略trigger-trigger:根据路由策略获取需调度的机器地址trigger - proxy:获取执行器代理对象缓存note leftjdk代理nettyxxljob的log是客户端记录在本地文件admin调用时也通过代理调用远端接口end noteproxy-executor:远程调用(传递触发信息)executor-executor:根据jobId获取执行线程executor-executor:获取job执行器alt 执行线程不为空executor-executor:根据阻塞策略处理endalt 执行线程为空executor-executor:新建job线程endexecutor-job:把任务参数加入阻塞队列job-job:jobId去重return:返回结果return:返回结果end group 异步jobThread autonumber 1job-job:执行handler init 方法loop toStopfalsejob-job:从阻塞队列中获取任务参数job-job:准备工作note left状态设置为运行中空闲次数0去除jobId设置logFile分片信息end notealt 超时时间0job-job:新建线程处理handler信息elsejob-job:本线程处理handler信息endjob-job:把执行结果or终止结果加入callback阻塞队列end loopjob-job:清除阻塞队列里的待任务note left此时已经该线程已经被停止了end note 异步callBackThread autonumber 1loop toStopfalsecallback-callback:从callback阻塞队列中获取callback参数alt 获取成功callback-callback:清空当前阻塞队列中的参数并将其放到一个新的listloop 遍历admin列表callback-controller:调用callback接口controller-admin:调用callback逻辑alt 任务处理成功admin-admin:获取job信息admin-admin:获取子任务信息loop 遍历子任务admin-trigger:提交trigger任务end loopadmin-admin:更新job信息endreturn:返回回调结果callback-callback:记录日志到本地文件alt 回调失败callback-callback:记录序列化后的失败参数用于重试endend loopendend loop重试retryCallBack
autonumber 1loop toStopfalseretryCallBack-retryCallBack:获取本地重试文件信息retryCallBack-retryCallBack:反序列化内容重试callback请求end loop
enduml
主要关注异步JobThread部分可以看出是有个toStop的flag去感知这个中断信号的那怎么去获取toStop的信息呢这里可以通过起另一个线程去检查这个信号如果为stop则透传到异步task中,设计流程如下 原图plantuml
startuml
https://plantuml.com/sequence-diagram!theme plain
autonumber
hide footbox
skinparam ParticipantPadding 50
skinparam BoxPadding 150
title xxljob接口调用actor xxl-job-admin as userbox xxl-job-client #LightGrayparticipant xxl-client as clientparticipant xxl-main-thread as mainThreadparticipant check-interrupt-thread as checkThreadparticipant async-task... as asyncThread
end boxautonumber 1
user - client:手动调度 /jobinfo/trigger
client-client:加入任务队列
return
client--mainThread:获取队列任务执行
mainThread-mainThread:init
mainThread-checkThread:定期检查mainThread的 stopFlag属性
loop
checkThread-checkThread:定期检查停止属性属性end loop
mainThread-mainThread:初始化完毕
mainThread-asyncThread:分发任务
asyncThread--asyncThread:任务执行
mainThread-mainThread:等待子任务执行完成
user-client:手动中断任务
client-client:捞取jobId对应的线程
client-mainThread:调用暂停方法interrupt设置 stopFlag
mainThread--client:返回暂停结果
mainThread-mainThread:等待执行中的子任务完成
checkThread-asyncThread:设置给子任务 stopFlag
asyncThread-asyncThread:业务逻辑判断 stopFlag
return:stop
mainThread-mainThread:等待检查线程完成
return:check-thread end
mainThread-mainThread:后置处理
return:stop
enduml
即对于异步的任务可以做一个封装用于接受中断信号而信息的传递则通过threadLocal复制的方式给到异步任务主要是解决中断信号如何传递到异步任务的问题异步任务可以通过某个方法来获取主线程是否中断
要点如下
感知xxl-job主线程的中断信号传递中断信号到异步任务异步任务执行的方法可以手动调用某个方法判断是否中断进而更快地停止任务 实现逻辑
定义异步任务封装类用于接受信息
public class TaskWrapperT implements Runnable {private Runnable runnable;private volatile boolean isInterrupt;private SupplierT supplier;private T result;private final String taskId;private MapString, String copyMdc null;//有需要传递的变量可以通过context传递private MapString, Object executeContext null;Throwable errorCause;TaskWrapper(Runnable runnable, String taskId) {this.runnable runnable;this.isInterrupt false;this.taskId taskId;copyMdc MDC.getCopyOfContextMap();executeContext XxlShardingTask.getCopyOfContext();}TaskWrapper(SupplierT supplier, String taskId) {this.supplier supplier;this.isInterrupt false;this.taskId taskId;copyMdc MDC.getCopyOfContextMap();executeContext XxlShardingTask.getCopyOfContext();}Overridepublic void run() {if (!CollectionUtils.isEmpty(copyMdc)) {MDC.setContextMap(copyMdc);}if (!CollectionUtils.isEmpty(executeContext)) {XxlShardingTask.setExecuteContext(executeContext);}XxlShardingTask.setWrapper(this);try {if (isInterrupt) {return;}if (runnable ! null) {runnable.run();}if (supplier ! null) {result supplier.get();}} finally {MDC.clear();XxlShardingTask.removeContext();}}static boolean isInterrupt() {return Optional.ofNullable(XxlShardingTask.getFromContext(XxlShardingTask.EXECUTE_KEY)).map(e - ((TaskWrapper?) e).interrupted()).orElse(Boolean.FALSE);}public T getResult() {return result;}public String getTaskId() {return taskId;}public Throwable getErrorCause() {return errorCause;}/*** 是否成功** return*/public boolean isSuccess() {return !isInterrupt errorCause null;}public boolean interrupted() {return isInterrupt;}synchronized void setInterrupt() {this.isInterrupt true;}
}
在xxljob的主线程初次调用时会调用init方法定一个handler继承xxljob的IJobHandler并实现
他的init方法新建检查线程用于check中断信号执行过程中会把当前在跑的任务丢到一个map中存储而检查线程会调用异步任务把对应的标志未置为停止
public abstract class XxlAsyncTaskHandlerT extends IJobHandler {
...public void init() throws InvocationTargetException, IllegalAccessException {super.init();JobThread thread (JobThread) Thread.currentThread();Field toStop ReflectionUtils.findField(JobThread.class, toStop);if (toStop null) {throw new IllegalStateException(current thread dont have field [toStop],please check the xxl-job version);}mainThreadInterrupt.set(false);ReflectionUtils.makeAccessible(toStop);checkInterruptThread new Thread(() - {try {while (!mainThreadInterrupt.get()) {TimeUnit.MILLISECONDS.sleep(getCheckInterruptMills());if ((boolean) toStop.get(thread)) {if (mainThreadInterrupt.compareAndSet(false, true)) {currentRunTask.forEach((s, tTaskWrapper) - {tTaskWrapper.setInterrupt();});}}}} catch (InterruptedException e) {//ignore} catch (Exception ex) {LOGGER.error(check interrupt error, ex);}});checkInterruptThread.start();}}
主流程(即xxl-job调度线程所执行的execute方法)通过获取待执行的任务对其进行封装并加入到当前在运行的任务map中核心的代码如下逻辑流程
从任务生成器中获取待执行的封装好的任务并加入到异步线程池执行主线程等待 while (currentTaskGenerator.hasNextTask()) {ListTaskWrapperT wrappers new ArrayList();for (int i 0; i parallelCount; i) {if (currentTaskGenerator.hasNextTask()) {TaskWrapperT nextTask currentTaskGenerator.getNextTask();String taskId nextTask.getTaskId();
//加入到当前执行中的任务currentRunTask.put(taskId, nextTask);CompletableFuture.runAsync(nextTask, executor).whenComplete((unused, throwable) - {if (throwable ! null) {currentRunTask.get(taskId).errorCause throwable;} else {if (nextTask.isSuccess()) {successCount.incrementAndGet();}}
//任务处理完countDown一下count.countDown();currentRunTask.remove(taskId);});//代表任务分配完毕} else {count.countDown();}}
//主线程等待count.await();
对于异步任务的逻辑
由于开始时设置当前执行的封装任务到本地线程可以通过static方法进行获取标识比如循环或者一些较重的耗时操作可以在执行前进行判断如果中断了就返回结果 protected static boolean isWorkerInterrupt() {return TaskWrapper.isInterrupt();}
比如继承该类子类可以在业务逻辑进行判断 while (!isWorkerInterrupt()) {
...业务逻辑
}由于整块优化的异步调度任务的代码比较多而且涉及了公司信息不在此展示重点在于
xxl-job异步线程如何感知主线程中断信息——了解xxljob trigger原理封装runnable管理当前封装的runnable任务把中断信息透传异步任务线程间的信息如何传递——这里通过封装runnable类作为一个信息载体threadLocal用于接受信息实现不同线程的信息传递