长春网站外包,移动互联网应用技术,wordpress标签增加彩色背景,甘肃两学一做网站引言
本篇博客介绍通过“执行任务”的机制来设计应用程序时需要掌握的一些知识。所有的内容均提炼自《Java并发编程实战》中第六章的内容。
大多数并发应用程序都是围绕“任务执行”来构造的#xff1a;任务通常是一些抽象的且离散的工作单元。
当围绕“任务执行”来设计应…引言
本篇博客介绍通过“执行任务”的机制来设计应用程序时需要掌握的一些知识。所有的内容均提炼自《Java并发编程实战》中第六章的内容。
大多数并发应用程序都是围绕“任务执行”来构造的任务通常是一些抽象的且离散的工作单元。
当围绕“任务执行”来设计应用程序结构时第一步就是要找出清晰的任务边界。在理想情况下各个任务之间是相互独立的任务并不依赖于其他任务的状态、结果或边界效应。
大多数服务器应用程序提供了一种自然的任务边界选择方式以独立的客户请求为边界。
不好的例子串行与“为每个任务创建线程”
在书中6.1节介绍了由最简单的串行执行任务到为每个任务创建一个线程这两种执行任务的方式。应该说这两种方式都是不可取的。
这一节主要是为了引出下一节介绍的“任务执行框架”。
其中“串行执行任务”的缺点是在一般的服务器应用程序中无法提高吞吐率或快速响应性。
而“为每个任务创建线程”的方式的问题在于可能导致高性能开销、高资源消耗、影响稳定性。 【重点】在工作或面试中也会遇到这个极富针对性的问题即大量创建线程会存在哪些问题 1、高性能开销创建和销毁都需要一定的代价创建过程需要时间延迟处理请求也需要jvm和操作系统提供一些辅助操作。 2、高资源消耗活跃的线程会消耗系统资源尤其是内存。当可运行的线程数量多余可用处理器的数量那么会有大量空闲的线程占用内存不仅给垃圾回收带来压力在竞争CPU的时候还将产生额外的性能开销。 3、影响稳定性大量线程占用内存内存不足导致可能抛出OutOfMemoryError系统崩溃。 线程数量的限制
书中在这里简单引出一个概念稳定性。
根据前后文的联系这里具体指的是应用程序不会因为线程过多而抛出OutOfMemoryError异常。
为了达到这种稳定性在可创建线程数量上存在一个限制。这个限制受平台以及多个因素影响包括JVM启动参数、Thread构造函数中请求的栈大小、底层操作系统对线程的限制等。例如在32位机器上其中一个主要的限制因素是线程栈的地址空间。每个线程都维护两个执行栈一个用于Java代码另一个用于原生代码。
通常JVM在默认情况下会生成一个复合栈大小约0.5M~1M这个值可以通过JVM标志 -Xss或通过Thread的构造函数来修改那么线程数量 ≈ 2^32(bit) / 0.5(MB) ≈几千或几万。
因此在一定范围内增加线程可以提高系统的吞吐率但如果超出这个范围再创建更多的线程只会降低程序的执行速度。
Executor接口
public interface Executor { void execute(Runnable command);
}
Executor是一个非常简单的接口只有一个execute(Runnable) 方法它是其他的灵活且强大的异步任务框架的基础。通过这种方式用Runnable来表示任务可以将任务的提交过程与执行过程解耦。
Executor本身就是基于生产者消费者提交任务相当于生产者执行任务相当于消费者因此如果要在程序中实现一个生产者-消费者的设计那么最简单的方式通常就是使用Executor。
什么是执行策略
执行策略定义了任务执行的“what、where、when、how”等方面主要是描述根据不同的资源而选择不同的执行方式一个最优执行策略应当是与硬件资源最匹配的。
线程池
先来看一下四种常用线程池的创建
ExecutorService newFixedThreadPool Executors.newFixedThreadPool(10);
ExecutorService newCachedThreadPool Executors.newCachedThreadPool();
ScheduledExecutorService newScheduledThreadPool Executors.newScheduledThreadPool(10);
ExecutorService newSingleThreadExecutor Executors.newSingleThreadExecutor();
其中ExecutorService extends ExecutorScheduledExecutorService extends ExecutorService 。 1、newFixedThreadPool(int) 创建一个定额线程池每提交一个任务创建一个线程达到数量限制后不再增加这时线程池的规模将不再变化如果某个线程由于发生了未预期的异常而结束那么线程池会补充一个新的线程 2、NewCachedThreadPool() : 创建一个可缓存的线程池线程池的规模不存在任何限制当线程多余任务时回收空闲线程当任务增加时创建新线程。 3、NewSingleThreadExecutor单线程的Executor如果这个线程异常结束会创建另一个线程来替代。NewSingleThreadExecutor能确保依照任务在队列中的顺序串行执行例如FIFO、LIFO、优先级。 4、NewScheduleThreadPool创建一个固定长度的线程池而且以延迟或定时的方式来执行任务类似于Timer。 Executor的生命周期 JVM只有在所有非守护线程全部终止后才会退出无法正确地关闭ExecutorJVM将无法结束。
Executor以异步的方式来执行任务导致了提交任务的状态不是立即可见的即有些任务可能已经完成有些可能正在执行还有些可能正在队列中等待执行。
ExecutorSevice接口就是为了解决执行服务的生命周期问题扩展了Executor接口。它添加了一些用于声明周期管理的方法同时还有一些用于任务提交的便利方法
public interface ExecutorService extends Executor { void shutdown(); ListRunnable shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // ......其他用于任务提交的便利方法
}
这五个方法是声明周期管理的方法其余的都是与任务提交相关的方法比如可以提交比较大的集合Callable对象的方法
invokeAll(Collection? extends CallableT tasks) 【重点】ExecutorService的三种状态运行、关闭、已终止 。 ExecutorService在初始创建时处于运行状态。shutdown()方法将执行平缓的关闭过程不再接受新的任务同时等待已经提交的任务执行完成——包括那些还未开始执行的任务。shutdownNow()方法将执行粗暴的关闭方式它将尝试取消所有运行中的任务并且不再启动队列中尚未开始执行的任务。
延迟任务与周期任务
Timer类负责管理延迟任务以及周期任务但它本身存在缺陷因此通常要用ScheduleThreadPoolExecutor的构造函数或newScheduleThreadPool工厂方法来创建该类对象。
Timer的缺陷在于Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长那么将破坏其他TimerTask的定时精确性。
Timer还有一个问题就是Timer线程不会捕获异常当TimerTask抛出未检查异常时将终止定时线程。Timer也不会恢复线程的执行而是会错误地任务整个Timer都被取消了。这就造成已经被调度但尚未执行的TimerTask将不会再执行新的任务也不会被调度。称之为“线程泄漏”。
【重点】生命周期小结 Runnable和Callable等任务的生命周期创建、提交、开始、完成、取消。 Future表示的就是一个任务的生命周期。 Thread的生命周期创建、就绪、运行、阻塞、死亡或结束。 ExecutorService的生命周期因为它继承自Executor因此也是Executor的生命周期创建、运行、关闭、已终止。 Callable与Future
callable
Runnable有一个局限性是没有返回值也没办法抛出受检异常。对于某些异步获得结果的任务无法胜任Callable应运而生。
它是Runnable的升级版既可以使用CallableVoid来达到Runnable一样的效果同时也可以使用CallableT 来指定返回结果。
创建Callable的方式有两种构造函数、静态的封装方法。
CallableString callableTask new CallableString() { Override public String call() throws Exception { return this is a callable task....; }
};
Java 8 style
CallableString callableTask () - { return this is a callable task....;
};
静态方法Executors.callable(Runnable task, T result)
CallableString call Executors.callable(() - {System.out.println(this is a runnable task...);
}, done!);
Future
future表示一个任务的生命周期。主要提供了一些方法用于判断任务处于哪个阶段还可以获取任务的结果甚至是取消任务。它本身还有一层隐含意义是任务的生命周期只能前进不能后退当一个任务处于“完成”状态就永远停留在“完成”状态上。这一点和ExecutorService的生命周期一样。
Future接口
public interface FutureV { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
创建Future的方式通常是使用ExecutorService的submit()方法获取返回值。如果想通过构造器的方式显式地创建一个任务的生命周期管理对象可以使用FutureTask。
FutureTaskString runnFutureTask new FutureTaskString(runnable, done!);
FutureTaskString callFutureTask new FutureTask(callable);
FutureTask类实现了Runnable和Future两个接口。
说明FutureTask是Java 5加入的类Java 6又为它补充了一个新的RunnableFuture接口Runnable接口和Future接口被提升到了RunnableFuture接口上这更像是一种重构手段我个人认为在实际开发中用途可能不及直接使用FutureTask 由于FutureTask实现了Runnable接口因此可以将它提交给Executor来执行或者直接调用它的run方法。 是的FutureTask的run()方法可以直接执行任务而不需要什么start。
Future.get
get()方法的行为取决于任务的状态尚未开始、正在运行、已完成如果任务已经完成那么get会立即返回或抛出一个Exception如果任务没有完成那么get将阻塞直到任务完成。如果任务抛出异常那么get将该异常封装成ExecutionException并重新抛出可以通过getCause来进一步获得被封装的初始异常。如果任务被取消那么get将抛出CancellationException。
异构任务并行化存在的局限
A与B两个完全不同的任务通过并行方式可以实现小幅度的性能提升但是如果想大幅度的提升存在一定的困难。因此得出一个结论是只有当大量相互独立且同构的任务可以并发进行处理时才能体现出真正的性能提升。
CompletionService与它的子类ExecutorCompletionService
CompletionService是Executor与BlockingQueue的融合。
回顾一下BlockingQueue的一些特性 BlockingQueue接口是Queue的子接口有两个最主要的实现LinkedBlockingQueue无界队列和ArrayBlockingQueue有界队列。take()或poll()方法都是BlockingQueue的取头元素的方法唯一不同的是当没有可用的头元素时take会无限期等待阻塞poll可以设置一个超时时间一旦超时将返回null。 CompletionService是在任务执行的功能上加入了队列的特性很明显是用于处理一批允许有返回值的任务。 用法创建一个CompletionServiceExecutorCompletionService对象。【ExecutorCompletionService的构造器允许我们传入一个ExecutorService用于采取不同的执行策略和一个BlockingQueue该参数可选默认LinkedBlockingQueue】然后可以将一组Callable任务提交给CompletionService来执行然后使用类似队列操作的take或poll方法来获取已完成的结果这些结果会在完成时被封装为Future。
【扩展】ExecutorCompletionService的实现很简单。首先通过构造函数创建一个BlockingQueue来保存计算结果然后当计算完成时调用FutureTask的done方法放入队列。展开当提交某个任务时该任务将首先包装为一个QueueingFuture这是FutureTask【回顾FutureTask实现了Future、Runnable】的一个子类QueueingFuture改写了FutureTask的done方法——将结果放入BlockingQueue中。take和poll方法委托给BlockingQueue方法这些方法会在得到结果之前阻塞。
为任务设置时限
有时候如果某个任务无法在指定时间内完成那么将不再需要它的结果此时可以放弃这个任务。Future.get中支持这种需求当结果可用时它将立即返回如果在指定时限内没有计算出结果那么抛出TimeoutException。
在使用时限任务时需要注意当这些任务超市后应该立即停止从而避免为继续计算一个不再使用的结果而浪费计算资源。
【使用Future.get为单个任务设置时限如果希望对一组任务设置计算时限比如前面介绍的CompletionService那么可以使用poll方法来设置执行时间】
invokeAll方法
ExecutorServie接口中有两个重载的invokeAll方法
T ListFutureT invokeAll(Collection? extends CallableT tasks) throws InterruptedException;
T ListFutureT invokeAll(Collection? extends CallableT tasks, long timeout, TimeUnit unit) throws InterruptedException;
invokeAll方法支持将多个任务提交到一个ExecutorService并获得结果。invokeAll方法的参数为一组任务并返回一组Future。invokeAll按照任务集合中迭代器的顺序将所有的Future添加到返回的集合中从而使调用者能够将各个Future与其表示的Callable关联起来。
当所有任务都执行完毕时或者调用线程被中断时又或者超过指定时限时invokeAll都会返回。当超过指定时限任何还未完成的任务都会取消。当invokeAll返回后每个任务要么正常完成要么被取消而客户端代码可以调用get或isCancelled来判断究竟是何种情况。
第六章小结
通过围绕任务执行来设计应用程序可以简化开发过程并有助于实现并发。
Executor框架将任务提交与执行策略解耦开来同时还支持多种不同类型的执行策略。当需要创建线程来执行任务时可以考虑使用Executor。
要想在将应用程序分解为不同的任务时获得最大的好处必须定义清晰的任务边界。某些应用程序中存在着比较明显的任务边界而在其他一些程序中则需要进一步分析才能揭示出粒度更细的并行性。