dedecms网站地图插件,设计最简单的企业网站,温州的网站设计,有的网站打开慢3.3.5 ThreadPoolExecutor的Worker工作线程
Worker对象主要包含了两个内容 ● 工作线程要执行任务 ● 工作线程可能会被中断#xff0c;控制中断
// Worker继承了AQS#xff0c;目的就是为了控制工作线程的中断。
// Worker实现了Runnable#xff0c;内部的Thread对象控制中断
// Worker继承了AQS目的就是为了控制工作线程的中断。
// Worker实现了Runnable内部的Thread对象在执行start时必然要执行Worker中断额一些操作
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
// Worker管理任务
// 线程工厂构建的线程
final Thread thread;// 当前Worker要执行的任务
Runnable firstTask;
// 记录当前工作线程处理了多少个任务。
volatile long completedTasks;
// 有参构造
Worker(Runnable firstTask) {
// 将State设置为-1代表当前不允许中断线程
setState(-1);
// 任务赋值
this.firstTask firstTask;
// 基于线程工作构建Thread并且传入的Runnable是Worker
this.thread getThreadFactory().newThread(this);
}
// 当thread执行start方法时调用的是Worker的run方法
public void run() {
// 任务执行时执行的是runWorker方法
runWorker(this);
}
// Worker管理中断
// 当前方法是中断工作线程时执行的方法
void interruptIfStarted() {Thread t;
// 只有Worker中的state 0的时候可以中断工作线程
if (getState() 0 (t thread) ! null !t.isInterrupted()) {
try {
// 如果状态正常并且线程未中断这边就中断线程
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
protected boolean isHeldExclusively() {
return getState() ! 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
3.3.6 ThreadPoolExecutor的runWorker方法
runWorker就是让工作线程拿到任务去执行即可。 并且在内部也处理了在工作线程正常结束和异常结束时的处理方案
// 工作线程启动后执行的任务。
final void runWorker(Worker w) {
// 拿到当前线程
Thread wt Thread.currentThread();
// 从worker对象中拿到任务
Runnable task w.firstTask;
// 将Worker中的firstTask置位空
w.firstTask null;
// 将Worker中的state置位0代表当前线程可以中断的
w.unlock(); // allow interrupts
// 判断工作线程是否是异常结束默认就是异常结束
boolean completedAbruptly true;
try {
// 获取任务// 直接拿到第一个任务去执行
// 如果第一个任务为null去阻塞队列中获取任务
while (task ! null || (task getTask()) ! null) {
// 执行了Worker的lock方法当前在lock时shutdown操作不能中断当前线程因为当前线程正在处理任务
w.lock();
// 比较ctl STOP,如果满足找个状态说明线程池已经到了STOP状态甚至已经要凉凉了
// 线程池到STOP状态并且当前线程还没有中断确保线程是中断的进到if内部执行中断方法
// if(runStateAtLeast(ctl.get(), STOP) !wt.isInterrupted()) {中断线程}
// 如果线程池状态不是STOP确保线程不是中断的。
// 如果发现线程中断标记位是true了再次查看线程池状态是大于STOP了再次中断线程
// 这里其实就是做了一个事情如果线程池状态 STOP确保线程中断了。
if (
(
runStateAtLeast(ctl.get(), STOP) ||
( Thread.interrupted() runStateAtLeast(ctl.get(), STOP) )
)!wt.isInterrupted())
wt.interrupt();
try {
// 勾子函数在线程池中没有做任何的实现如果需要在线程池执行任务前后做一些额外的处理可以重写勾子函数
// 前置勾子函数
beforeExecute(wt, task);
Throwable thrown null;
try {
// 执行任务。
task.run();
} catch (RuntimeException x) {thrown x; throw x;
} catch (Error x) {
thrown x; throw x;
} catch (Throwable x) {
thrown x; throw new Error(x);
} finally {
// 前后置勾子函数
afterExecute(task, thrown);
}
} finally {
// 任务执行完丢掉任务
task null;
// 当前工作线程处理的任务数1
w.completedTasks;
// 执行unlock方法此时shutdown方法才可以中断当前线程
w.unlock();
}
}
// 如果while循环结束正常走到这说明是正常结束
// 正常结束的话在getTask中就会做一个额外的处理将ctl - 1代表工作线程没一个。
completedAbruptly false;
} finally {
// 考虑干掉工作线程
processWorkerExit(w, completedAbruptly);
}
}
// 工作线程结束前要执行当前方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果是异常结束
if (completedAbruptly)
// 将ctl - 1扣掉一个工作线程
decrementWorkerCount();
// 操作Worker为了线程安全加锁
final ReentrantLock mainLock this.mainLock;
mainLock.lock();
try {
// 当前工作线程处理的任务个数累加到线程池处理任务的个数属性中
completedTaskCount w.completedTasks;
// 将工作线程从hashSet中移除
workers.remove(w);
} finally {
// 释放锁
mainLock.unlock();
}
// 只要工作线程凉了查看是不是线程池状态改变了。
tryTerminate();
// 获取ctl
int c ctl.get();
// 判断线程池状态当前线程池要么是RUNNING要么是SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 如果正常结束工作线程if (!completedAbruptly) {
// 如果核心线程允许超时min 0否则就是核心线程个数
int min allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min 0可能会出现没有工作线程并且阻塞队列有任务没有线程处理
if (min 0 ! workQueue.isEmpty())
// 至少要有一个工作线程处理阻塞队列任务
min 1;
// 如果工作线程个数 大于等于1不怕没线程处理正常return
if (workerCountOf(c) min)
return;
}
// 异常结束为了避免出现问题添加一个空任务的非核心线程来填补上刚刚异常结束的工作线程
addWorker(null, false);
}
}
3.3.7 ThreadPoolExecutor的getTask方法
工作线程在去阻塞队列获取任务前要先查看线程池状态 如果状态没问题去阻塞队列take或者是poll任务 第二个循环时不但要判断线程池状态还要判断当前工作线程是否可以被干掉
// 当前方法就在阻塞队列中获取任务
// 前面半部分是判断当前工作线程是否可以返回null结束。
// 后半部分就是从阻塞队列中拿任务
private Runnable getTask() {
// timeOut默认值是false。
boolean timedOut false;
// 死循环
for (;;) {
// 拿到ctl
int c ctl.get();
// 拿到线程池的状态
int rs runStateOf(c);
// 如果线程池状态是STOP没有必要处理阻塞队列任务直接返回null
// 如果线程池状态是SHUTDOWN并且阻塞队列是空的直接返回null
if (rs SHUTDOWN
(rs STOP || workQueue.isEmpty())) {
// 如果可以返回null先扣减工作线程个数
decrementWorkerCount();
// 返回null结束runWorker的while循环
return null;
}
// 基于ctl拿到工作线程个数
int wc workerCountOf(c);
// 核心线程允许超时timed为true
// 工作线程个数大于核心线程数timed为true
boolean timed allowCoreThreadTimeOut || wc corePoolSize;if (
// 如果工作线程个数大于最大线程数。一般情况不会满足把他看成false
// 第二个判断代表只要工作线程数小于等于核心线程数必然为false
// 即便工作线程个数大于核心线程数了此时第一次循环也不会为true因为timedOut默认值是false
// 考虑第二次循环了因为循环内部必然有修改timeOut的位置
(wc maximumPoolSize || (timed timedOut))// 要么工作线程还有要么阻塞队列为空并且满足上述条件后工作线程才会走到if内部结束工作线程
(wc 1 || workQueue.isEmpty())
) {
// 第二次循环才有可能到这。
// 正常结束工作线程 - 1因为是CAS操作如果失败了重新走for循环
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
// 工作线程从阻塞队列拿任务
try {
// 如果是核心线程timed是false如果是非核心线程timed就是true
Runnable r timed ?
// 如果是非核心走poll方法拿任务等待一会
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 如果是核心走take方法死等。
workQueue.take();
// 从阻塞队列拿到的任务不为null这边就正常返回任务去执行if (r ! null)
return r;
// 说明当前线程没拿到任务将timeOut设置为true在上面就可以返回null退出了。
timedOut true;
} catch (InterruptedException retry) {
timedOut false;
}
}
}