做食品网站,网站建设的价格,网站的统计代码,wordpress博客整站源码深入理解定时任务线程池ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor作用与用法ScheduledThreadPoolExecutor内部执行流程DelayedWorkQueueScheduledFutureTask源码分析任务提交ScheduledFutureTask的属性和方法delayedExecute(t) 任务执行ScheduledFutureTask.su… 深入理解定时任务线程池ScheduledThreadPoolExecutor ScheduledThreadPoolExecutor作用与用法ScheduledThreadPoolExecutor内部执行流程DelayedWorkQueueScheduledFutureTask源码分析任务提交ScheduledFutureTask的属性和方法delayedExecute(t) 任务执行ScheduledFutureTask.super.run()ScheduledFutureTask.super.runAndReset()setNextRunTime()reExecutePeriodic(outerTask) DelayedWorkQueue小顶堆DelayedWorkQueue成员变量void add(Runnable e)void grow()void siftUp(int k, RunnableScheduledFuture? key) RunnableScheduledFuture? take()RunnableScheduledFuture? finishPoll(RunnableScheduledFuture? f)void siftDown(int k, RunnableScheduledFuture? key) 总结 ScheduledThreadPoolExecutor作用与用法
ScheduledThreadPoolExecutor是一个用于执行定时任务或延时任务的线程池提交到该线程池中的任务会等待执行时间到了才会被执行。与此相对的是ThreadPoolExecutor提交到ThreadPoolExecutor中的任务只要ThreadPoolExecutor中有空闲线程就会被马上执行如果ThreadPoolExecutor中没有空闲线程则会把任务放入队列。而提交到ScheduledThreadPoolExecutor中的任务不管此时ScheduledThreadPoolExecutor有没有空闲线程任务都会被放入到队列里去等待任务执行时间到期时被线程从队列中取出并执行。
除此以外ScheduledThreadPoolExecutor是继承自ThreadPoolExecutor的因此ThreadPoolExecutor有的东西ScheduledThreadPoolExecutor也有。比如任务队列 BlockingQueueRunnable workQueue工作者线程集合 HashSetWorker workers。 ScheduledThreadPoolExecutor一般可以运用在一些非实时性或者非交互性的场景。比如微服务的注册中心就可以通过定时任务扫描没有在规定时间之内续约的服务将其下线。 提交到ScheduledThreadPoolExecutor的任务有两种类型一种时定时任务一种时延时任务。我们可以调ScheduledThreadPoolExecutor不同的方法来提交不同类型的任务。
如果要提交的任务是延时任务我们可以调用ScheduledThreadPoolExecutor的schedule方法 public ScheduledFuture? schedule(Runnable command, // 要执行的任务不带返回值long delay, // 延迟时间TimeUnit unit) // 时间单位public V ScheduledFutureV schedule(CallableV callable, // 要执行的任务带返回值long delay, // 延迟时间TimeUnit unit) // 时间单位如果要提交定时任务可以调用ScheduledThreadPoolExecutor的scheduleAtFixedRate方法或者scheduleWithFixedDelay方法 public ScheduledFuture? scheduleAtFixedRate(Runnable command, // 要执行的任务不带返回值long initialDelay, // 延迟时间long period, // 执行周期TimeUnit unit) // 时间单位public ScheduledFuture? scheduleWithFixedDelay(Runnable command, // 要执行的任务不带返回值long initialDelay, // 初始延迟时间long delay, // 后面每次执行完上一次任务延迟多久执行下一次任务TimeUnit unit) // 时间单位scheduleAtFixedRate方法以固定的时间周期period运行任务前一次任务开始执行后下一次的任务会在固定的周期period之后再执行。而scheduleWithFixedDelay方法是每次执行完上一次的任务会延迟一定的时间delay之后再执行下一次的任务。
ScheduledThreadPoolExecutor内部执行流程
提交到ScheduledThreadPoolExecutor中的任务Runnable会被封装为一个Task该Task会被入队列然后检查ScheduledThreadPoolExecutor中是否有工作者线程没有的话要新建一个工作者线程添加到ScheduledThreadPoolExecutor中的工作者线程集合中ScheduledThreadPoolExecutor后台会有工作者线程从任务队列中拉取任务但是ScheduledThreadPoolExecutor中的队列是一个延时队列所以队列中的任务只有到了执行时间之后才会被执行。ScheduledThreadPoolExecutor中的延时队列内部是一个小顶堆结构任务会按照到期时间从小到大进行排序堆顶是最早到期的任务。如果一个任务它是定时任务那么它被执行完以后会更新下一次的到期时间然后重新放回到队列 DelayedWorkQueue
ScheduledThreadPoolExecutor有一个内部类DelayedWorkQueue他是一个延时阻塞队列。DelayedWorkQueue用于存放提交到ScheduledThreadPoolExecutor中的任务DelayedWorkQueue内部使用一个小顶堆存储提交到ScheduledThreadPoolExecutor中的任务放入DelayedWorkQueue中的任务会按照到期时间从小到大进行排序堆顶元素是最早到期的元素。
小顶堆是一个特殊的二叉树。这颗二叉树中的每个父节点都比它的两个字节点要小。其次这颗二叉树不是真的用二叉树的结构来实现的而是用一个数组实现的也就是用一个数组去模拟一个符合小顶堆结构的二叉树。比如父节点是数组下标n那么左子节点的数组下标是2n1右子节点的下标就是2n2。
每次往DelayedWorkQueue中放入任务时都会从堆底往堆顶做向上调整。每次从DelayedWorkQueue中获取任务后堆底任务会被提到堆顶然后从堆顶到堆底做一次堆的向下调整。 DelayedWorkQueue中有一个Thread类型的leader变量存放等待堆顶任务到期的线程。因为ScheduledThreadPoolExecutor中可能有许多线程线程从ScheduledThreadPoolExecutor中获取任务是从DelayedWorkQueue里面的堆顶获取但是堆顶任务只能由一个线程执行那么该由哪个线程执行呢那就是leader线程是谁就由谁执行。
那么其他非leader的线程呢这些线程就要在DelayedWorkQueue内部的一个Condition类型的条件队列available中进行等待。当一个线程成功从DelayedWorkQueue中取走一个任务时会唤醒available中的一个线程此时这个线程就可以去竞争当上leader线程了。 ScheduledFutureTask
我们提交到ScheduledThreadPoolExecutor中的任务都是Runnable类型的。但是ScheduledThreadPoolExecutor需要标记这些Runnable对象什么时候到期被执行并且这些Runnable之间要互相比较到期时间好让它们在DelayedWorkQueue的堆中被从小到大排序。因此ScheduledThreadPoolExecutor使用了ScheduledFutureTask类型去封装被提交进来的Runnable对象。
ScheduledFutureTask对象使用一个time属性记录下一次执行的时间使用一个sequenceNumber记录自己的序号。ScheduledFutureTask重写了Comparable接口的compareTo方法。compareTo方法与其他任务比较时首先会比较time属性谁更小谁的到期时间就更早那么在堆中的排序就越靠近堆顶如果time属性相等则比较谁的sequenceNumber更小。
ScheduledFutureTask还有一个period属性记录执行任务的时间间隔这个属性可以用于计算下一次的执行时间time。 源码分析
下面我们进入ScheduledThreadPoolExecutor的内部阅读ScheduledThreadPoolExecutor的源码了解它的运行逻辑和核心原理。
任务提交
我们以ScheduledThreadPoolExecutor的scheduleAtFixedRate方法为例看看任务被提交到ScheduledThreadPoolExecutor之后是怎么处理。 public ScheduledFuture? scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command null || unit null)throw new NullPointerException();if (period 0)throw new IllegalArgumentException();// step1ScheduledFutureTaskVoid sft new ScheduledFutureTaskVoid(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFutureVoid t decorateTask(command, sft);sft.outerTask t;// step2delayedExecute(t);return t;}总体上分两步。
第一步step1是把我们传进来的Runnable封装为一个ScheduledFutureTask对象。封装为ScheduledFutureTask对象是为了更方便的计算任务的执行时间以及在堆中的排序。
ScheduledFutureTask利用一个time属性记录下一次任务的执行时间调用ScheduledFutureTask的setNextRunTime()方法会自动计算下一次的执行时间并更新time属性。ScheduledFutureTask实现了Comparable接口compareTo方法会比较两个任务的执行时间谁更快到期谁的排序就更优先。
然后第二步step2就是调用delayedExecute(t)把ScheduledFutureTask对象放入到ScheduledThreadPoolExecutor的队列中等待执行时间到期被ScheduledThreadPoolExecutor中的Worker线程取走并执行。
ScheduledFutureTask的属性和方法
既然第一步把我们传进来的Runnable对象封装为了一个ScheduledFutureTask我们看看ScheduledFutureTask内部到底有什么。 private class ScheduledFutureTaskVextends FutureTaskV implements RunnableScheduledFutureV {// 序列号每个ScheduledFutureTask都有一个唯一的序列号顺序递增private final long sequenceNumber;// 下一次的执行时间private long time;// 执行周期private final long period;// 当前任务执行完后重新放回队列的任务一般就是当前任务本身RunnableScheduledFutureV outerTask this;// 当前任务在DelayedWorkQueue的堆数组中的下标int heapIndex;}以上是ScheduledFutureTask的属性。
sequenceNumber是ScheduledThreadPoolExecutor分配给当前ScheduledFutureTask的序列号作用就是在比较两个任务的排序优先级时如果time属性相同会进一步拿sequenceNumber进行比较
time记录的是当前ScheduledFutureTask下一次执行的时间。
period是任务的执行周期用于计算下一次的执行时间计算结果会赋值给time。
outerTask就是当前的ScheduledFutureTask对象自己用于重回队列时作为参数传递。如果我们设置outerTask为其他的ScheduledFutureTask对象那么下一次执行的就是不同的任务。如果不做修改的话就是当前ScheduledFutureTask对象自己。
heapIndex是记录当前ScheduledFutureTask对象在DelayedWorkQueue的堆数组中的下标。有了heapIndex属性之后就可以很快速的从堆数组中找到对应的ScheduledFutureTask对象比如我们判断一个ScheduledFutureTask对象是否在堆中就可以拿到ScheduledFutureTask的heapIndex属性从堆数组中取出heapIndex下标对应的ScheduledFutureTask对象判断两个对象是否相等相等表示当前ScheduledFutureTask对象在堆中而DelayedWorkQueue的contains方法正是这样的逻辑。
delayedExecute(t)
Runnable对象封装为ScheduledFutureTask对象后下一步就是要把它放入到队列中并检查ScheduledThreadPoolExecutor中是否有线程如果没有要创建一个保证当前任务有线程执行它。 private void delayedExecute(RunnableScheduledFuture? task) {if (isShutdown())reject(task);else {// step2.1super.getQueue().add(task);if (isShutdown() !canRunInCurrentRunState(task.isPeriodic()) remove(task))task.cancel(false);else// step2.2ensurePrestart();}}非核心逻辑不看就看重点的两行代码。
step2.1super.getQueue().add(task); 就是把当前ScheduledFutureTask对象放入队列中。super.getQueue()是调用父类ThreadPoolExecutor的getQueue()获取线程池中的阻塞队列ScheduledThreadPoolExecutor的构造方法创建的队列是DelayedWorkQueue所以这里获取到的时DelayedWorkQueue。然后调用DelayedWorkQueue的add方法把ScheduledFutureTask对象放入DelayedWorkQueue中。
step2.2ensurePrestart()在任务放入队列之后被调用用于检测ScheduledThreadPoolExecutor是否需要创建线程如果需要的话会创建一个线程。 由于DelayedWorkQueue的add方法比较复杂我们放到后面再看先看完大体流程。这里我们暂时先理解为放入队列中的任务会按到期执行时间从小到大排好序。
下面看看ensurePrestart()方法如何判断是否需要创建线程。 void ensurePrestart() {int wc workerCountOf(ctl.get());if (wc corePoolSize)addWorker(null, true);else if (wc 0)addWorker(null, false);}如果ScheduledThreadPoolExecutor中的线程数小于核心线程数那不管ScheduledThreadPoolExecutor中有没有已经创建的线程都会再创建一个线程目的是为了让ScheduledThreadPoolExecutor尽快达到核心线程数。如果已经达到了核心线程数那么就不会再创建线程。
下面的 else if 分支是当我们设置ScheduledThreadPoolExecutor的核心线程数为0时会进入的一个逻辑那么就看ScheduledThreadPoolExecutor中有没有线程没有就创建有就不创建。 创建线程的方法是addWorker方法这个是ThreadPoolExecutor的方法会创建一个Thread线程对象并包装成Worker对象放入到一个 HashSetWorker workers 集合中最后会调用Thread对象的start()方法启动线程。
线程启动后会在一个while循环中不停地从队列中拉取任务并执行拉取任务就是调用BlockingQueue的take()方法这里就是DelayedWorkQueue的take()方法。 由于addWorker方法是ThreadPoolExecutor中的方法不是本篇文章的重点而且ThreadPoolExecutor的源码是非常简单的应该是新手都能看得懂这里就不展开分析了。
至于DelayedWorkQueue的take()方法稍微有点复杂我们也是放到后面进行分析。这里我们暂时先理解线程会等待队列中最早到期的任务到期后取走。
任务执行
当任务到期被leader线程从队列中取出后任务ScheduledFutureTask的run()方法就会被执行。 public void run() {boolean periodic isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);// 延时任务else if (!periodic)ScheduledFutureTask.super.run();// 定时任务else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}}如果是延时任务那么之后执行一次不会重回队列直接调用父类的run()方法运行任务然后会进入到FutureTask的run()方法中。如果是定时任务会周期性的运行调用父类的runAndReset()方法进入到FutureTask的runAndReset()方法然后调用setNextRunTime()设置下一次的运行时间然后调用reExecutePeriodic(outerTask)方法把任务重回队列。 ScheduledFutureTask.super.run()
ScheduledFutureTask的父类是FutureTaskScheduledFutureTask.super.run() 会进入 FutureTask的run()方法。 public void run() {if (state ! NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {CallableV c callable;if (c ! null state NEW) {V result;boolean ran;try {// 执行Callable的call()方法正在执行任务result c.call();ran true;} catch (Throwable ex) {result null;ran false;setException(ex);}if (ran)set(result);}} finally {runner null;int s state;if (s INTERRUPTING)handlePossibleCancellationInterrupt(s);}}FutureTask的run()方法调用Callable的call()方法真正执行任务这里执行的不是Runnable的run()方法是因为我们的Runnable对象被转成了一个Callable的实现类最后会调到我们的Runnable的run()方法。 ScheduledFutureTask.super.runAndReset()
执行 ScheduledFutureTask.super.runAndReset() 会进入到 FutureTask的runAndReset()方法。 protected boolean runAndReset() {if (state ! NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran false;int s state;try {CallableV c callable;if (c ! null s NEW) {try {// 也是调用Callable的call()方法c.call();ran true;} catch (Throwable ex) {setException(ex);}}} finally {runner null;s state;if (s INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran s NEW;}FutureTask的runAndReset()方法也是调用Callable的call()方法只是不会接收返回值并设置到FutureTask的结果中。 setNextRunTime()
任务执行完之后就会调用setNextRunTime()方法更新下一次的执行时间。 private void setNextRunTime() {long p period;// 调用scheduleAtFixedRate方法提交进来的任务会进这个分支if (p 0)time p;// 调用scheduleWithFixedDelay方法提交进来的任务会进这个分支elsetime triggerTime(-p);}setNextRunTime()方法会更新ScheduledFutureTask的time属性为下一次的执行时间我们调用的scheduleAtFixedRate方法进来的任务会简单的把time加一个period周期。 reExecutePeriodic(outerTask)
设置好了任务下一次的运行时间后就会调用reExecutePeriodic(outerTask)方法把任务重新放回队列中。 void reExecutePeriodic(RunnableScheduledFuture? task) {if (canRunInCurrentRunState(true)) {// 调用DelayedWorkQueue的add方法把任务重新放回队列super.getQueue().add(task);if (!canRunInCurrentRunState(true) remove(task))task.cancel(false);elseensurePrestart();}}可以看到就是调用DelayedWorkQueue的add方法把任务重新放回队列。 以上就是ScheduledThreadPoolExecutor的核心流程包含了任务被提交到ScheduledThreadPoolExecutor之后的处理从队列中被取出执行执行完成后计算下次执行时间然后重回队列的整个过程。 但是在我们上面的流程分析中我们跳过了DelayedWorkQueue的相关方法也就是任务入队列时的DelayedWorkQueue#add方法和任务出队列时的DelayedWorkQueue#take方法下面我们就着重分析DelayedWorkQueue的相关源码。
DelayedWorkQueue
小顶堆
要理解DelayedWorkQueue的原理首先要熟悉小顶堆。
小顶堆是一个特殊的二叉树。它不是全局有序但是保证每个父节点都比两个子节点要小。而且小顶堆通常不会真的使用树结构来实现而是用一个数组来模拟了树结构每个树节点在数组中都有对应的下标获取子节点也是通过下标换算来取得。
当需要获取父节点的子节点时假如父节点的下标是n那么左子节点的下标就是2n1右子节点的下标就是2n2。当需要通过子节点寻找父节点时假设子节点的下标是n那么父节点就是 (n - 1) / 2。
当向小顶堆中加入新节点时先添加在堆底也就是添加在数组的最末尾。然后通过下标换算找到父节点的位置与父节点进行比较发现比父节点小则跟父节点交换位置然后继续通过下标换算找到新的父节点再次比较直到某一次比较发现比父节点大则不再交换位置此时新节点就放在这个位置上整个过程是不断的把新节点从堆底往上提所以叫做堆的向上调整。
下面是一个往小顶堆添加新节点的例子
假设现在有一个小顶堆[1, 4, 3, 8, 5, 6, 7]我们要往堆中加入值为2的节点。那么首先是把它放到数组尾部堆底[1, 4, 3, 8, 5, 6, 7, 2]现在新节点的下标是7通过下标运算 (7 - 1) / 2 3下标为3的节点就是父节点与父节点进行比较发现 2 8因此和父节点交换位置新节点来到下标为3的位置 [1, 4, 3, 2, 5, 6, 7, 8] 进入下一轮再次通过下标换算 (3 - 1) / 2 1找到下标为1的父节点与父节点进行比较发现 2 4那么继续跟父节点交换位置新节点来到下标1的位置 [1, 2, 3, 4, 5, 6, 7, 8]。 进入下一轮再次通过下标换算 (1 - 1) / 2 0找到下标为0的父节点与父节点进行比较发现 2 1不再跟父节点交换位置新节点停留在下标为1的位置调整完后堆就是 [1, 2, 3, 4, 5, 6, 7, 8]。 以上就是新节点加入小顶堆的过程。我们再来看一个从堆中弹出节点的例子。
还是上面的堆结构假如我们要从堆中弹出堆顶节点也就是值为1的节点那么在把堆顶节点1作为结果返回之前会进行堆调整使得堆再次符合小顶堆的规则。
首先把堆底节点节点8提到堆顶然后数组长度减1。 此时节点8来到下标0的位置也就是堆顶开始做向下调整。根据下标换算找到子节点2和3它要和最小的子节点PK也就是跟2PK发现比2大那么8和2交换位置把2提上来。 进入下一轮此时8来到了下标1的位置通过下标换算找到两个子节点4和5与最小的子节点4进行PK发现比4大继续交换位置。 此时节点8已经来到堆底了那么堆的向下调整结束。 可以发现通过堆调整小顶堆总是可以保证堆顶元素最小并且数值越小的元素就越靠近堆顶会越先被取走。如果堆中的节点是定时任务那么图中节点的数值就是到期时间小顶堆总是可以保证最快到期的任务总是在堆顶我们取任务时直接取堆顶任务判断是否到期即可。
了解了小顶堆的原理我们就可以来看DelayedWorkQueue的源码了。
DelayedWorkQueue成员变量 static class DelayedWorkQueue extends AbstractQueueRunnableimplements BlockingQueueRunnable {private static final int INITIAL_CAPACITY 16;private RunnableScheduledFuture?[] queue new RunnableScheduledFuture?[INITIAL_CAPACITY];private final ReentrantLock lock new ReentrantLock();private int size 0;private Thread leader null;private final Condition available lock.newCondition();} queue 是用于存放 ScheduledFutureTask 的堆数组初始化容量是16RunnableScheduledFuture是ScheduledFutureTask 的父类。
lock是ReentrantLock可重入锁对象在向DelayedWorkQueue添加任务元素和获取任务时需要先加锁。
size是当前堆大小也就是堆中有多少个节点。这个跟数组中的元素个数不一样数组中的元素个数有可能比size多但是数组中有可能有一部分是无效元素。
leader线程是等待堆顶任务到期取走的线程。
available是一个Condition条件队列用于存放排队等待获取任务的线程非leader线程会在available中一直等待直到被唤醒leader线程会在available中等待堆顶任务的到期时间带时长的等待到期自动唤醒。 void add(Runnable e)
接下来看一下add方法add方法是把一个Runnable对象放入到队列中这里的Runnable就是ScheduledFutureTaskScheduledFutureTask 间接继承了Runnable接口。 public boolean add(Runnable e) {return offer(e);}add方法调用了offer方法。 public boolean offer(Runnable x) {if (x null)throw new NullPointerException();RunnableScheduledFuture? e (RunnableScheduledFuture?)x;// 1.先上锁final ReentrantLock lock this.lock;lock.lock();try {int i size;// 2.判断是否需要进行数组扩容如果需要则调用grow方法进行扩容if (i queue.length)grow();size i 1;// 3.把任务放入堆中if (i 0) {// 堆中没有任务则直接放在堆顶不用做调整queue[0] e;setIndex(e, 0);} else {// 把任务放入堆中然后从底向上做调整siftUp(i, e);}// 4.如果堆顶任务是刚刚入队列的任务那么重置leader唤醒available中的线程去竞争leaderif (queue[0] e) {leader null;available.signal();}} finally {// 5.释放锁lock.unlock();}return true;}一共分5步
先获取ReentrantLock锁获取到ReentrantLock锁的才能往下进行获取不到锁的线程要在AQS的同步队列中阻塞等待。判断是否需要进行数组扩容如果需要则调用grow方法进行扩容。把任务放入堆中。这里如果判断堆中没有任务那么直接放到堆顶就可以没有必要做堆调整否则就要把任务放到堆底然后从底往上做堆调整。如果发现堆顶任务变成了刚刚添加的任务那么就要重置leader唤醒available中的一个线程去竞争leader。因为原来的leader等待的任务已被拉下去了不再是堆顶任务自然等待这个任务到期的线程也不能继续当leader。最后就是释放锁。 void grow()
grow()是堆数组进行扩容的方法当数组中元素的个数已满时再往堆中添加任务会调用该方法进行扩容。 private void grow() {int oldCapacity queue.length;int newCapacity oldCapacity (oldCapacity 1);if (newCapacity 0) // overflownewCapacity Integer.MAX_VALUE;queue Arrays.copyOf(queue, newCapacity);}oldCapacity是老数组的长度oldCapacity (oldCapacity 1) 就是 oldCapacity的1.5倍得到新数组长度然后把老数组的元素复制到新数组。 void siftUp(int k, RunnableScheduledFuture? key)
然后再看一下堆向上调整的方法。 private void siftUp(int k, RunnableScheduledFuture? key) {while (k 0) {int parent (k - 1) 1;RunnableScheduledFuture? e queue[parent];if (key.compareTo(e) 0)break;queue[k] e;setIndex(e, k);k parent;}queue[k] key;setIndex(key, k);}k是新任务当前所处的位置key就是新加入的任务。
while (k 0)表示一直循环向上调整直到来到堆顶就停止或者中途break掉。
int parent (k - 1) 1; 就是找到当前位置k的父节点的位置(k - 1) 1 就是 (k - 1) / 2只不过这里用了位移运算加速。
RunnableScheduledFuture? e queue[parent]; 就是根据父任务下标parent获取到父任务e。
if (key.compareTo(e) 0) break; 这一行的意思就是如果当前任务的过期时间比父任务大那么调整结束新任务就放在位置k。因为 key.compareTo(e) 返回大于等于0的数值表示新任务key的过期时间比父任务e的过期时间大而当前又是小顶堆过期时间越小的越靠近堆顶所以新任务key只能放在父任务e的下面。
queue[k] e; 就是上面的if分支没有进去代表新任务key的过期时间比父任务e的过期时间要小那么新任务key是应该要处于比父任务e更靠上的位置的因此这里先把父任务e拉下来也就是是移到新任务的当前位置k然后新任务key继续向上跟更靠上的父任务做比较。
setIndex(e, k); 就是更新父任务在堆中的位置
k parent; 就是更新新任务的在数组中的位置方便做下一轮的比较。
当跳出while循环后k的值就是新任务在数组中的位置queue[k] key; 就是把新任务放到它该放的位置。
最后 setIndex(key, k); 更新新任务在堆中的位置。 RunnableScheduledFuture? take()
看完add方法我们再来看take方法。take方法是从队列中获取任务的方法该方法会阻塞当前线程直到获取到任务为止。 public RunnableScheduledFuture? take() throws InterruptedException {// 1.先获取锁final ReentrantLock lock this.lock;lock.lockInterruptibly();try {for (;;) {RunnableScheduledFuture? first queue[0];// 2.如果堆顶任务为空在available中等待if (first null)available.await();else {long delay first.getDelay(NANOSECONDS);// 3.如果堆顶任务已经到期了取走取走前要调做堆调整if (delay 0)return finishPoll(first);first null;// 4.如果此时leader线程不为null要在available中等待if (leader ! null)available.await();else {// 5.设置当前线程为leader线程然后在available等待堆顶任务到期醒来后置空leaderThread thisThread Thread.currentThread();leader thisThread;try {available.awaitNanos(delay);} finally {if (leader thisThread)leader null;}}}}} finally {// 6.leader为空堆中还有任务唤醒available中等待的线程if (leader null queue[0] ! null)available.signal();// 7.释放锁lock.unlock();}}总共分7步
还是先获取锁防止并发问题。获取锁成功后就进入一个for循环自旋直到取到任务。如果检查到如果堆顶任务为空在available中等待。堆顶任务不为空并且堆顶任务已经到期了那么就取走堆顶任务并执行但是在取走前要调做堆调整finishPoll(first)就是进行堆调整的方法。堆顶任务没到期并且此时leader线程不为null当前线程就要在available中等待。堆顶任务未到期并且此时leader线程为null那么就设置当前线程为leader线程然后在available进行带超时时间的等待等待时间就是堆顶任务的到期时间。到期醒来后会设置leader为null然后就是进入下一轮循环时取走堆顶任务。最后退出方法前检查到leader为null并且堆中还有任务那么唤醒available中的一个线程。 RunnableScheduledFuture? finishPoll(RunnableScheduledFuture? f)
finishPoll方法做的事情就是在取走堆顶任务之前对小顶堆做调整然后才把堆顶任务返回。 private RunnableScheduledFuture? finishPoll(RunnableScheduledFuture? f) {int s --size;RunnableScheduledFuture? x queue[s];queue[s] null;if (s ! 0)siftDown(0, x);setIndex(f, -1);return f;}int s --size; 是堆元素个数减1。
RunnableScheduledFuture? x queue[s]; 获取到的是堆底任务准备用这个任务来做堆调整。
queue[s] null; 把堆底置空。
siftDown(0, x); 真正做堆调整的方法从堆顶数组下标为0位置向下做堆调整。
setIndex(f, -1); 设置原先的堆顶任务也就是即将要作为结果返回的任务的位置为-1表示该任务已被取走。
return f; 返回原先的堆顶任务堆顶任务是作为参数传进来的原封不动返回出去。 void siftDown(int k, RunnableScheduledFuture? key)
我们再来看一下真正做堆调整的siftDown方法siftDown方法是拿着原先的堆底任务作为当前任务去做堆的向下调整它跟我们上面描述的从顶向下做堆调整的流程是一致的。 private void siftDown(int k, RunnableScheduledFuture? key) {int half size 1;while (k half) {int child (k 1) 1;RunnableScheduledFuture? c queue[child];int right child 1;if (right size c.compareTo(queue[right]) 0)c queue[child right];if (key.compareTo(c) 0)break;queue[k] c;setIndex(c, k);k child;}queue[k] key;setIndex(key, k);}int half size 1; 取到的是堆大小一半的位置half然后 while (k half) 循环直到当前位置k过了half 就停止。因为过了位置half就是小顶堆的最底一层的位置没必要再往下了。
然后再下面 int child (k 1) 1; 就是获取左孩子的位置也就是 k * 2 1然后int right child 1;就是右孩子的位置。
if (right size c.compareTo(queue[right]) 0) c queue[child right]; c是最终要和当前父节点PK的子任务这里就是两个子任务PK一下看谁胜出就拿谁去跟父节点PK。
if (key.compareTo© 0) break; 就是如果父节点的任务到期时间比子任务小那么父节点就放在当前位置k无需再往下调整。
queue[k] c; 就是把子节点往上提到当前位置k因为执行到这里代表当前父节点没有PK过子节点所以自然要把子节点往上提。
setIndex(c, k); 更新子节点在堆数组的位置记录。
k child; 当前位置k更新为子节点原先的位置方便做下一轮调整。
queue[k] key; 就是在while循环结束后得到了当前任务该放的位置k把当前任务放在位置k。
setIndex(key, k); 最后更新当前任务在堆数组的位置记录。 总结
到这里整个ScheduledThreadPoolExecutor的原理和源码都分析完毕了。
总体逻辑
用了一个延时队列DelayedWorkQueue去存放任务延时队列又使用一个小顶堆去存放任务小顶堆中的任务ScheduledFutureTask会按照到期时间time从小到大进行排序。当我们提交定时任务到ScheduledThreadPoolExecutor时任务会被放入小顶堆中然后会从底往上做堆调整。任务放入堆中之后检查是否有必要创建新线程如果有必要则创建新线程。ScheduledThreadPoolExecutor中的线程会不停的循环从DelayedWorkQueue中获取任务并执行。线程从DelayedWorkQueue中获取任务时并不是想取就能取而是要当上了leader才能取。leader线程每次取任务都是从堆顶取走一个而且要等待堆顶任务到期才能取走堆顶任务还没到期前leader线程会在available条件队列中等待一定的时间后自动唤醒其他非leader线程就要在available条件队列中等待leader线程执行完任务唤醒。每次取走一个任务后会拿到堆底任务从堆顶向下做堆调整。定时任务执行完成后会计算并更新下一次的执行时间然后重写放回到队列中。 本篇文章到此全部结束。