乐山建设网站,android 网站模板下载,免费咨询医生的软件,自己怎么做外贸网站Java并发编程艺术#xff1a;深度剖析多线程利器
前言
在当今软件开发的世界中#xff0c;多线程编程已经变得愈发重要。面对多核处理器的普及和复杂的系统架构#xff0c;开发人员需要深入了解并发编程的原理和实践#xff0c;以充分发挥硬件的性能潜力。本文将带您深入…Java并发编程艺术深度剖析多线程利器
前言
在当今软件开发的世界中多线程编程已经变得愈发重要。面对多核处理器的普及和复杂的系统架构开发人员需要深入了解并发编程的原理和实践以充分发挥硬件的性能潜力。本文将带您深入探讨Java中的并发与多线程编程介绍一系列强大的Java库和框架助您更好地处理并发挑战。 文章目录 Java并发编程艺术深度剖析多线程利器前言1. java.util.concurrent 包1.1 Executor 框架1.2 并发集合1.3 同步器1.4 原子变量1.5 CompletableFuture1.6 Phaser1.7 LinkedTransferQueue 2. Akka2.1 Actor 模型2.2 并发与分布式2.2.1 Clustering2.2.2 Sharding2.2.3 Distributed Data2.2.4 Akka Streams2.2.5 Akka HTTP 3. RxJava3.1 Observable 与 Observer3.2 操作符3.3 背压处理3.4 任务调度器3.5 错误处理3.6 异步与并行3.7 扩展与自定义操作符 4. ForkJoin 框架4.1 工作窃取算法4.2 ForkJoinPool 类4.3 RecursiveTask 与 RecursiveAction 类4.4 ForkJoinTask 类4.5 RecursiveTask 与 RecursiveAction 类续4.5.1 RecursiveTask4.5.2 RecursiveAction 5. Disruptor5.1 高性能无锁并发框架5.2 RingBuffer 数据结构5.3 生产者-消费者模式5.4 应用于低延迟的金融交易系统 6. Guava 并发库6.1 ListenableFuture 接口6.2 Futures 工具类6.3 SettableFuture 类6.4 ListeningExecutorService 接口6.5 RateLimiter 类6.6 Cache 类 总结 1. java.util.concurrent 包
1.1 Executor 框架
Executor 框架是 Java 提供的用于管理线程的框架它包含一组接口和类用于简化多线程编程。其中Executor 接口是整个框架的核心定义了执行任务的基本协议。下面是一个简单的使用 ThreadPoolExecutor 的例子
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class ExecutorExample {public static void main(String[] args) {// 创建线程池ExecutorService executorService Executors.newFixedThreadPool(2);// 提交任务executorService.submit(() - {System.out.println(Task 1 executed by Thread.currentThread().getName());});executorService.submit(() - {System.out.println(Task 2 executed by Thread.currentThread().getName());});// 关闭线程池executorService.shutdown();}
}1.2 并发集合
并发集合是为了在多线程环境中提供安全的数据操作而设计的。ConcurrentHashMap 是一个线程安全的哈希表实现CopyOnWriteArrayList 是一个线程安全的动态数组实现。以下是它们的简单应用
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;public class ConcurrentCollectionExample {public static void main(String[] args) {// 使用 ConcurrentHashMapMapString, String concurrentMap new ConcurrentHashMap();concurrentMap.put(key1, value1);concurrentMap.put(key2, value2);// 使用 CopyOnWriteArrayListCopyOnWriteArrayListString copyOnWriteList new CopyOnWriteArrayList();copyOnWriteList.add(Item1);copyOnWriteList.add(Item2);}
}1.3 同步器
同步器用于协调多个线程的执行。CountDownLatch 允许一个或多个线程等待其他线程完成操作CyclicBarrier 用于多线程之间的同步Semaphore 用于控制同时访问的线程数量。下面是一个使用 CountDownLatch 的例子
import java.util.concurrent.CountDownLatch;public class CountDownLatchExample {public static void main(String[] args) throws InterruptedException {// 创建 CountDownLatch设置计数器为2CountDownLatch latch new CountDownLatch(2);// 启动两个线程new Thread(() - {System.out.println(Task 1 executed);latch.countDown();}).start();new Thread(() - {System.out.println(Task 2 executed);latch.countDown();}).start();// 等待两个线程执行完毕latch.await();System.out.println(Both tasks completed);}
}1.4 原子变量
原子变量提供了一种无锁的线程安全的操作方式。AtomicInteger、AtomicLong 和 AtomicReference 分别用于整数、长整数和对象的原子操作。以下是 AtomicInteger 的使用示例
import java.util.concurrent.atomic.AtomicInteger;public class AtomicIntegerExample {public static void main(String[] args) {AtomicInteger atomicInteger new AtomicInteger(0);// 原子地增加值int result atomicInteger.incrementAndGet();System.out.println(Incremented value: result);// 原子地减少值result atomicInteger.decrementAndGet();System.out.println(Decremented value: result);}
}1.5 CompletableFuture
CompletableFuture 是 Java 8 引入的一个强大的异步编程工具它提供了一种简单而灵活的方式来处理异步操作的结果。相较于传统的 FutureCompletableFuture 允许你以声明式的方式构建异步操作流水线轻松地进行组合和转换。下面是一个使用 CompletableFuture 的示例
import java.util.concurrent.CompletableFuture;public class CompletableFutureExample {public static void main(String[] args) {// 异步执行任务CompletableFutureString future CompletableFuture.supplyAsync(() - {System.out.println(Task executed by Thread.currentThread().getName());return Result;});// 注册回调函数future.thenAccept(result - System.out.println(Async result: result));// 等待任务完成future.join();}
}在上述示例中通过 CompletableFuture.supplyAsync 异步执行任务使用 thenAccept 注册回调函数实现了异步任务的执行和结果处理。
1.6 Phaser
Phaser 是 Java 7 引入的同步辅助类它允许线程在多阶段并发算法中协同工作。Phaser 提供了更灵活的同步机制比传统的 CountDownLatch 和 CyclicBarrier 更强大。以下是一个简单的 Phaser 使用示例
import java.util.concurrent.Phaser;public class PhaserExample {public static void main(String[] args) {// 创建 Phaser设置参与的线程数目Phaser phaser new Phaser(3);// 启动三个线程new Thread(() - {System.out.println(Thread 1 arrived);phaser.arriveAndAwaitAdvance();}).start();new Thread(() - {System.out.println(Thread 2 arrived);phaser.arriveAndAwaitAdvance();}).start();new Thread(() - {System.out.println(Thread 3 arrived);phaser.arriveAndAwaitAdvance();}).start();}
}在上述示例中通过 Phaser 控制三个线程同时到达同一阶段实现了更加灵活的线程同步。
1.7 LinkedTransferQueue
LinkedTransferQueue 是 java.util.concurrent 包中的一个并发队列实现它具有高性能和可伸缩性。相较于其他阻塞队列LinkedTransferQueue 具有更好的性能特征特别适用于高并发场景。以下是一个简单的 LinkedTransferQueue 使用示例
import java.util.concurrent.LinkedTransferQueue;public class LinkedTransferQueueExample {public static void main(String[] args) {// 创建 LinkedTransferQueueLinkedTransferQueueString transferQueue new LinkedTransferQueue();// 生产者线程new Thread(() - {try {// 将元素传输给消费者transferQueue.transfer(Message from producer);System.out.println(Message sent by producer);} catch (InterruptedException e) {e.printStackTrace();}}).start();// 消费者线程new Thread(() - {try {// 接收生产者传输的元素String message transferQueue.take();System.out.println(Message received by consumer: message);} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}在上述示例中LinkedTransferQueue 实现了生产者与消费者之间的消息传输具有更好的性能表现。
2. Akka
2.1 Actor 模型
Actor 模型是一种并发计算的模型其中的 Actor 是并发执行的基本单位。在 Akka 中ActorSystem 是整个 Actor 模型的入口ActorRef 用于在 Actor 之间传递消息。以下是一个简单的 Actor 示例
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;public class ActorExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system ActorSystem.create(MySystem);// 创建 ActorActorRef myActor system.actorOf(Props.create(MyActor.class), myActor);// 发送消息给 ActormyActor.tell(Hello, Actor!, ActorRef.noSender());}// 定义一个简单的 Actorstatic class MyActor extends AbstractActor {Overridepublic Receive createReceive() {return receiveBuilder().match(String.class, message - {System.out.println(Received message: message);}).build();}}
}2.2 并发与分布式
Akka 提供了强大的并发和分布式支持。Clustering 允许将多个 ActorSystem 组成集群Sharding 用于将 Actor 分布到多个节点而 Distributed Data 提供了分布式数据结构的实现。
2.2.1 Clustering
Clustering 是 Akka 中用于构建集群的核心模块。通过 Cluster 模块可以将多个运行在不同 JVM 中的 ActorSystem 组成一个分布式集群实现节点之间的通信和协同工作。以下是一个简单的集群示例
import akka.actor.AbstractActor;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;public class ClusterExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system1 ActorSystem.create(ClusterSystem);ActorSystem system2 ActorSystem.create(ClusterSystem);// 将两个 ActorSystem 加入同一个集群Cluster.get(system1).join(Cluster.get(system2).selfAddress());// 创建一个运行在集群中的 Actorsystem1.actorOf(Props.create(ClusterActor.class), clusterActor);}// 集群中的 Actorstatic class ClusterActor extends AbstractActor {Overridepublic Receive createReceive() {return receiveBuilder().matchAny(message - {System.out.println(Received message: message by self().path());}).build();}}
}上述示例中ClusterActor 在两个不同的 ActorSystem 中运行并通过 Cluster 模块加入了同一个集群。这样它们可以相互通信形成一个分布式集群。
2.2.2 Sharding
Sharding 是 Akka 中用于分片管理的模块。它允许将大量的 Actor 实例分布到多个节点上每个节点负责一部分数据。这有助于提高并发性能和分布式扩展性。以下是一个简单的 Sharding 示例
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.sharding.ClusterSharding;
import akka.cluster.sharding.ClusterShardingSettings;
import akka.cluster.sharding.ShardRegion;public class ShardingExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system ActorSystem.create(ShardingSystem);// 创建 Sharding 区域ActorRef shardingRegion ClusterSharding.get(system).start(MyShardingActor, Props.create(ShardingActor.class), ClusterShardingSettings.create(system), new ShardingMessageExtractor(), new ShardingMessageExtractor());// 发送消息给 Sharding ActorshardingRegion.tell(new ShardingMessageExtractor.ShardingMessage(shard-1, Hello, Sharding Actor!), ActorRef.noSender());}// Sharding Actorstatic class ShardingActor extends AbstractActor {Overridepublic Receive createReceive() {return receiveBuilder().match(ShardingMessageExtractor.ShardingMessage.class, message - {System.out.println(Received sharded message: message.getMessage());}).build();}}// Sharding 消息提取器static class ShardingMessageExtractor implements ShardRegion.MessageExtractor {Overridepublic Object entityMessage(Object message) {return message;}Overridepublic String entityId(Object message) {if (message instanceof ShardingMessage) {return ((ShardingMessage) message).getShardId();}return null;}Overridepublic String shardId(Object message) {if (message instanceof ShardingMessage) {return ((ShardingMessage) message).getShardId();}return null;}static class ShardingMessage {private final String shardId;private final String message;public ShardingMessage(String shardId, String message) {this.shardId shardId;this.message message;}public String getShardId() {return shardId;}public String getMessage() {return message;}}}
}在上述示例中通过 ClusterSharding 模块创建了一个名为 “MyShardingActor” 的 Sharding 区域。消息通过 ShardingMessageExtractor 进行分片并由相应的 ShardingActor 处理。
2.2.3 Distributed Data
Distributed Data 是 Akka 中用于处理分布式数据的模块。它提供了一系列的分布式数据结构如 Replicator、LWWMap 等使得在分布式系统中更容易实现数据的一致性和复制。以下是一个简单的 Replicator 示例
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ddata.*;public class DistributedDataExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system1 ActorSystem.create(DistributedDataSystem);ActorSystem system2 ActorSystem.create(DistributedDataSystem);// 加入同一个集群Cluster.get(system1).join(Cluster.get(system2).selfAddress());// 创建 ReplicatorActorRef replicator1 DistributedData.get(system1).replicator();ActorRef replicator2 DistributedData.get(system2).replicator();// 创建分布式 MapReplicatedDataKeyString dataKey ReplicatedDataKey.create(myData, Replicators.CausalDiamond);// 在系统1中更新值replicator1.tell(new Replicator.Update(dataKey, Replicators.writeLocal(), key, new Replicator.UpdateData(value, Replicators.writeLocal())),ActorRef.noSender());// 在系统2中读取值replicator2.tell(new Replicator.Get(dataKey, Replicators.readLocal()), ActorRef.noSender());}// Actor 处理分布式数据的更新和读取static class DistributedDataActor extends AbstractActor {Overridepublic Receive createReceive() {return receiveBuilder().match(Replicator.GetSuccess.class, success - {System.out.println(Read value: success.get(dataKey).get(key));}).build();}}
}在上述示例中通过 DistributedData 模块创建了两个 ActorSystem并在集群中加入了相同的地址。通过Replicator 进行分布式数据的复制和同步。在示例中通过 ReplicatedDataKey 创建了一个名为 “myData” 的分布式 Map并在系统1中更新了键值对 “key” 和 “value”然后在系统2中读取了该值。
2.2.4 Akka Streams
Akka Streams 是 Akka 中用于处理流数据的模块。它提供了一种声明式的方式来操作和处理数据流适用于异步、高并发的场景。以下是一个简单的 Akka Streams 示例
import akka.actor.ActorSystem;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;import java.util.Arrays;public class AkkaStreamsExample {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system ActorSystem.create(AkkaStreamsSystem);// 创建流执行环境ActorMaterializer materializer ActorMaterializer.create(system);// 创建数据源SourceInteger, ? source Source.from(Arrays.asList(1, 2, 3, 4, 5));// 定义数据处理流程source.map(value - value * 2).filter(value - value 5).to(Sink.foreach(System.out::println)).run(materializer);}
}在上述示例中通过 Akka Streams 创建了一个数据源并定义了一个数据处理流程包括将每个元素乘以2、过滤掉小于等于5的元素最后将结果打印出来。
2.2.5 Akka HTTP
Akka HTTP 是 Akka 中用于构建高性能、可伸缩的 HTTP 服务的模块。它提供了一套强大而灵活的 API支持异步和流式处理。以下是一个简单的 Akka HTTP 服务示例
import akka.actor.ActorSystem;
import akka.http.javadsl.Http;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.server.AllDirectives;
import akka.http.javadsl.server.Route;
import akka.stream.ActorMaterializer;public class AkkaHttpExample extends AllDirectives {public static void main(String[] args) {// 创建 Actor 系统ActorSystem system ActorSystem.create(AkkaHttpSystem);// 创建流执行环境ActorMaterializer materializer ActorMaterializer.create(system);// 定义路由Route route path(hello, () -get(() -complete(Hello, Akka HTTP!)));// 启动 HTTP 服务Http.get(system).bindAndHandle(route.flow(system, materializer), ConnectHttp.toHost(localhost, 8080), materializer);}
}在上述示例中通过 Akka HTTP 定义了一个简单的路由当访问 “/hello” 路径时返回 “Hello, Akka HTTP!”。
3. RxJava
3.1 Observable 与 Observer
RxJava 是响应式编程库基于观察者模式。Observable 代表一个可被观察的对象而 Observer 则是观察者。以下是一个简单的 RxJava 示例
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class RxJavaExample {public static void main(String[] args) {// 创建 ObservableObservableString observable Observable.just(Hello, RxJava);// 创建 ObserverObserverString observer new ObserverString() {Overridepublic void onSubscribe(Disposable d) {System.out.println(Subscribed);}Overridepublic void onNext(String value) {System.out.println(Received: value);}Overridepublic void onError(Throwable e) {System.err.println(Error: e.getMessage());}Overridepublic void onComplete() {System.out.println(Completed);}};// 订阅 Observableobservable.subscribe(observer);}
}3.2 操作符
RxJava 提供了丰富的操作符用于对发射的数据进行变换、过滤和合并等操作。以下是一些常用的操作符示例
import io.reactivex.Observable;
import io.reactivex.functions.Function;public class RxJavaOperatorsExample {public static void main(String[] args) {// 转换操作符mapObservableInteger numbers Observable.just(1, 2, 3, 4, 5);numbers.map(value - value * 2).subscribe(System.out::println);// 过滤操作符filterObservableString fruits Observable.just(Apple, Banana, Orange, Grape);fruits.filter(fruit - fruit.length() 5).subscribe(System.out::println);// 合并操作符zipObservableInteger integers Observable.just(1, 2, 3);ObservableString strings Observable.just(A, B, C);Observable.zip(integers, strings, (num, str) - num str).subscribe(System.out::println);}
}3.3 背压处理
在处理大量数据或者处理速度较快的情况下为了避免产生过多的数据导致内存溢出需要使用背压处理机制。RxJava 提供了 Flowable 类来支持背压处理。以下是一个简单的背压处理示例
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;public class RxJavaBackpressureExample {public static void main(String[] args) throws InterruptedException {FlowableInteger flowable Flowable.create(emitter - {for (int i 1; i 1000; i) {emitter.onNext(i);}emitter.onComplete();}, BackpressureStrategy.BUFFER);flowable.observeOn(Schedulers.io()).subscribe(System.out::println);Thread.sleep(1000); // 等待异步线程执行}
}3.4 任务调度器
RxJava 提供了不同的调度器Scheduler来控制任务在不同线程上的执行。例如IoScheduler 用于执行 I/O 操作ComputationScheduler 用于执行计算密集型任务。以下是一个简单的任务调度器示例
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;public class RxJavaSchedulersExample {public static void main(String[] args) throws InterruptedException {Observable.just(Task 1, Task 2, Task 3).observeOn(Schedulers.io()).map(task - {System.out.println(Executing task on thread Thread.currentThread().getName());return task;}).subscribe();Thread.sleep(1000); // 等待异步线程执行}
}3.5 错误处理
在 RxJava 中错误处理是一个重要的方面可以通过 onError 回调来处理发生的异常。以下是一个简单的错误处理示例
import io.reactivex.Observable;public class RxJavaErrorHandlingExample {public static void main(String[] args) {ObservableInteger numbers Observable.just(1, 2, 3, 4, 5);numbers.map(value - {if (value 3) {throw new RuntimeException(Error at value 3);}return value;}).subscribe(System.out::println,throwable - System.err.println(Error: throwable.getMessage()));}
}3.6 异步与并行
RxJava 提供了多种方式来实现异步和并行操作。以下是一个简单的异步与并行示例
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;public class RxJavaAsyncParallelExample {public static void main(String[] args) throws InterruptedException {Observable.just(Task 1, Task 2, Task 3).observeOn(Schedulers.io()).map(task - {System.out.println(Executing task on thread Thread.currentThread().getName());return task;}).subscribe();Thread.sleep(1000); // 等待异步线程执行}
}3.7 扩展与自定义操作符
RxJava 允许开发者扩展和自定义操作符以满足特定的需求。以下是一个简单的自定义操作符示例
import io.reactivex.Observable;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;public class RxJavaCustomOperatorExample {public static void main(String[] args) {Observable.just(1, 2, 3, 4, 5).lift(upperCaseOperator()).subscribe(System.out::println);}private static ObservableOperatorString, Integer upperCaseOperator() {return observer - new ObserverInteger() {Overridepublic void onSubscribe(Disposable d) {observer.onSubscribe(d);}Overridepublic void onNext(Integer value) {observer.onNext(String.valueOf(value).toUpperCase());}Overridepublic void onError(Throwable e) {observer.onError(e);}Overridepublic void onComplete() {observer.onComplete();}};}
}4. ForkJoin 框架
4.1 工作窃取算法
ForkJoin 框架采用工作窃取算法Work-Stealing使得任务可以在多个线程之间高效地分配和执行。工作窃取算法允许线程在执行完自己的任务后主动去窃取其他线程的任务执行从而实现任务的动态负载均衡。这样当某个线程执行完自己的任务后它可以帮助其他线程执行任务提高整体的并发效率。
4.2 ForkJoinPool 类
ForkJoinPool 是 ForkJoin 框架的核心类负责管理工作线程池。它提供了一个通用的线程池用于执行 ForkJoinTask。在一般情况下可以使用默认的无参构造函数创建一个 ForkJoinPool也可以通过构造函数指定并行度Parallelism即并发执行的线程数目。
下面是一个使用 ForkJoin 框架计算数组元素和的示例
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;public class ForkJoinExample {public static void main(String[] args) {int[] array {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};ForkJoinPool forkJoinPool new ForkJoinPool();// 创建一个计算任务SumTask task new SumTask(array, 0, array.length);// 提交任务并获取结果int result forkJoinPool.invoke(task);System.out.println(Sum of array elements: result);}static class SumTask extends RecursiveTaskInteger {private final int[] array;private final int start;private final int end;public SumTask(int[] array, int start, int end) {this.array array;this.start start;this.end end;}Overrideprotected Integer compute() {// 如果任务足够小直接计算结果if (end - start 2) {int sum 0;for (int i start; i end; i) {sum array[i];}return sum;}// 否则拆分任务int middle (start end) / 2;SumTask leftTask new SumTask(array, start, middle);SumTask rightTask new SumTask(array, middle, end);// 并行执行子任务leftTask.fork();rightTask.fork();// 合并子任务的结果return leftTask.join() rightTask.join();}}
}上述示例中我们通过 ForkJoinPool 创建了一个工作线程池然后定义了一个 SumTask 继承自 RecursiveTask用于计算数组元素的和。在 compute 方法中我们判断任务是否足够小如果是则直接计算结果否则将任务拆分成两个子任务并并行执行。最后合并子任务的结果得到最终的计算结果。
4.3 RecursiveTask 与 RecursiveAction 类
RecursiveTask 用于表示有返回值的任务而 RecursiveAction 用于表示无返回值的任务。在上面的例子中我们使用了 RecursiveTask因为我们希望计算出数组元素的和并返回一个结果。
4.4 ForkJoinTask 类
ForkJoinTask 是 ForkJoin 框架中所有任务的基类它提供了一些用于管理任务执行的方法。在前面的例子中RecursiveTask 继承了 ForkJoinTask并通过 fork 方法实现了任务的拆分与并行执行通过 join 方法实现了子任务结果的合并。
这样通过 ForkJoin 框架我们能够方便地实现复杂的任务拆分与并行执行从而充分利用多核处理器的性能。
4.5 RecursiveTask 与 RecursiveAction 类续
在 ForkJoinTask 中RecursiveTask 和 RecursiveAction 分别用于表示有返回值的任务和无返回值的任务。
4.5.1 RecursiveTask
RecursiveTask 是一个泛型类用于表示有返回值的任务。通过继承 RecursiveTask可以实现自定义的有返回值的任务。以下是一个简单的示例计算斐波那契数列的第 n 项
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;public class FibonacciExample {public static void main(String[] args) {int n 10;ForkJoinPool forkJoinPool new ForkJoinPool();// 创建一个计算任务FibonacciTask task new FibonacciTask(n);// 提交任务并获取结果int result forkJoinPool.invoke(task);System.out.println(Fibonacci number at position n : result);}static class FibonacciTask extends RecursiveTaskInteger {private final int n;public FibonacciTask(int n) {this.n n;}Overrideprotected Integer compute() {if (n 1) {return n;}FibonacciTask leftTask new FibonacciTask(n - 1);leftTask.fork();FibonacciTask rightTask new FibonacciTask(n - 2);rightTask.fork();return leftTask.join() rightTask.join();}}
}在上述示例中通过继承 RecursiveTask实现了一个计算斐波那契数列第 n 项的任务。任务在计算过程中拆分为两个子任务分别计算第 n - 1 和第 n - 2 项然后合并子任务的结果得到最终结果。
4.5.2 RecursiveAction
RecursiveAction 是一个泛型类用于表示无返回值的任务。通过继承 RecursiveAction可以实现自定义的无返回值的任务。以下是一个简单的示例打印斐波那契数列的前 n 项
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.ForkJoinPool;public class FibonacciPrintExample {public static void main(String[] args) {int n 10;ForkJoinPool forkJoinPool new ForkJoinPool();// 创建一个打印任务FibonacciPrintTask task new FibonacciPrintTask(n);// 提交任务forkJoinPool.invoke(task);}static class FibonacciPrintTask extends RecursiveAction {private final int n;public FibonacciPrintTask(int n) {this.n n;}Overrideprotected void compute() {int[] fib new int[n];computeFibonacci(fib, n);for (int i 0; i n; i) {System.out.print(fib[i] );}}private void computeFibonacci(int[] fib, int n) {fib[0] 0;if (n 1) {fib[1] 1;computeFibonacci(fib, n, 2);}}private void computeFibonacci(int[] fib, int n, int current) {if (current n) {fib[current] fib[current - 1] fib[current - 2];computeFibonacci(fib, n, current 1);}}}
}在上述示例中通过继承 RecursiveAction实现了一个打印斐波那契数列前 n 项的任务。任务在计算过程中递归调用自身直到计算完成。由于该任务无返回值因此 compute 方法不需要返回结果。
5. Disruptor
5.1 高性能无锁并发框架
Disruptor 是一个专注于高性能、无锁并发的框架主要应用于金融领域的低延迟系统。它的核心思想是通过环形缓冲区RingBuffer实现高效的事件发布与订阅避免了传统锁机制可能带来的性能瓶颈。
5.2 RingBuffer 数据结构
RingBuffer 是 Disruptor 中的核心数据结构采用环形缓冲区的形式存储事件。它通过使用预分配的数组避免了链式结构的内存分配减少了垃圾回收的压力。多个生产者可以同时向 RingBuffer 中发布事件多个消费者可以同时订阅并处理事件。这种设计使得 Disruptor 能够以极低的延迟处理大量的事件。
5.3 生产者-消费者模式
Disruptor 基于生产者-消费者模式通过无锁的方式实现了高效的事件处理。生产者负责向 RingBuffer 中发布事件消费者负责订阅并处理事件。由于 RingBuffer 的环形结构生产者和消费者之间不存在竞争关系不需要加锁从而避免了传统并发编程中锁带来的性能开销。
5.4 应用于低延迟的金融交易系统
Disruptor 的高性能和低延迟特性使其在金融领域的高频交易系统中得到广泛应用。在这些系统中对事件的处理速度要求极高而 Disruptor 的设计理念正好满足了这些需求。通过有效地利用现代计算机硬件的特性避免了传统锁机制可能引入的性能问题使得 Disruptor 成为处理金融交易等对低延迟要求极高的场景的理想选择。
以下是一个简单的使用 Disruptor 的示例演示了生产者和消费者之间的协作
import com.lmax.disruptor.*;import java.util.concurrent.Executors;public class DisruptorExample {public static void main(String[] args) {// 创建 Disruptor 环境DisruptorEvent disruptor new Disruptor(Event::new, 1024, Executors.defaultThreadFactory(), ProducerType.SINGLE, new YieldingWaitStrategy());// 设置事件处理器disruptor.handleEventsWith(new EventHandlerEvent() {Overridepublic void onEvent(Event event, long sequence, boolean endOfBatch) {// 处理事件System.out.println(Event: event.getData() processed by Thread.currentThread().getName());}});// 启动 Disruptordisruptor.start();// 创建生产者RingBufferEvent ringBuffer disruptor.getRingBuffer();EventProducer producer new EventProducer(ringBuffer);// 发布事件for (int i 0; i 10; i) {producer.publishEvent(i);}// 关闭 Disruptordisruptor.shutdown();}static class Event {private int data;public int getData() {return data;}public void setData(int data) {this.data data;}}static class EventProducer {private final RingBufferEvent ringBuffer;public EventProducer(RingBufferEvent ringBuffer) {this.ringBuffer ringBuffer;}public void publishEvent(int data) {// 获取下一个可用的序号long sequence ringBuffer.next();try {// 获取序号对应的事件对象Event event ringBuffer.get(sequence);// 设置事件数据event.setData(data);} finally {// 发布事件ringBuffer.publish(sequence);}}}
}上述示例中我们使用了 Disruptor 框架创建了一个环境定义了一个事件类 Event并设置了事件处理器。然后创建了一个生产者 EventProducer通过调用 publishEvent 发布事件。通过 Disruptor 的内部机制生产者和消费者之间的通信实现了高效的事件处理。在实际应用中可以根据具体场景进一步定制事件处理逻辑。
6. Guava 并发库
6.1 ListenableFuture 接口
ListenableFuture 是 Guava 提供的接口扩展了 JDK 的 Future 接口允许注册回调函数。这个接口的设计目的是为了在异步操作完成时执行特定操作而不需要显式地等待异步操作的完成。以下是一个简单的使用示例
import com.google.common.util.concurrent.*;import java.util.concurrent.Executors;public class ListenableFutureExample {public static void main(String[] args) {// 创建 ListeningExecutorServiceListeningExecutorService executorService MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));// 提交异步任务ListenableFutureString future executorService.submit(() - {System.out.println(Task executed by Thread.currentThread().getName());return Result;});// 注册回调函数Futures.addCallback(future, new FutureCallbackString() {Overridepublic void onSuccess(String result) {System.out.println(Success! Result: result);}Overridepublic void onFailure(Throwable t) {System.err.println(Failure: t.getMessage());}}, executorService);// 关闭 executorServiceexecutorService.shutdown();}
}在上述示例中我们使用 Guava 的 ListeningExecutorService 包装了 JDK 的 ExecutorService并通过 Futures.addCallback 注册了回调函数。这样当异步任务完成时将自动执行注册的回调函数。
6.2 Futures 工具类
Guava 的 Futures 工具类提供了一系列用于处理 ListenableFuture 的静态方法。除了 addCallback还有其他一些方法例如 transform、transformAsync 等用于对异步操作的结果进行转换或组合。这些方法的设计目的是为了简化异步编程的复杂性使代码更加清晰。
import com.google.common.util.concurrent.*;import java.util.concurrent.Executors;public class FuturesExample {public static void main(String[] args) {ListeningExecutorService executorService MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));// 提交异步任务ListenableFutureInteger future executorService.submit(() - {System.out.println(Task executed by Thread.currentThread().getName());return 42;});// 使用 transform 转换结果ListenableFutureString transformedFuture Futures.transform(future, Object::toString, executorService);// 注册回调函数Futures.addCallback(transformedFuture, new FutureCallbackString() {Overridepublic void onSuccess(String result) {System.out.println(Transformed Result: result);}Overridepublic void onFailure(Throwable t) {System.err.println(Failure: t.getMessage());}}, executorService);// 关闭 executorServiceexecutorService.shutdown();}
}在上述示例中我们使用 Futures.transform 将异步任务的结果从整数转换为字符串。通过使用 Guava 的工具方法我们能够更加方便地对异步操作的结果进行处理。
6.3 SettableFuture 类
SettableFuture 是 Guava 提供的一个实现了 ListenableFuture 接口的类可以手动设置异步任务的结果。它允许在异步任务的执行体中根据实际情况设置成功或失败的结果从而更加灵活地控制异步任务的执行。
以下是一个使用 SettableFuture 的示例
import com.google.common.util.concurrent.*;import java.util.concurrent.Executors;public class SettableFutureExample {public static void main(String[] args) {ListeningExecutorService executorService MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2));// 创建 SettableFutureSettableFutureString settableFuture SettableFuture.create();// 手动设置异步任务的结果executorService.submit(() - {try {Thread.sleep(1000); // 模拟异步操作settableFuture.set(Success Result);} catch (InterruptedException e) {settableFuture.setException(e);}});// 注册回调函数Futures.addCallback(settableFuture, new FutureCallbackString() {Overridepublic void onSuccess(String result) {System.out.println(Success! Result: result);}Overridepublic void onFailure(Throwable t) {System.err.println(Failure: t.getMessage());}}, executorService);// 关闭 executorServiceexecutorService.shutdown();}
}在上述示例中我们使用 SettableFuture 创建了一个实现了 ListenableFuture 接口的对象并在异步任务的执行体中手动设置了成功的结果。这样我们就可以灵活地在异步操作中控制结果的设置。
6.4 ListeningExecutorService 接口
Guava 的 ListeningExecutorService 接口是对 JDK 的 ExecutorService 的扩展允许执行异步任务并提供了一些用于处理 ListenableFuture 的方法。它提供了 submit 方法用于提交异步任务以及 shutdown 方法用于关闭 executor。通过使用 ListeningExecutorService我们可以更方便地处理异步任务的结果。
以上是 Guava 并发库的一些基本用法和实例Guava 提供了丰富的工具类和接口能够简化并发编程的复杂性提高代码的可读性和可维护性。在实际应用中可以根据具体的需求选择合适的工具类和接口以便更高效地处理并发操作。
6.5 RateLimiter 类
RateLimiter 是 Guava 提供的一个用于令牌桶算法的实现类用于控制某个资源访问的速度。通过 RateLimiter我们可以限制对资源的访问频率以便更好地控制系统的并发性。
以下是一个使用 RateLimiter 的简单示例
import com.google.common.util.concurrent.RateLimiter;public class RateLimiterExample {public static void main(String[] args) {// 创建一个每秒发放两个令牌的 RateLimiterRateLimiter rateLimiter RateLimiter.create(2.0);// 模拟请求for (int i 0; i 10; i) {// 尝试获取令牌double waitTime rateLimiter.acquire();// 执行业务逻辑System.out.println(Request i served after waiting for waitTime seconds);}}
}在上述示例中我们创建了一个每秒发放两个令牌的 RateLimiter然后模拟了一系列请求。通过 acquire 方法我们可以获取令牌并在获取令牌的过程中进行阻塞以控制请求的速率。
6.6 Cache 类
Cache 是 Guava 提供的缓存实现类用于将键值对存储在内存中以便更快地检索数据。Cache 提供了一系列的方法用于将数据放入缓存、从缓存中获取数据以及清理缓存等操作。
以下是一个使用 Cache 的简单示例
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;import java.util.concurrent.TimeUnit;public class CacheExample {public static void main(String[] args) {// 创建一个最大容量为 100过期时间为 5 分钟的缓存CacheString, String cache CacheBuilder.newBuilder().maximumSize(100).expireAfterWrite(5, TimeUnit.MINUTES).build();// 将数据放入缓存cache.put(key1, value1);cache.put(key2, value2);// 从缓存中获取数据String value1 cache.getIfPresent(key1);String value3 cache.getOrDefault(key3, default);System.out.println(Value 1: value1);System.out.println(Value 3: value3);}
}在上述示例中我们使用 CacheBuilder 创建了一个最大容量为 100过期时间为 5 分钟的缓存。然后我们将数据放入缓存并通过 getIfPresent 和 getOrDefault 方法从缓存中获取数据。
Guava 并发库中还有其他许多有用的类和接口用于处理并发编程中的各种场景。在实际应用中可以根据具体的需求选择合适的工具类和接口以便更高效地处理并发操作。
总结
通过学习本文提供的内容读者将掌握Java中并发编程的核心概念和高级工具。了解多线程编程的基本原理、并发库的使用方法以及适用场景将使开发者能够更加自信地构建高性能、可伸缩且稳定的应用程序。在竞争激烈的软件开发领域具备并发编程的深入知识将成为区分优秀开发者的重要标志。