当前位置: 首页 > news >正文

企业建设网站的好处有哪些济南浩特元辰建设工程有限公司网站

企业建设网站的好处有哪些,济南浩特元辰建设工程有限公司网站,写软文怎么接单子,河北智能网站建设平台前言 线程池可以说是 Java 进阶必备的知识点了#xff0c;也是面试中必备的考点#xff0c;可能不少人看了一些文章后能对线程池工作原理说上一二#xff0c;但这还远远不够#xff0c;如果碰到比较有经验的面试官再继续追问#xff0c;很可能会被吊打#xff0c;考虑如下… 前言 线程池可以说是 Java 进阶必备的知识点了也是面试中必备的考点可能不少人看了一些文章后能对线程池工作原理说上一二但这还远远不够如果碰到比较有经验的面试官再继续追问很可能会被吊打考虑如下问题:Tomcat 的线程池和 JDK 的线程池实现有啥区别, Dubbo 中有类似 Tomcat 的线程池实现吗?我司网关 dubbo 调用线程池曾经出现过这样的一个问题压测时接口可以正常返回但接口 RT 很高假设设置的核心线程大小为 500最大线程为 800缓冲队列为 5000你能从这个设置中发现出一些问题并对这些参数进行调优吗线程池里的线程真的有核心线程和非核心线程之分?线程池被 shutdown 后还能产生新的线程?线程把任务丢给线程池后肯定就马上返回了?线程池里的线程异常后会再次新增线程吗如何捕获这些线程抛出的异常?线程池的大小如何设置如何动态设置线程池的参数线程池的状态机画一下阿里 Java 代码规范为什么不允许使用 Executors 快速创建线程池使用线程池应该避免哪些问题能否简单说下线程池的最佳实践如何优雅关闭线程池如何对线程池进行监控相信不少人看了这些问题会有些懵逼其实这些问题的答案大多数都藏在线程池的源码里所以深入了解线程池的源码非常重要本章我们将会来学习一下线程池的源码相信看完之后以上的问题大部分都能回答另外一些问题我们也会在文中与大家一起探讨。本文将会从以下几个方面来介绍线程池的原理。为什么要用线程池线程池是如何工作的线程池提交任务的两种方式ThreadPoolExecutor 源码剖析解答开篇的问题线程池的最佳实践总结相信大家看完对线程池的理解会更进一步肝文不易看完别完了三连哦。为什么要用线程池 创建线程有三大开销如下1、其实 Java 中的线程模型是基于操作系统原生线程模型实现的也就是说 Java 中的线程其实是基于内核线程实现的线程的创建析构与同步都需要进行系统调用而系统调用需要在用户态与内核中来回切换代价相对较高线程的生命周期包括「线程创建时间」,「线程执行任务时间」,「线程销毁时间」创建和销毁都需要导致系统调用。2、每个 Thread 都需要有一个内核线程的支持也就意味着每个 Thread 都需要消耗一定的内核资源如内核线程的栈空间因此能创建的 Thread 是有限的默认一个线程的线程栈大小是 1 M有图有真相图中所示在 Java 8 下创建 19 个线程thread #19需要创建 19535 KB即 1 M 左右reserved 代表如果创建 19 个线程操作系统保证会为其分配这么多空间实际上并不一定分配committed 则表示实际已分配的空间大小。画外音注意这是在 Java 8 下的线程占用空间情况但在 Java 11 中对线程作了很大的优化创建一个线程大概只需要 40 KB空间消耗大大减少3、线程多了导致不可忽视的上下文切换开销。由此可见线程的创建是昂贵的所以必须以线程池的形式来管理这些线程在线程池中合理设置线程大小和管理线程以达到以合理的创建线程大小以达到最大化收益最小化风险的目的对于开发人员来说要完成任务不用关心线程如何创建如何销毁如何协作只需要关心提交的任务何时完成即可对线程的调优监控等这些细枝末节的工作通通交给线程池来实现所以也让开发人员得到极大的解脱类似线程池的这种池化思想应用在很多地方比如数据库连接池Http 连接池等避免了昂贵资源的创建提升了性能也解放了开发人员。ThreadPoolExecutor 设计架构图 首先我们来看看 Executor 框架的设计图Executor: 最顶层的 Executor 接口只提供了一个 execute 接口实现了提交任务与执行任务的解藕这个方法是最核心的也是我们源码剖析的重点此方法最终是由 ThreadPoolExecutor 实现的ExecutorService 扩展了 Executor 接口实现了终止执行器单个/批量提交任务等方法AbstractExecutorService 实现了 ExecutorService 接口实现了除 execute 以外的所有方法只将一个最重要的 execute 方法交给 ThreadPoolExecutor 实现。这样的分层设计虽然层次看起来挺多但每一层每司其职逻辑清晰值得借鉴。线程池是如何工作的 首先我们来看下如何创建一个线程池ThreadPoolExecutor threadPool  new ThreadPoolExecutor(10, 20, 600L,TimeUnit.SECONDS, new LinkedBlockingQueue(4096),new NamedThreadFactory(common-work-thread)); // 设置拒绝策略默认为 AbortPolicy threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); 看下其构造方法签名如下public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 省略代码若干 } 要理解这些参数具体代表的意义必须清楚线程池提交任务与执行任务流程如下图片来自美团技术团队步骤如下1、corePoolSize如果提交任务后线程还在运行当线程数小于 corePoolSize 值时无论线程池中的线程是否忙碌都会创建线程并把任务交给此新创建的线程进行处理如果线程数少于等于 corePoolSize那么这些线程不会回收除非将 allowCoreThreadTimeOut 设置为 true但一般不这么干因为频繁地创建销毁线程会极大地增加系统调用的开销。2、workQueue如果线程数大于核心数corePoolSize且小于最大线程数maximumPoolSize则会将任务先丢到阻塞队列里然后线程自己去阻塞队列中拉取任务执行。3、maximumPoolSize: 线程池中最大可创建的线程数如果提交任务时队列满了且线程数未到达这个设定值则会创建线程并执行此次提交的任务如果提交任务时队列满了但线池数已经到达了这个值此时说明已经超出了线池程的负载能力就会执行拒绝策略这也好理解总不能让源源不断地任务进来把线程池给压垮了吧我们首先要保证线程池能正常工作。4、RejectedExecutionHandler一共有以下四种拒绝策略AbortPolicy丢弃任务并抛出异常这也是默认策略CallerRunsPolicy用调用者所在的线程来执行任务所以开头的问题「线程把任务丢给线程池后肯定就马上返回了?」我们可以回答了如果用的是 CallerRunsPolicy 策略提交任务的线程比如主线程提交任务后并不能保证马上就返回当触发了这个 reject 策略不得不亲自来处理这个任务。DiscardOldestPolicy丢弃阻塞队列中靠最前的任务并执行当前任务。DiscardPolicy直接丢弃任务不抛出任何异常这种策略只适用于不重要的任务。5、keepAliveTime: 线程存活时间如果在此时间内超出 corePoolSize 大小的线程处于 idle 状态这些线程会被回收6、threadFactory可以用此参数设置线程池的命名指定 defaultUncaughtExceptionHandler有啥用后文阐述,甚至可以设定线程为守护线程。现在问题来了该如何合理设置这些参数呢。首先来看线程大小设置Java 并发编程实战告诉我们应该分两种情况针对 CPU 密集型的任务在有 Ncpu个处理器的系统上当线程池的大小为 Ncpu 1 时通常能实现最优的利用率1 是因为当计算密集型线程偶尔由于缺页故障或其他原因而暂停工作时这个额外的线程也能确保 CPU 的时钟周期不会被浪费所谓 CPU 密集就是线程一直在忙碌这样将线程池的大小设置为 Ncpu 1 避免了线程的上下文切换让线程时刻处于忙碌状态将 CPU 的利用率最大化。针对 IO 密集型的任务它也给出了如下计算公式这些公式看看就好实际的业务场景中基本用不上这些公式太过理论化了脱离业务场景仅可作个理论参考举个例子你说 CPU 密集型任务设置线程池大小为 N 1个但实际上在业务中往往不只设置一个线程池这种情况套用的公式就懵逼了再来看 workQueue 的大小设置由上文可知如果最大线程大于核心线程数当且仅当核心线程满了且 workQueue 也满的情况下才会新增新的线程也就是说如果 workQueue 是无界队列那么当线程数增加到 corePoolSize 后永远不会再新增新的线程了也就是说此时 maximumPoolSize 的设置就无效了也无法触发 RejectedExecutionHandler 拒绝策略任务只会源源不断地填充到 workQueue直到 OOM。所以 workQueue 应该为有界队列至少保证在任务过载的情况下线程池还能正常工作那么哪些是有有界队列哪些是无界队列呢。有界队列我们常用的以下两个LinkedBlockingQueue: 链表构成的有界队列按先进先出FIFO的顺序对元素进行排列但注意在创建时需指定其大小否则其大小默认为 Integer.MAX_VALUE相当于无界队列了ArrayBlockingQueue: 数组实现的有界队列按先进先出FIFO的顺序对元素进行排列。无界队列我们常用 PriorityBlockingQueue 这个优先级队列任务插入的时候可以指定其权重以让这些任务优先执行但这个队列很少用原因很简单线程池里的任务执行顺序一般是平等的如果真有必须某些类型的任务需要优先执行大不了再开个线程池好了将不同的任务类型用不同的线程池隔离开来也是合理利用线程池的一种实践。说到这我相信大家应该能回答开头的问题「阿里 Java 代码规范为什么不允许使用 Executors 快速创建线程池」最常见的是以下两种创建方式image-20201109002227476newCachedThreadPool 方法的最大线程数设置成了 Integer.MAX_VALUE而 newSingleThreadExecutor 方法创建 workQueue 时 LinkedBlockingQueue 未声明大小相当于创建了无界队列一不小心就会导致 OOM。threadFactory 如何设置一般业务中会有多个线程池如果某个线程池出现了问题定位是哪一个线程出问题很重要所以为每个线程池取一个名字就很有必要了我司用的 dubbo 的 NamedThreadFactory 来生成 threadFactory创建很简单new NamedThreadFactory(demo-work) 它的实现还是很巧妙的有兴趣地可以看看它的源码每调用一次底层有个计数器会加一会依次命名为 「demo-work-thread-1」, 「demo-work-thread-2」 「demo-work-thread-3」这样递增的字符串。在实际的业务场景中一般很难确定 corePoolSize workQueuemaximumPoolSize 的大小如果出问题了一般来说只能重新设置一下这些参数再发布这样往往需要耗费一些时间美团的这篇文章给出了让人眼前一亮的解决方案当发现问题线程池监控告警时动态调整这些参数可以让这些参数实时生效能在发现问题时及时解决确实是个很好的思路。线程池提交任务的两种方式 线程池创建好了该怎么给它提交任务有两种方式调用 execute 和 submit 方法来看下这两个方法的方法签名// 方式一execute 方法 public void execute(Runnable command) { }// 方式二ExecutorService 中 submit 的三个方法 T FutureT submit(CallableT task); T FutureT submit(Runnable task, T result); Future? submit(Runnable task); 区别在于调用 execute 无返回值而调用  submit 可以返回 Future那么这个 Future 能到底能干啥呢看它的接口public interface FutureV {/*** 取消正在执行的任务如果任务已执行或已被取消或者由于某些原因不能取消则返回 false* 如果任务未开始或者任务已开始但可以中断mayInterruptIfRunning 为 true则* 可以取消/中断此任务*/boolean cancel(boolean mayInterruptIfRunning);/*** 任务在完成前是否已被取消*/boolean isCancelled();/*** 正常的执行完流程流程或抛出异常或取消导致的任务完成都会返回 true*/boolean isDone();/*** 阻塞等待任务的执行结果*/V get() throws InterruptedException, ExecutionException;/*** 阻塞等待任务的执行结果不过这里指定了时间如果在 timeout 时间内任务还未执行完成* 则抛出 TimeoutException 异常*/V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException; } 可以用 Future 取消任务判断任务是否已取消/完成甚至可以阻塞等待结果。submit 为啥能提交任务Runnable的同时也能返回任务Future的执行结果呢原来在最后执行 execute 前用 newTaskFor 将 task 封装成了 RunnableFuturenewTaskFor 返回了 FutureTask 这个类结构图如下可以看到 FutureTask 这个接口既实现了 Runnable 接口也实现 Future 接口所以在提交任务的同时也能利用 Future 接口来执行任务的取消获取任务的状态等待执行结果这些操作。execute 与 submit 除了是否能返回执行结果这一区别外还有一个重要区别那就是使用 execute 执行如果发生了异常是捕获不到的默认会执行 ThreadGroup 的 uncaughtException 方法下图数字 2 对应的逻辑所以如果你想监控执行 execute 方法时发生的异常需要通过 threadFactory 来指定一个 UncaughtExceptionHandler这样就会执行上图中的 1进而执行 UncaughtExceptionHandler 中的逻辑,如下所示://1.实现一个自己的线程池工厂 ThreadFactory factory  (Runnable r) - {//创建一个线程Thread t  new Thread(r);//给创建的线程设置UncaughtExceptionHandler对象 里面实现异常的默认逻辑t.setDefaultUncaughtExceptionHandler((Thread thread1, Throwable e) - {// 在此设置统计监控逻辑System.out.println(线程工厂设置的exceptionHandler  e.getMessage());});return t; };// 2.创建一个自己定义的线程池使用自己定义的线程工厂 ExecutorService service  new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue(10),factory);//3.提交任务 service.execute(()-{int i1/0; }); 执行以上逻辑最终会输出「线程工厂设置的exceptionHandler/ by zero」,通过这样的方式就能通过设定的 defaultUncaughtExceptionHandler 来执行我们的监控逻辑了。如果用 submit 如何捕获异常呢当我们调用 future.get 就可以捕获Callable testCallable  xxx; Future future  executor.submit(myCallable); try {future1.get(3)); } catch (InterruptedException e) {e.printStackTrace(); } catch (ExecutionException e) {e.printStackTrace(); } 那么 future 为啥在 get 的时候才捕获异步呢因为在执行 submit 时抛出异常后此异常被保存了起来而在 get 的时候才被抛出关于 execute 和 submit 的执行流程 why 神的这篇文章写得非常透彻我就不拾人牙慧了建议大家好好品品收获会很大ThreadPoolExecutor 源码剖析 前面铺垫了这么多终于到了最核心的源码剖析环节了。对于线程池来说我们最关心的是它的「状态」和「可运行的线程数量」一般来说我们可以选择用两个变量来记录不过 Doug Lea 只用了一个变量ctl就达成目的了我们知道变量越多代码的可维护性就越差也越容易出 bug, 所以只用一个变量就达成了两个变量的效果这让代码的可维护性大大提高那么他是怎么设计的呢// ThreadPoolExecutor.java public class ThreadPoolExecutor extends AbstractExecutorService {private final AtomicInteger ctl  new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS  Integer.SIZE - 3;private static final int CAPACITY    (1  COUNT_BITS) - 1;// 结果111 00000000000000000000000000000private static final int RUNNING     -1  COUNT_BITS;// 结果 000 00000000000000000000000000000private static final int SHUTDOWN     0  COUNT_BITS;// 结果 001 00000000000000000000000000000private static final int STOP         1  COUNT_BITS;// 结果 010 00000000000000000000000000000private static final int TIDYING      2  COUNT_BITS;// 结果 011 00000000000000000000000000000private static final int TERMINATED   3  COUNT_BITS;// 获取线程池的状态private static int runStateOf(int c)     { return c  ~CAPACITY; }// 获取线程数量private static int workerCountOf(int c)  { return c  CAPACITY; } } 可以看到ctl 是一个 原子类的 Integer 变量有 32 位低 29 位表示线程数量 29 位最大可以表示 (2^29)-1 (大概 5 亿多)足够记录线程大小了如果未来还是不够可以把 ctl 声明为 AtomicLong高 3 位用来表示线程池的状态3 位可以表示 8 个线程池的状态由于线程池总共只有五个状态所以 3 位也是足够了线程池的五个状态如下RUNNING: 接收新的任务并能继续处理 workQueue 中的任务SHUTDOWN: 不再接收新的任务不过能继续处理 workQueue 中的任务STOP: 不再接收新的任务也不再处理 workQueue 中的任务并且会中断正在处理任务的线程TIDYING: 所有的任务都完结了并且线程数量workCount为 0 时即为此状态进入此状态后会调用 terminated() 这个钩子方法进入 TERMINATED 状态TERMINATED: 调用 terminated() 方法后即为此状态线程池的状态流转及触发条件如下有了这些基础我们来分析下 execute 的源码 public void execute(Runnable command) {if (command  null)throw new NullPointerException();int c  ctl.get();// 如果当前线程数少于核心线程数corePoolSize无论核心线程是否忙碌都创建线程直到达到 corePoolSize 为止if (workerCountOf(c)  corePoolSize) {// 创建线程并将此任务交给 worker 处理此时此任务即 worker 中的 firstTaskif (addWorker(command, true))return;c  ctl.get();}// 如果线程池处于 RUNNING 状态并且线程数大于 corePoolSize 或者 // 线程数少于 corePoolSize 但创建线程失败了则将任务丢进 workQueue 中if (isRunning(c)  workQueue.offer(command)) {int recheck  ctl.get();// 这里需要再次检查线程池是否处于 RUNNING 状态因为在任务入队后可能线程池状态会发生变化比如调用了 shutdown 方法等如果线程状态发生变化了则移除此任务执行拒绝策略if (! isRunning(recheck)  remove(command))reject(command);// 如果线程池在 RUNNING 状态下线程数为 0则新建线程加速处理 workQueue 中的任务else if (workerCountOf(recheck)  0)addWorker(null, false);}// 这段逻辑说明线程数大于 corePoolSize 且任务入队失败了此时会以最大线程数maximumPoolSize为界来创建线程如果失败说明线程数超过了 maximumPoolSize则执行拒绝策略else if (!addWorker(command, false))reject(command); } 从这段代码中可以看到创建线程是调用 addWorker 实现的在分析 addWorker 之前有必要简单提一下 Worker线程池把每一个执行任务的线程都封装为 Worker 的形式取名为 Worker 很形象线程池的本质是生产者-消费者模型生产者不断地往 workQueue 中丢 task, workQueue 就像流水线一样不断地输送着任务而 worker工人 不断地取任务来执行那么问题来了为啥要把线程封装到 worker 中呢线程池拿到 task 后直接丢给线程处理或者让线程自己去 workQueue 中处理不就完了将线程封装为 worker 主要是为了更好地管理线程的中断来看下 Worker 的定义// 此处可以看出 worker 既是一个 Runnable 任务也实现了 AQS实际上是用 AQS 实现了一个独占锁这样由于 worker 运行时会上锁执行 shutdownsetCorePoolSizesetMaximumPoolSize等方法时会试着中断线程interruptIdleWorkers 在这个方法中断方法中会先尝试获取 worker 的锁如果不成功说明 worker 在运行中此时会先让 worker 执行完任务再关闭 worker 的线程实现优雅关闭线程的目的 private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{private static final long serialVersionUID  6138294804551838833L;// 实际执行任务的线程final Thread thread;// 上文提到如果当前线程数少于核心线程数创建线程并将提交的任务交给 worker 处理处理此时 firstTask 即为此提交的任务如果 worker 从 workQueue 中获取任务则 firstTask 为空Runnable firstTask;// 统计完成的任务数volatile long completedTasks;Worker(Runnable firstTask) {// 初始化为 -1这样在线程运行前调用runWorker禁止中断在 interruptIfStarted() 方法中会判断 getState()0setState(-1); this.firstTask  firstTask;// 根据线程池的 threadFactory 创建一个线程将 worker 本身传给线程因为 worker 实现了 Runnable 接口this.thread  getThreadFactory().newThread(this);}public void run() {// thread 启动后会调用此方法runWorker(this);}// 1 代表被锁住了0 代表未锁protected boolean isHeldExclusively() {return getState() ! 0;}// 尝试获取锁protected boolean tryAcquire(int unused) {// 从这里可以看出它是一个独占锁因为当获取锁后cas 设置 state 不可能成功这里我们也能明白上文中将 state 设置为 -1 的作用这种情况下永远不可能获取得锁而 worker 要被中断首先必须获取锁if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 尝试释放锁protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}    public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }// 中断线程这个方法会被 shutdowNow 调用从中可以看出 shutdownNow 要中断线程不需要获取锁也就是说如果线程正在运行照样会给你中断掉所以一般来说我们不用 shutdowNow 来中断线程太粗暴了中断时线程很可能在执行任务影响任务执行void interruptIfStarted() {Thread t;// 中断也是有条件的必须是 state  0 且 t ! null 且线程未被中断// 如果 state  -1 不执行中断再次明白了为啥上文中 setState(-1) 的意义if (getState()  0  (t  thread) ! null  !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}} 通过上文对 Worker 类的分析相信大家不难理解 将线程封装为 worker 主要是为了更好地管理线程的中断 这句话。理解了 Worker 的意义我们再来看 addWorker 的方法private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c  ctl.get();// 获取线程池的状态int rs  runStateOf(c);// 如果线程池的状态  SHUTDOWN即为 SHUTDOWNSTOPTIDYINGTERMINATED 这四个状态只有一种情况有可能创建线程即线程状态为 SHUTDOWN, 且队列非空时firstTask  null 代表创建一个不接收新任务的线程此线程会从 workQueue 中获取任务再执行这种情况下创建线程是为了加速处理完 workQueue 中的任务if (rs  SHUTDOWN ! (rs  SHUTDOWN firstTask  null ! workQueue.isEmpty()))return false;for (;;) {// 获取线程数int wc  workerCountOf(c);// 如果超过了线程池的最大 CAPACITY5 亿多基本不可能// 或者 超过了 corePoolSizecore 为 true 或者 maximumPoolSizecore 为 false 时// 则返回 falseif (wc  CAPACITY ||wc  (core ? corePoolSize : maximumPoolSize))return false;// 否则 CAS 增加线程的数量如果成功跳出双重循环if (compareAndIncrementWorkerCount(c))break retry;c  ctl.get();  // Re-read ctl// 如果线程运行状态发生变化跳到外层循环继续执行if (runStateOf(c) ! rs)continue retry;// 说明是因为 CAS 增加线程数量失败所致继续执行 retry 的内层循环}}boolean workerStarted  false;boolean workerAdded  false;Worker w  null;try {// 能执行到这里说明满足增加 worker 的条件了所以创建 worker,准备添加进线程池中执行任务w  new Worker(firstTask);final Thread t  w.thread;if (t ! null) {// 加锁是因为下文要把 w 添加进 workers 中 workers 是 HashSet不是线程安全的所以需要加锁予以保证final ReentrantLock mainLock  this.mainLock;mainLock.lock();try {//  再次 check 线程池的状态以防执行到此步时发生中断等int rs  runStateOf(ctl.get());// 如果线程池状态小于 SHUTDOWN即为 RUNNING// 或者状态为 SHUTDOWN 但 firstTask  null代表不接收任务只是创建线程处理 workQueue 中的任务则满足添加 worker 的条件if (rs  SHUTDOWN ||(rs  SHUTDOWN  firstTask  null)) {// 如果线程已启动显然有问题因为创建 worker 后还没启动线程呢抛出异常if (t.isAlive()) throw new IllegalThreadStateException();workers.add(w);int s  workers.size();// 记录最大的线程池大小以作监控之用if (s  largestPoolSize)largestPoolSize  s;workerAdded  true;}} finally {mainLock.unlock();}// 说明往 workers 中添加 worker 成功此时启动线程if (workerAdded) {t.start();workerStarted  true;}}} finally {// 添加线程失败执行 addWorkerFailed 方法主要做了将 worker 从 workers 中移除减少线程数并尝试着关闭线程池这样的操作if (! workerStarted)addWorkerFailed(w);}return workerStarted; } 从这段代码我们可以看到多线程下情况的不可预料性我们发现在满足条件情况下又对线程状态重新进行了 check,以防期间出现中断等线程池状态发生变更的操作这也给我们以启发多线程环境下的各种临界条件一定要考虑到位。执行 addWorker 创建 worker 成功后线程开始执行了t.start()由于在创建 Worker 时将 Worker  自己传给了此线程所以启动线程后会调用  Worker 的 run 方法public void run() {runWorker(this); } 可以看到最终会调用  runWorker 方法接下来我们来分析下 runWorker 方法final void runWorker(Worker w) {Thread wt  Thread.currentThread();Runnable task  w.firstTask;w.firstTask  null;// unlock 会调用 tryRelease 方法将 state 设置成 0代表允许中断允许中断的条件上文我们在 interruptIfStarted() 中有提过即 state  0w.unlock();boolean completedAbruptly  true;try {// 如果在提交任务时创建了线程并把任务丢给此线程则会先执行此 task// 否则从任务队列中获取 task 来执行即 getTask() 方法while (task ! null || (task  getTask()) ! null) {w.lock();// 如果线程池状态为  STOP即 STOPTIDYINGTERMINATED 时则线程应该中断// 如果线程池状态  STOP, 线程不应该中断,如果中断了Thread.interrupted() 返回 true并清除标志位,再次判断线程池状态防止在清除标志位时执行了 shutdownNow() 这样的方法,如果此时线程池为 STOP执行线程中断if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted())wt.interrupt();try {// 执行任务前子类可实现此钩子方法作为统计之用beforeExecute(wt, task);Throwable thrown  null;try {task.run();} catch (RuntimeException x) {thrown  x; throw x;} catch (Error x) {thrown  x; throw x;} catch (Throwable x) {thrown  x; throw new Error(x);} finally {// 执行任务后子类可实现此钩子方法作为统计之用afterExecute(task, thrown);}} finally {task  null;w.completedTasks;w.unlock();}}completedAbruptly  false;} finally {// 如果执行到这只有两种可能一种是执行过程中异常中断了一种是队列里没有任务了从这里可以看出线程没有核心线程与非核心线程之分哪个任务异常了或者正常退出了都会执行此方法此方法会根据情况将线程数-1processWorkerExit(w, completedAbruptly);} } 来看看 processWorkerExit 方法是咋样的private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果异常退出cas 执行线程池减 1 操作if (completedAbruptly) decrementWorkerCount();final ReentrantLock mainLock  this.mainLock;mainLock.lock();try {completedTaskCount  w.completedTasks;// 加锁确保线程安全地移除 worker workers.remove(w);} finally {mainLock.unlock();}// woker 既然异常退出可能线程池状态变了如执行 shutdown 等尝试着关闭线程池tryTerminate();int c  ctl.get();//  如果线程池处于 STOP 状态则如果 woker 是异常退出的重新新增一个 woker如果是正常退出的在 wokerQueue 为非空的条件下确保至少有一个线程在运行以执行 wokerQueue 中的任务    if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min  allowCoreThreadTimeOut ? 0 : corePoolSize;if (min  0  ! workQueue.isEmpty())min  1;if (workerCountOf(c)  min)return; // replacement not needed}addWorker(null, false);} } 接下来我们分析 woker 从 workQueue 中取任务的方法 getTaskprivate Runnable getTask() {boolean timedOut  false; // Did the last poll() time out?for (;;) {int c  ctl.get();int rs  runStateOf(c);// 如果线程池状态至少为 STOP 或者// 线程池状态  SHUTDOWN 并且任务队列是空的// 则减少线程数量返回 null,这种情况下上文分析的 runWorker 会执行 processWorkerExit 从而让获取此 Task 的 woker 退出if (rs  SHUTDOWN  (rs  STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc  workerCountOf(c);// 如果 allowCoreThreadTimeOut 为 true代表任何线程在 keepAliveTime 时间内处于 idle 状态都会被回收如果线程数大于 corePoolSize本身在 keepAliveTime 时间内处于 idle 状态就会被回收boolean timed  allowCoreThreadTimeOut || wc  corePoolSize;// worker 应该被回收的几个条件这个比较简单就此略过if ((wc  maximumPoolSize || (timed  timedOut)) (wc  1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 阻塞获取 task如果在 keepAliveTime 时间内未获取任务说明超时了此时 timedOut 为 trueRunnable r  timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r ! null)return r;timedOut  true;} catch (InterruptedException retry) {timedOut  false;}} } 经过以上源码剖析相信我们对线程池的工作原理了解得八九不离十了再来简单过一下其他一些比较有用的方法开头我们提到线程池的监控问题我们看一下可以监控哪些指标int getCorePoolSize()获取核心线程数。int getLargestPoolSize()历史峰值线程数。int getMaximumPoolSize()最大线程数(线程池线程容量)。int getActiveCount()当前活跃线程数int getPoolSize()当前线程池中的线程总数BlockingQueuegetQueue() 当前线程池的任务队列据此可以获取积压任务的总数getQueue.size()监控思路也很简单开启一个定时线程 ScheduledThreadPoolExecutor定期对这些线程池指标进行采集一般会采用一些开源工具如 Grafana Prometheus MicroMeter 来实现。如何实现核心线程池的预热使用  prestartAllCoreThreads() 方法这个方法会一次性创建 corePoolSize 个线程无需等到提交任务时才创建提交创建好线程的话一有任务提交过来这些线程就可以立即处理。如何实现动态调整线程池参数setCorePoolSize(int corePoolSize) 调整核心线程池大小setMaximumPoolSize(int maximumPoolSize)setKeepAliveTime() 设置线程的存活时间解答开篇的问题 其它问题基本都在源码剖析环节回答了这里简单说下其他问题1、Tomcat 的线程池和 JDK 的线程池实现有啥区别, Dubbo 中有类似 Tomcat 的线程池实现吗? Dubbo 中一个叫 EagerThreadPool 的东西可以看看它的使用说明从注释里可以看出如果核心线程都处于 busy 状态如果有新的请求进来EagerThreadPool 会选择先创建线程而不是将其放入任务队列中这样可以更快地响应这些请求。Tomcat 实现也是与此类似的只不过稍微有所不同当 Tomcat 启动时会先创建 minSpareThreads 个线程如果经过一段时间收到请求时这些线程都处于忙碌状态每次都会以 minSpareThreads 的步长创建线程本质上也是为了更快地响应处理请求。具体的源码可以看它的 ThreadPool 实现这里就不展开了。2、我司网关 dubbo 调用线程池曾经出现过这样的一个问题压测时接口可以正常返回但接口 RT 很高假设设置的核心线程大小为 500最大线程为 800缓冲队列为 5000你能从这个设置中发现出一些问题并对这些参数进行调优吗这个参数明显能看出问题来首先任务队列设置过大任务达到核心线程后如果再有请求进来会先进入任务队列队列满了之后才创建线程创建线程也是需要不少开销的所以我们后来把核心线程设置成了与最大线程一样并且调用 prestartAllCoreThreads() 来预热核心线程就不用等请求来时再创建线程了。线程池的几个最佳实践 1、线程池执行的任务应该是互相独立的如果互相依赖的话可能导致死锁比如下面这样的代码ExecutorService pool  Executors.newSingleThreadExecutor(); pool.submit(() - {try {String qqpool.submit(()-QQ).get();System.out.println(qq);} catch (Exception e) {} }); 2、核心任务与非核心任务最好能用多个线程池隔离开来曾经我们业务上就出现这样的一个故障突然很多用户反馈短信收不到了排查才发现发短信是在一个线程池里而另外的定时脚本也是用的这个线程池来执行任务这个脚本一分钟可能产生几百上千条任务导致发短信的方法在线程池里基本没机会执行后来我们用了两个线程池把发短信和执行脚本隔离开来解决了问题。3、添加线程池监控动态设置线程池如前文所述线程池的各个参数很难一次性确定既然难以确定又要保证发现问题后及时解决我们就需要为线程池增加监控监控队列大小线程数量等我们可以设置 3 分钟内比如队列任务一直都是满了的话就触发告警这样可以提前预警如果线上因为线程池参数设置不合理而触发了降级等操作可以通过动态设置线程池的方式来实时修改核心线程数最大线程数等将问题及时修复。总结 本文详细剖析了线程池的工作原理相信大家对其工作机制应该有了较深入的了解也对开头的几个问题有了较清楚的认识本质上设置线程池的目的是为了利用有效的资源最大化性能最小化风险同时线程池的使用本质上是为了更好地为用户服务据此也不难明白 Tomcat, Dubbo 要另起炉灶来设置自己的线程池了。最后欢迎大家加我私人微信一起讨论共同进步拉你进读者群2020 难过我们一起抱团取暖巨人的肩膀https://dzone.com/articles/how-much-memory-does-a-java-thread-takehttps://segmentfault.com/a/1190000021047279https://www.cnblogs.com/trust-freedom/p/6681948.html深入理解线程池 https://tinyurl.com/y675j928有的线程它死了于是它变成一道面试题 https://mp.weixin.qq.com/s/wrTVGLDvhE-eb5lhygWEqQJava 并发编程实战Java线程池实现原理及其在美团业务中的实践: https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww最后欢迎大家加我好友拉你进技术交流群群里有很多 BAT 的大咖可以提问互相交流内推等进群一起抱团取暖巨人的肩膀https://dzone.com/articles/how-much-memory-does-a-java-thread-takehttps://segmentfault.com/a/1190000021047279https://www.cnblogs.com/trust-freedom/p/6681948.html深入理解线程池 https://tinyurl.com/y675j928有的线程它死了于是它变成一道面试题 https://mp.weixin.qq.com/s/wrTVGLDvhE-eb5lhygWEqQJava 并发编程实战Java线程池实现原理及其在美团业务中的实践: https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww线程池异常处理详解一文搞懂https://www.cnblogs.com/ncy1/articles/11629933.html 往期推荐 23张图万字详解「链表」从小白到大佬面试官你说说互斥锁、自旋锁、读写锁、悲观锁、乐观锁的应用场景25 张图1.4 w字彻底搞懂分布式事务原理关注我每天陪你进步一点点
http://www.pierceye.com/news/211868/

