做网站不会框架,网站开发逻辑图,ppt做的模板下载网站有哪些,广州网站设计价格文章目录 线程池ThreadPoolExecutor源码分析1、如何提交任务2、如何执行任务3、如何停止过期的非核心线程4、如何使用拒绝策略 ScheduledThreadPoolExecutor源码分析 线程池
快速过一遍基础知识 7大参数 corePoolSize #xff1a; 核心线程数 maximumPoolSize#xff1a; 最… 文章目录 线程池ThreadPoolExecutor源码分析1、如何提交任务2、如何执行任务3、如何停止过期的非核心线程4、如何使用拒绝策略 ScheduledThreadPoolExecutor源码分析 线程池
快速过一遍基础知识 7大参数 corePoolSize 核心线程数 maximumPoolSize 最大线程数 keepAliveTime 空闲线程存活时间 TimeUnit 时间单位 BlockingQueue任务队列 ThreadFactory 创建线程的工厂 RejectedExecutionHandler拒绝策略
拒绝策略 AbortPolicy中止策略线程池会抛出异常并中止执行此任务; CallerRunsPolicy把任务交给添加此任务的(main)线程来执行; DiscardPolicy忽略此任务忽略最新的一个任务; DiscardOldestPolicy忽略最早的任务最先加入队列的任务。
内置的线程池 SingleThreadExecutor单线程1 - 1 - Interge.MAX核心线程-最大线程-队列长度 FixedThreadPool固定大小N - N - Interge.MAX CachedThreadPool缓存0 - Integer.MAX - 0 ScheduledThreadPool定时线程池的另一个关于定时的分支 为什么不推荐使用内置的线程池 SingleThreadExecutor和FixedThreadPool无法控制队列长度可能导致OOM 而CachedThreadPool无法控制线程数量可能导致大量的线程创建。 ThreadPoolExecutor源码分析 先不考虑ScheduledThreadPool后面再单独说明定时线程池。 1、如何提交任务
ThreadPoolExecutor#execute public void execute(Runnable command) {if (command null)throw new NullPointerException();int c ctl.get();//新建核心线程if (workerCountOf(c) corePoolSize) {if (addWorker(command, true))return;c ctl.get();}//加入阻塞队列if (isRunning(c) workQueue.offer(command)) {//双重检测int recheck ctl.get();if (! isRunning(recheck) remove(command))reject(command);//如果当前没有正在运行的线程则新增一个非核心线程任务为null表示线程的任务将会从阻塞队列中获取else if (workerCountOf(recheck) 0)addWorker(null, false);}//新建非核心线程else if (!addWorker(command, false))reject(command);}也就是 submit和execute的区别 其实没啥太大的区别submit最后也是调用的execute只不过在调用之前封装了task为FutureTask表示有返回值的任务最后将返回值返回。 不过有一点需要注意的是。FutureTask不仅会返回结果还会把原本runnable中的异常吃了。所以submit提交的任务如果抛异常了外部是无法感知的。 FutureTask#run 测试结果
2、如何执行任务
ThreadPoolExecutor#addWorker
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (int c ctl.get();;) {if (runStateAtLeast(c, SHUTDOWN) (runStateAtLeast(c, STOP) || firstTask ! null || workQueue.isEmpty()))return false;for (;;) {//COUNT_MASK掩码舍去前3位因为前3位是状态位后面的才是任务数if (workerCountOf(c) ((core ? corePoolSize : maximumPoolSize) COUNT_MASK))return false;if (compareAndIncrementWorkerCount(c))break retry;c ctl.get();if (runStateAtLeast(c, SHUTDOWN))continue retry;}}//上面主要是ctl其他很多都是检测boolean workerStarted false;boolean workerAdded false;Worker w null;try {//新建一个worker封装了firstTask//worker也实现了Runnable相当于对firstTask封装了一层w new Worker(firstTask);//这里线程的runable实现是worker而不是firstTaskfinal Thread t w.thread;if (t ! null) {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {int c ctl.get();//一些检测if (isRunning(c) || (runStateLessThan(c, STOP) firstTask null)) {if (t.isAlive())throw new IllegalThreadStateException();workers.add(w);int s workers.size();if (s largestPoolSize)largestPoolSize s;workerAdded true;}} finally {mainLock.unlock();}if (workerAdded) {//Thread.start()-runnable.run()也就是worker.run()-runWorker(worker)t.start();workerStarted true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}addWorker新建worker对象封装了新建的线程对象和原始task。线程的执行调用如下 thread.start()-runnable.run()也就是worker.run()-runWorker(worker)
ThreadPoolExecutor#runWorker
final void runWorker(Worker w) {Thread wt Thread.currentThread();Runnable task w.firstTask;w.firstTask null;w.unlock();boolean completedAbruptly true;try {//worker的task为nulladdWorker传入的参数则从阻塞队列中获取一个taskwhile (task ! null || (task getTask()) ! null) {w.lock();//检测是否需要中止线程if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted())wt.interrupt();try {//执行前回调beforeExecute(wt, task);try {//执行任务task.run();//执行后回调afterExecute(task, null);} catch (Throwable ex) {afterExecute(task, ex);throw ex;}} finally {task null;w.completedTasks;w.unlock();}}completedAbruptly false;} finally {// finally 调用processWorkerExit(w, completedAbruptly);}}所以runWorker就是如果worker手上有task就先把手头上的task执行了然后再循环去阻塞队列获取task执行。如果没有就直接去阻塞队列获取task执行。
那么 finally 那里的 processWorkerExit 是干嘛用的
执行到processWorkerExit要么就是异常情况跳出循环completedAbruptlytrue要么就是worker手上和阻塞队列均没有task跳出循环completedAbruptlyfalse。
private void processWorkerExit(Worker w, boolean completedAbruptly) {//如果是异常退出的此时workerCount还没调整所以需要工作线程数减1if (completedAbruptly)decrementWorkerCount();final ReentrantLock mainLock this.mainLock;mainLock.lock();//更新 完成任务数以及移除workertry {completedTaskCount w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//尝试终止线程tryTerminate();int c ctl.get();//如果不是异常退出则根据配置计算需要的最小工作线程数//如果是异常退出或者当前工作线程小于上面根据配置计算的最小工作线程//则都用一个新worker来替换原来的workerif (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min allowCoreThreadTimeOut ? 0 : corePoolSize;if (min 0 ! workQueue.isEmpty())min 1;if (workerCountOf(c) min)return;}//启动一个worker替换原来的workeraddWorker(null, false);}}总之这段代码的主要作用是在工作线程退出时更新线程池的状态、计数以及根据配置来决定是否需要新的worker替代退出的工作线程以保持线程池的正常运行。
3、如何停止过期的非核心线程
答案在getTask()。
private Runnable getTask() {boolean timedOut false; // Did the last poll() time out?for (;;) {int c ctl.get();// 一些退出的状态就直接返回if (runStateAtLeast(c, SHUTDOWN) (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc workerCountOf(c);//是否需要超时淘汰boolean timed allowCoreThreadTimeOut || wc corePoolSize;//在确保当workQueue不为空时至少有一个工作线程的前提下//来淘汰超出 maximumPoolSize 或者超时的线程if ((wc maximumPoolSize || (timed timedOut)) (wc 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//阻塞获取任务Runnable r timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r ! null)return r;//标记超时timedOut true;} catch (InterruptedException retry) {timedOut false;}}}其实线程池并没有标记谁是核心线程谁是非核心线程只关心核心线程和非核心线程的数量。也就是说无论是哪个线程在获取任务时都有可能被标记为timeOut并且每次获取任务都会根据核心线程数最大线程数当前线程数timeout标记等判断是否需要当前worker如果不需要就返回null跳出runWorker的循环进而结束线程。
4、如何使用拒绝策略
在提交任务的时候如果addWorker失败就会进入拒绝策略的逻辑。 public void execute(Runnable command) {//...//加入阻塞队列if (isRunning(c) workQueue.offer(command)) {//...if (! isRunning(recheck) remove(command))//双重检测失败进入拒绝策略reject(command);//... }//新建非核心线程else if (!addWorker(command, false))//非核心线程添加失败进入拒绝策略reject(command);
}final void reject(Runnable command) {handler.rejectedExecution(command, this);
}ScheduledThreadPoolExecutor源码分析
.schedule()延迟执行只执行一次。 .scheduleAtFixedRate()固定频率执行按照固定的时间间隔来调度任务。 .scheduleWithFixedDelay()固定延迟执行在上一次任务完成后的固定延迟之后再次执行任务。
无论是哪种都会先将task封装成 ScheduledFutureTask然后调用 delayedExecute。 以scheduleAtFixedRate为例
public ScheduledFuture? scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command null || unit null)throw new NullPointerException();if (period 0L)throw new IllegalArgumentException();ScheduledFutureTaskVoid sft new ScheduledFutureTaskVoid(command,null,triggerTime(initialDelay, unit),//scheduleWithFixedDelay与scheduleAtFixedRate的区别就只在这里//scheduleWithFixedDelay 传的是 -unit.toNanos(period)//后续会根据这个值的正负来判断是固定频率还是固定延迟unit.toNanos(period),sequencer.getAndIncrement());//封装成 ScheduledFutureTask RunnableScheduledFutureVoid t decorateTask(command, sft);sft.outerTask t;//调用 delayedExecutedelayedExecute(t);return t;}delayedExecute
private void delayedExecute(RunnableScheduledFuture? task) {if (isShutdown())reject(task);else {//task添加到队列//这同样也是自己实现的一个延迟队列大概的逻辑就是先按时间排如果时间一样就按插入的顺序排。super.getQueue().add(task);//一些检测if (!canRunInCurrentRunState(task) remove(task))task.cancel(false);else//保证有足够的woker正在工作ensurePrestart();}}void ensurePrestart() {int wc workerCountOf(ctl.get());if (wc corePoolSize)//addWorker跟就上面的是一样的了addWorker(null, true);else if (wc 0)addWorker(null, false);}那么凭什么将Worker的task封装成 ScheduledFutureTask 能起到持续调用的效果来看看他的 run 方法。 ScheduledFutureTask#run public void run() {//一些检测if (!canRunInCurrentRunState(this))cancel(false);//如果不是周期性任务就只调用一次period不为0则表示不是周期性任务else if (!isPeriodic())super.run();//如果是周期性任务就在调用完之后//设置下次调用时间并将任务放回队列且保证有足够的woker正在工作else if (super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}ScheduledFutureTask#setNextRunTime private void setNextRunTime() {long p period;//根据period的正负来区分是固定频率还是固定延迟if (p 0)time p;elsetime triggerTime(-p);}ScheduledThreadPoolExecutor#reExecutePeriodic void reExecutePeriodic(RunnableScheduledFuture? task) {if (canRunInCurrentRunState(task)) {//放回队列super.getQueue().add(task);if (canRunInCurrentRunState(task) || !remove(task)) {//保证有足够的woker正在工作ensurePrestart();return;}}task.cancel(false);}所以ScheduledThreadPoolExecutor的总体框架设计和上面的ThreadPoolExecutor是一样的毕竟是他的子类。 最主要的区别在于ScheduledThreadPoolExecutor里worker使用的task是自己内部实现的 ScheduledFutureTask 类而该类的run方法在执行完后会设置下一次的执行时间并将任务放回队列中等待执行。