公司做一个网站内容如何设计方案,集团企业网工管理系统,深圳全网营销系统,视频直播怎么赚钱的文章目录面试官#xff1a;能给我讲讲线程池的实现原理#xff1f;线程池类继承关系ThreadPoolExecutor核心数据结构面试官#xff1a;给我讲讲线程池的有哪些参数#xff1f;面试官#xff1a;如何优雅的关闭线程#xff1f;线程的生命周期面试官#xff1a;线程池哪五…
文章目录面试官能给我讲讲线程池的实现原理线程池类继承关系ThreadPoolExecutor核心数据结构面试官给我讲讲线程池的有哪些参数面试官如何优雅的关闭线程线程的生命周期面试官线程池哪五种状态面试官线程池哪4种拒绝策略并分别说一下作用和实现原理DiscardOldestPolicyAbortPolicyDiscardPolicyCallerRunsPolicy面试官线程池常用的阻塞队列有哪些能说下各自的区别SynchronousQueue应用PriorityBlockedQueue应用DelayQueue应用面试官如何结合业务合理的配置线程池参数CPU密集型和IO密集型如何配置线程设置过多会造成什么影响CPU 密集型任务IO密集型任务面试官给我讲讲什么是线程复用面试官为什么《阿里巴巴开发手册》不推荐使用Executor创建线程ScheduledThreadPoolExecutor延时执行周期执行面试题你知道延迟执行、周期性执行任务实现原理面试题为什么不使用Timer而使用ScheduledThreadPoolExecutor?CompletableFuture异步编程工具基本使用四种任务原型面试题你知道CompletableFuture内部原理?CompletableFuture的构造ForkJoinPool任务类型的适配任务的链式执行过程分析什么是 Java8 的 ForkJoinPool应用核心数据结构面试官能给我讲讲线程池的实现原理
声回答该问题需要了解线程池有哪些方法并讲解每个方法的作用以及各个类的继承关系线程池的运行原理线程池的状态转换、生命周期线程池的构造参数线程池Runnable-Worker-Thread执行任务-线程复用机制等 线程池类继承关系 ThreadPoolExecutor
核心数据结构
public class ThreadPoolExecutor extends AbstractExecutorService {//存储线程池的状态和线程数量private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));// 存放任务的阻塞队列private final BlockingQueueRunnable workQueue;// 对线程池内部各种变量进行互斥访问控制private final ReentrantLock mainLock new ReentrantLock();// 线程集合private final HashSetWorker workers new HashSetWorker();
每一个线程是一个Worker对象Worker是ThreadPoolExecutor内部类核心数据结构如下
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread; // Worker封装的线程Runnable firstTask; // Worker接收到的第1个任务volatile long completedTasks; // Worker执行完毕的任务个数
}由定义会发现Worker继承于AQS也就是说Worker本身就是一把锁。这把锁有什么用处呢用于线程池的关闭、线程执行任务的过程中。
面试官给我讲讲线程池的有哪些参数
ThreadPoolExecutor在其构造方法中提供了几个核心配置参数来配置不同策略的线程池。 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueRunnable workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) corePoolSize核心线程数-线程池中始终维护的线程MaxPoolSize最大线程数-达到核心线程数并且阻塞队列慢的时候会扩充到最大线程数KeepAliveTime、TimeUnit空闲超过该时间后线程会被销毁WorkQueue任务阻塞队列-当核心线程满的时候会放入阻塞队列中ThreadFactory线程工厂-可以根据业务自定义创建线程修改线程名称Handler拒绝策略-最大线程满并且阻塞队列慢了之后新的任务进来会触发拒绝策略
面试官如何优雅的关闭线程
线程池的关闭比线程的关闭更加复杂因为线程池的关闭涉及到很多场景如果有线程正在执行任务如果任务队列不为空还有当前线程进来如何处理因此关闭过程不可能是瞬时的而是需要一个平滑的过渡这就涉及线程池的完整生命周期管理。
线程的生命周期
private final AtomicInteger ctl new AtomicInteger(ctlOf(RUNNING, 0));在JDK 7中把线程数量workerCount和线程池状态runState这两个变量打包存储在一个字 段里面即ctl变量。如下图所示最高的3位存储线程池状态其余29位存储线程个数。而在JDK 6中 这两个变量是分开存储的。 关于内部封装的获取生命周期状态、获取线程池线程数量的计算方法如以下代码所示
private static int runStateOf(int c) { return c ~CAPACITY; } //计算当前运行状态
private static int workerCountOf(int c) { return c CAPACITY; } //计算当前线程数量
private static int ctlOf(int rs, int wc) { return rs | wc; } //通过状态和线程数生成ctl面试官ctl为什么这样设计这样做的好处 用一个变量去存储两个值可避免在做相关决策时出现不一致的情况不必为了维护两者的一致而占用锁资源。通过阅读线程池源代码也可以发现经常出现要同时判断线程池运行状态和线程数量的情况。线程池也提供了若干方法去供用户获得线程池当前的运行状态、线程个数。这里都使用的是位运算的方式相比于基本运算速度也会快很多。 线程状态转换过程 状态解释
切记线程状态-1、0、1、2、3转化只能从小到大而不能逆向转换。 除 terminated()之外线程池还提供了其他几个钩子方法这些方法的实现都是空的。如果想实现 自己的线程池可以重写这几个方法
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }面试官线程池哪五种状态 // runState is stored in the high-order bitsprivate static final int RUNNING -1 COUNT_BITS;private static final int SHUTDOWN 0 COUNT_BITS;private static final int STOP 1 COUNT_BITS;private static final int TIDYING 2 COUNT_BITS;private static final int TERMINATED 3 COUNT_BITS;面试官线程池哪4种拒绝策略并分别说一下作用和实现原理
接口类
public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}实现类
DiscardOldestPolicy public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }/*** 从任务队列中调用poll()方法删除最先入队列的(最老的)任务* 拓展队列是先进先出由此调用poll()方法是取出的是先入队列的数据*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}AbortPolicy public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.* 丢弃准备添加的任务并抛出异常* param r the runnable task requested to be executed* param e the executor attempting to execute this task* throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException(Task r.toString() rejected from e.toString());}}DiscardPolicy public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.* 不做任何处理丢弃准备添加的任务* param r the runnable task requested to be executed* param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}CallerRunsPolicy public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {code CallerRunsPolicy}.*/public CallerRunsPolicy() { }/*** Executes task r in the callers thread, unless the executor* has been shut down, in which case the task is discarded.* 准备添加的任务直接调用run()方法交给提交任务的线程执行* param r the runnable task requested to be executed* param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}面试官线程池常用的阻塞队列有哪些能说下各自的区别
队列说明ArrayBlockedQueue数组实现有界队列FIFO先入先出支持公平锁、非公平锁LinkedBlockedQueue单链表实现的有界队列如果不指定容量默认为Integer.MAX_VALUESynchronousQueue不存储元素的队列每个put()操作时必须有线程正在调用take()该元素才存在Executors.newCacheThreadPool()就使用该队列每来一个任务如果没有空闲线程(线程复用)则创建新线程执行任务PriorityBlockedQueue无界的优先队列默认按自然排序自定义实现compareTo()定制自己优先级不同保证同优先级顺序DelayQueue无界延迟队列利用PriorityBlockedQueue实现在创建元素时可以指定多久能够获取到该元素只有满足延迟时间才能获取到数据ScheduledThreadPoolExecutor定时任务就是利用自己实现的延时队列(思想一致)
SynchronousQueue应用 Testpublic void SynchronousQueue() throws InterruptedException {SynchronousQueueInteger queue new SynchronousQueue();Random random new Random();AtomicInteger ait new AtomicInteger(0);new Thread(() - {try {for (int i 0; i 3; i) {Integer integer queue.take();if (integer ! null){int count ait.incrementAndGet();System.out.println(count - integer);}}} catch (InterruptedException e) {e.printStackTrace();}}).start();TimeUnit.SECONDS.sleep(3);new Thread(() - {for (int i 0; i 3; i) {queue.offer(random.nextInt());}}).start();TimeUnit.SECONDS.sleep(5);}PriorityBlockedQueue应用
和PriorityQueue使用一样无非就是加了锁阻塞生产、消费者线程 Testpublic void priorityQueue(){PriorityQueueInteger queue new PriorityQueue(new ComparatorInteger() {Overridepublic int compare(Integer o1, Integer o2) {return Integer.compare(o1, o2);}});queue.add(2);queue.add(1);queue.add(3);while (!queue.isEmpty()){System.out.println(queue.poll());}PriorityQueueCustomRank queue2 new PriorityQueue();queue2.add(new CustomRank(2));queue2.add(new CustomRank(1));queue2.add(new CustomRank(3));while (!queue2.isEmpty()){System.out.println(queue2.poll().v);}}public class CustomRank implements ComparableCustomRank{Integer v;public CustomRank(Integer v) {this.v v;}Overridepublic int compareTo(CustomRank o) {return Integer.compare(this.v, o.v);}}DelayQueue应用 Testpublic void delayQueue() throws InterruptedException {DelayQueueCustomTimeTask queue new DelayQueue();queue.add(new CustomTimeTask(我是第一个任务, 4, TimeUnit.SECONDS));queue.add(new CustomTimeTask(我是第二个任务, 8, TimeUnit.SECONDS));queue.add(new CustomTimeTask(我是第三个任务, 16, TimeUnit.SECONDS));while (!queue.isEmpty()){CustomTimeTask task queue.take();System.out.format(name: {%s}, time: {%s} \n, task.name, new Date());}}class CustomTimeTask implements Delayed{//触发时间long time;//任务名称String name;public CustomTimeTask(String name,long time, TimeUnit timeUnit) {this.time System.currentTimeMillis() timeUnit.toMillis(time);this.name name;}Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}/*** 利用优先队列将任务按照触发时间从小到大排序* param o* return*/Overridepublic int compareTo(Delayed o) {CustomTimeTask other (CustomTimeTask) o;return Long.compare(this.time, other.time);}Overridepublic String toString() {return CustomTimeTask{ time time , name name \ };}}
面试官如何结合业务合理的配置线程池参数CPU密集型和IO密集型如何配置线程设置过多会造成什么影响
答案其实没有完整的公式去计算我在使用的时候一般是根据业务场景动态的去改变线程池参数选择最优配置方案
CPU 密集型任务
IO密集型任务 面试官给我讲讲什么是线程复用
什么是线程复用 通过同一个线程去执行不同的任务这就是线程复用。
java.util.concurrent.ThreadPoolExecutor#execute public void execute(Runnable command) {// 如果传入的Runnable的空就抛出异常if (command null)throw new NullPointerException();int c ctl.get();// 线程池中的线程比核心线程数少 if (workerCountOf(c) corePoolSize) {// 新建一个核心线程执行任务if (addWorker(command, true))return;c ctl.get();}// 核心线程已满但是任务队列未满添加到队列中if (isRunning(c) workQueue.offer(command)) {int recheck ctl.get();// 任务成功添加到队列以后再次检查是否需要添加新的线程因为已存在的线程可能被销毁了if (! isRunning(recheck) remove(command))// 如果线程池处于非运行状态并且把当前的任务从任务队列中移除成功则拒绝该任务reject(command);else if (workerCountOf(recheck) 0)// 如果之前的线程已经被销毁完新建一个非核心线程addWorker(null, false);}// 核心线程池已满队列已满尝试创建一个非核心新的线程else if (!addWorker(command, false))// 如果创建新线程失败说明线程池关闭或者线程池满了拒绝任务reject(command);}
线程复用源码分析java.util.concurrent.ThreadPoolExecutor#runWorker final void runWorker(Worker w) {Thread wt Thread.currentThread();Runnable task w.firstTask;w.firstTask null;w.unlock(); // 释放锁 设置work的state0 允许中断boolean completedAbruptly true;try {//一直执行 如果task不为空 或者 从队列中获取的task不为空while (task ! null || (task getTask()) ! null) {task.run();//执行task中的run方法}}completedAbruptly false;} finally {//1.将 worker 从数组 workers 里删除掉//2.根据布尔值 allowCoreThreadTimeOut 来决定是否补充新的 Worker 进数组 workersprocessWorkerExit(w, completedAbruptly);}}
面试官为什么《阿里巴巴开发手册》不推荐使用Executor创建线程 ScheduledThreadPoolExecutor
延时执行 ScheduledThreadPoolExecutor threadPool new ScheduledThreadPoolExecutor(1, new ThreadFactory() {Overridepublic Thread newThread(Runnable r) {return new Thread(r, schedule-thread);}});/*** 延迟执行* throws InterruptedException*/Testvoid testSchedule() throws InterruptedException {CountDownLatch countDownLatch new CountDownLatch(1);System.out.println(new Date());threadPool.schedule(new TimeTask(), 3, TimeUnit.SECONDS);countDownLatch.await();}class TimeTask implements Runnable{Overridepublic void run() {try {TimeUnit.SECONDS.sleep(4);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() new Date() 任务执行完成);}}周期执行
1.scheduleAtFixedRate方法 按固定频率执行与任务本身执行时间无关。但有个前提条件任务执行时间必须小于间隔时间例如间隔时间是5s每5s执行一次任务任务的执行时间必须小于5s。 Testvoid testScheduleAtFixedRate() throws InterruptedException {CountDownLatch countDownLatch new CountDownLatch(1);threadPool.scheduleAtFixedRate(new TimeTask(), 2, 3, TimeUnit.SECONDS);countDownLatch.await();}2.scheduleWithFixedDelay方法 按固定间隔执行与任务本身执行时间有关。例如任务本身执行时间是10s间隔2s则下一次开始执行的时间就是12s。 Testvoid testScheduleWithFixedDelay() throws InterruptedException {CountDownLatch countDownLatch new CountDownLatch(1);threadPool.scheduleWithFixedDelay(new TimeTask(), 2, 3, TimeUnit.SECONDS);countDownLatch.await();}面试题你知道延迟执行、周期性执行任务实现原理
ScheduledThreadPoolExecutor继承了ThreadPoolExecutor这意味着其内部的数据结构和ThreadPoolExecutor是基本一样的。
延迟执行任务依靠的是DelayQueue。DelayQueue是 BlockingQueue的一种其实现原理是二叉堆。
而周期性执行任务是执行完一个任务之后再把该任务扔回到任务队列中如此就可以对一个任务反复执行。
不过这里并没有使用DelayQueue而是在ScheduledThreadPoolExecutor内部又实现了一个特定的DelayQueue。 static class DelayedWorkQueue extends AbstractQueueRunnableimplements BlockingQueueRunnable {...}其原理和DelayQueue一样但针对任务的取消进行了优化。下面主要讲延迟执行和周期性执行的实现过程。
延迟执行设计原理 传进去的是一个Runnable外加延迟时间delay。在内部通过decorateTask(…)方法把Runnable包装成一个ScheduleFutureTask对象而DelayedWorkQueue中存放的正是这种类型的对象这种类型的对象一定实现了Delayed接口。 从上面的代码中可以看出schedule()方法本身很简单就是把提交的Runnable任务加上delay时间转换成ScheduledFutureTask对象放入DelayedWorkerQueue中。任务的执行过程还是复用的ThreadPoolExecutor延迟的控制是在DelayedWorkerQueue内部完成的。
周期性执行设计原理 和schedule(…)方法的框架基本一样也是包装一个ScheduledFutureTask对象只是在延迟时间参数之外多了一个周期参数然后放入DelayedWorkerQueue就结束了。
两个方法的区别在于一个传入的周期是一个负数另一个传入的周期是一个正数为什么要这样做呢
用于生成任务序列号的sequencer创建ScheduledFutureTask的时候使用 private class ScheduledFutureTaskVextends FutureTaskV implements RunnableScheduledFutureV {/** Sequence number to break ties FIFO */private final long sequenceNumber;/** 延时时间 */private long time;private final long period;/** The actual task to be re-enqueued by reExecutePeriodic */RunnableScheduledFutureV outerTask this;/*** Index into delay queue, to support faster cancellation.*/int heapIndex;/*** Creates a one-shot action with given nanoTime-based trigger time.*/ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time ns;this.period 0;this.sequenceNumber sequencer.getAndIncrement();}/*** Creates a periodic action with given nano time and period.*/ScheduledFutureTask(Runnable r, V result, long ns, long period) {super(r, result);this.time ns;this.period period;this.sequenceNumber sequencer.getAndIncrement();}public long getDelay(TimeUnit unit) {return unit.convert(time - now(), NANOSECONDS);}public int compareTo(Delayed other) {if (other this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask? x (ScheduledFutureTask?)other;long diff time - x.time;if (diff 0)return -1;else if (diff 0)return 1;else if (sequenceNumber x.sequenceNumber)return -1;elsereturn 1;}long diff getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff 0) ? -1 : (diff 0) ? 1 : 0;}public boolean isPeriodic() {return period ! 0;}/*** 设置下一个执行时间*/private void setNextRunTime() {long p period;if (p 0)time p;elsetime triggerTime(-p);}public boolean cancel(boolean mayInterruptIfRunning) {boolean cancelled super.cancel(mayInterruptIfRunning);if (cancelled removeOnCancel heapIndex 0)remove(this);return cancelled;}/***实现Runnable*/public void run() {boolean periodic isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);// 如果不是周期执行则执行一次else if (!periodic)ScheduledFutureTask.super.run();// 如果是周期执行则重新设置下一次运行的时间重新入队列else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}//下一次触发时间long triggerTime(long delay) {return now() ((delay (Long.MAX_VALUE 1)) ? delay : overflowFree(delay));}//放到队列中等待下一次执行void reExecutePeriodic(RunnableScheduledFuture? task{if (canRunInCurrentRunState(true)) {super.getQueue().add(task);if (!canRunInCurrentRunState(true) remove(task))task.cancel(false);elseensurePrestart();}}}withFixedDelay和atFixedRate的区别就体现在setNextRunTime里面。
如果是atFixedRateperiod0下一次开始执行时间等于上一次开始执行时间period
如果是withFixedDelayperiod 0下一次开始执行时间等于triggerTime(-p)为now(-period)now即上一次执行的结束时间。
面试题为什么不使用Timer而使用ScheduledThreadPoolExecutor?
Timer使用的是绝对时间系统时间的改变会对Timer产生一定的影响而ScheduledThreadPoolExecutor使用的是相对时间所以不会有这个问题。Timer使用单线程来处理任务长时间运行的任务会导致其他任务的延时处理而ScheduledThreadPoolExecutor可以自定义线程数量。Timer没有对运行时异常进行处理一旦某个任务触发运行时异常会导致整个Timer崩溃而ScheduledThreadPoolExecutor对运行时异常做了捕获可以在afterExecute()回调方法中进行处理所以更加安全。
CompletableFuture异步编程工具
基本使用
package net.dreamzuora.thread;import org.testng.annotations.Test;import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;/*** 异步编程工具*/
public class CompletableFutureDemo {/*** CompletableFuture实现了Future接口所以它也具有Future的特性调用get()方法会阻塞在那* 直到结果返回。* 另外1个线程调用complete方法完成该Future则所有阻塞在get()方法的线程都将获得返回结果。* throws ExecutionException* throws InterruptedException*/Testvoid complete() throws ExecutionException, InterruptedException {CompletableFutureString completeFuture new CompletableFuture();new Thread(() - {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}completeFuture.complete(gome);}).start();System.out.println(completeFuture.get());}/*** 阻塞等待任务执行完成*/Testvoid runAsyncTest() throws ExecutionException, InterruptedException {CompletableFutureVoid completableFuture CompletableFuture.runAsync(() - {try {TimeUnit.SECONDS.sleep(3);System.out.println(hello word!);} catch (InterruptedException e) {e.printStackTrace();}});//阻塞等待任务完成completableFuture.get();System.out.println(succ);}/*** 带返回值的任务执行* throws ExecutionException* throws InterruptedException*/Testvoid supplyAsync() throws ExecutionException, InterruptedException {CompletableFutureString stringCompletableFuture CompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}return hello;}});String result stringCompletableFuture.get();System.out.println(result);}/*** thenRun()上个任务结束再执行(不带上一个返回值结果)下一个任务* thenAccept后面跟的是一个有参数、无返回值的方法称为Consumer返回值也是* CompletableFutureVoid类型。顾名思义只进不出所以称为Consumer前面的* Supplier是无参数有返回值只出不进和Consumer刚好相反。* throws ExecutionException* throws InterruptedException*/Testvoid thenRun() throws ExecutionException, InterruptedException {CompletableFutureVoid completableFuture CompletableFuture.runAsync(() - {System.out.println(第一次执行);}).thenRun(new Runnable() {Overridepublic void run() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(第二次执行);}});completableFuture.get();}/*** thenAccept()上个任务结束再执行(前面任务的结果作为下一个任务的入参)下一个任务* throws ExecutionException* throws InterruptedException*/Testvoid thenAccept() throws ExecutionException, InterruptedException {CompletableFutureVoid completableFuture CompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {return hello;}}).thenAccept(new ConsumerString() {Overridepublic void accept(String param) {System.out.println(param word!);}});completableFuture.get();}/*** thenApply 后面跟的是一个有参数、有返回值的方法称为Function。返回值是* CompletableFutureString类型。* throws ExecutionException* throws InterruptedException*/Testvoid thenApply() throws ExecutionException, InterruptedException {CompletableFutureString stringCompletableFuture CompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return 第一个任务执行完成;}}).thenApply(new FunctionString, String() {Overridepublic String apply(String firstTaskResult) {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return firstTaskResult 第二个任务执行完成;}});String result stringCompletableFuture.get();System.out.println(result);}/*** 第1个参数是一个CompletableFuture类型第2个参数是一个方法并且是一个BiFunction也就* 是该方法有2个输入参数1个返回值。* 从该接口的定义可以大致推测它是要在2个 CompletableFuture 完成之后把2个* CompletableFuture的返回值传进去再额外做一些事情。* throws ExecutionException* throws InterruptedException*/Testvoid thenCompose() throws ExecutionException, InterruptedException {CompletableFutureString future CompletableFuture.supplyAsync((SupplierString) () - 第一个任务执行完成).thenCompose(new FunctionString, CompletionStageString() {Overridepublic CompletionStageString apply(String firstTask) {return CompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {return firstTask 第二个任务执行完成;}});}});String s future.get();System.out.println(s);}/*** 如果希望返回值是一个非嵌套的CompletableFuture可以使用thenCompose* throws ExecutionException* throws InterruptedException*/Testvoid thenCombine() throws ExecutionException, InterruptedException {CompletableFutureInteger future CompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {return 第一个任务执行完成 ;}}).thenCombine(CompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {return 第二个任务执行完成 ;}}), new BiFunctionString, String, Integer() {Overridepublic Integer apply(String s1, String s2) {return s1.length() s2.length();}});System.out.println(future.get());}/*** 等待所有的CompletableFuture执行完成无返回值* throws ExecutionException* throws InterruptedException*/Testvoid allOf() throws ExecutionException, InterruptedException {AtomicInteger atc new AtomicInteger(0);CompletableFuture[] completableFutures new CompletableFuture[10];for (int i 0; i 10; i){CompletableFuture supplyAsync CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return atc.incrementAndGet();}});completableFutures[i] supplyAsync;}CompletableFutureVoid completableFuture CompletableFuture.allOf(completableFutures);completableFuture.get();System.out.println(atc);}/*** anyOf:只要有任意一个CompletableFuture结束就可以做接下来的事情而无须像* AllOf那样等待所有的CompletableFuture结束。* 但由于每个CompletableFuture的返回值类型都可能不同任意一个意味着无法判断是什么类* 型所以anyOf的返回值是CompletableFutureObject类型*/Testvoid anyOf() throws ExecutionException, InterruptedException {AtomicInteger atc new AtomicInteger(0);CompletableFuture[] completableFutures new CompletableFuture[10];for (int i 0; i 10; i){CompletableFuture supplyAsync CompletableFuture.supplyAsync(new SupplierInteger() {Overridepublic Integer get() {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return atc.incrementAndGet();}});completableFutures[i] supplyAsync;}Integer result (Integer) CompletableFuture.anyOf(completableFutures).get();System.out.println(result);}
}
四种任务原型
通过上面的例子可以总结出提交给CompletableFuture执行的任务有四种类型Runnable、Consumer、Supplier、Function。下面是这四种任务原型的对比。 runAsync 与 supplierAsync 是 CompletableFuture 的静态方法而 thenAccept、thenAsync、thenApply是CompletableFutre的成员方法。
因为初始的时候没有CompletableFuture对象也没有参数可传所以提交的只能是Runnable或者Supplier只能是静态方法
通过静态方法生成CompletableFuture对象之后便可以链式地提交其他任务了这个时候就可以提交Runnable、Consumer、Function且都是成员方法。
面试题你知道CompletableFuture内部原理?
CompletableFuture的构造ForkJoinPool private static final Executor asyncPool useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();任务执行 public static CompletableFutureVoid runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}static CompletableFutureVoid asyncRunStage(Executor e, Runnable f) {if (f null) throw new NullPointerException();CompletableFutureVoid d new CompletableFutureVoid();e.execute(new AsyncRun(d, f));return d;} 通过上面的代码可以看到asyncPool是一个static类型supplierAsync、asyncSupplyStage也都是static方法。
Static方法会返回一个CompletableFuture类型对象之后就可以链式调用CompletionStage里面的各个方法。
任务类型的适配
我们向CompletableFuture提交的任务是Runnable/Supplier/Consumer/Function 。因此肯定需要一个适配机制把这四种类型的任务转换成ForkJoinTask然后提交给ForkJoinPool如下图所示 supplyAsync()-Supplier-AsyncSupply
在 supplyAsync(…)方法内部会把一个 Supplier 转换成一个 AsyncSupply然后提交给ForkJoinPool执行 public static U CompletableFutureU supplyAsync(SupplierU supplier) {return asyncSupplyStage(asyncPool, supplier);}static U CompletableFutureU asyncSupplyStage(Executor e,SupplierU f) {if (f null) throw new NullPointerException();CompletableFutureU d new CompletableFutureU();e.execute(new AsyncSupplyU(d, f));return d;}static final class AsyncSupplyT extends ForkJoinTaskVoidimplements Runnable, AsynchronousCompletionTask {CompletableFutureT dep; SupplierT fn;AsyncSupply(CompletableFutureT dep, SupplierT fn) {this.dep dep; this.fn fn;}...}runAsync()-Runnable-AsyncRun 在runAsync(…)方法内部会把一个Runnable转换成一个AsyncRun然后提交给ForkJoinPool执行 public static CompletableFutureVoid runAsync(Runnable runnable) {return asyncRunStage(asyncPool, runnable);}static CompletableFutureVoid asyncRunStage(Executor e, Runnable f) {if (f null) throw new NullPointerException();CompletableFutureVoid d new CompletableFutureVoid();e.execute(new AsyncRun(d, f));return d;}static final class AsyncRun extends ForkJoinTaskVoidimplements Runnable, AsynchronousCompletionTask {CompletableFutureVoid dep; Runnable fn;AsyncRun(CompletableFutureVoid dep, Runnable fn) {this.dep dep; this.fn fn;}...}thenAccept()-Consumer-UniAccept 在 thenRun/thenAccept/thenApply 内部会分别把Runnable/Consumer/Function 转换成UniRun/UniAccept/UniApply对象然后提交给ForkJoinPool执行
除此之外还有两种 CompletableFuture 组合的情况分为“与”和“或”所以有对应的Bi和Or类型 的Completion类型 public CompletableFutureVoid thenAccept(Consumer? super T action) {return uniAcceptStage(null, action);}private CompletableFutureVoid uniAcceptStage(Executor e,Consumer? super T f) {if (f null) throw new NullPointerException();CompletableFutureVoid d new CompletableFutureVoid();if (e ! null || !d.uniAccept(this, f, null)) {UniAcceptT c new UniAcceptT(e, d, this, f);push(c);c.tryFire(SYNC);}return d;}任务的链式执行过程分析
下面以CompletableFuture.supplyAsync(…).thenApply(…).thenRun(…)链式代码为例分析整个执行过程。 static final class AsyncSupplyT extends ForkJoinTaskVoidimplements Runnable, AsynchronousCompletionTask {...}什么是 Java8 的 ForkJoinPool
ForkJoinPool就是JDK7提供的一种“分治算法”的多线程并行计算框架。Fork意为分叉Join意为合并一分一合相互配合形成分治算法。此外也可以将ForkJoinPool看作一个单机版的 Map/Reduce多个线程并行计算。
相比于ThreadPoolExecutorForkJoinPool可以更好地实现计算的负载均衡提高资源利用率。 假设有5个任务在ThreadPoolExecutor中有5个线程并行执行其中一个任务的计算量很大其余4个任务的计算量很小这会导致1个线程很忙其他4个线程则处于空闲状态。 利用ForkJoinPool可以把大的任务拆分成很多小任务然后这些小任务被所有的线程执行从而 实现任务计算的负载均衡。 应用
1.斐波那契数列 Testvoid testForkJoin() throws ExecutionException, InterruptedException {ForkJoinPool forkJoinPool new ForkJoinPool();ForkJoinTaskInteger task forkJoinPool.submit(new FibonacciTask(5));System.out.println(task.get());}// 1 1 2 3 5 8 ...class FibonacciTask extends RecursiveTaskInteger {int n;public FibonacciTask(int n) {this.n n;}Overrideprotected Integer compute() {if (n 1){return 1;}FibonacciTask task1 new FibonacciTask(n - 1);task1.fork();FibonacciTask task2 new FibonacciTask(n - 2);task2.fork();return task1.join() task2.join();}}核心数据结构
与ThreadPoolExector不同的是除一个全局的任务队列之外每个线程还有一个自己的局部队列。
本课程内容参考 1.《并发编程78讲》-徐隆曦 滴滴出行高级工程师 2.美团技术博客-Java线程池实现原理及其在美团业务中的实践 3.《java并发编程实战》 4.CSDN博客-面试官你知道什么是线程池的线程复用原理吗