淘客网站开发流程,网站还没上线怎么做品牌推广,dw一级网页制作教程,网站开发工程师职业定位来源#xff1a;blog.csdn.net/anhenzhufeng/article/details/88870374在Java开发中#xff0c;经常需要创建线程去执行一些任务#xff0c;实现起来也非常方便#xff0c;但如果并发的线程数量很多#xff0c;并且每个线程都是执行一个时间很短的任务就结束了#xff0c… 来源blog.csdn.net/anhenzhufeng/article/details/88870374在Java开发中经常需要创建线程去执行一些任务实现起来也非常方便但如果并发的线程数量很多并且每个线程都是执行一个时间很短的任务就结束了这样频繁创建线程就会大大降低系统的效率因为频繁创建线程和销毁线程需要时间。此时我们很自然会想到使用线程池来解决这个问题。使用线程池的好处降低资源消耗。java中所有的池化技术都有一个好处就是通过复用池中的对象降低系统资源消耗。设想一下如果我们有n多个子任务需要执行如果我们为每个子任务都创建一个执行线程而创建线程的过程是需要一定的系统消耗的最后肯定会拖慢整个系统的处理速度。而通过线程池我们可以做到复用线程任务有多个但执行任务的线程可以通过线程池来复用这样减少了创建线程的开销系统资源利用率得到了提升。降低管理线程的难度。多线程环境下对线程的管理是最容易出现问题的而线程池通过框架为我们降低了管理线程的难度。我们不用再去担心何时该销毁线程如何最大限度的避免多线程的资源竞争。这些事情线程池都帮我们代劳了。提升任务处理速度。线程池中长期驻留了一定数量的活线程当任务需要执行时我们不必先去创建线程线程池会自己选择利用现有的活线程来处理任务。很显然线程池一个很显著的特征就是“长期驻留了一定数量的活线程”避免了频繁创建线程和销毁线程的开销那么它是如何做到的呢我们知道一个线程只要执行完了run()方法内的代码这个线程的使命就完成了等待它的就是销毁。既然这是个“活线程”自然是不能很快就销毁的。为了搞清楚这个“活线程”是如何工作的下面通过追踪源码来看看能不能解开这个疑问。学习过线程池都知道可以通过工厂类Executors来创个多种类型的线程池部分类型如下public static ExecutorService newFixedThreadPool(int var0) {return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());}public static ExecutorService newSingleThreadExecutor() {return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));}public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());}public static ScheduledExecutorService newSingleThreadScheduledExecutor() {return new Executors.DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newScheduledThreadPool(int var0) {return new ScheduledThreadPoolExecutor(var0);}
无论哪种类型的线程池最终都是直接或者间接通过ThreadPoolExecutor这个类来实现的。而ThreadPoolExecutor的有多个构造方法最终都是调用含有7个参数的构造函数。/*** Creates a new {code ThreadPoolExecutor} with the given initial* parameters.** param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {code allowCoreThreadTimeOut} is set* param maximumPoolSize the maximum number of threads to allow in the* pool* param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* param unit the time unit for the {code keepAliveTime} argument* param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {code Runnable}* tasks submitted by the {code execute} method.* param threadFactory the factory to use when the executor* creates a new thread* param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* throws IllegalArgumentException if one of the following holds:br* {code corePoolSize 0}br* {code keepAliveTime 0}br* {code maximumPoolSize 0}br* {code maximumPoolSize corePoolSize}* throws NullPointerException if {code workQueue}* or {code threadFactory} or {code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize 0 ||maximumPoolSize 0 ||maximumPoolSize corePoolSize ||keepAliveTime 0)throw new IllegalArgumentException();if (workQueue null || threadFactory null || handler null)throw new NullPointerException();this.corePoolSize corePoolSize;this.maximumPoolSize maximumPoolSize;this.workQueue workQueue;this.keepAliveTime unit.toNanos(keepAliveTime);this.threadFactory threadFactory;this.handler handler;}
① corePoolSize顾名思义其指代核心线程的数量。当提交一个任务到线程池时线程池会创建一个核心线程来执行任务即使其他空闲的核心线程能够执行新任务也会创建新的核心线程而等到需要执行的任务数大于线程池核心线程的数量时就不再创建这里也可以理解为当核心线程的数量等于线程池允许的核心线程最大数量的时候如果有新任务来就不会创建新的核心线程。如果你想要提前创建并启动所有的核心线程可以调用线程池的prestartAllCoreThreads()方法。② maximumPoolSize顾名思义其指代线程池允许创建的最大线程数。如果队列满了并且已创建的线程数小于最大线程数则线程池会再创建新的线程执行任务。所以只有队列满了的时候这个参数才有意义。因此当你使用了无界任务队列的时候这个参数就没有效果了。③ keepAliveTime顾名思义其指代线程活动保持时间即当线程池的工作线程空闲后保持存活的时间。所以如果任务很多并且每个任务执行的时间比较短可以调大时间提高线程的利用率不然线程刚执行完一个任务还没来得及处理下一个任务线程就被终止而需要线程的时候又再次创建刚创建完不久执行任务后没多少时间又终止会导致资源浪费。注意这里指的是核心线程池以外的线程。还可以设置allowCoreThreadTimeout true这样就会让核心线程池中的线程有了存活的时间。④ TimeUnit顾名思义其指代线程活动保持时间的单位可选的单位有天DAYS、小时HOURS、分钟MINUTES、毫秒MILLISECONDS、微秒MICROSECONDS千分之一毫秒和纳秒NANOSECONDS千分之一微秒。⑤ workQueue顾名思义其指代任务队列用来保存等待执行任务的阻塞队列。⑥ threadFactory顾名思义其指代创建线程的工厂可以通过线程工厂给每个创建出来的线程设置更加有意义的名字。⑦ RejectedExecutionHandler顾名思义其指代拒绝执行程序可以理解为饱和策略当队列和线程池都满了说明线程池处于饱和状态那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy表示无法处理新任务时抛出异常。在JDK1.5中Java线程池框架提供了以下4种策略。AbortPolicy直接抛出异常RejectedExecutionException。CallerRunsPolicy只用调用者所在线程来运行任务即由调用 execute方法的线程执行该任务。DiscardOldestPolicy丢弃队列里最近的一个任务并执行当前任务。DiscardPolicy不处理丢弃掉即丢弃且不抛出异常。这7个参数共同决定了线程池执行一个任务的策略当一个任务被添加进线程池时线程数量未达到 corePoolSize则新建一个线程(核心线程)执行任务线程数量达到了 corePools则将任务移入队列等待队列已满新建线程(非核心线程)执行任务队列已满总线程数又达到了 maximumPoolSize就会由上面那位星期天(RejectedExecutionHandler)抛出异常说白了就是先利用核心线程核心线程用完新来的就加入等待队列一旦队列满了那么只能开始非核心线程来执行了。上面的策略会在阅读代码的时候体现出来并且在代码中也能窥探出真正复用空闲线程的实现原理。接下来我们就从线程池执行任务的入口分析。一个线程池可以接受任务类型有Runnable和Callable分别对应了execute和submit方法。目前我们只分析execute的执行过程。上源码public void execute(Runnable command) {if (command null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldnt, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c ctl.get();if (workerCountOf(c) corePoolSize) { //第一步如果线程数量小于核心线程数if (addWorker(command, true))//则启动一个核心线程执行任务return;c ctl.get();}if (isRunning(c) workQueue.offer(command)) {//第二步当前线程数量大于等于核心线程数加入任务队列成功的话会进行二次检查int recheck ctl.get();if (! isRunning(recheck) remove(command))reject(command);else if (workerCountOf(recheck) 0)addWorker(null, false);//启动非核心线程执行注意这里任务是null其实里面会去取任务队列里的任务执行}else if (!addWorker(command, false))//第三步加入不了队列即队列满了尝试启动非核心线程reject(command);//如果启动不了非核心线程执行说明到达了最大线程数量的限制会使用第7个参数抛出异常}
代码并不多主要分三个步骤其中有两个静态方法经常被用到主要用来判断线程池的状态和有效线程数量// 获取运行状态private static int runStateOf(int c) { return c ~CAPACITY; }// 获取活动线程数private static int workerCountOf(int c) { return c CAPACITY; }总结一下execute的执行逻辑就是如果 当前活动线程数 指定的核心线程数则创建并启动一个线程来执行新提交的任务此时新建的线程相当于核心线程如果 当前活动线程数 指定的核心线程数且缓存队列未满则将任务添加到缓存队列中如果 当前活动线程数 指定的核心线程数且缓存队列已满则创建并启动一个线程来执行新提交的任务此时新建的线程相当于非核心线程从代码中我们也可以看出即便当前活动的线程有空闲的只要这个活动的线程数量小于设定的核心线程数那么依旧会启动一个新线程来执行任务。也就是说不会去复用任何线程。在execute方法里面我们没有看到线程复用的影子那么我们继续来看看addWorker方法。private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c ctl.get();int rs runStateOf(c);// Check if queue empty only if necessary.if (rs SHUTDOWN ! (rs SHUTDOWN firstTask null ! workQueue.isEmpty()))return false;for (;;) {int wc workerCountOf(c);if (wc CAPACITY ||wc (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c ctl.get(); // Re-read ctlif (runStateOf(c) ! rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}//前面都是线程池状态的判断暂时不理会主要看下面两个关键的地方boolean workerStarted false;boolean workerAdded false;Worker w null;try {w new Worker(firstTask); // 新建一个Worker对象这个对象包含了待执行的任务并且新建一个线程final Thread t w.thread;if (t ! null) {final ReentrantLock mainLock this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs runStateOf(ctl.get());if (rs SHUTDOWN ||(rs SHUTDOWN firstTask null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s workers.size();if (s largestPoolSize)largestPoolSize s;workerAdded true;}} finally {mainLock.unlock();}if (workerAdded) {t.start(); // 启动刚创建的worker对象里面的thread执行workerStarted true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}
方法虽然有点长但是我们只考虑两个关键的地方先是创建一个worker对象创建成功后对线程池状态判断成功后就去执行该worker对象的thread的启动。也就是说在这个方法里面启动了一个关联到worker的线程但是这个线程是如何执行我们传进来的runnable任务的呢接下来看看这个Worker对象到底做了什么。private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask;this.thread getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker. */public void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.protected boolean isHeldExclusively() {return getState() ! 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() 0 (t thread) ! null !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
最重要的构造方法Worker(Runnable firstTask) { // worker本身实现了Runnable接口setState(-1); // inhibit interrupts until runWorkerthis.firstTask firstTask; // 持有外部传进来的runnable任务//创建了一个thread对象并把自身这个runnable对象给了thread一旦该thread执行start方法就会执行worker的run方法this.thread getThreadFactory().newThread(this); }
在addWorker方法中执行的t.start会去执行worker的run方法public void run() {runWorker(this);}
run方法又执行了ThreadPoolExecutor的runWorker方法把当前worker对象传入。final void runWorker(Worker w) {Thread wt Thread.currentThread();Runnable task w.firstTask; // 取出worker的runnable任务w.firstTask null;w.unlock(); // allow interruptsboolean completedAbruptly true;try {// 循环不断的判断任务是否为空当第一个判断为false的时候即task为null这个task啥时候为null呢// 要么w.firstTask为null还记得我们在execute方法第二步的时候执行addWorker的时候传进来的runnable是null吗// 要么是执行了一遍while循环在下面的finally中执行了tasknull// 或者执行第二个判断一旦不为空就会继续执行循环里的代码。while (task ! null || (task getTask()) ! null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() runStateAtLeast(ctl.get(), STOP))) !wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown null;try {task.run(); // 任务不为空就会执行任务的run方法也就是runnable的run方法} catch (RuntimeException x) {thrown x; throw x;} catch (Error x) {thrown x; throw x;} catch (Throwable x) {thrown x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task null; // 执行完成置null继续下一个循环w.completedTasks;w.unlock();}}completedAbruptly false;} finally {processWorkerExit(w, completedAbruptly);}}
方法比较长归纳起来就三步1从worker中取出runnable这个对象有可能是null见注释中的解释2进入while循环判断判断当前worker中的runnable或者通过getTask得到的runnable是否为空不为空的情况下就执行run3执行完成把runnable任务置为null。假如我们不考虑此方法里面的while循环的第二个判断在我们的线程开启的时候顺序执行了runWorker方法后当前worker的run就执行完成了。既然执行完了那么这个线程也就没用了只有等待虚拟机销毁了。那么回顾一下我们的目标Java线程池中的线程是如何被重复利用的好像并没有重复利用啊新建一个线程执行一个任务然后就结束了销毁了。没什么特别的啊难道有什么地方漏掉了被忽略了仔细回顾下该方法中的while循环的第二个判断task getTasknull玄机就在getTask方法中。private Runnable getTask() {boolean timedOut false; // Did the last poll() time out?for (;;) {int c ctl.get();int rs runStateOf(c);// Check if queue empty only if necessary.if (rs SHUTDOWN (rs STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc workerCountOf(c);// timed变量用于判断是否需要进行超时控制。// allowCoreThreadTimeOut默认是false也就是核心线程不允许进行超时// wc corePoolSize表示当前线程池中的线程数量大于核心线程数量// 对于超过核心线程数量的这些线程或者允许核心线程进行超时控制的时候需要进行超时控制// Are workers subject to culling?boolean timed allowCoreThreadTimeOut || wc corePoolSize;// 如果需要进行超时控制且上次从缓存队列中获取任务时发生了超时timedOut开始为false后面的循环末尾超时时会置为true)// 或者当前线程数量已经超过了最大线程数量那么尝试将workerCount减1,即当前活动线程数减1if ((wc maximumPoolSize || (timed timedOut)) (wc 1 || workQueue.isEmpty())) {// 如果减1成功则返回null这就意味着runWorker()方法中的while循环会被退出其对应的线程就要销毁了也就是线程池中少了一个线程了if (compareAndDecrementWorkerCount(c))return null;continue;}try {// 注意workQueue中的poll()方法与take()方法的区别//poll方式取任务的特点是从缓存队列中取任务,最长等待keepAliveTime的时长取不到返回null//take方式取任务的特点是从缓存队列中取任务若队列为空,则进入阻塞状态直到能取出对象为止Runnable r timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r ! null)return r;timedOut true; // 能走到这里说明已经超时了} catch (InterruptedException retry) {timedOut false;}}}
注释已经很清楚了getTask的作用就是在当前线程中1如果当前线程池线程数量大于核心线程数量或者设置了对核心线程进行超时控制的话此时相当于对所有线程进行超时控制就会去任务队列获取超时时间内的任务队列的poll方法获取到的话就会继续执行任务也就是执行runWorker方法中的while循环里的任务的run方法执行完成后又继续进入getTask从任务队列中获取下一个任务。如果在超时时间内没有获取到任务就会走到getTask的倒数第三行设置timeOut标记为true此时继续进入getTask的for循环中由于超时了那么就会进入尝试去去对线程数量-1操作-1成功了就直接返回一个null的任务这样就回到了当前线程执行的runWorker方法中该方法的while循环判断getTask为空直接退出循环这样当前线程就执行完成了意味着要被销毁了这样自然就会被回收器择时回收了。也就是线程池中少了一个线程了。因此只要线程池中的线程数大于核心线程数或者核心线程也允许超时就会这样一个一个地销毁这些多余的线程。2如果当前活动线程数小于等于核心线程数或者不允许核心线程超时同样也是去缓存队列中取任务但当缓存队列中没任务了就会进入阻塞状态队列的take方法直到能取出任务为止也就是队列中被新添加了任务时因此这个线程是处于阻塞状态的并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的可以随时处理任务从而达到重复利用的目的。综上所述线程之所以能达到复用就是在当前线程执行的runWorker方法中有个while循环while循环的第一个判断条件是执行当前线程关联的Worker对象中的任务执行一轮后进入while循环的第二个判断条件getTask()从任务队列中取任务取这个任务的过程要么是一直阻塞的要么是阻塞一定时间直到超时才结束的超时到了的时候这个线程也就走到了生命的尽头。然而在我们开始分析execute的时候这个方法中的三个部分都会调用addWorker去执行任务在addWorker方法中都会去新建一个线程来执行任务这样的话是不是每次execute都是去创建线程了事实上复用机制跟线程池的阻塞队列有很大关系我们可以看到在execute在核心线程满了但是队列不满的时候会把任务加入到队列中一旦加入成功之前被阻塞的线程就会被唤醒去执行新的任务这样就不会重新创建线程了。我们用个例子来看下假设我们有这么一个ThreadPoolExecutor核心线程数设置为5不允许核心线程超时最大线程数设置为10超时时间为20s线程队列是LinkedBlockingDeque相当于是个无界队列。当我们给这个线程池陆续添加任务前5个任务执行的时候会执行到我们之前分析的execute方法的第一步部分会陆续创建5个线程做为核心线程执行任务当前线程里面的5个关联的任务执行完成后会进入各自的while循环的第二个判断getTask中去取队列中的任务假设当前没有新的任务过来也就是没有执行execute方法那么这5个线程就会在workQueue.take()处一直阻塞的。这个时候我们执行execute加入一个任务即第6个任务这个时候会进入execute的第二部分将任务加入到队列中一旦加入队列之前阻塞的5个线程其中一个就会被唤醒取出新加入的任务执行了。这里有个execute的第二部分的后半段执行重复校验的代码即addWorker传入null任务目前还没搞明白是怎么回事。在我们这个例子中由于队列是无界的所以始终不会执行到execute的第三部分即启动非核心线程假如我们设置队列为有界的那么必然就会执行到这里了。小结通过以上的分析应该算是比较清楚地解答了“线程池中的核心线程是如何被重复利用的”这个问题同时也对线程池的实现机制有了更进一步的理解当有新任务来的时候先看看当前的线程数有没有超过核心线程数如果没超过就直接新建一个线程来执行新的任务如果超过了就看看缓存队列有没有满没满就将新任务放进缓存队列中满了就新建一个线程来执行新的任务如果线程池中的线程数已经达到了指定的最大线程数了那就根据相应的策略拒绝任务。当缓存队列中的任务都执行完了的时候线程池中的线程数如果大于核心线程数就销毁多出来的线程直到线程池中的线程数等于核心线程数。此时这些线程就不会被销毁了它们一直处于阻塞状态等待新的任务到来。注意本文所说的“核心线程”、“非核心线程”是一个虚拟的概念是为了方便描述而虚拟出来的概念在代码中并没有哪个线程被标记为“核心线程”或“非核心线程”所有线程都是一样的只是当线程池中的线程多于指定的核心线程数量时会将多出来的线程销毁掉池中只保留指定个数的线程。那些被销毁的线程是随机的可能是第一个创建的线程也可能是最后一个创建的线程或其它时候创建的线程。一开始我以为会有一些线程被标记为“核心线程”而其它的则是“非核心线程”在销毁多余线程的时候只销毁那些“非核心线程”而“核心线程”不被销毁。这种理解是错误的。
往期推荐
ThreadLocal内存溢出代码演示和原因分析ThreadLocal不好用那是你没用对额Java中用户线程和守护线程区别这么大