相关文章:

  • 成都私人网站制作卓越网站建设的优点
  • 做网站下载别人的图算不算侵权源码之家免费
  • 宁夏住房城乡建设厅网站应用网站建设
  • 宾馆网站建设网站建设管理规范
  • 内部网站建设的步骤过程选择邯郸做网站
  • 国外免费外贸网站dw网页制作教程个人网站
  • 西安建设局网站地址室内设计效果图一套方案
  • php 建网站电子商务网站建设项目规划书
  • 常熟建设局网站代理办营业执照的公司
  • 济南网站关键词优化公司如何制作网站赚钱
  • 长春旅游网站开发360投放广告怎么收费
  • 微信公众号做网站卖东西静态化网站的缺点
  • 网站空间购买今天的新闻头条最新消息
  • 网站制作教程图解怎么解压wordpress
  • 唐山市城市建设规划局网站腾讯云建设一个网站要多少钱
  • 邢台集团网站建设费用聚牛建设网站
  • 如何创建电子商务网站学校网站设计首页
  • 扬州建设投资集团网站世界总人口实时数据
  • 沧州制作网站食品商务网-网站建设
  • 0592 网站建设模板网站建设+百度
  • 请人做个网站多少钱免费商城app
  • 网站建设包括哪些方面?手游源码网站
  • 机关门户网站建设管理情况软件开发工具都有哪些
  • 官方网站建设专家磐石网络wordpress对应的id
  • 学生自做网站优秀作品徐州企业建站模板
  • 网络电子商务购物网站idc机房建设
  • 网站单页seo个人服务器网站备案
  • 装修队伍做网站做机票在线预订网站
  • 手机版企业网站php山西建设执业注册中心网站
  • 南充网站建设略奥科技凡科建站电话