深圳优化网站排名,外企外贸是做什么的,网站开发视频资源放哪儿,wordpress 文章查看次数本文是我们学院课程中名为Java Concurrency Essentials的一部分 。 在本课程中#xff0c;您将深入探讨并发的魔力。 将向您介绍并发和并发代码的基础知识#xff0c;并学习诸如原子性#xff0c;同步和线程安全之类的概念。 在这里查看 #xff01; 目录 1.简介 2. jav… 本文是我们学院课程中名为Java Concurrency Essentials的一部分 。 在本课程中您将深入探讨并发的魔力。 将向您介绍并发和并发代码的基础知识并学习诸如原子性同步和线程安全之类的概念。 在这里查看 目录 1.简介 2. java.util.concurrent 2.1。 执行者 2.2。 执行器服务 2.3。 并发集合 2.4。 原子变量 2.5。 信号 2.6。 CountDownLatch 2.7。 循环屏障 3.下载源代码 1.简介 下一章介绍java.util.concurrent包。 在该程序包中包含许多有趣的类这些类提供了实现多线程应用程序所需的必要和有用的功能。 在讨论了如何使用Executor接口及其实现之后本章介绍了原子数据类型和并发数据结构。 最后一部分向信号灯和倒数锁存器发出信号。 2. java.util.concurrent 阅读了先前关于并发和多线程的文章之后您可能会觉得编写在多线程环境中执行良好的健壮代码并不总是那么简单。 有一个谚语可以说明这一点来源未知 初级程序员认为并发很难。 经验丰富的程序员认为并发很容易。 高级程序员认为并发很难。 因此一个可靠的数据结构和类库可提供经过良好测试的线程安全性对于编写使用并发性程序的任何人都非常有帮助。 幸运的是JDK为此目的提供了一组现成的数据结构和功能。 所有这些类都位于包java.util.concurrent中。 执行者 java.util.concurrent包定义了一组接口这些接口的实现执行任务。 其中最简单的一个是Executor接口 public interface Executor {void execute(Runnable command);
} 因此执行器实现采用给定的Runnable实例并执行它。 该接口不对执行方式进行任何假设javadoc仅声明“将来某个时候执行给定命令”。 因此一个简单的实现可以是 public class MyExecutor implements Executor {public void execute(Runnable r) {(new Thread(r)).start();}
} 除了纯接口外JDK还提供了一个成熟且可扩展的实现名为ThreadPoolExecutor 。 在后台 ThreadPoolExecutor维护线程池并在给定execute()方法的情况下将Runnable实例调度到该池。 传递给构造函数的参数控制线程池的行为。 参数最多的构造函数如下 ThreadPoolExecutorint corePoolSizeint maximumPoolSizelong keepAliveTimeTimeUnit单位BlockingQueue Runnable workQueueThreadFactory threadFactoryRejectedExecutionHandler处理程序 让我们逐步研究不同的参数 corePoolSize ThreadPoolExecutor具有一个corePoolSize属性该属性确定只有在队列已满时才启动新线程直到启动新线程为止。 maximumPoolSize 此属性确定最大启动多少线程。 您可以将其设置为Integer.MAX_VALUE 以使其没有上限。 keepAliveTime 当ThreadPoolExecutor创建的ThreadPoolExecutor数超过corePoolSize 当线程在给定的时间内空闲时该线程将从池中删除。 unit 这只是keepAliveTime的TimeUnit 。 workQueue 此队列保存通过execute()方法给定的Runnable实例直到它们实际启动为止。 threadFactory 此接口的实现使您可以控制ThreadPoolExecutor使用的线程的创建。 handler 当您为workQueue指定固定大小并提供maximumPoolSize时可能会发生ThreadPoolExecutor由于饱和而无法执行您的Runnable实例的情况。 在这种情况下将调用提供的处理程序并让您控制在这种情况下应该发生的情况。 由于有许多参数需要调整让我们检查一些使用它们的代码 public class ThreadPoolExecutorExample implements Runnable {private static AtomicInteger counter new AtomicInteger();private final int taskId;public int getTaskId() {return taskId;}public ThreadPoolExecutorExample(int taskId) {this.taskId taskId;}public void run() {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) {BlockingQueueRunnable queue new LinkedBlockingQueueRunnable(10);ThreadFactory threadFactory new ThreadFactory() {public Thread newThread(Runnable r) {int currentCount counter.getAndIncrement();System.out.println(Creating new thread: currentCount);return new Thread(r, mythread currentCount);}};RejectedExecutionHandler rejectedHandler new RejectedExecutionHandler() {public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (r instanceof ThreadPoolExecutorExample) {ThreadPoolExecutorExample example (ThreadPoolExecutorExample) r;System.out.println(Rejecting task with id example.getTaskId());}}};ThreadPoolExecutor executor new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS, queue, threadFactory, rejectedHandler);for (int i 0; i 100; i) {executor.execute(new ThreadPoolExecutorExample(i));}executor.shutdown();}
} 我们的run()实现仅睡5秒钟但这不是此代码的主要重点。 ThreadPoolExecutor从5个核心线程开始并允许池最多扩展到10个线程。 出于演示目的我们仅将未使用的线程闲置大约1秒钟。 这里的队列实现是LinkedBlockingQueue与10个的容量Runnable实例。 我们还实现了一个简单的ThreadFactory以便跟踪线程的创建。 对于RejectedExecutionHandler也是如此。 在环路main()方法现在发出100 Runnable实例很短的时间量内该池。 该示例的输出显示我们必须创建10个线程最多来处理所有未决的Runnables Creating new thread: 0
...
Creating new thread: 9
Rejecting task with id 20
...
Rejecting task with id 99 但它也显示所有taskId大于19的任务都转发到RejectedExecutionHandler 。 这是因为我们的Runnable实现休眠了5秒钟。 第10个线程已经启动后队列只能持有另外10个Runnable实例。 然后必须拒绝所有其他实例。 最后 shutdown()方法使ThreadPoolExecutor拒绝所有其他任务并等待直到已执行的任务已执行。 您可以将调用shutdown()替换为shutdownNow() 。 后者尝试中断所有正在运行的线程并关闭线程池而不等待所有线程完成。 在上面的示例中您会看到十个InterruptedException异常因为我们的十个睡眠线程被立即唤醒。 执行器服务 Executor接口非常简单它仅强制底层实现实现execute()方法。 ExecutorService进一步扩展了Executor接口并添加了一系列实用程序方法例如添加了完整的任务集合关闭线程池的方法以及查询实现以获取执行结果的能力执行一项任务。 我们已经看到 Runnable接口仅定义一个run()方法作为返回值是无效的。 因此有必要引入一个名为Callable的新接口该接口类似于Runnable定义也只有一个方法但是此方法返回一个值 V call(); 但是JDK如何处理任务返回一个值但提交给线程池以执行的事实呢 任务的提交者无法提前知道任务何时执行以及执行的持续时间。 让当前线程等待结果显然不是解决方案。 在另一个类java.util.concurrent.FutureV实现了检查结果是否已经可用的功能该功能可以阻止或等待一定时间。 此类只有几种方法可以检查任务是否已完成取消任务以及检索其结果。 最后但并非最不重要的一点是我们还有另一个接口该接口通过某些方法扩展了Executor接口和ExecutorService接口以在给定的时间点计划任务。 接口的名称为ScheduledExecutorService 它基本上提供了一个schedule()方法该方法使用一个参数来等待任务执行之前需要等待多长时间 schedule(CallableV callable, long delay, TimeUnit unit);
schedule(Runnable command, long delay, TimeUnit unit); 就像ExecutorService一样 schedule()方法有两种变体一种用于Runnable接口一种用于使用Callable接口返回值的任务。 ScheduledExecutorService还提供了一种定期执行任务的方法 scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); 在初始延迟旁边我们可以指定任务应运行的时间。 最后一个示例已经展示了如何创建ThreadPoolExecutor 。 ScheduledExecutorService的实现名为ScheduledThreadPoolExecutor 其处理方式与上面使用的ThreadPoolExecutor非常相似。 但是通常不需要完全控制ExecutorService的所有功能。 试想一下一个简单的测试客户端应该使用一个简单的ThreadPool调用一些服务器方法。 因此JDK的创建者创建了一个名为Executors的简单工厂类请注意结尾的。 此类提供了一些静态方法来创建可使用的ThreadPoolExecutor 。 所有这些使我们能够实现一个简单的线程池该线程池执行一堆计算一些数字的任务这里的数字运算操作是出于演示目的由一个简单的Thread.sleep()代替 public class ExecutorsExample implements CallableInteger {private static Random random new Random(System.currentTimeMillis());public Integer call() throws Exception {Thread.sleep(1000);return random.nextInt(100);}public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService Executors.newFixedThreadPool(5);FutureInteger[] futures new Future[5];for (int i 0; i futures.length; i) {futures[i] executorService.submit(new ExecutorsExample());}for (int i 0; i futures.length; i) {Integer retVal futures[i].get();System.out.println(retVal);}executorService.shutdown();}
} ExecutorService的创建是ExecutorService 。 要执行一些任务我们只需要一个for循环即可创建ExecutorsExample的一些新实例并将返回的Future存储在数组中。 将任务提交给服务后我们只需等待结果。 Future get()方法正在阻塞即当前线程进入睡眠状态直到结果可用。 如果任务未在定义的时间段内完成则此方法的重写版本采用超时规范以便等待线程继续进行。 并发集合 Java集合框架包含每个Java程序员在日常工作中使用的各种数据结构。 此集合由java.util.concurrent包中的数据结构扩展。 这些实现提供了在多线程环境中使用的线程安全集合。 许多Java程序员甚至不知不觉地使用线程安全的数据结构。 “旧”类Hashtable和Vector是此类的示例。 自1.0版以来它们是JDK的一部分这些基本数据结构在设计时考虑了线程安全性。 尽管此处的线程安全性仅意味着所有方法都在实例级别上同步。 以下代码取自Oracle的JDK实现 public synchronized void clear() {Entry tab[] table;modCount;for (int index tab.length; --index 0; )tab[index] null;count 0;
} 这与诸如HashMap或ArrayList 自JDK 1.2起都提供之类的“较新”集合类它们本身都不是线程安全的的关键区别。 但是有一种方便的方法可以检索此类“较新”的集合类的线程安全实例 HashMapLong,String map new HashMapLong, String();
MapLong, String synchronizedMap Collections.synchronizedMap(map); 正如我们在上面的代码中看到的那样 Collections类使我们可以在运行时创建以前未同步的collections类的同步版本。 如前所述将关键字sync同步到方法会导致在每个时间点只有一个线程执行所研究对象的方法。 当然这是使简单集合类具有线程安全性的最简单方法。 更高级的技术包括专为并发访问而设计的特殊算法。 这些算法在java.util.concurrent包的集合类java.util.concurrent实现。 此类的一个示例是ConcurrentHashMap ConcurrentHashMapLong,String map new ConcurrentHashMapLong,String();
map.put(key, value);
String value2 map.get(key); 上面的代码看起来与普通的HashMap几乎相同但是底层实现却完全不同。 ConcurrentHashMap不是将整个表仅使用一个锁而是将整个表细分为许多小分区。 每个分区都有自己的锁。 因此假设不同线程在表的不同分区上进行写入则它们从不同线程对该映射的写入操作不会竞争并且可以使用自己的锁。 该实现还引入了提交写操作的想法以减少读操作的等待时间。 这将略微更改读取操作的语义因为它将返回已完成的最新写入操作的结果。 这意味着在执行read方法之前和之后条目的数量可能不一样就像使用同步方法时一样但是对于并发应用程序这并不总是很重要。 ConcurrentHashMap的迭代器实现也是如此。 为了更好地了解Hashtable性能同步的HashMap和ConcurrentHashMap的性能让我们实现一个简单的性能测试。 以下代码启动了几个线程并允许每个线程从映射中的一个随机位置检索一个值然后在另一个随机位置更新一个值 public class MapComparison implements Runnable {private static MapInteger, String map;private Random random new Random(System.currentTimeMillis());public static void main(String[] args) throws InterruptedException {runPerfTest(new HashtableInteger, String());runPerfTest(Collections.synchronizedMap(new HashMapInteger,String()));runPerfTest(new ConcurrentHashMapInteger, String());runPerfTest(new ConcurrentSkipListMapInteger, String());}private static void runPerfTest(MapInteger, String map) throws InterruptedException {MapComparison.map map;fillMap(map);ExecutorService executorService Executors.newFixedThreadPool(10);long startMillis System.currentTimeMillis();for (int i 0; i 10; i) {executorService.execute(new MapComparison());}executorService.shutdown();executorService.awaitTermination(1, TimeUnit.MINUTES);System.out.println(map.getClass().getSimpleName() took (System.currentTimeMillis() - startMillis) ms);}private static void fillMap(MapInteger, String map) {for (int i 0; i 100; i) {map.put(i, String.valueOf(i));}}public void run() {for (int i 0; i 100000; i) {int randomInt random.nextInt(100);map.get(randomInt);randomInt random.nextInt(100);map.put(randomInt, String.valueOf(randomInt));}}
} 该程序的输出如下 Hashtable took 436 ms
SynchronizedMap took 433 ms
ConcurrentHashMap took 75 ms
ConcurrentSkipListMap took 89 ms 正如我们所期望的 Hashtable和同步的HashMap实现远远落后于并HashMap实现。 本示例还介绍了HashMap的跳过列表实现其中一个存储桶中的链接项形成一个跳过列表这意味着对列表进行了排序并且列表中链接项的级别不同。 最高级别的指针直接指向列表中间的某个项目。 如果该项目已经大于当前项目则迭代器必须采用下一个较低的链接级别以跳过比最高级别更少的元素。 跳过列表的详细说明可以在此处找到。 关于跳过列表的有趣之处在于即使所有项目都存储在同一存储桶中所有读取访问也要花费logn时间。 原子变量 当多个线程共享一个变量时我们需要同步对该变量的访问。 原因是这样的事实即使像i 这样的简单指令也不是原子的。 它基本上由以下字节码指令组成 iload_1
iinc 1, 1
istore_1 在不了解Java字节码的情况下人们看到了局部变量1的当前值被压入操作数堆栈它以常数1递增然后从堆栈中弹出并存储在局部变量号1中。 。 这意味着我们需要三个原子操作才能将局部变量加1。 在多线程环境中这还意味着调度程序可以停止在这些指令中的每条指令之间执行当前线程并启动一个新线程然后该新线程又可以在同一变量上工作。 为了应对这种情况您当然可以同步对此特定变量的访问 synchronized(i) {i;
} 但这也意味着当前线程必须获取i的锁这需要在JVM中进行一些内部同步和计算。 这种方法也称为悲观锁定因为我们认为另一个线程当前持有我们想要获取的锁定的可能性很高。 另一种称为乐观锁定的方法假定没有太多线程争用资源因此我们只是尝试更新该值并查看是否起作用。 此方法的一种实现是比较交换CAS方法。 此操作在许多现代CPU上实现为原子操作。 它将给定存储位置的内容与给定值“期望值”进行比较如果当前值等于期望值则将其更新为新值。 用伪代码看起来像 int currentValue getValueAtMemoryPosition(pos);
if(currentValue expectedValue) {setValueAtMemoryPosition(pos, newValue);
} CAS操作将上述代码实现为一个原子操作。 因此它可以用来查看某个变量的值是否仍为当前线程持有的值并在这种情况下将其更新为递增的值。 由于CAS操作的使用需要硬件支持因此JDK提供了特殊的类来支持这些操作。 它们都位于包java.util.concurrent.atomic中。 这些类的代表是java.util.concurrent.atomic.AtomicInteger 。 上面讨论的CAS操作是通过该方法实现的 boolean compareAndSet(int expect, int update) 布尔值返回值指示更新操作是否成功。 基于此功能可以实现进一步的操作例如原子增量操作此处取自Oracle的JDK实现 public final int getAndIncrement() {for (;;) {int current get();int next current 1;if (compareAndSet(current, next))return current;}} 现在我们可以通过不同的线程递增整数变量而无需使用悲观锁 public class AtomicIntegerExample implements Runnable {private static final AtomicInteger atomicInteger new AtomicInteger();public static void main(String[] args) {ExecutorService executorService Executors.newFixedThreadPool(5);for (int i 0; i 5; i) {executorService.execute(new AtomicIntegerExample());}executorService.shutdown();}public void run() {for (int i 0; i 10; i) {int newValue atomicInteger.getAndIncrement();if (newValue 42) {System.out.println([ Thread.currentThread().getName() ]: newValue);}}}
} 上面的代码启动了五个线程并让每个线程递增AtomicInteger变量。 得到答案的幸运线42将其打印到控制台。 重复执行此示例代码时输出将仅由一个线程创建。 在AtomicInteger旁边JDK还提供了用于对长值整数和长数组以及引用进行原子操作的类。 信号 信号量用于控制对共享资源的访问。 与简单的同步块相反信号量具有一个内部计数器该内部计数器在线程每次获取锁时增加而在线程释放其之前获得的锁时减少。 递增和递减操作当然是同步的因此可以使用信号量来控制同时通过关键部分的线程数。 线程的两个基本操作是 void acquire();
void release(); 构造函数采用并发锁定公平性参数的数量。 fairness参数决定是否在等待线程列表的开头或结尾设置尝试获取锁的新线程。 将新线程放在线程末尾可确保所有线程在一段时间后将获得锁因此不会出现线程饥饿的情况。 Semaphore(int permits, boolean fair) 为了说明所描述的行为让我们建立一个具有五个线程的简单线程池但通过一个信号量进行控制在每个时间点运行的信号不超过三个 public class SemaphoreExample implements Runnable {private static final Semaphore semaphore new Semaphore(3, true);private static final AtomicInteger counter new AtomicInteger();private static final long endMillis System.currentTimeMillis() 10000;public static void main(String[] args) {ExecutorService executorService Executors.newFixedThreadPool(5);for (int i 0; i 5; i) {executorService.execute(new SemaphoreExample());}executorService.shutdown();}public void run() {while(System.currentTimeMillis() endMillis) {try {semaphore.acquire();} catch (InterruptedException e) {System.out.println([Thread.currentThread().getName()] Interrupted in acquire().);}int counterValue counter.incrementAndGet();System.out.println([Thread.currentThread().getName()] semaphore acquired: counterValue);if(counterValue 3) {throw new IllegalStateException(More than three threads acquired the lock.);}counter.decrementAndGet();semaphore.release();}}
} 通过将3作为并发许可的数量来构造信号量。 当尝试获取锁时被阻止的线程可能会遇到必须捕获的InterruptedException 。 或者也可以调用实用程序方法acquireUninterruptibly()来绕过try-catch构造。 为确保关键部分中的并发线程不超过三个我们使用AtomicInteger 每次进程进入该部分时该AtomicInteger都会递增而在离开该部分之前会递减。 当计数器的值大于4时将引发IllegalStateException 。 最后我们release()信号量然后让另一个等待线程进入临界区。 CountDownLatch CountDownLatch类是另一个有助于从JDK进行线程同步的类。 类似于Semaphore类它提供了一个计数器但是CountDownLatch的计数器只能减少到零为止。 一旦计数器达到零所有等待CountDownLatch线程都可以继续。 当池中的所有线程必须在某个点进行同步才能继续进行时通常需要这种功能。 一个简单的示例是一个应用程序该应用程序必须先从不同来源收集数据然后才能将新数据集存储到数据库中。 以下代码演示了五个线程如何在随机时间内睡眠。 唤醒的每个线程都会对闩锁进行递减计数然后等待闩锁变为零。 最后所有线程输出它们已完成的输出。 public class CountDownLatchExample implements Runnable {private static final int NUMBER_OF_THREADS 5;private static final CountDownLatch latch new CountDownLatch(NUMBER_OF_THREADS);private static Random random new Random(System.currentTimeMillis());public static void main(String[] args) {ExecutorService executorService Executors.newFixedThreadPool(NUMBER_OF_THREADS);for (int i 0; i NUMBER_OF_THREADS; i) {executorService.execute(new CountDownLatchExample());}executorService.shutdown();}public void run() {try {int randomSleepTime random.nextInt(20000);System.out.println([ Thread.currentThread().getName() ] Sleeping for randomSleepTime);Thread.sleep(randomSleepTime);latch.countDown();System.out.println([ Thread.currentThread().getName() ] Waiting for latch.);latch.await();System.out.println([ Thread.currentThread().getName() ] Finished.);} catch (InterruptedException e) {e.printStackTrace();}}
} 运行此示例时您将看到输出“ Waiting for闩锁”。 在不同的时间点出现但“完成”。 每个线程的消息立即一个接一个地打印。 循环屏障 与CountDownLatch CyclicBarrier类实现了一个计数器该计数器在递减为零后可以重置。 所有线程必须调用其方法await()直到内部计数器设置为零为止。 等待的线程然后被唤醒并可以继续。 然后在内部将计数器重置为其原始值并且整个过程可以再次开始 public class CyclicBarrierExample implements Runnable {private static final int NUMBER_OF_THREADS 5;private static AtomicInteger counter new AtomicInteger();private static Random random new Random(System.currentTimeMillis());private static final CyclicBarrier barrier new CyclicBarrier(5, new Runnable() {public void run() {counter.incrementAndGet();}});public static void main(String[] args) {ExecutorService executorService Executors.newFixedThreadPool(NUMBER_OF_THREADS);for (int i 0; i NUMBER_OF_THREADS; i) {executorService.execute(new CyclicBarrierExample());}executorService.shutdown();}public void run() {try {while(counter.get() 3) {int randomSleepTime random.nextInt(10000);System.out.println([ Thread.currentThread().getName() ] Sleeping for randomSleepTime);Thread.sleep(randomSleepTime);System.out.println([ Thread.currentThread().getName() ] Waiting for barrier.);barrier.await();System.out.println([ Thread.currentThread().getName() ] Finished.);}} catch (Exception e) {e.printStackTrace();}}
} 上面的示例与CountDownLatch非常相似但是与前面的示例相反我向run()方法添加了while循环。 这种run()实现使每个线程都能继续进行sleeping和await()过程直到计数器为三。 还要注意提供给CyclicBarrier的构造函数的匿名Runnable实现。 每当障碍被触发时其run()方法都会执行。 在这里我们增加了并发线程检查的计数器。 3.下载源代码 您可以下载本课程的源代码 concurrency-4.zip 翻译自: https://www.javacodegeeks.com/2015/09/the-java-util-concurrent-package.html