湖北什么是网站建设,鲜花网站模板下载,制做公司排扁,百度识图网页版在线来源#xff1a;SilenceDut http://www.codeceo.com/article/java-threadpool-learn.html线程池的技术背景 在面向对象编程中#xff0c;创建和销毁对象是很费时间的#xff0c;因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此#xff0c;虚拟机将试图跟…来源SilenceDut http://www.codeceo.com/article/java-threadpool-learn.html 线程池的技术背景 在面向对象编程中创建和销毁对象是很费时间的因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此虚拟机将试图跟踪每一个对象以便能够在对象销毁后进行垃圾回收。 所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数特别是一些很耗资源的对象创建和销毁。如何利用已有对象来服务就是一个需要解决的关键问题其实这就是一些”池化资源”技术产生的原因。 例如Android中常见到的很多通用组件一般都离不开”池”的概念如各种图片加载库网络请求库即使Android的消息传递机制中的Meaasge当使用Meaasge.obtain()就是使用的Meaasge池中的对象因此这个概念很重要。本文将介绍的线程池技术同样符合这一思想。 线程池的优点: 重用线程池中的线程,减少因对象创建,销毁所带来的性能开销;能有效的控制线程的最大并发数,提高系统资源利用率,同时避免过多的资源竞争,避免堵塞;能够多线程进行简单的管理,使线程的使用简单、高效。线程池框架Executor java中的线程池是通过Executor框架实现的Executor 框架包括类ExecutorExecutorsExecutorServiceThreadPoolExecutor Callable和Future、FutureTask的使用等。 Executor: 所有线程池的接口,只有一个方法。 public interface Executor { void execute(Runnable command); } ExecutorService: 增加Executor的行为是Executor实现类的最直接接口。 Executors 提供了一系列工厂方法用于创先线程池返回的线程池都实现了ExecutorService 接口。 ThreadPoolExecutor线程池的具体实现类,一般用的各种线程池都是基于这个类实现的。构造方法如下: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueRunnable workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } corePoolSize线程池的核心线程数,线程池中运行的线程数也永远不会超过 corePoolSize 个,默认情况下可以一直存活。可以通过设置allowCoreThreadTimeOut为True,此时 核心线程数就是0,此时keepAliveTime控制所有线程的超时时间。maximumPoolSize线程池允许的最大线程数;keepAliveTime 指的是空闲线程结束的超时时间;unit 是一个枚举表示 keepAliveTime 的单位;workQueue表示存放任务的BlockingQueueRunnable队列。BlockingQueue:阻塞队列BlockingQueue是java.util.concurrent下的主要用来控制线程同步的工具。如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒。同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作。阻塞队列常用于生产者和消费者的场景生产者是往队列里添加元素的线程消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器而消费者也只从容器里拿元素。具体的实现类有LinkedBlockingQueue,ArrayBlockingQueued等。一般其内部的都是通过Lock和Condition(显示锁Lock及Condition的学习与使用)来实现阻塞和唤醒。线程池的工作过程如下 线程池刚创建时里面没有一个线程。任务队列是作为参数传进来的。不过就算队列里面有任务线程池也不会马上执行它们。当调用 execute() 方法添加一个任务时线程池会做如下判断 如果正在运行的线程数量小于 corePoolSize那么马上创建线程运行这个任务如果正在运行的线程数量大于或等于 corePoolSize那么将这个任务放入队列如果这时候队列满了而且正在运行的线程数量小于 maximumPoolSize那么还是要创建非核心线程立刻运行这个任务如果队列满了而且正在运行的线程数量大于或等于 maximumPoolSize那么线程池会抛出异常RejectExecutionException。当一个线程完成任务时它会从队列中取下一个任务来执行。当一个线程无事可做超过一定的时间keepAliveTime时线程池会判断如果当前运行的线程数大于 corePoolSize那么这个线程就被停掉。所以线程池的所有任务完成后它最终会收缩到 corePoolSize 的大小。线程池的创建和使用 生成线程池采用了工具类Executors的静态方法以下是几种常见的线程池。 SingleThreadExecutor单个后台线程 (其缓冲队列是无界的) public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService ( new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable())); } 创建一个单线程的线程池。这个线程池只有一个核心线程在工作也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。 FixedThreadPool只有核心线程的线程池,大小固定 (其缓冲队列是无界的) 。 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueueRunnable()); } 创建固定大小的线程池。每次提交一个任务就创建一个线程直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变如果某个线程因为执行异常而结束那么线程池会补充一个新线程。 CachedThreadPool无界线程池可以进行自动线程回收。 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0,Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueueRunnable()); } 如果线程池的大小超过了处理任务所需要的线程那么就会回收部分空闲60秒不执行任务的线程当任务数增加时此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制线程池大小完全依赖于操作系统或者说JVM能够创建的最大线程大小。SynchronousQueue是一个是缓冲区为1的阻塞队列。 ScheduledThreadPool核心线程池固定大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。 public static ExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPool(corePoolSize, Integer.MAX_VALUE, DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS, new DelayedWorkQueue()); } 创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在DEFAULT_KEEPALIVEMILLIS时间内回收。 线程池最常用的提交任务的方法有两种 execute: ExecutorService.execute(Runnable runable) submit: FutureTask task ExecutorService.submit(Runnable runnable);FutureTaskT task ExecutorService.submit(Runnable runnable,T Result);FutureTaskT task ExecutorService.submit(CallableT callable); submit(Callable callable)的实现submit(Runnable runnable)同理。 public T FutureT submit(CallableT task) {if (task null) throw new NullPointerException(); FutureTaskT ftask newTaskFor(task); execute(ftask); return ftask; } 可以看出submit开启的是有返回结果的任务会返回一个FutureTask对象这样就能通过get()方法得到结果。submit最终调用的也是execute(Runnable runable)submit只是将Callable对象或Runnable封装成一个FutureTask对象因为FutureTask是个Runnable所以可以在execute中执行。关于Callable对象和Runnable怎么封装成FutureTask对象见Callable和Future、FutureTask的使用。 线程池实现的原理 如果只讲线程池的使用那这篇博客没有什么大的价值充其量也就是熟悉Executor相关API的过程。线程池的实现过程没有用到Synchronized关键字用的都是Volatile,Lock和同步(阻塞)队列,Atomic相关类FutureTask等等因为后者的性能更优。理解的过程可以很好的学习源码中并发控制的思想。 在开篇提到过线程池的优点是可总结为以下三点 线程复用控制最大并发数管理线程1.线程复用过程 理解线程复用原理首先应了解线程生命周期。 在线程的生命周期中它要经过新建(New)、就绪Runnable、运行Running、阻塞(Blocked)和死亡(Dead)5种状态。 Thread通过new来新建一个线程这个过程是是初始化一些线程信息如线程名id,线程所属group等可以认为只是个普通的对象。调用Thread的start()后Java虚拟机会为其创建方法调用栈和程序计数器同时将hasBeenStarted为true,之后调用start方法就会有异常。 处于这个状态中的线程并没有开始运行只是表示该线程可以运行了。至于该线程何时开始运行取决于JVM里线程调度器的调度。当线程获取cpu后run()方法会被调用。不要自己去调用Thread的run()方法。之后根据CPU的调度在就绪——运行——阻塞间切换直到run()方法结束或其他方式停止线程进入dead状态。 所以实现线程复用的原理应该就是要保持线程处于存活状态就绪运行或阻塞。接下来来看下ThreadPoolExecutor是怎么实现线程复用的。 在ThreadPoolExecutor主要Worker类来控制线程的复用。看下Worker类简化后的代码这样方便理解 private final class Worker implements Runnable { final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { this.firstTask firstTask; this.thread getThreadFactory().newThread(this); } public void run() { runWorker(this); } final void runWorker(Worker w) { Runnable task w.firstTask; w.firstTask null; while (task ! null || (task getTask()) ! null){ task.run(); } } Worker是一个Runnable同时拥有一个thread这个thread就是要开启的线程在新建Worker对象时同时新建一个Thread对象同时将Worker自己作为参数传入TThread这样当Thread的start()方法调用时运行的实际上是Worker的run()方法接着到runWorker()中,有个while循环一直从getTask()里得到Runnable对象顺序执行。getTask()又是怎么得到Runnable对象的呢 依旧是简化后的代码 private Runnable getTask() {if(一些特殊情况) { return null; } Runnable r workQueue.take(); return r; } 这个workQueue就是初始化ThreadPoolExecutor时存放任务的BlockingQueue队列这个队列里的存放的都是将要执行的Runnable任务。因为BlockingQueue是个阻塞队列BlockingQueue.take()得到如果是空则进入等待状态直到BlockingQueue有新的对象被加入时唤醒阻塞的线程。所以一般情况Thread的run()方法就不会结束,而是不断执行从workQueue里的Runnable任务这就达到了线程复用的原理了。 2.控制最大并发数 那Runnable是什么时候放入workQueueWorker又是什么时候创建Worker里的Thread的又是什么时候调用start()开启新线程来执行Worker的run()方法的呢有上面的分析看出Worker里的runWorker()执行任务时是一个接一个串行进行的那并发是怎么体现的呢 很容易想到是在execute(Runnable runnable)时会做上面的一些任务。看下execute里是怎么做的。 execute: 简化后的代码 public void execute(Runnable command) { if (command null) throw new NullPointerException(); int c ctl.get(); // 当前线程数 corePoolSize if (workerCountOf(c) corePoolSize) { // 直接启动新的线程。 if (addWorker(command, true)) return; c ctl.get(); } // 活动线程数 corePoolSize // runState为RUNNING 队列未满 if (isRunning(c) workQueue.offer(command)) { int recheck ctl.get(); // 再次检验是否为RUNNING状态 // 非RUNNING状态 则从workQueue中移除任务并拒绝 if (!isRunning(recheck) remove(command)) reject(command);// 采用线程池指定的策略拒绝任务 // 两种情况 // 1.非RUNNING状态拒绝新的任务 // 2.队列满了启动新的线程失败workCount maximumPoolSize } else if (!addWorker(command, false)) reject(command); } addWorker: 简化后的代码 private boolean addWorker(Runnable firstTask, boolean core) { int wc workerCountOf(c); if (wc (core ? corePoolSize : maximumPoolSize)) { return false; } w new Worker(firstTask); final Thread t w.thread; t.start(); } 根据代码再来看上面提到的线程池工作过程中的添加任务的情况 * 如果正在运行的线程数量小于 corePoolSize那么马上创建线程运行这个任务
* 如果正在运行的线程数量大于或等于 corePoolSize那么将这个任务放入队列
* 如果这时候队列满了而且正在运行的线程数量小于 maximumPoolSize那么还是要创建非核心线程立刻运行这个任务
* 如果队列满了而且正在运行的线程数量大于或等于 maximumPoolSize那么线程池会抛出异常RejectExecutionException。 这就是Android的AsyncTask在并行执行是在超出最大任务数是抛出RejectExecutionException的原因所在详见基于最新版本的AsyncTask源码解读及AsyncTask的黑暗面 通过addWorker如果成功创建新的线程成功则通过start()开启新线程同时将firstTask作为这个Worker里的run()中执行的第一个任务。 虽然每个Worker的任务是串行处理但如果创建了多个Worker因为共用一个workQueue所以就会并行处理了。 所以根据corePoolSize和maximumPoolSize来控制最大并发数。大致过程可用下图表示。 上面的讲解和图来可以很好的理解的这个过程。 如果是做Android开发的并且对Handler原理比较熟悉你可能会觉得这个图挺熟悉其中的一些过程和HandlerLooperMeaasge使用中很相似。Handler.send(Message)相当于execute(Runnuble)Looper中维护的Meaasge队列相当于BlockingQueue只不过需要自己通过同步来维护这个队列Looper中的loop()函数循环从Meaasge队列取Meaasge和Worker中的runWork()不断从BlockingQueue取Runnable是同样的道理。 3.管理线程 通过线程池可以很好的管理线程的复用控制并发数以及销毁等过程,线程的复用和控制并发上面已经讲了而线程的管理过程已经穿插在其中了也很好理解。 在ThreadPoolExecutor有个ctl的AtomicInteger变量。通过这一个变量保存了两个内容 所有线程的数量每个线程所处的状态其中低29位存线程数高3位存runState通过位运算来得到不同的值。 private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));//得到线程的状态 private static int runStateOf(int c) { return c ~CAPACITY; } //得到Worker的的数量 private static int workerCountOf(int c) { return c CAPACITY; } // 判断线程是否在运行 private static boolean isRunning(int c) { return c SHUTDOWN; } 这里主要通过shutdown和shutdownNow()来分析线程池的关闭过程。首先线程池有五种状态来控制任务添加与执行。主要介绍以下三种 RUNNING状态线程池正常运行可以接受新的任务并处理队列中的任务SHUTDOWN状态不再接受新的任务但是会执行队列中的任务STOP状态不再接受新任务不处理队列中的任务shutdown这个方法会将runState置为SHUTDOWN会终止所有空闲的线程而仍在工作的线程不受影响所以队列中的任务人会被执行。shutdownNow方法将runState置为STOP。和shutdown方法的区别这个方法会终止所有的线程所以队列中的任务也不会被执行了。 总结 通过对ThreadPoolExecutor源码的分析从总体上了解了线程池的创建任务的添加执行等过程熟悉这些过程使用线程池就会更轻松了。 而从中学到的一些对并发控制以及生产者——消费者模型任务处理的使用对以后理解或解决其他相关问题会有很大的帮助。比如Android中的Handler机制而Looper中的Messager队列用一个BlookQueue来处理同样是可以的,这写就是读源码的收获吧。 转载于:https://www.cnblogs.com/bigben0123/p/8243041.html