当前位置: 首页 > news >正文

河北省城乡和建设厅网站首页网站维护属于什么部门

河北省城乡和建设厅网站首页,网站维护属于什么部门,wordpress访问速度,做淘宝客网站有什么服务器Netty核心原理剖析与RPC实践21-25 21 技巧篇#xff1a;延迟任务处理神器之时间轮 HahedWheelTimer Netty 中有很多场景依赖定时任务实现#xff0c;比较典型的有客户端连接的超时控制、通信双方连接的心跳检测等场景。在学习 Netty Reactor 线程模型时#xff0c;我们知道…Netty核心原理剖析与RPC实践21-25 21 技巧篇延迟任务处理神器之时间轮 HahedWheelTimer Netty 中有很多场景依赖定时任务实现比较典型的有客户端连接的超时控制、通信双方连接的心跳检测等场景。在学习 Netty Reactor 线程模型时我们知道 NioEventLoop 不仅负责处理 I/O 事件而且兼顾执行任务队列中的任务其中就包括定时任务。为了实现高性能的定时任务调度Netty 引入了时间轮算法驱动定时任务的执行。时间轮到底是什么呢为什么 Netty 一定要用时间轮来处理定时任务呢JDK 原生的实现方案不能满足要求吗本节课我将一步步为你深入剖析时间轮的原理以及 Netty 中是如何实现时间轮算法的。 说明本文参考的 Netty 源码版本为 4.1.42.Final。 定时任务的基础知识 首先我们先了解下什么是定时任务定时器有非常多的使用场景大家在平时工作中应该经常遇到例如生成月统计报表、财务对账、会员积分结算、邮件推送等都是定时器的使用场景。定时器一般有三种表现形式按固定周期定时执行、延迟一定时间后执行、指定某个时刻执行。 定时器的本质是设计一种数据结构能够存储和调度任务集合而且 deadline 越近的任务拥有更高的优先级。那么定时器如何知道一个任务是否到期了呢定时器需要通过轮询的方式来实现每隔一个时间片去检查任务是否到期。 所以定时器的内部结构一般需要一个任务队列和一个异步轮询线程并且能够提供三种基本操作 Schedule 新增任务至任务集合Cancel 取消某个任务Run 执行到期的任务。 JDK 原生提供了三种常用的定时器实现方式分别为 Timer、DelayedQueue 和 ScheduledThreadPoolExecutor。下面我们逐一对它们进行介绍。 Timer Timer 属于 JDK 比较早期版本的实现它可以实现固定周期的任务以及延迟任务。Timer 会起动一个异步线程去执行到期的任务任务可以只被调度执行一次也可以周期性反复执行多次。我们先来看下 Timer 是如何使用的示例代码如下。 Timer timer new Timer(); timer.scheduleAtFixedRate(new TimerTask() {Overridepublic void run() {// do something} }, 10000, 1000); // 10s 后调度一个周期为 1s 的定时任务可以看出任务是由 TimerTask 类实现TimerTask 是实现了 Runnable 接口的抽象类Timer 负责调度和执行 TimerTask。接下来我们看下 Timer 的内部构造。 public class Timer {private final TaskQueue queue new TaskQueue();private final TimerThread thread new TimerThread(queue); public Timer(String name) {thread.setName(name);thread.start();} }TaskQueue 是由数组结构实现的小根堆deadline 最近的任务位于堆顶端queue[1] 始终是最优先被执行的任务。所以使用小根堆的数据结构Run 操作时间复杂度 O(1)新增 Schedule 和取消 Cancel 操作的时间复杂度都是 O(logn)。 Timer 内部启动了一个 TimerThread 异步线程不论有多少任务被加入数组始终都是由 TimerThread 负责处理。TimerThread 会定时轮询 TaskQueue 中的任务如果堆顶的任务的 deadline 已到那么执行任务如果是周期性任务执行完成后重新计算下一次任务的 deadline并再次放入小根堆如果是单次执行的任务执行结束后会从 TaskQueue 中删除。 DelayedQueue DelayedQueue 是 JDK 中一种可以延迟获取对象的阻塞队列其内部是采用优先级队列 PriorityQueue 存储对象。DelayQueue 中的每个对象都必须实现 Delayed 接口并重写 compareTo 和 getDelay 方法。DelayedQueue 的使用方法如下 public class DelayQueueTest {public static void main(String[] args) throws Exception {BlockingQueueSampleTask delayQueue new DelayQueue();long now System.currentTimeMillis();delayQueue.put(new SampleTask(now 1000));delayQueue.put(new SampleTask(now 2000));delayQueue.put(new SampleTask(now 3000));for (int i 0; i 3; i) {System.out.println(new Date(delayQueue.take().getTime()));}}static class SampleTask implements Delayed {long time;public SampleTask(long time) {this.time time;}public long getTime() {return time;}Overridepublic int compareTo(Delayed o) {return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));}Overridepublic long getDelay(TimeUnit unit) {return unit.convert(time - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}} }DelayQueue 提供了 put() 和 take() 的阻塞方法可以向队列中添加对象和取出对象。对象被添加到 DelayQueue 后会根据 compareTo() 方法进行优先级排序。getDelay() 方法用于计算消息延迟的剩余时间只有 getDelay 0 时该对象才能从 DelayQueue 中取出。 DelayQueue 在日常开发中最常用的场景就是实现重试机制。例如接口调用失败或者请求超时后可以将当前请求对象放入 DelayQueue通过一个异步线程 take() 取出对象然后继续进行重试。如果还是请求失败继续放回 DelayQueue。为了限制重试的频率可以设置重试的最大次数以及采用指数退避算法设置对象的 deadline如 2s、4s、8s、16s ……以此类推。 相比于 TimerDelayQueue 只实现了任务管理的功能需要与异步线程配合使用。DelayQueue 使用优先级队列实现任务的优先级排序新增 Schedule 和取消 Cancel 操作的时间复杂度也是 O(logn)。 ScheduledThreadPoolExecutor 上文中介绍的 Timer 其实目前并不推荐用户使用它是存在不少设计缺陷的。 Timer 是单线程模式。如果某个 TimerTask 执行时间很久会影响其他任务的调度。Timer 的任务调度是基于系统绝对时间的如果系统时间不正确可能会出现问题。TimerTask 如果执行出现异常Timer 并不会捕获会导致线程终止其他任务永远不会执行。 为了解决 Timer 的设计缺陷JDK 提供了功能更加丰富的 ScheduledThreadPoolExecutor。ScheduledThreadPoolExecutor 提供了周期执行任务和延迟执行任务的特性下面通过一个例子先看下 ScheduledThreadPoolExecutor 如何使用。 public class ScheduledExecutorServiceTest {public static void main(String[] args) {ScheduledExecutorService executor Executors.newScheduledThreadPool(5);executor.scheduleAtFixedRate(() - System.out.println(Hello World), 1000, 2000, TimeUnit.MILLISECONDS); // 1s 延迟后开始执行任务每 2s 重复执行一次} }ScheduledThreadPoolExecutor 继承于 ThreadPoolExecutor因此它具备线程池异步处理任务的能力。线程池主要负责管理创建和管理线程并从自身的阻塞队列中不断获取任务执行。线程池有两个重要的角色分别是任务和阻塞队列。ScheduledThreadPoolExecutor 在 ThreadPoolExecutor 的基础上重新设计了任务 ScheduledFutureTask 和阻塞队列 DelayedWorkQueue。ScheduledFutureTask 继承于 FutureTask并重写了 run() 方法使其具备周期执行任务的能力。DelayedWorkQueue 内部是优先级队列deadline 最近的任务在队列头部。对于周期执行的任务在执行完会重新设置时间并再次放入队列中。ScheduledThreadPoolExecutor 的实现原理可以用下图表示。 以上我们简单介绍了 JDK 三种实现定时器的方式。可以说它们的实现思路非常类似都离不开任务、任务管理、任务调度三个角色。三种定时器新增和取消任务的时间复杂度都是 O(logn)面对海量任务插入和删除的场景这三种定时器都会遇到比较严重的性能瓶颈。因此对于性能要求较高的场景我们一般都会采用时间轮算法。那么时间轮又是如何解决海量任务插入和删除的呢我们继续向下分析。 时间轮原理分析 技术有时就源于生活例如排队买票可以想到队列公司的组织关系可以理解为树等而时间轮算法的设计思想就来源于钟表。如下图所示时间轮可以理解为一种环形结构像钟表一样被分为多个 slot 槽位。每个 slot 代表一个时间段每个 slot 中可以存放多个任务使用的是链表结构保存该时间段到期的所有任务。时间轮通过一个时针随着时间一个个 slot 转动并执行 slot 中的所有到期任务。 任务是如何添加到时间轮当中的呢可以根据任务的到期时间进行取模然后将任务分布到不同的 slot 中。如上图所示时间轮被划分为 8 个 slot每个 slot 代表 1s当前时针指向 2。假如现在需要调度一个 3s 后执行的任务应该加入 235 的 slot 中如果需要调度一个 12s 以后的任务需要等待时针完整走完一圈 round 零 4 个 slot需要放入第 (212)%86 个 slot。 那么当时针走到第 6 个 slot 时怎么区分每个任务是否需要立即执行还是需要等待下一圈 round甚至更久时间之后执行呢所以我们需要把 round 信息保存在任务中。例如图中第 6 个 slot 的链表中包含 3 个任务第一个任务 round0需要立即执行第二个任务 round1需要等待 188s 后执行第三个任务 round2需要等待 288s 后执行。所以当时针转动到对应 slot 时只执行 round0 的任务slot 中其余任务的 round 应当减 1等待下一个 round 之后执行。 上面介绍了时间轮算法的基本理论可以看出时间轮有点类似 HashMap如果多个任务如果对应同一个 slot处理冲突的方法采用的是拉链法。在任务数量比较多的场景下适当增加时间轮的 slot 数量可以减少时针转动时遍历的任务个数。 时间轮定时器最大的优势就是任务的新增和取消都是 O(1) 时间复杂度而且只需要一个线程就可以驱动时间轮进行工作。HashedWheelTimer 是 Netty 中时间轮算法的实现类下面我就结合 HashedWheelTimer 的源码详细分析时间轮算法的实现原理。 Netty HashedWheelTimer 源码解析 在开始学习 HashedWheelTimer 的源码之前需要了解 HashedWheelTimer 接口定义以及相关组件才能更好地使用 HashedWheelTimer。 接口定义 HashedWheelTimer 实现了接口 io.netty.util.TimerTimer 接口是我们研究 HashedWheelTimer 一个很好的切入口。一起看下 Timer 接口的定义 public interface Timer {Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);SetTimeout stop(); }Timer 接口提供了两个方法分别是创建任务 newTimeout() 和停止所有未执行任务 stop()。从方法的定义可以看出Timer 可以认为是上层的时间轮调度器通过 newTimeout() 方法可以提交一个任务 TimerTask并返回一个 Timeout。TimerTask 和 Timeout 是两个接口类它们有什么作用呢我们分别看下 TimerTask 和 Timeout 的接口定义 public interface TimerTask {void run(Timeout timeout) throws Exception; } public interface Timeout {Timer timer();TimerTask task();boolean isExpired();boolean isCancelled();boolean cancel(); }Timeout 持有 Timer 和 TimerTask 的引用而且通过 Timeout 接口可以执行取消任务的操作。Timer、Timeout 和 TimerTask 之间的关系如下图所示 清楚 HashedWheelTimer 的接口定义以及相关组件的概念之后接下来我们就可以开始使用它了。 快速上手 通过下面这个简单的例子我们看下 HashedWheelTimer 是如何使用的。 public class HashedWheelTimerTest {public static void main(String[] args) {Timer timer new HashedWheelTimer();Timeout timeout1 timer.newTimeout(new TimerTask() {Overridepublic void run(Timeout timeout) {System.out.println(timeout1: new Date());}}, 10, TimeUnit.SECONDS);if (!timeout1.isExpired()) {timeout1.cancel();}timer.newTimeout(new TimerTask() {Overridepublic void run(Timeout timeout) throws InterruptedException {System.out.println(timeout2: new Date());Thread.sleep(5000);}}, 1, TimeUnit.SECONDS);timer.newTimeout(new TimerTask() {Overridepublic void run(Timeout timeout) {System.out.println(timeout3: new Date());}}, 3, TimeUnit.SECONDS);} }代码运行结果如下 timeout2: Mon Nov 09 19:57:04 CST 2020 timeout3: Mon Nov 09 19:57:09 CST 2020简单的几行代码基本展示了 HashedWheelTimer 的大部分用法。示例中我们通过 newTimeout() 启动了三个 TimerTasktimeout1 由于被取消了所以并没有执行。timeout2 和 timeout3 分别应该在 1s 和 3s 后执行。然而从结果输出看并不是timeout2 和 timeout3 的打印时间相差了 5s这是由于 timeout2 阻塞了 5s 造成的。由此可以看出时间轮中的任务执行是串行的当一个任务执行的时间过长会影响后续任务的调度和执行很可能产生任务堆积的情况。 至此对 HashedWheelTimer 的基本使用方法已经有了初步了解下面我们开始深入研究 HashedWheelTimer 的实现原理。 内部结构 我们先从 HashedWheelTimer 的构造函数看起结合上文中介绍的时间轮算法一起梳理出 HashedWheelTimer 的内部实现结构。 public HashedWheelTimer(ThreadFactory threadFactory,long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,long maxPendingTimeouts) {// 省略其他代码 wheel createWheel(ticksPerWheel); // 创建时间轮的环形数组结构mask wheel.length - 1; // 用于快速取模的掩码long duration unit.toNanos(tickDuration); // 转换成纳秒处理// 省略其他代码workerThread threadFactory.newThread(worker); // 创建工作线程leak leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null; // 是否开启内存泄漏检测this.maxPendingTimeouts maxPendingTimeouts; // 最大允许等待任务数HashedWheelTimer 中任务超出该阈值时会抛出异常// 如果 HashedWheelTimer 的实例数超过 64会打印错误日志if (INSTANCE_COUNTER.incrementAndGet() INSTANCE_COUNT_LIMIT WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {reportTooManyInstances();} }HashedWheelTimer 的构造函数清晰地列举出了几个核心属性 threadFactory线程池但是只创建了一个线程tickDuration时针每次 tick 的时间相当于时针间隔多久走到下一个 slotunit表示 tickDuration 的时间单位ticksPerWheel时间轮上一共有多少个 slot默认 512 个。分配的 slot 越多占用的内存空间就越大leakDetection是否开启内存泄漏检测maxPendingTimeouts最大允许等待任务数。 下面我们看下 HashedWheelTimer 是如何创建出来的我们直接跟进 createWheel() 方法的源码 private static HashedWheelBucket[] createWheel(int ticksPerWheel) {// 省略其他代码ticksPerWheel normalizeTicksPerWheel(ticksPerWheel);HashedWheelBucket[] wheel new HashedWheelBucket[ticksPerWheel];for (int i 0; i wheel.length; i ) {wheel[i] new HashedWheelBucket();}return wheel; } private static int normalizeTicksPerWheel(int ticksPerWheel) {int normalizedTicksPerWheel 1;while (normalizedTicksPerWheel ticksPerWheel) {normalizedTicksPerWheel 1;}return normalizedTicksPerWheel; } private static final class HashedWheelBucket {private HashedWheelTimeout head;private HashedWheelTimeout tail;// 省略其他代码 }时间轮的创建就是为了创建 HashedWheelBucket 数组每个 HashedWheelBucket 表示时间轮中一个 slot。从 HashedWheelBucket 的结构定义可以看出HashedWheelBucket 内部是一个双向链表结构双向链表的每个节点持有一个 HashedWheelTimeout 对象HashedWheelTimeout 代表一个定时任务。每个 HashedWheelBucket 都包含双向链表 head 和 tail 两个 HashedWheelTimeout 节点这样就可以实现不同方向进行链表遍历。关于 HashedWheelBucket 和 HashedWheelTimeout 的具体功能下文再继续介绍。 因为时间轮需要使用 做取模运算所以数组的长度需要是 2 的次幂。normalizeTicksPerWheel() 方法的作用就是找到不小于 ticksPerWheel 的最小 2 次幂这个方法实现的并不好可以参考 JDK HashMap 扩容 tableSizeFor 的实现进行性能优化如下所示。当然 normalizeTicksPerWheel() 只是在初始化的时候使用所以并无影响。 static final int MAXIMUM_CAPACITY 1 30; private static int normalizeTicksPerWheel(int ticksPerWheel) {int n ticksPerWheel - 1;n | n 1;n | n 2;n | n 4;n | n 8;n | n 16;return (n 0) ? 1 : (n MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n 1; }HashedWheelTimer 初始化的主要工作我们已经介绍完了其内部结构与上文中介绍的时间轮算法类似如下图所示。 接下来我们围绕定时器的三种基本操作分析下 HashedWheelTimer 是如何实现添加任务、执行任务和取消任务的。 添加任务 HashedWheelTimer 初始化完成后如何向 HashedWheelTimer 添加任务呢我们自然想到 HashedWheelTimer 提供的 newTimeout() 方法。 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {// 省略其他代码long pendingTimeoutsCount pendingTimeouts.incrementAndGet();if (maxPendingTimeouts 0 pendingTimeoutsCount maxPendingTimeouts) {pendingTimeouts.decrementAndGet();throw new RejectedExecutionException(Number of pending timeouts ( pendingTimeoutsCount ) is greater than or equal to maximum allowed pending timeouts ( maxPendingTimeouts ));}start(); // 1. 如果 worker 线程没有启动需要启动long deadline System.nanoTime() unit.toNanos(delay) - startTime; // 计算任务的 deadlineif (delay 0 deadline 0) {deadline Long.MAX_VALUE;}HashedWheelTimeout timeout new HashedWheelTimeout(this, task, deadline); // 2. 创建定时任务timeouts.add(timeout); // 3. 添加任务到 Mpsc Queuereturn timeout; } private final QueueHashedWheelTimeout timeouts PlatformDependent.newMpscQueue();newTimeout() 方法主要做了三件事分别为启动工作线程创建定时任务并把任务添加到 Mpsc Queue。HashedWheelTimer 的工作线程采用了懒启动的方式不需要用户显示调用。这样做的好处是在时间轮中没有任务时可以避免工作线程空转而造成性能损耗。先看下启动工作线程 start() 的源码 public void start() {switch (WORKER_STATE_UPDATER.get(this)) {case WORKER_STATE_INIT:if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {workerThread.start();}break;case WORKER_STATE_STARTED:break;case WORKER_STATE_SHUTDOWN:throw new IllegalStateException(cannot be started once stopped);default:throw new Error(Invalid WorkerState);}while (startTime 0) {try {startTimeInitialized.await();} catch (InterruptedException ignore) {}} }工作线程的启动之前会通过 CAS 操作获取工作线程的状态如果已经启动则直接跳过。如果没有启动再次通过 CAS 操作更改工作线程状态然后启动工作线程。启动的过程是直接调用的 Thread#start() 方法我们暂且先不关注工作线程具体做了什么下文再继续分析。 回到 newTimeout() 的主流程接下来的逻辑就非常简单了。根据用户传入的任务延迟时间可以计算出任务的 deadline然后创建定时任务 HashedWheelTimeout 对象最终把 HashedWheelTimeout 添加到 Mpsc Queue 中。看到这里你会不会有个疑问为什么不是将 HashedWheelTimeout 直接添加到时间轮中呢而是先添加到 Mpsc QueueMpsc Queue 可以理解为多生产者单消费者的线程安全队列下节课我们会对 Mpsc Queue 详细分析在这里就不做展开了。可以猜到 HashedWheelTimer 是想借助 Mpsc Queue 保证多线程向时间轮添加任务的线程安全性。 那么什么时候任务才会被加入时间轮并执行呢此时还没有太多信息接下来我们只能工作线程 Worker 里寻找问题的答案。 工作线程 Worker 工作线程 Worker 是时间轮的核心引擎随着时针的转动到期任务的处理都由 Worker 处理完成。下面我们定位到 Worker 的 run() 方法一探究竟。 private final class Worker implements Runnable {private final SetTimeout unprocessedTimeouts new HashSetTimeout(); // 未处理任务列表private long tick;Overridepublic void run() {startTime System.nanoTime();if (startTime 0) {startTime 1;}startTimeInitialized.countDown();do {final long deadline waitForNextTick(); // 1. 计算下次 tick 的时间, 然后sleep 到下次 tickif (deadline 0) { // 可能因为溢出或者线程中断造成 deadline 0int idx (int) (tick mask); // 2. 获取当前 tick 在 HashedWheelBucket 数组中对应的下标processCancelledTasks(); // 3. 移除被取消的任务HashedWheelBucket bucket wheel[idx];transferTimeoutsToBuckets(); // 4. 从 Mpsc Queue 中取出任务加入对应的 slot 中bucket.expireTimeouts(deadline); // 5. 执行到期的任务tick;}} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) WORKER_STATE_STARTED);// 时间轮退出后取出 slot 中未执行且未被取消的任务并加入未处理任务列表以便 stop() 方法返回for (HashedWheelBucket bucket: wheel) {bucket.clearTimeouts(unprocessedTimeouts);}// 将还没来得及添加到 slot 中的任务取出如果任务未取消则加入未处理任务列表以便 stop() 方法返回for (;;) {HashedWheelTimeout timeout timeouts.poll();if (timeout null) {break;}if (!timeout.isCancelled()) {unprocessedTimeouts.add(timeout);}}processCancelledTasks();} }工作线程 Worker 的核心执行流程是代码中的 do-while 循环只要 Worker 处于 STARTED 状态就会执行 do-while 循环我们把该过程拆分成为以下几个步骤逐一分析。 通过 waitForNextTick() 方法计算出时针到下一次 tick 的时间间隔然后 sleep 到下一次 tick。通过位运算获取当前 tick 在 HashedWheelBucket 数组中对应的下标移除被取消的任务。从 Mpsc Queue 中取出任务加入对应的 HashedWheelBucket 中。执行当前 HashedWheelBucket 中的到期任务。 首先看下 waitForNextTick() 方法是如何计算等待时间的源码如下 private long waitForNextTick() {long deadline tickDuration * (tick 1);for (;;) {final long currentTime System.nanoTime() - startTime;long sleepTimeMs (deadline - currentTime 999999) / 1000000;if (sleepTimeMs 0) {if (currentTime Long.MIN_VALUE) {return -Long.MAX_VALUE;} else {return currentTime;}}if (PlatformDependent.isWindows()) {sleepTimeMs sleepTimeMs / 10 * 10;}try {Thread.sleep(sleepTimeMs);} catch (InterruptedException ignored) {if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) WORKER_STATE_SHUTDOWN) {return Long.MIN_VALUE;}}} }根据 tickDuration 可以推算出下一次 tick 的 deadlinedeadline 减去当前时间就可以得到需要 sleep 的等待时间。所以 tickDuration 的值越小时间的精准度也就越高同时 Worker 的繁忙程度越高。如果 tickDuration 设置过小为了防止系统会频繁地 sleep 再唤醒会保证 Worker 至少 sleep 的时间为 1ms 以上。 Worker 从 sleep 状态唤醒后接下来会执行第二步流程通过按位与的操作计算出当前 tick 在 HashedWheelBucket 数组中对应的下标。按位与比普通的取模运算效率要快很多前提是时间轮中的数组长度是 2 的次幂掩码 mask 为 2 的次幂减 1这样才能达到与取模一样的效果。 接下来 Worker 会调用 processCancelledTasks() 方法处理被取消的任务所有取消的任务都会加入 cancelledTimeouts 队列中Worker 会从队列中取出任务然后将其从对应的 HashedWheelBucket 中删除删除操作为基本的链表操作。processCancelledTasks() 的源码比较简单我们在此就不展开了。 之前我们还留了一个疑问Mpsc Queue 中的任务什么时候加入时间轮的呢答案就在 transferTimeoutsToBuckets() 方法中。 private void transferTimeoutsToBuckets() {// 每次时针 tick 最多只处理 100000 个任务以防阻塞 Worker 线程for (int i 0; i 100000; i) {HashedWheelTimeout timeout timeouts.poll();if (timeout null) {break;}if (timeout.state() HashedWheelTimeout.ST_CANCELLED) {continue;}long calculated timeout.deadline / tickDuration; // 计算任务需要经过多少个 ticktimeout.remainingRounds (calculated - tick) / wheel.length; // 计算任务需要在时间轮中经历的圈数 remainingRoundsfinal long ticks Math.max(calculated, tick); // 如果任务在 timeouts 队列里已经过了执行时间, 那么会加入当前 HashedWheelBucket 中int stopIndex (int) (ticks mask);HashedWheelBucket bucket wheel[stopIndex];bucket.addTimeout(timeout);} }transferTimeoutsToBuckets() 的主要工作就是从 Mpsc Queue 中取出任务然后添加到时间轮对应的 HashedWheelBucket 中。每次时针 tick 最多只处理 100000 个任务一方面避免取任务的操作耗时过长另一方面为了防止执行太多任务造成 Worker 线程阻塞。 根据用户设置的任务 deadline可以计算出任务需要经过多少次 tick 才能开始执行以及需要在时间轮中转动圈数 remainingRoundsremainingRounds 会记录在 HashedWheelTimeout 中在执行任务的时候 remainingRounds 会被使用到。因为时间轮中的任务并不能够保证及时执行假如有一个任务执行的时间特别长那么任务在 timeouts 队列里已经过了执行时间也没有关系Worker 会将这些任务直接加入当前HashedWheelBucket 中所以过期的任务并不会被遗漏。 任务被添加到时间轮之后重新再回到 Worker#run() 的主流程接下来就是执行当前 HashedWheelBucket 中的到期任务跟进 HashedWheelBucket#expireTimeouts() 方法的源码 public void expireTimeouts(long deadline) {HashedWheelTimeout timeout head;while (timeout ! null) {HashedWheelTimeout next timeout.next;if (timeout.remainingRounds 0) {next remove(timeout);if (timeout.deadline deadline) {timeout.expire(); // 执行任务} else {throw new IllegalStateException(String.format(timeout.deadline (%d) deadline (%d), timeout.deadline, deadline));}} else if (timeout.isCancelled()) {next remove(timeout);} else {timeout.remainingRounds --; // 未到执行时间remainingRounds 减 1}timeout next;} }执行任务的操作比较简单就是从头开始遍历 HashedWheelBucket 中的双向链表。如果 remainingRounds 0则调用 expire() 方法执行任务timeout.expire() 内部就是调用了 TimerTask 的 run() 方法。如果任务已经被取消直接从链表中移除。否则表示任务的执行时间还没到remainingRounds 减 1等待下一圈即可。 至此工作线程 Worker 的核心逻辑 do-while 循环我们已经讲完了。当时间轮退出后Worker 还会执行一些后置的收尾工作。Worker 会从每个 HashedWheelBucket 取出未执行且未取消的任务以及还来得及添加到 HashedWheelBucket 中的任务然后加入未处理任务列表以便 stop() 方法统一处理。 停止时间轮 回到 Timer 接口两个方法newTimeout() 上文已经分析完了接下来我们就以 stop() 方法为入口看下时间轮停止都做了哪些工作。 Override public SetTimeout stop() {// Worker 线程无法停止时间轮if (Thread.currentThread() workerThread) {throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() .stop() cannot be called from TimerTask.class.getSimpleName());}// 尝试通过 CAS 操作将工作线程的状态更新为 SHUTDOWN 状态if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) ! WORKER_STATE_SHUTDOWN) {INSTANCE_COUNTER.decrementAndGet();if (leak ! null) {boolean closed leak.close(this);assert closed;}return Collections.emptySet();}try {boolean interrupted false;while (workerThread.isAlive()) {workerThread.interrupt(); // 中断 Worker 线程try {workerThread.join(100);} catch (InterruptedException ignored) {interrupted true;}}if (interrupted) {Thread.currentThread().interrupt();}} finally {INSTANCE_COUNTER.decrementAndGet();if (leak ! null) {boolean closed leak.close(this);assert closed;}}return worker.unprocessedTimeouts(); // 返回未处理任务的列表 }如果当前线程是 Worker 线程它是不能发起停止时间轮的操作的是为了防止有定时任务发起停止时间轮的恶意操作。停止时间轮主要做了三件事首先尝试通过 CAS 操作将工作线程的状态更新为 SHUTDOWN 状态然后中断工作线程 Worker最后将未处理的任务列表返回给上层。 到此为止HashedWheelTimer 的实现原理我们已经分析完了。再来回顾一下 HashedWheelTimer 的几个核心成员。 HashedWheelTimeout任务的封装类包含任务的到期时间 deadline、需要经历的圈数 remainingRounds 等属性。HashedWheelBucket相当于时间轮的每个 slot内部采用双向链表保存了当前需要执行的 HashedWheelTimeout 列表。WorkerHashedWheelTimer 的核心工作引擎负责处理定时任务。 时间轮进阶应用 Netty 中的时间轮是通过固定的时间间隔 tickDuration 进行推动的如果长时间没有到期任务那么会存在时间轮空推进的现象从而造成一定的性能损耗。此外如果任务的到期时间跨度很大例如 A 任务 1s 后执行B 任务 6 小时之后执行也会造成空推进的问题。 那么上述问题有没有什么解决方案呢在研究 Kafka 的时候Kafka 也有时间轮的应用它的实现思路与 Netty 是存在区别的。因为 Kafka 面对的应用场景是更加严苛的可能会存在各种时间粒度的定时任务那么 Kafka 是否有解决时间跨度问题呢我们接下来就简单介绍下 Kafka 的优化思路。 Kafka 时间轮的内部结构与 Netty 类似如下图所示。Kafka 的时间轮也是采用环形数组存储定时任务数组中的每个 slot 代表一个 Bucket每个 Bucket 保存了定时任务列表 TimerTaskListTimerTaskList 同样采用双向链表的结构实现链表的每个节点代表真正的定时任务 TimerTaskEntry。 为了解决空推进的问题Kafka 借助 JDK 的 DelayQueue 来负责推进时间轮。DelayQueue 保存了时间轮中的每个 Bucket并且根据 Bucket 的到期时间进行排序最近的到期时间被放在 DelayQueue 的队头。Kafka 中会有一个线程来读取 DelayQueue 中的任务列表如果时间没有到那么 DelayQueue 会一直处于阻塞状态从而解决空推荐的问题。这时候你可能会问DelayQueue 插入和删除的性能不是并不好吗其实 Kafka 采用的是一种权衡的策略把 DelayQueue 用在了合适的地方。DelayQueue 只存放了 BucketBucket 的数量并不多相比空推进带来的影响是利大于弊的。 为了解决任务时间跨度很大的问题Kafka 引入了层级时间轮如下图所示。当任务的 deadline 超出当前所在层的时间轮表示范围时就会尝试将任务添加到上一层时间轮中跟钟表的时针、分针、秒针的转动规则是同一个道理。 从图中可以看出第一层时间轮每个时间格为 1ms整个时间轮的跨度为 20ms第二层时间轮每个时间格为 20ms整个时间轮跨度为 400ms第三层时间轮每个时间格为 400ms整个时间轮跨度为 8000ms。每一层时间轮都有自己的指针每层时间轮走完一圈后上层时间轮也会相应推进一格。 假设现在有一个任务到期时间是 450ms 之后应该放在第三层时间轮的第一格。随着时间的流逝当指针指向该时间格时发现任务到期时间还有 50ms这里就涉及时间轮降级的操作它会将任务重新提交到时间轮中。此时发现第一层时间轮整体跨度不够需要放在第二层时间轮中第三格。当时间再经历 40ms 之后该任务又会触发一次降级操作放入到第一层时间轮最后等到 10ms 后执行任务。 由此可见Kafka 的层级时间轮的时间粒度更好控制可以应对更加复杂的定时任务处理场景适用的范围更广。 总结 HashedWheelTimer 的源码通俗易懂其设计思想值得我们借鉴。在平时开发中如果有类似的任务处理机制你可以尝试套用 HashedWheelTimer 的工作模式。 HashedWheelTimer 并不是十全十美的使用的时候需要清楚它存在的问题 如果长时间没有到期任务那么会存在时间轮空推进的现象。只适用于处理耗时较短的任务由于 Worker 是单线程的如果一个任务执行的时间过长会造成 Worker 线程阻塞。相比传统定时器的实现方式内存占用较大。 22 技巧篇高性能无锁队列 Mpc Queue 在前面的源码课程中NioEventLoop 线程以及时间轮 HashedWheelTimer 的任务队列中都出现了 Mpsc Queue 的身影。这又是 Netty 使用的什么 “黑科技” 呢为什么不使用 JDK 原生的队列呢Mpsc Queue 应该在什么场景下使用呢今天这节课就让我们一起再来长长知识吧 JDK 原生并发队列 在介绍 Mpsc Queue 之前我们先回顾下 JDK 原生队列的工作原理。JDK 并发队列按照实现方式可以分为阻塞队列和非阻塞队列两种类型阻塞队列是基于锁实现的非阻塞队列是基于 CAS 操作实现的。JDK 中包含多种阻塞和非阻塞的队列实现如下图所示。 队列是一种 FIFO先进先出的数据结构JDK 中定义了 java.util.Queue 的队列接口与 List、Set 接口类似java.util.Queue 也继承于 Collection 集合接口。此外JDK 还提供了一种双端队列接口 java.util.Deque我们最常用的 LinkedList 就是实现了 Deque 接口。下面我们简单说说上图中的每个队列的特点并给出一些对比和总结。 阻塞队列 阻塞队列在队列为空或者队列满时都会发生阻塞。阻塞队列自身是线程安全的使用者无需关心线程安全问题降低了多线程开发难度。阻塞队列主要分为以下几种 ArrayBlockingQueue最基础且开发中最常用的阻塞队列底层采用数组实现的有界队列初始化需要指定队列的容量。ArrayBlockingQueue 是如何保证线程安全的呢它内部是使用了一个重入锁 ReentrantLock并搭配 notEmpty、notFull 两个条件变量 Condition 来控制并发访问。从队列读取数据时如果队列为空那么会阻塞等待直到队列有数据了才会被唤醒。如果队列已经满了也同样会进入阻塞状态直到队列有空闲才会被唤醒。LinkedBlockingQueue内部采用的数据结构是链表队列的长度可以是有界或者无界的初始化不需要指定队列长度默认是 Integer.MAX_VALUE。LinkedBlockingQueue 内部使用了 takeLock、putLock两个重入锁 ReentrantLock以及 notEmpty、notFull 两个条件变量 Condition 来控制并发访问。采用读锁和写锁的好处是可以避免读写时相互竞争锁的现象所以相比于 ArrayBlockingQueueLinkedBlockingQueue 的性能要更好。PriorityBlockingQueue采用最小堆实现的优先级队列队列中的元素按照优先级进行排列每次出队都是返回优先级最高的元素。PriorityBlockingQueue 内部是使用了一个 ReentrantLock 以及一个条件变量 Condition notEmpty 来控制并发访问不需要 notFull 是因为 PriorityBlockingQueue 是无界队列所以每次 put 都不会发生阻塞。PriorityBlockingQueue 底层的最小堆是采用数组实现的当元素个数大于等于最大容量时会触发扩容在扩容时会先释放锁保证其他元素可以正常出队然后使用 CAS 操作确保只有一个线程可以执行扩容逻辑。DelayQueue一种支持延迟获取元素的阻塞队列常用于缓存、定时任务调度等场景。DelayQueue 内部是采用优先级队列 PriorityQueue 存储对象。DelayQueue 中的每个对象都必须实现 Delayed 接口并重写 compareTo 和 getDelay 方法。向队列中存放元素的时候必须指定延迟时间只有延迟时间已满的元素才能从队列中取出。SynchronizedQueue又称无缓冲队列。比较特别的是 SynchronizedQueue 内部不会存储元素。与 ArrayBlockingQueue、LinkedBlockingQueue 不同SynchronizedQueue 直接使用 CAS 操作控制线程的安全访问。其中 put 和 take 操作都是阻塞的每一个 put 操作都必须阻塞等待一个 take 操作反之亦然。所以 SynchronizedQueue 可以理解为生产者和消费者配对的场景双方必须互相等待直至配对成功。在 JDK 的线程池 Executors.newCachedThreadPool 中就存在 SynchronousQueue 的运用对于新提交的任务如果有空闲线程将重复利用空闲线程处理任务否则将新建线程进行处理。LinkedTransferQueue一种特殊的无界阻塞队列可以看作 LinkedBlockingQueues、SynchronousQueue公平模式、ConcurrentLinkedQueue 的合体。与 SynchronousQueue 不同的是LinkedTransferQueue 内部可以存储实际的数据当执行 put 操作时如果有等待线程那么直接将数据交给对方否则放入队列中。与 LinkedBlockingQueues 相比LinkedTransferQueue 使用 CAS 无锁操作进一步提升了性能。 非阻塞队列 说完阻塞队列我们再来看下非阻塞队列。非阻塞队列不需要通过加锁的方式对线程阻塞并发性能更好。JDK 中常用的非阻塞队列有以下几种 ConcurrentLinkedQueue它是一个采用双向链表实现的无界并发非阻塞队列它属于 LinkedQueue 的安全版本。ConcurrentLinkedQueue 内部采用 CAS 操作保证线程安全这是非阻塞队列实现的基础相比 ArrayBlockingQueue、LinkedBlockingQueue 具备较高的性能。ConcurrentLinkedDeque也是一种采用双向链表结构的无界并发非阻塞队列。与 ConcurrentLinkedQueue 不同的是ConcurrentLinkedDeque 属于双端队列它同时支持 FIFO 和 FILO 两种模式可以从队列的头部插入和删除数据也可以从队列尾部插入和删除数据适用于多生产者和多消费者的场景。 至此常见的队列类型我们已经介绍完了。我们在平时开发中使用频率最高的是 BlockingQueue。实现一个阻塞队列需要具备哪些基本功能呢下面看 BlockingQueue 的接口如下图所示。 我们可以通过下面一张表格对上述 BlockingQueue 接口的具体行为进行归类。 JDK 提供的并发队列已经能够满足我们大部分的需求但是在大规模流量的高并发系统中如果你对性能要求严苛JDK 的非阻塞并发队列可选择面较少且性能并不够出色。如果你还是需要一个数组 CAS 操作实现的无锁安全队列有没有成熟的解决方案呢Java 强大的生态总能给我们带来惊喜一些第三方框架提供的高性能无锁队列已经可以满足我们的需求其中非常出名的有 Disruptor 和 JCTools。 Disruptor 是 LMAX 公司开发的一款高性能无锁队列我们平时常称它为 RingBuffer其设计初衷是为了解决内存队列的延迟问题。Disruptor 内部采用环形数组和 CAS 操作实现性能非常优越。为什么 Disruptor 的性能会比 JDK 原生的无锁队列要好呢环形数组可以复用内存减少分配内存和释放内存带来的性能损耗。而且数组可以设置长度为 2 的次幂直接通过位运算加快数组下标的定位速度。此外Disruptor 还解决了伪共享问题对 CPU Cache 更加友好。Disruptor 已经开源详细可查阅 Github 地址 https://github.com/LMAX-Exchange/disruptor。 JCTools 也是一个开源项目Github 地址为 https://github.com/JCTools/JCTools。JCTools 是适用于 JVM 并发开发的工具主要提供了一些 JDK 确实的并发数据结构例如非阻塞 Map、非阻塞 Queue 等。其中非阻塞队列可以分为四种类型可以根据不同的场景选择使用。 Spsc 单生产者单消费者Mpsc 多生产者单消费者Spmc 单生产者多消费者Mpmc 多生产者多消费者。 Netty 中直接引入了 JCTools 的 Mpsc Queue相比于 JDK 原生的并发队列Mpsc Queue 又有什么过人之处呢接下来便开始我们今天要讨论的重点。 Mpsc Queue 基础知识 Mpsc 的全称是 Multi Producer Single Consumer多生产者单消费者。Mpsc Queue 可以保证多个生产者同时访问队列是线程安全的而且同一时刻只允许一个消费者从队列中读取数据。Netty Reactor 线程中任务队列 taskQueue 必须满足多个生产者可以同时提交任务所以 JCTools 提供的 Mpsc Queue 非常适合 Netty Reactor 线程模型。 Mpsc Queue 有多种的实现类例如 MpscArrayQueue、MpscUnboundedArrayQueue、MpscChunkedArrayQueue 等。我们先抛开一些提供特性功能的队列聚焦在最基础的 MpscArrayQueue回过头再学习其他类型的队列会事半功倍。 首先我们看下 MpscArrayQueue 的继承关系会发现相当复杂如下图所示。 除了顶层 JDK 原生的 AbstractCollection、AbstractQueueMpscArrayQueue 还继承了很多类似于 MpscXxxPad 以及 MpscXxxField 的类。我们可以发现一个很有意思的规律每个有包含属性的类后面都会被 MpscXxxPad 类隔开。MpscXxxPad 到底起到什么作用呢我们自顶向下将所有类的字段合并在一起看下 MpscArrayQueue 的整体结构。 // ConcurrentCircularArrayQueueL0Pad long p01, p02, p03, p04, p05, p06, p07; long p10, p11, p12, p13, p14, p15, p16, p17; // ConcurrentCircularArrayQueue protected final long mask; protected final E[] buffer; // MpmcArrayQueueL1Pad long p00, p01, p02, p03, p04, p05, p06, p07; long p10, p11, p12, p13, p14, p15, p16; // MpmcArrayQueueProducerIndexField private volatile long producerIndex; // MpscArrayQueueMidPad long p01, p02, p03, p04, p05, p06, p07; long p10, p11, p12, p13, p14, p15, p16, p17; // MpscArrayQueueProducerLimitField private volatile long producerLimit; // MpscArrayQueueL2Pad long p00, p01, p02, p03, p04, p05, p06, p07; long p10, p11, p12, p13, p14, p15, p16; // MpscArrayQueueConsumerIndexField protected long consumerIndex; // MpscArrayQueueL3Pad long p01, p02, p03, p04, p05, p06, p07; long p10, p11, p12, p13, p14, p15, p16, p17;可以看出MpscXxxPad 类中使用了大量 long 类型的变量其命名没有什么特殊的含义只是起到填充的作用。如果你也读过 Disruptor 的源码会发现 Disruptor 也使用了类似的填充方法。Mpsc Queue 和 Disruptor 之所以填充这些无意义的变量是为了解决伪共享false sharing问题。 什么是伪共享呢我们有必要补充这方面的基础知识。在计算机组成中CPU 的运算速度比内存高出几个数量级为了 CPU 能够更高效地与内存进行交互在 CPU 和内存之间设计了多层缓存机制如下图所示。 一般来说CPU 会分为三级缓存分别为L1 一级缓存、L2 二级缓存和L3 三级缓存。越靠近 CPU 的缓存速度越快但是缓存的容量也越小。所以从性能上来说L1 L2 L3容量方面 L1 L2 L3。CPU 读取数据时首先会从 L1 查找如果未命中则继续查找 L2如果还未能命中则继续查找 L3最后还没命中的话只能从内存中查找读取完成后再将数据逐级放入缓存中。此外多线程之间共享一份数据的时候需要其中一个线程将数据写回主存其他线程访问主存数据。 由此可见引入多级缓存是为了能够让 CPU 利用率最大化。如果你在做频繁的 CPU 运算时需要尽可能将数据保持在缓存中。那么 CPU 从内存中加载数据的时候是如何提高缓存的利用率的呢这就涉及缓存行Cache Line的概念Cache Line 是 CPU 缓存可操作的最小单位CPU 缓存由若干个 Cache Line 组成。Cache Line 的大小与 CPU 架构有关在目前主流的 64 位架构下Cache Line 的大小通常为 64 Byte。Java 中一个 long 类型是 8 Byte所以一个 Cache Line 可以存储 8 个 long 类型变量。CPU 在加载内存数据时会将相邻的数据一同读取到 Cache Line 中因为相邻的数据未来被访问的可能性最大这样就可以避免 CPU 频繁与内存进行交互了。 伪共享问题是如何发生的呢它又会造成什么影响呢我们使用下面这幅图进行讲解。 假设变量 A、B、C、D 被加载到同一个 Cache Line它们会被高频地修改。当线程 1 在 CPU Core1 中中对变量 A 进行修改修改完成后 CPU Core1 会通知其他 CPU Core 该缓存行已经失效。然后线程 2 在 CPU Core2 中对变量 C 进行修改时发现 Cache line 已经失效此时 CPU Core1 会将数据重新写回内存CPU Core2 再从内存中读取数据加载到当前 Cache line 中。 由此可见如果同一个 Cache line 被越多的线程修改那么造成的写竞争就会越激烈数据会频繁写入内存导致性能浪费。题外话多核处理器中每个核的缓存行内容是如何保证一致的呢有兴趣的同学可以深入学习下缓存一致性协议 MESI具体可以参考 https://zh.wikipedia.org/wiki/MESI%E5%8D%8F%E8%AE%AE。 对于伪共享问题我们应该如何解决呢Disruptor 和 Mpsc Queue 都采取了空间换时间的策略让不同线程共享的对象加载到不同的缓存行即可。下面我们通过一个简单的例子进行说明。 public class FalseSharingPadding {protected long p1, p2, p3, p4, p5, p6, p7;protected volatile long value 0L;protected long p9, p10, p11, p12, p13, p14, p15; }从上述代码中可以看出变量 value 前后都填充了 7 个 long 类型的变量。这样不论在什么情况下都可以保证在多线程访问 value 变量时value 与其他不相关的变量处于不同的 Cache Line如下图所示。 伪共享问题一般是非常隐蔽的在实际开发的过程中并不是项目中所有地方都需要花费大量的精力去优化伪共享问题。CPU Cache 的填充本身也是比较珍贵的我们应该把精力聚焦在一些高性能的数据结构设计上把资源用在刀刃上使系统性能收益最大化。 至此我们知道 Mpsc Queue 为了解决伪共享问题填充了大量的 long 类型变量造成源码不易阅读。因为变量填充只是为了提升 Mpsc Queue 的性能与 Mpsc Queue 的主体功能无关。接下来我们先忽略填充变量开始分析 Mpsc Queue 的基本实现原理。 Mpsc Queue 源码分析 在开始源码学习之前我们同样先看看 MpscArrayQueue 如何使用示例代码如下 public class MpscArrayQueueTest {public static final MpscArrayQueueString MPSC_ARRAY_QUEUE new MpscArrayQueue(2);public static void main(String[] args) {for (int i 1; i 2; i) {int index i;new Thread(() - MPSC_ARRAY_QUEUE.offer(data index), thread index).start();}try {Thread.sleep(1000L);MPSC_ARRAY_QUEUE.add(data3); // 入队操作队列满则抛出异常} catch (Exception e) {e.printStackTrace();}System.out.println(队列大小 MPSC_ARRAY_QUEUE.size() , 队列容量 MPSC_ARRAY_QUEUE.capacity());System.out.println(出队 MPSC_ARRAY_QUEUE.remove()); // 出队操作队列为空则抛出异常System.out.println(出队 MPSC_ARRAY_QUEUE.poll()); // 出队操作队列为空则返回 NULL} }程序输出结果如下 java.lang.IllegalStateException: Queue fullat java.util.AbstractQueue.add(AbstractQueue.java:98)at MpscArrayQueueTest.main(MpscArrayQueueTest.java:17) 队列大小2, 队列容量2 出队data1 出队data2 Disconnected from the target VM, address: 127.0.0.1:58005, transport: socket说到底 MpscArrayQueue 终究还是是个队列基本用法与 ArrayBlockingQueue 都是类似的都离不开队列的基本操作入队 offer()**和**出队 poll()。下面我们就入队 offer() 和出队 poll() 两个最重要的操作分别进行详细的讲解。 入队 offer 首先我们先回顾下 MpscArrayQueue 的重要属性 // ConcurrentCircularArrayQueue protected final long mask; // 计算数组下标的掩码 protected final E[] buffer; // 存放队列数据的数组 // MpmcArrayQueueProducerIndexField private volatile long producerIndex; // 生产者的索引 // MpscArrayQueueProducerLimitField private volatile long producerLimit; // 生产者索引的最大值 // MpscArrayQueueConsumerIndexField protected long consumerIndex; // 消费者索引看到 mask 变量你现在是不是条件反射想到队列中数组的容量大小肯定是 2 的次幂。因为 Mpsc 是多生产者单消费者队列所以 producerIndex、producerLimit 都是用 volatile 进行修饰的其中一个生产者线程的修改需要对其他生产者线程可见。队列入队和出队时会如何操作上述这些属性呢其中生产者和消费者的索引变量又有什么作用呢带着这些问题我们开始阅读源码。 首先跟进 offer() 方法的源码 public boolean offer(E e) {if (null e) {throw new NullPointerException();} else {long mask this.mask;long producerLimit this.lvProducerLimit(); // 获取生产者索引最大限制long pIndex;long offset;do {pIndex this.lvProducerIndex(); // 获取生产者索引if (pIndex producerLimit) {offset this.lvConsumerIndex(); // 获取消费者索引producerLimit offset mask 1L;if (pIndex producerLimit) {return false; // 队列已满}this.soProducerLimit(producerLimit); // 更新 producerLimit}} while(!this.casProducerIndex(pIndex, pIndex 1L)); // CAS 更新生产者索引更新成功则退出说明当前生产者已经占领索引值offset calcElementOffset(pIndex, mask); // 计算生产者索引在数组中下标UnsafeRefArrayAccess.soElement(this.buffer, offset, e); // 向数组中放入数据return true;} }MpscArrayQueue 的 offer() 方法虽然比较简短但是需要具备一些底层知识才能看得懂先不用担心我们一点点开始拆解。首先需要搞懂 producerIndex、producerLimit 以及 consumerIndex 之间的关系这也是 MpscArrayQueue 中设计比较独特的地方。首先看下 lvProducerLimit() 方法的源码 public MpscArrayQueueProducerLimitField(int capacity) {super(capacity);this.producerLimit capacity; } protected final long lvProducerLimit() {return producerLimit; }在初始化状态producerLimit 与队列的容量是相等的对应到 MpscArrayQueueTest 代码示例中producerLimit capacity 2而 producerIndex consumerIndex 0。接下来 Thread1 和 Thread2 并发向 MpscArrayQueue 中存放数据如下图所示。 两个线程此时拿到的 producerIndex 都是 0是小于 producerLimit 的。此时两个线程都会尝试使用 CAS 操作更新 producerIndex其中必然有一个是成功的另外一个是失败的。假设 Thread1 执行 CAS 操作成功那么 Thread2 失败后就会重新更新 producerIndex。Thread1 更新后 producerIndex 的值为 1由于 producerIndex 是 volatile 修饰的更新后立刻对 Thread2 可见。这里有一点需要注意的是当前线程更新后的值是被其他线程使用当 Thread1 和 Thread2 都通过 CAS 抢占成功后它们拿到的 pIndex 分别是 0 和 1。接下来就是根据 pIndex 进行位运算计算得到数组对应的下标然后通过 UNSAFE.putOrderedObject() 方法将数据写入到数组中源码如下所示。 public static E void soElement(E[] buffer, long offset, E e) {UnsafeAccess.UNSAFE.putOrderedObject(buffer, offset, e); }putOrderedObject() 和 putObject() 都可以用于更新对象的值但是 putOrderedObject() 并不会立刻将数据更新到内存中并把其他 Cache Line 置为失效。putOrderedObject() 使用的是 LazySet 延迟更新机制所以性能方面 putOrderedObject() 要比 putObject() 高很多。 Java 中有四种类型的内存屏障分别为 LoadLoad、StoreStore、LoadStore 和 StoreLoad。putOrderedObject() 使用了 StoreStore Barrier对于 Store1StoreStoreStore2 这样的操作序列在 Store2 进行写入之前会保证 Store1 的写操作对其他处理器可见。 LazySet 机制是有代价的就是写操作结果有纳秒级的延迟不会立刻被其他线程以及自身线程可见。因为在 Mpsc Queue 的使用场景中多个生产者只负责写入数据并没有写入之后立刻读取的需求所以使用 LazySet 机制是没有问题的只要 StoreStore Barrier 保证多线程写入的顺序即可。 至此offer() 的核心操作我们已经讲完了。现在我们继续把目光聚焦在 do-while 循环内的逻辑为什么需要两次 if(pIndex producerLimit) 判断呢说明当生产者索引大于 producerLimit 阈值时可能存在两种情况producerLimit 缓存值过期了或者队列已经满了。所以此时我们需要读取最新的消费者索引 consumerIndex之前读取过的数据位置都可以被重复使用重新做一次 producerLimit 计算然后再做一次 if(pIndex producerLimit) 判断如果生产者索引还是大于 producerLimit 阈值说明队列的真的满了。 因为生产者有多个线程所以 MpscArrayQueue 采用了 UNSAFE.getLongVolatile() 方法保证获取消费者索引 consumerIndex 的准确性。getLongVolatile() 使用了 StoreLoad Barrier对于 Store1StoreLoadLoad2 的操作序列在 Load2 以及后续的读取操作之前都会保证 Store1 的写入操作对其他处理器可见。StoreLoad 是四种内存屏障开销最大的现在你是不是可以体会到引入 producerLimit 的好处了呢假设我们的消费速度和生产速度比较均衡的情况下差不多走完一圈数组才需要获取一次消费者索引 consumerIndex从而大幅度减少了 getLongVolatile() 操作的执行次数性能提升是显著的。 学习完 MpscArrayQueue 的入队 offer() 方法后再来看出队 poll() 就会容易很多我们继续向下看。 出队 poll poll() 方法的作用是移除队列的首个元素并返回如果队列为空则返回 NULL。我们看下 poll() 源码是如何实现的。 public E poll() {long cIndex this.lpConsumerIndex(); // 直接返回消费者索引 consumerIndexlong offset this.calcElementOffset(cIndex); // 计算数组对应的偏移量E[] buffer this.buffer;E e UnsafeRefArrayAccess.lvElement(buffer, offset); // 取出数组中 offset 对应的元素if (null e) {if (cIndex this.lvProducerIndex()) { // 队列为空return null;}do {e UnsafeRefArrayAccess.lvElement(buffer, offset); } while(e null); // 等待生产者填充元素}UnsafeRefArrayAccess.spElement(buffer, offset, (Object)null); // 消费成功后将当前位置置为 NULLthis.soConsumerIndex(cIndex 1L); // 更新 consumerIndex 到下一个位置return e; }因为只有一个消费者线程所以整个 poll() 的过程没有 CAS 操作。poll() 方法核心思路是获取消费者索引 consumerIndex然后根据 consumerIndex 计算得出数组对应的偏移量然后将数组对应位置的元素取出并返回最后将 consumerIndex 移动到环形数组下一个位置。 获取消费者索引以及计算数组对应的偏移量的逻辑与 offer() 类似在这里就不赘述了。下面直接看下如何取出数组中 offset 对应的元素跟进 lvElement() 方法的源码。 public static E E lvElement(E[] buffer, long offset) {return (E) UNSAFE.getObjectVolatile(buffer, offset); }获取数组元素的时候同样使用了 UNSAFE 系列方法getObjectVolatile() 方法则使用的是 LoadLoad Barrier对于 Load1LoadLoadLoad2 操作序列在 Load2 以及后续读取操作之前会保证 Load1 的读取操作执行完毕所以 getObjectVolatile() 方法可以保证每次读取数据都可以从内存中拿到最新值。 与 offer() 相反poll() 比较关注队列为空的情况。当调用 lvElement() 方法获取到的元素为 NULL 时有两种可能的情况队列为空或者生产者填充的元素还没有对消费者可见。如果消费者索引 consumerIndex 等于生产者 producerIndex说明队列为空。只要两者不相等消费者需要等待生产者填充数据完毕。 当成功消费数组中的元素之后需要把当前消费者索引 consumerIndex 的位置置为 NULL然后把 consumerIndex 移动到数组下一个位置。逻辑比较简单下面我们把 spElement() 和 soConsumerIndex() 方法放在一起看。 public static E void spElement(E[] buffer, long offset, E e) {UNSAFE.putObject(buffer, offset, e); } protected void soConsumerIndex(long newValue) {UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue); }最后的更新操作我们又看到了 UNSAFE put 系列方法的运用其中 putObject() 不会使用任何内存屏障它会直接更新对象对应偏移量的值。而 putOrderedLong 与 putOrderedObject() 是一样的都使用了 StoreStore Barrier也是延迟更新 LazySet 机制我们就不再赘述了。 到此为止MpscArrayQueue 入队和出队的核心源码已经分析完了。因为 JCTools 是服务于 JVM 的并发工具类其中包含了很多黑科技的技巧例如填充法解决伪共享问题、Unsafe 直接操作内存等让我们对底层知识的掌握又更进一步。此外 JCTools 还提供了 MpscUnboundedArrayQueue、MpscChunkedArrayQueue 等其他具有特色功能的队列有兴趣的话你可以课后自行研究相信有了本节课的基础再分析其他队列一定不会难倒你。 总结 MpscArrayQueue 还只是 Jctools 中的冰山一角其中蕴藏着丰富的技术细节我们对 MpscArrayQueue 的知识点做一个简单的总结。 通过大量填充 long 类型变量解决伪共享问题。环形数组的容量设置为 2 的次幂可以通过位运算快速定位到数组对应下标。入队 offer() 操作中 producerLimit 的巧妙设计大幅度减少了主动获取消费者索引 consumerIndex 的次数性能提升显著。入队和出队操作中都大量使用了 UNSAFE 系列方法针对生产者和消费者的场景不同使用的 UNSAFE 方法也是不一样的。Jctools 在底层操作的运用上也是有的放矢把性能发挥到极致。 到此为止我们源码解析的课程就告一段落了。Netty 还有很多黑科技等待我们去探索希望通过前面 Netty 核心源码的学习在今后深入研究 Netty 的道路上能够有所帮助。 23 架构设计如何实现一个高性能分布式 RPC 框架 在前面的课程中我们由浅入深地讲解了 Netty 的基础知识和实现原理并对 Netty 的核心源码进行了剖析相信你已经体会到了 Netty 的强大之处。本身学习一门技术是一个比较漫长的过程恭喜你坚持了下来。纸上得来终觉浅绝知此事要躬行。你是不是已经迫不及待想在项目中使用 Netty 了呢接下来我会带着你完成一个相对完整的 RPC 框架原型帮助你加深对 Netty 的理解希望你能亲自动手跟我一起完成它。 我先来说说为什么要选择 RPC 框架作为实战项目。RPC 框架是大型企业高频使用的一种中间件框架用于解决分布式系统中服务之间的调用问题。RPC 框架设计很多重要的知识点如线程模型、通信协议设计、同步/异步调用、负载均衡等对于提高我们的技术综合能力有非常大的帮助。 我们实战课需要达到什么样的目标呢市面上有较多出名的 RPC 框架例如 Dubbo、Thrift、gRPC 等RPC 框架本身是非常负责的我们不可能面面俱到而是抓住 RPC 框架的核心流程以及必备的组件开发一个功能比较丰富的小型 RPC 框架。麻雀虽小五脏俱全。 在正式开始 RPC 实战项目之前我们先学习一下 RPC 的架构设计这是项目前期规划非常重要的一步。 RPC 框架架构设计 RPC 又称远程过程调用Remote Procedure Call用于解决分布式系统中服务之间的调用问题。通俗地讲就是开发者能够像调用本地方法一样调用远程的服务。下面我们通过一幅图来说说 RPC 框架的基本架构。 RPC 框架包含三个最重要的组件分别是客户端、服务端和注册中心。在一次 RPC 调用流程中这三个组件是这样交互的 服务端在启动后会将它提供的服务列表发布到注册中心客户端向注册中心订阅服务地址客户端会通过本地代理模块 Proxy 调用服务端Proxy 模块收到负责将方法、参数等数据转化成网络字节流客户端从服务列表中选取其中一个的服务地址并将数据通过网络发送给服务端服务端接收到数据后进行解码得到请求信息服务端根据解码后的请求信息调用对应的服务然后将调用结果返回给客户端。 虽然 RPC 调用流程很容易理解但是实现一个完整的 RPC 框架设计到很多内容例如服务注册与发现、通信协议与序列化、负载均衡、动态代理等下面我们一一进行初步地讲解。 服务注册与发现 在分布式系统中不同服务之间应该如何通信呢传统的方式可以通过 HTTP 请求调用、保存服务端的服务列表等这样做需要开发者主动感知到服务端暴露的信息系统之间耦合严重。为了更好地将客户端和服务端解耦以及实现服务优雅上线和下线于是注册中心就出现了。 在 RPC 框架中主要是使用注册中心来实现服务注册和发现的功能。服务端节点上线后自行向注册中心注册服务列表节点下线时需要从注册中心将节点元数据信息移除。客户端向服务端发起调用时自己负责从注册中心获取服务端的服务列表然后在通过负载均衡算法选择其中一个服务节点进行调用。以上是最简单直接的服务端和客户端的发布和订阅模式不需要再借助任何中间服务器性能损耗也是最小的。 现在思考一个问题服务在下线时需要从注册中心移除元数据那么注册中心怎么才能感知到服务下线呢我们最先想到的方法就是节点主动通知的实现方式当节点需要下线时向注册中心发送下线请求让注册中心移除自己的元数据信息。但是如果节点异常退出例如断网、进程崩溃等那么注册中心将会一直残留异常节点的元数据从而可能造成服务调用出现问题。 为了避免上述问题实现服务优雅下线比较好的方式是采用主动通知 心跳检测的方案。除了主动通知注册中心下线外还需要增加节点与注册中心的心跳检测功能这个过程也叫作探活。心跳检测可以由节点或者注册中心负责例如注册中心可以向服务节点每 60s 发送一次心跳包如果 3 次心跳包都没有收到请求结果可以任务该服务节点已经下线。 由此可见采用注册中心的好处是可以解耦客户端和服务端之间错综复杂的关系并且能够实现对服务的动态管理。服务配置可以支持动态修改然后将更新后的配置推送到客户端和服务端无须重启任何服务。 通信协议与序列化 既然 RPC 是远程调用必然离不开网络通信协议。客户端在向服务端发起调用之前需要考虑采用何种方式将调用信息进行编码并传输到服务端。因为 RPC 框架对性能有非常高的要求所以通信协议应该越简单越好这样可以减少编解码的性能损耗。RPC 框架可以基于不同的协议实现大部分主流 RPC 框架会选择 TCP、HTTP 协议出名的 gRPC 框架使用的则是 HTTP2。TCP、HTTP、HTTP2 都是稳定可靠的但其实使用 UDP 协议也是可以的具体看业务使用的场景。成熟的 RCP 框架能够支持多种协议例如阿里开源的 Dubbo 框架被很多互联网公司广泛使用其中可插拔的协议支持是 Dubbo 的一大特色这样不仅可以给开发者提供多种不同的选择而且为接入异构系统提供了便利。 客户端和服务端在通信过程中需要传输哪些数据呢这些数据又该如何编解码呢如果采用 TCP 协议你需要将调用的接口、方法、请求参数、调用属性等信息序列化成二进制字节流传递给服务提供方服务端接收到数据后再把二进制字节流反序列化得到调用信息然后利用反射的原理调用对应方法最后将返回结果、返回码、异常信息等返回给客户端。所谓序列化和反序列化就是将对象转换成二进制流以及将二进制流再转换成对象的过程。因为网络通信依赖于字节流而且这些请求信息都是不确定的所以一般会选用通用且高效的序列化算法。比较常用的序列化算法有 FastJson、Kryo、Hessian、Protobuf 等这些第三方序列化算法都比 Java 原生的序列化操作都更加高效。Dubbo 支持多种序列化算法并定义了 Serialization 接口规范所有序列化算法扩展都必须实现该接口其中默认使用的是 Hessian 序列化算法。 RPC 调用方式 成熟的 RPC 框架一般会提供四种调用方式分别为同步 Sync、异步 Future、回调 Callback和单向 Oneway。RPC 框架的性能和吞吐量与合理使用调用方式是息息相关的下面我们逐一介绍下四种调用方式的实现原理。 Sync 同步调用。客户端线程发起 RPC 调用后当前线程会一直阻塞直至服务端返回结果或者处理超时异常。Sync 同步调用一般是 RPC 框架默认的调用方式为了保证系统可用性客户端设置合理的超时时间是非常重要的。虽说 Sync 是同步调用但是客户端线程和服务端线程并不是同一个线程实际在 RPC 框架内部还是异步处理的。Sync 同步调用的过程如下图所示。 Future 异步调用。客户端发起调用后不会再阻塞等待而是拿到 RPC 框架返回的 Future 对象调用结果会被服务端缓存客户端自行决定后续何时获取返回结果。当客户端主动获取结果时该过程是阻塞等待的。Future 异步调用过程如下图所示。 Callback 回调调用。如下图所示客户端发起调用时将 Callback 对象传递给 RPC 框架无须同步等待返回结果直接返回。当获取到服务端响应结果或者超时异常后再执行用户注册的 Callback 回调。所以 Callback 接口一般包含 onResponse 和 onException 两个方法分别对应成功返回和异常返回两种情况。 Oneway 单向调用。客户端发起请求之后直接返回忽略返回结果。Oneway 方式是最简单的具体调用过程如下图所示。 四种调用方式都各有优缺点很难说异步方式一定会比同步方式效果好在不用的业务场景可以按需选取更合适的调用方式。 线程模型 线程模型是 RPC 框架需要重点关注的部分与我们之前介绍的 Netty Reactor 线程模型有什么区别和联系吗 首先我们需要明确 I/O 线程和业务线程的区别以 Dubbo 框架为例Dubbo 使用 Netty 作为底层的网络通信框架采用了我们熟悉的主从 Reactor 线程模型其中 Boss 和 Worker 线程池就可以看作 I/O 线程。I/O 线程可以理解为主要负责处理网络数据例如事件轮询、编解码、数据传输等。如果业务逻辑能够立即完成也可以使用 I/O 线程进行处理这样可以省去线程上下文切换的开销。如果业务逻辑耗时较多例如包含查询数据库、复杂规则计算等耗时逻辑那么 I/O 必须将这些请求分发到业务线程池中进行处理以免阻塞 I/O 线程。 那么哪些请求需要在 I/O 线程中执行哪些又需要在业务线程池中执行呢Dubbo 框架的做法值得借鉴它给用户提供了多种选择它一共提供了 5 种分发策略如下表格所示。 负载均衡 在分布式系统中服务提供者和服务消费者都会有多台节点如何保证服务提供者所有节点的负载均衡呢客户端在发起调用之前需要感知有多少服务端节点可用然后从中选取一个进行调用。客户端需要拿到服务端节点的状态信息并根据不同的策略实现负载均衡算法。负载均衡策略是影响 RPC 框架吞吐量很重要的一个因素下面我们介绍几种最常用的负载均衡策略。 Round-Robin 轮询。Round-Robin 是最简单有效的负载均衡策略并没有考虑服务端节点的实际负载水平而是依次轮询服务端节点。Weighted Round-Robin 权重轮询。对不同负载水平的服务端节点增加权重系数这样可以通过权重系数降低性能较差或者配置较低的节点流量。权重系数可以根据服务端负载水平实时进行调整使集群达到相对均衡的状态。Least Connections 最少连接数。客户端根据服务端节点当前的连接数进行负载均衡客户端会选择连接数最少的一台服务器进行调用。Least Connections 策略只是服务端其中一种维度我们可以演化出最少请求数、CPU 利用率最低等其他维度的负载均衡方案。Consistent Hash 一致性 Hash。目前主流推荐的负载均衡策略Consistent Hash 是一种特殊的 Hash 算法在服务端节点扩容或者下线时尽可能保证客户端请求还是固定分配到同一台服务器节点。Consistent Hash 算法是采用哈希环来实现的通过 Hash 函数将对象和服务器节点放置在哈希环上一般来说服务器可以选择 IP Port 进行 Hash然后为对象选择对应的服务器节点在哈希环中顺时针查找距离对象 Hash 值最近的服务器节点。 此外负载均衡算法可以是多种多样的客户端可以记录例如健康状态、连接数、内存、CPU、Load 等更加丰富的信息根据综合因素进行更好地决策。 动态代理 RPC 框架怎么做到像调用本地接口一样调用远端服务呢这必须依赖动态代理来实现。需要创建一个代理对象在代理对象中完成数据报文编码然后发起调用发送数据给服务提供方以此屏蔽 RPC 框架的调用细节。因为代理类是在运行时生成的所以代理类的生成速度、生成的字节码大小都会影响 RPC 框架整体的性能和资源消耗所以需要慎重选择动态代理的实现方案。动态代理比较主流的实现方案有以下几种JDK 动态代理、Cglib、Javassist、ASM、Byte Buddy我们简单做一个对比和介绍。 JDK 动态代理。在运行时可以动态创建代理类但是 JDK 动态代理的功能比较局限代理对象必须实现一个接口否则抛出异常。因为代理类会继承 Proxy 类然而 Java 是不支持多重继承的只能通过接口实现多态。JDK 动态代理所生成的代理类是接口的实现类不能代理接口中不存在的方法。JDK 动态代理是通过反射调用的形式代理类中的方法比直接调用肯定是性能要慢的。Cglib 动态代理。Cglib 是基于 ASM 字节码生成框架实现的通过字节码技术生成的代理类所以代理类的类型是不受限制的。而且 Cglib 生成的代理类是继承于被代理类所以可以提供更加灵活的功能。在代理方法方面Cglib 是有优势的它采用了 FastClass 机制为代理类和被代理类各自创建一个 Class这个 Class 会为代理类和被代理类的方法分配 index 索引FastClass 就可以通过 index 直接定位要调用的方法并直接调用这是一种空间换时间的优化思路。Javassist 和 ASM。二者都是 Java 字节码操作框架使用起来难度较大需要开发者对 Class 文件结构以及 JVM 都有所了解但是它们都比反射的性能要高。Byte Buddy 也是一个字节码生成和操作的类库Byte Buddy 功能强大相比于 Javassist 和 ASMByte Buddy 提供了更加便捷的 API用于创建和修改 Java 类无须理解字节码的格式而且 Byte Buddy 更加轻量性能更好。 至此我们已经对实现 RPC 框架的几个核心要点做了一个大致的介绍关于通信协议、负载均衡、动态代理在 RPC 框架中如何实现我们后面会有专门的实践课对其进行详细介绍本节课我们先有个大概的印象即可。 总结 如果你可以完成上述 RPC 框架的核心功能那么一个简易的 RPC 框架的 MVP 原型就完成了这也是我们实践课的目标。当然实现一个高性能高可靠的 RPC 框架并不容易需要考虑的问题远不止如此例如异常重试、服务级别线程池隔离、熔断限流、集群容错、优雅下线等等在实践课最后我会为你讲解 RPC 框架进阶的拓展内容。 24 服务发布与订阅搭建生产者和消费者的基础框架 从本节课开始我们开始动手开发一个完整的 RPC 框架原型通过整个实践课程的学习你不仅可以熟悉 RPC 的实现原理而且可以对之前 Netty 基础知识加深理解同样在工作中也可以学以致用。 我会从服务发布与订阅、远程通信、服务治理、动态代理四个方面详细地介绍一个通用 RPC 框架的实现过程相信你只要坚持完成本次实践课之后你再独立完成工作中项目研发会变得更加容易。你是不是已经迫不及待地想动手了呢让我们一起开始吧 源码参考地址mini-rpc 环境搭建 工欲善其事必先利其器首先我们需要搭建我们的开发环境这是每个程序员的必备技能。以下是我的本机环境清单仅供参考。 操作系统MacOS Big Sur11.0.1。集成开发工具IntelliJ IDEA 2020.3当然你也可以选择 eclipse。项目技术栈SpringBoot 2.1.12.RELEASE JDK 1.8.0_221 Netty 4.1.42.Final。项目依赖管理工具Maven 3.5.4你可以独立安装 Maven 或者使用 IDEA 的集成版独立安装的 Maven 需要配置 MAVEN_HOME 和 PATH 环境变量。注册中心Zookeeeper 3.4.14需要特别注意 Zookeeeper 和 Apache Curator 一定要搭配使用Zookeeper 3.4.x 版本Apache Curator 只有 2.x.x 才能支持。 项目结构 在动手开发项目之前我们需要对项目结构有清晰的构思。根据上节课介绍的 RPC 框架设计架构我们可以将项目结构划分为以下几个模块。 其中每个模块都是什么角色呢下面我们一一进行介绍。 rpc-provider服务提供者。负责发布 RPC 服务接收和处理 RPC 请求。rpc-consumer服务消费者。使用动态代理发起 RPC 远程调用帮助使用者来屏蔽底层网络通信的细节。rpc-registry注册中心模块。提供服务注册、服务发现、负载均衡的基本功能。rpc-protocol网络通信模块。包含 RPC 协议的编解码器、序列化和反序列化工具等。rpc-core基础类库。提供通用的工具类以及模型定义例如 RPC 请求和响应类、RPC 服务元数据类等。rpc-facadeRPC 服务接口。包含服务提供者需要对外暴露的接口本模块主要用于模拟真实 RPC 调用的测试。 如下图所示首先我们需要清楚各个模块之间的依赖关系才能帮助我们更好地梳理 Maven 的 pom 定义。rpc-core 是最基础的类库所以大部分模块都依赖它。rpc-consumer 用于发起 RPC 调用。rpc-provider 负责处理 RPC 请求如果不知道远程服务的地址那么一切都是空谈了所以两者都需要依赖 rpc-registry 提供的服务发现和服务注册的能力。 如何使用 我们不着急开始动手实现代码细节而是考虑一个问题最终实现的 RPC 框架应该让用户如何使用呢这就跟我们学习一门技术一样你不可能刚开始就直接陷入源码的细节而是先熟悉它的基本使用方式然后找到关键的切入点再深入研究实现原理会起到事半功倍的效果。 首先我们看下 RPC 框架想要实现的效果如下所示 // rpc-facade # HelloFacade public interface HelloFacade {String hello(String name); } // rpc-provider # HelloFacadeImpl RpcService(serviceInterface HelloFacade.class, serviceVersion 1.0.0) public class HelloFacadeImpl implements HelloFacade {Overridepublic String hello(String name) {return hello name;} } // rpc-consumer # HelloController RestController public class HelloController {RpcReference(serviceVersion 1.0.0, timeout 3000)private HelloFacade helloFacade;RequestMapping(value /hello, method RequestMethod.GET)public String sayHello() {return helloFacade.hello(mini rpc);} }为了方便在本地模拟客户端和服务端我会把 rpc-provider 和 rpc-consumer 两个模块能够做到独立启动。rpc-provider 通过 RpcService 注解暴露 RPC 服务 HelloFacaderpc-consumer 通过 RpcReference 注解引用 HelloFacade 服务并发起调用基本与我们常用的 RPC 框架使用方式保持一致。 梳理清楚项目结构和整体实现思路之后下面我们从服务提供者开始入手开发。 服务提供者发布服务 服务提供者 rpc-provider 需要完成哪些事情呢主要分为四个核心流程 服务提供者启动服务并暴露服务端口启动时扫描需要对外发布的服务并将服务元数据信息发布到注册中心接收 RPC 请求解码后得到请求消息提交请求至自定义线程池进行处理并将处理结果写回客户端。 本节课我们先实现 rpc-provider 模块前面两个流程。 服务提供者启动 服务提供者启动的配置方式基本是固定模式也是从引导器 Bootstrap 开始入手你可以复习下基础课程《03 引导器作用客户端和服务端启动都要做些什么》。我们首先看下服务提供者的启动实现代码如下所示 private void startRpcServer() throws Exception {this.serverAddress InetAddress.getLocalHost().getHostAddress();EventLoopGroup boss new NioEventLoopGroup();EventLoopGroup worker new NioEventLoopGroup();try {ServerBootstrap bootstrap new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerSocketChannel() {Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {}}).childOption(ChannelOption.SO_KEEPALIVE, true);ChannelFuture channelFuture bootstrap.bind(this.serverAddress, this.serverPort).sync();log.info(server addr {} started on port {}, this.serverAddress, this.serverPort);channelFuture.channel().closeFuture().sync();} finally {boss.shutdownGracefully();worker.shutdownGracefully();} }服务提供者采用的是主从 Reactor 线程模型启动过程包括配置线程池、Channel 初始化、端口绑定三个步骤我们暂时先不关注 Channel 初始化中自定义的业务处理器 Handler 是如何设计和实现的。 对于 RPC 框架而言可扩展性是比较重要的一方面。接下来我们看下如何借助 Spring Boot 的能力将服务提供者启动所依赖的参数做成可配置化。 参数配置 服务提供者启动需要配置一些参数我们不应该把这些参数固定在代码里而是以命令行参数或者配置文件的方式进行输入。我们可以使用 Spring Boot 的 ConfigurationProperties 注解很轻松地实现配置项的加载并且可以把相同前缀类型的配置项自动封装成实体类。接下来我们为服务提供者提供参数映射的对象 Data ConfigurationProperties(prefix rpc) public class RpcProperties {private int servicePort;private String registryAddr;private String registryType; }我们一共提取了三个参数分别为服务暴露的端口 servicePort、注册中心的地址 registryAddr 和注册中心的类型 registryType。ConfigurationProperties 注解最经典的使用方式就是通过 prefix 属性指定配置参数的前缀默认会与全局配置文件 application.properties 或者 application.yml 中的参数进行一一绑定。如果你想自定义一个配置文件可以通过 PropertySource 注解指定配置文件的位置。下面我们在 rpc-provider 模块的 resources 目录下创建全局配置文件 application.properties并配置以上三个参数 rpc.servicePort2781 rpc.registryTypeZOOKEEPER rpc.registryAddr127.0.0.1:2181application.properties 配置文件中的属性必须和实体类的成员变量是一一对应的可以采用以下常用的命名规则例如驼峰命名 rpc.servicePort2781或者虚线 - 分割的方式 rpc.service-port2781以及大写加下划线的形式 RPC_Service_Port建议在环境变量中使用。ConfigurationProperties 注解还可以支持更多复杂结构的配置并且可以 Validation 功能进行参数校验如果你有兴趣可以课后再进行深入研究。 有了 RpcProperties 实体类我们接下来应该如何使用呢如果只配置 ConfigurationProperties 注解Spring 容器并不能获取配置文件的内容并映射为对象这时 EnableConfigurationProperties 注解就登场了。EnableConfigurationProperties 注解的作用就是将声明 ConfigurationProperties 注解的类注入为 Spring 容器中的 Bean。具体用法如下 Configuration EnableConfigurationProperties(RpcProperties.class) public class RpcProviderAutoConfiguration {Resourceprivate RpcProperties rpcProperties;Beanpublic RpcProvider init() throws Exception {RegistryType type RegistryType.valueOf(rpcProperties.getRegistryType());RegistryService serviceRegistry RegistryFactory.getInstance(rpcProperties.getRegistryAddr(), type);return new RpcProvider(rpcProperties.getServicePort(), serviceRegistry);} }我们通过 EnableConfigurationProperties 注解使得 RpcProperties 生效并通过 Configuration 和 Bean 注解自定义了 RpcProvider 的生成方式。Configuration 主要用于定义配置类配置类内部可以包含多个 Bean 注解的方法可以替换传统 XML 的定义方式。被 Bean 注解的方法会返回一个自定义的对象Bean 注解会将这个对象注册为 Bean 并装配到 Spring 容器中Bean 比 Component 注解的自定义功能更强。 至此我们服务提供者启动的准备工作就完成了下面你需要添加 Spring Boot 的 main 方法如下所示然后尝试启动下 rpc-provider 模块吧。 EnableConfigurationProperties SpringBootApplication public class RpcProviderApplication {public static void main(String[] args) {SpringApplication.run(RpcProviderApplication.class, args);} }发布服务 在服务提供者启动时我们需要思考一个核心问题服务提供者需要将服务发布到注册中心怎么知道哪些服务需要发布呢服务提供者需要定义需要发布服务类型、服务版本等属性主流的 RPC 框架都采用 XML 文件或者注解的方式进行定义。以注解的方式暴露服务现在最为常用省去了很多烦琐的 XML 配置过程。例如 Dubbo 框架中使用 Service 注解替代 dubbo:service 的定义方式服务消费者则使用 Reference 注解替代 dubbo:reference。接下来我们看看作为服务提供者如何通过注解暴露服务首先给出我们自定义的 RpcService 注解定义 Retention(RetentionPolicy.RUNTIME) Target(ElementType.TYPE) Component public interface RpcService {Class? serviceInterface() default Object.class;String serviceVersion() default 1.0; }RpcService 提供了两个必不可少的属性服务类型 serviceInterface 和服务版本 serviceVersion服务消费者必须指定完全一样的属性才能正确调用。有了 RpcService 注解之后我们就可以在服务实现类上使用它RpcService 注解本质上就是 Component可以将服务实现类注册成 Spring 容器所管理的 Bean那么 serviceInterface、serviceVersion 的属性值怎么才能和 Bean 关联起来呢这就需要我们就 Bean 的生命周期以及 Bean 的可扩展点有所了解。 Spring 的 BeanPostProcessor 接口给提供了对 Bean 进行再加工的扩展点BeanPostProcessor 常用于处理自定义注解。自定义的 Bean 可以通过实现 BeanPostProcessor 接口在 Bean 实例化的前后加入自定义的逻辑处理。如下所示我们通过 RpcProvider 实现 BeanPostProcessor 接口来实现对 声明 RpcService 注解服务的自定义处理。 public class RpcProvider implements InitializingBean, BeanPostProcessor {// 省略其他代码private final MapString, Object rpcServiceMap new HashMap();Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {RpcService rpcService bean.getClass().getAnnotation(RpcService.class);if (rpcService ! null) {String serviceName rpcService.serviceInterface().getName();String serviceVersion rpcService.serviceVersion();try {ServiceMeta serviceMeta new ServiceMeta();serviceMeta.setServiceAddr(serverAddress);serviceMeta.setServicePort(serverPort);serviceMeta.setServiceName(serviceName);serviceMeta.setServiceVersion(serviceVersion);// TODO 发布服务元数据至注册中心rpcServiceMap.put(RpcServiceHelper.buildServiceKey(serviceMeta.getServiceName(), serviceMeta.getServiceVersion()), bean);} catch (Exception e) {log.error(failed to register service {}#{}, serviceName, serviceVersion, e);}}return bean;} }RpcProvider 重写了 BeanPostProcessor 接口的 postProcessAfterInitialization 方法对所有初始化完成后的 Bean 进行扫描。如果 Bean 包含 RpcService 注解那么通过注解读取服务的元数据信息并构造出 ServiceMeta 对象接下来准备将服务的元数据信息发布至注册中心注册中心的实现我们先暂且跳过后面会有单独一节课进行讲解注册中心的实现。此外RpcProvider 还维护了一个 rpcServiceMap存放服务初始化后所对应的 BeanrpcServiceMap 起到了缓存的角色在处理 RPC 请求时可以直接通过 rpcServiceMap 拿到对应的服务进行调用。 明白服务提供者如何处理 RpcService 注解的原理之后接下来再实现服务消费者就容易很多了。 服务消费者订阅服务 与服务提供者不同的是服务消费者并不是一个常驻的服务每次发起 RPC 调用时它才会去选择向哪个远端服务发送数据。所以服务消费者的实现要复杂一些对于声明 RpcReference 注解的成员变量我们需要构造出一个可以真正进行 RPC 调用的 Bean然后将它注册到 Spring 的容器中。 首先我们看下 RpcReference 注解的定义代码如下所示 Retention(RetentionPolicy.RUNTIME) Target(ElementType.FIELD) Autowired public interface RpcReference {String serviceVersion() default 1.0;String registryType() default ZOOKEEPER;String registryAddress() default 127.0.0.1:2181;long timeout() default 5000; }RpcReference 注解提供了服务版本 serviceVersion、注册中心类型 registryType、注册中心地址 registryAddress 和超时时间 timeout 四个属性接下来我们需要使用这些属性构造出一个自定义的 Bean并对该 Bean 执行的所有方法进行拦截。 Spring 的 FactoryBean 接口可以帮助我们实现自定义的 BeanFactoryBean 是一种特种的工厂 Bean通过 getObject() 方法返回对象而并不是 FactoryBean 本身。 public class RpcReferenceBean implements FactoryBeanObject {private Class? interfaceClass;private String serviceVersion;private String registryType;private String registryAddr;private long timeout;private Object object;Overridepublic Object getObject() throws Exception {return object;}Overridepublic Class? getObjectType() {return interfaceClass;}public void init() throws Exception {// TODO 生成动态代理对象并赋值给 object}public void setInterfaceClass(Class? interfaceClass) {this.interfaceClass interfaceClass;}public void setServiceVersion(String serviceVersion) {this.serviceVersion serviceVersion;}public void setRegistryType(String registryType) {this.registryType registryType;}public void setRegistryAddr(String registryAddr) {this.registryAddr registryAddr;}public void setTimeout(long timeout) {this.timeout timeout;} }在 RpcReferenceBean 中 init() 方法被我标注了 TODO此处需要实现动态代理对象并通过代理对象完成 RPC 调用。对于使用者来说只是通过 RpcReference 订阅了服务并不感知底层调用的细节。对于如何实现 RPC 通信、服务寻址等都是在动态代理类中完成的在后面我们会有专门的一节课详细讲解动态代理的实现。 有了 RpcReference 注解和 RpcReferenceBean 之后我们可以使用 Spring 的扩展点 BeanFactoryPostProcessor 对 Bean 的定义进行修改。上文中服务提供者使用的是 BeanPostProcessorBeanFactoryPostProcessor 和 BeanPostProcessor 都是 Spring 的核心扩展点它们之间有什么区别呢BeanFactoryPostProcessor 是 Spring 容器加载 Bean 的定义之后以及 Bean 实例化之前执行所以 BeanFactoryPostProcessor 可以在 Bean 实例化之前获取 Bean 的配置元数据并允许用户对其修改。而 BeanPostProcessor 是在 Bean 初始化前后执行它并不能修改 Bean 的配置信息。 现在我们需要对声明 RpcReference 注解的成员变量构造出 RpcReferenceBean所以需要实现 BeanFactoryPostProcessor 修改 Bean 的定义具体实现如下所示。 Component Slf4j public class RpcConsumerPostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {private ApplicationContext context;private ClassLoader classLoader;private final MapString, BeanDefinition rpcRefBeanDefinitions new LinkedHashMap();Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context applicationContext;}Overridepublic void setBeanClassLoader(ClassLoader classLoader) {this.classLoader classLoader;}Overridepublic void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {for (String beanDefinitionName : beanFactory.getBeanDefinitionNames()) {BeanDefinition beanDefinition beanFactory.getBeanDefinition(beanDefinitionName);String beanClassName beanDefinition.getBeanClassName();if (beanClassName ! null) {Class? clazz ClassUtils.resolveClassName(beanClassName, this.classLoader);ReflectionUtils.doWithFields(clazz, this::parseRpcReference);}}BeanDefinitionRegistry registry (BeanDefinitionRegistry) beanFactory;this.rpcRefBeanDefinitions.forEach((beanName, beanDefinition) - {if (context.containsBean(beanName)) {throw new IllegalArgumentException(spring context already has a bean named beanName);}registry.registerBeanDefinition(beanName, rpcRefBeanDefinitions.get(beanName));log.info(registered RpcReferenceBean {} success., beanName);});}private void parseRpcReference(Field field) {RpcReference annotation AnnotationUtils.getAnnotation(field, RpcReference.class);if (annotation ! null) {BeanDefinitionBuilder builder BeanDefinitionBuilder.genericBeanDefinition(RpcReferenceBean.class);builder.setInitMethodName(RpcConstants.INIT_METHOD_NAME);builder.addPropertyValue(interfaceClass, field.getType());builder.addPropertyValue(serviceVersion, annotation.serviceVersion());builder.addPropertyValue(registryType, annotation.registryType());builder.addPropertyValue(registryAddr, annotation.registryAddress());builder.addPropertyValue(timeout, annotation.timeout());BeanDefinition beanDefinition builder.getBeanDefinition();rpcRefBeanDefinitions.put(field.getName(), beanDefinition);}} }RpcConsumerPostProcessor 类中重写了 BeanFactoryPostProcessor 的 postProcessBeanFactory 方法从 beanFactory 中获取所有 Bean 的定义信息然后分别对每个 Bean 的所有 field 进行检测。如果 field 被声明了 RpcReference 注解通过 BeanDefinitionBuilder 构造 RpcReferenceBean 的定义并为 RpcReferenceBean 的成员变量赋值包括服务类型 interfaceClass、服务版本 serviceVersion、注册中心类型 registryType、注册中心地址 registryAddr 以及超时时间 timeout。构造完 RpcReferenceBean 的定义之后会将RpcReferenceBean 的 BeanDefinition 重新注册到 Spring 容器中。 至此我们已经将服务提供者服务消费者的基本框架搭建出来了并且着重介绍了服务提供者使用 RpcService 注解是如何发布服务的服务消费者相应需要一个能够注入服务接口的注解 RpcReference被 RpcReference 修饰的成员变量都会被构造成 RpcReferenceBean并为它生成动态代理类后面我们再继续深入介绍。 总结 本节课我们介绍了服务发布与订阅的实现原理搭建出了服务提供者和服务消费者的基本框架。可以看出如果采用 Java 语言实现 RPC 框架核心的服务发布与订阅的核心逻辑需要你具备较为扎实的 Spring 框架基础。了解 Spring 重要的扩展接口可以帮助我们开发出更优雅的代码。 25 远程通信通信协议设计以及编解码的实现 上节课我们搭建了服务提供者和服务消费者的基本框架现在我们可以建立两个模块之间的通信机制了。本节课我们通过向 ChannelPipeline 添加自定义的业务处理器来完成 RPC 框架的远程通信机制。需要实现的主要功能如下 服务消费者实现协议编码向服务提供者发送调用数据。服务提供者收到数据后解码然后向服务消费者发送响应数据暂时忽略 RPC 请求是如何被调用的。服务消费者收到响应数据后成功返回。 源码参考地址mini-rpc RPC 通信方案设计 结合本节课的目标接下来我们对 RPC 请求调用和结果响应两个过程分别进行详细拆解分析。首先看下 RPC 请求调用的过程如下图所示。 RPC 请求的过程对于服务消费者来说是出站操作对于服务提供者来说是入站操作。数据发送前服务消费者将 RPC 请求信息封装成 MiniRpcProtocol 对象然后通过编码器 MiniRpcEncoder 进行二进制编码最后直接向发送至远端即可。服务提供者收到请求数据后将二进制数据交给解码器 MiniRpcDecoder解码后再次生成 MiniRpcProtocol 对象然后传递给 RpcRequestHandler 执行真正的 RPC 请求调用。 我们暂时忽略 RpcRequestHandler 是如何执行 RPC 请求调用的接下来我们继续分析 RpcRequestHandler 处理成功后是如何向服务消费者返回响应结果的如下图所示 与 RPC 请求过程相反是由服务提供者将响应结果封装成 MiniRpcProtocol 对象然后通过 MiniRpcEncoder 编码发送给服务消费者。服务消费者对响应结果进行解码因为 RPC 请求是高并发的所以需要 RpcRequestHandler 根据响应结果找到对应的请求最后将响应结果返回。 综合 RPC 请求调用和结果响应的处理过程来看编码器 MiniRpcEncoder、解码器 MiniRpcDecoder 以及通信协议对象 MiniRpcProtocol 都可以设计成复用的最终服务消费者和服务提供者的 ChannelPipeline 结构如下图所示。 由此可见在实现 Netty 网络通信模块时先画图分析 ChannelHandler 的处理流程是非常有帮助的。 自定义 RPC 通信协议 协议是服务消费者和服务提供者之间通信的基础主流的 RPC 框架都会自定义通信协议相比于 HTTP、HTTPS、JSON 等通用的协议自定义协议可以实现更好的性能、扩展性以及安全性。在《接头暗语利用 Netty 如何实现自定义协议通信》课程中我们学习了设计一个完备的通信协议需要考虑哪些因素同时结合 RPC 请求调用与结果响应的场景我们设计了一个简易版的 RPC 自定义协议如下所示 --------------------------------------------------------------- | 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte | --------------------------------------------------------------- | 状态 1byte | 消息 ID 8byte | 数据长度 4byte | --------------------------------------------------------------- | 数据内容 长度不定 | ---------------------------------------------------------------我们把协议分为协议头 Header 和协议体 Body 两个部分。协议头 Header 包含魔数、协议版本号、序列化算法、报文类型、状态、消息 ID、数据长度协议体 Body 只包含数据内容部分数据内容的长度是不固定的。RPC 请求和响应都可以使用该协议进行通信对应协议实体类的定义如下所示 Data public class MiniRpcProtocolT implements Serializable {private MsgHeader header; // 协议头private T body; // 协议体 } Data public class MsgHeader implements Serializable {private short magic; // 魔数private byte version; // 协议版本号private byte serialization; // 序列化算法private byte msgType; // 报文类型private byte status; // 状态private long requestId; // 消息 IDprivate int msgLen; // 数据长度 }在 RPC 请求调用的场景下MiniRpcProtocol 中泛型 T 对应的 MiniRpcRequest 类型MiniRpcRequest 主要包含 RPC 远程调用需要的必要参数定义如下所示。 Data public class MiniRpcRequest implements Serializable {private String serviceVersion; // 服务版本private String className; // 服务接口名private String methodName; // 服务方法名private Object[] params; // 方法参数列表private Class?[] parameterTypes; // 方法参数类型列表 }在 RPC 结果响应的场景下MiniRpcProtocol 中泛型 T 对应的 MiniRpcResponse 类型MiniRpcResponse 实体类的定义如下所示。此外响应结果是否成功可以使用 MsgHeader 中的 status 字段表示0 表示成功非 0 表示失败。MiniRpcResponse 中 data 表示成功状态下返回的 RPC 请求结果message 表示 RPC 请求调用失败的错误信息。 Data public class MiniRpcResponse implements Serializable {private Object data; // 请求结果private String message; // 错误信息 }设计完 RPC 自定义协议之后我们接下来再来解决 MiniRpcRequest 和 MiniRpcResponse 如何进行编码的问题。 序列化选型 MiniRpcRequest 和 MiniRpcResponse 实体类表示的协议体内容都是不确定具体长度的所以我们一般会选用通用且高效的序列化算法将其转换成二进制数据这样可以有效减少网络传输的带宽提升 RPC 框架的整体性能。目前比较常用的序列化算法包括 Json、Kryo、Hessian、Protobuf 等这些第三方序列化算法都比 Java 原生的序列化操作都更加高效。 首先我们定义了一个通用的序列化接口 RpcSerialization所有序列化算法扩展都必须实现该接口RpcSerialization 接口分别提供了序列化 serialize() 和反序列化 deserialize() 方法如下所示 public interface RpcSerialization {T byte[] serialize(T obj) throws IOException;T T deserialize(byte[] data, ClassT clz) throws IOException; }接下来我们为 RpcSerialization 提供了 HessianSerialization 和 JsonSerialization 两种类型的实现类。以 HessianSerialization 为例实现逻辑如下 Component Slf4j public class HessianSerialization implements RpcSerialization {Overridepublic T byte[] serialize(T object) {if (object null) {throw new NullPointerException();}byte[] results;HessianSerializerOutput hessianOutput;try (ByteArrayOutputStream os new ByteArrayOutputStream()) {hessianOutput new HessianSerializerOutput(os);hessianOutput.writeObject(object);hessianOutput.flush();results os.toByteArray();} catch (Exception e) {throw new SerializationException(e);}return results;}SuppressWarnings(unchecked)Overridepublic T T deserialize(byte[] bytes, ClassT clz) {if (bytes null) {throw new NullPointerException();}T result;try (ByteArrayInputStream is new ByteArrayInputStream(bytes)) {HessianSerializerInput hessianInput new HessianSerializerInput(is);result (T) hessianInput.readObject(clz);} catch (Exception e) {throw new SerializationException(e);}return result;} }为了能够支持不同序列化算法我们采用工厂模式来实现不同序列化算法之间的切换使用相同的序列化接口指向不同的序列化算法。对于使用者来说只需要知道序列化算法的类型即可不用关心底层序列化是如何实现的。具体实现如下 public class SerializationFactory {public static RpcSerialization getRpcSerialization(byte serializationType) {SerializationTypeEnum typeEnum SerializationTypeEnum.findByType(serializationType);switch (typeEnum) {case HESSIAN:return new HessianSerialization();case JSON:return new JsonSerialization();default:throw new IllegalArgumentException(serialization type is illegal, serializationType);}} }有了以上基础知识的储备接下来我们就可以开始实现自定义的处理器了。 协议编码实现 在《接头暗语利用 Netty 如何实现自定义协议通信》课程中我们同样介绍了如何使用 Netty 实现自定义的通信协议。Netty 提供了两个最为常用的编解码抽象基类 MessageToByteEncoder 和 ByteToMessageDecoder帮助我们很方便地扩展实现自定义协议。 我们接下来要完成的编码器 MiniRpcEncoder 需要继承 MessageToByteEncoder并重写 encode() 方法具体实现如下所示 public class MiniRpcEncoder extends MessageToByteEncoderMiniRpcProtocolObject {Overrideprotected void encode(ChannelHandlerContext ctx, MiniRpcProtocolObject msg, ByteBuf byteBuf) throws Exception {MsgHeader header msg.getHeader();byteBuf.writeShort(header.getMagic());byteBuf.writeByte(header.getVersion());byteBuf.writeByte(header.getSerialization());byteBuf.writeByte(header.getMsgType());byteBuf.writeByte(header.getStatus());byteBuf.writeLong(header.getRequestId());RpcSerialization rpcSerialization SerializationFactory.getRpcSerialization(header.getSerialization());byte[] data rpcSerialization.serialize(msg.getBody());byteBuf.writeInt(data.length);byteBuf.writeBytes(data);} }编码逻辑比较简单在服务消费者或者服务提供者调用 writeAndFlush() 将数据写给对方前都已经封装成 MiniRpcRequest 或者 MiniRpcResponse所以可以采用 MiniRpcProtocolObject 作为 MiniRpcEncoder 编码器能够支持的编码类型。 协议解码实现 解码器 MiniRpcDecoder 需要继承 ByteToMessageDecoder并重写 decode() 方法具体实现如下所示 public class MiniRpcDecoder extends ByteToMessageDecoder {Overridepublic final void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {if (in.readableBytes() ProtocolConstants.HEADER_TOTAL_LEN) {return;}in.markReaderIndex();short magic in.readShort();if (magic ! ProtocolConstants.MAGIC) {throw new IllegalArgumentException(magic number is illegal, magic);}byte version in.readByte();byte serializeType in.readByte();byte msgType in.readByte();byte status in.readByte();long requestId in.readLong();int dataLength in.readInt();if (in.readableBytes() dataLength) {in.resetReaderIndex();return;}byte[] data new byte[dataLength];in.readBytes(data);MsgType msgTypeEnum MsgType.findByType(msgType);if (msgTypeEnum null) {return;}MsgHeader header new MsgHeader();header.setMagic(magic);header.setVersion(version);header.setSerialization(serializeType);header.setStatus(status);header.setRequestId(requestId);header.setMsgType(msgType);header.setMsgLen(dataLength);RpcSerialization rpcSerialization SerializationFactory.getRpcSerialization(serializeType);switch (msgTypeEnum) {case REQUEST:MiniRpcRequest request rpcSerialization.deserialize(data, MiniRpcRequest.class);if (request ! null) {MiniRpcProtocolMiniRpcRequest protocol new MiniRpcProtocol();protocol.setHeader(header);protocol.setBody(request);out.add(protocol);}case RESPONSE:MiniRpcResponse response rpcSerialization.deserialize(data, MiniRpcResponse.class);if (response ! null) {MiniRpcProtocolMiniRpcResponse protocol new MiniRpcProtocol();protocol.setHeader(header);protocol.setBody(response);out.add(protocol);}case HEARTBEAT:// TODObreak;}} }解码器 MiniRpcDecoder 相比于编码器 MiniRpcEncoder 要复杂很多MiniRpcDecoder 的目标是将字节流数据解码为消息对象并传递给下一个 Inbound 处理器。整个 MiniRpcDecoder 解码过程有几个要点要特别注意 只有当 ByteBuf 中内容大于协议头 Header 的固定的 18 字节时才开始读取数据。即使已经可以完整读取出协议头 Header但是协议体 Body 有可能还未就绪。所以在刚开始读取数据时需要使用 markReaderIndex() 方法标记读指针位置当 ByteBuf 中可读字节长度小于协议体 Body 的长度时再使用 resetReaderIndex() 还原读指针位置说明现在 ByteBuf 中可读字节还不够一个完整的数据包。根据不同的报文类型 MsgType需要反序列化出不同的协议体对象。在 RPC 请求调用的场景下服务提供者需要将协议体内容反序列化成 MiniRpcRequest 对象在 RPC 结果响应的场景下服务消费者需要将协议体内容反序列化成 MiniRpcResponse 对象。 请求处理与响应 在 RPC 请求调用的场景下服务提供者的 MiniRpcDecoder 编码器将二进制数据解码成 MiniRpcProtocolMiniRpcRequest 对象后再传递给 RpcRequestHandler 执行 RPC 请求调用。RpcRequestHandler 也是一个 Inbound 处理器它并不需要承担解码工作所以 RpcRequestHandler 直接继承 SimpleChannelInboundHandler 即可然后重写 channelRead0() 方法具体实现如下 Slf4j public class RpcRequestHandler extends SimpleChannelInboundHandlerMiniRpcProtocolMiniRpcRequest {private final MapString, Object rpcServiceMap;public RpcRequestHandler(MapString, Object rpcServiceMap) {this.rpcServiceMap rpcServiceMap;}Overrideprotected void channelRead0(ChannelHandlerContext ctx, MiniRpcProtocolMiniRpcRequest protocol) {RpcRequestProcessor.submitRequest(() - {MiniRpcProtocolMiniRpcResponse resProtocol new MiniRpcProtocol();MiniRpcResponse response new MiniRpcResponse();MsgHeader header protocol.getHeader();header.setMsgType((byte) MsgType.RESPONSE.getType());try {Object result handle(protocol.getBody()); // TODO 调用 RPC 服务response.setData(result);header.setStatus((byte) MsgStatus.SUCCESS.getCode());resProtocol.setHeader(header);resProtocol.setBody(response);} catch (Throwable throwable) {header.setStatus((byte) MsgStatus.FAIL.getCode());response.setMessage(throwable.toString());log.error(process request {} error, header.getRequestId(), throwable);}ctx.writeAndFlush(resProtocol);});} }因为 RPC 请求调用是比较耗时的所以比较推荐的做法是将 RPC 请求提交到自定义的业务线程池中执行。其中 handle() 方法是真正执行 RPC 调用的地方你可以先留一个空的实现在之后动态代理的课程中我们再完成它。根据 handle() 的执行情况MiniRpcProtocolMiniRpcResponse 最终会被设置成功或者失败的状态以及相应的请求结果或者错误信息最终通过 writeAndFlush() 方法将数据写回服务消费者。 上文中我们已经分析了服务消费者入站操作首先要经过 MiniRpcDecoder 解码器根据报文类型 msgType 解码出 MiniRpcProtocolMiniRpcResponse 响应结果然后传递给 RpcResponseHandler 处理器RpcResponseHandler 负责响应不同线程的请求结果具体实现如下 public class RpcResponseHandler extends SimpleChannelInboundHandlerMiniRpcProtocolMiniRpcResponse {Overrideprotected void channelRead0(ChannelHandlerContext ctx, MiniRpcProtocolMiniRpcResponse msg) {long requestId msg.getHeader().getRequestId();MiniRpcFutureMiniRpcResponse future MiniRpcRequestHolder.REQUEST_MAP.remove(requestId);future.getPromise().setSuccess(msg.getBody());} } public class MiniRpcRequestHolder {public final static AtomicLong REQUEST_ID_GEN new AtomicLong(0);public static final MapLong, MiniRpcFutureMiniRpcResponse REQUEST_MAP new ConcurrentHashMap(); } Data public class MiniRpcFutureT {private PromiseT promise;private long timeout;public MiniRpcFuture(PromiseT promise, long timeout) {this.promise promise;this.timeout timeout;} }服务消费者在发起调用时维护了请求 requestId 和 MiniRpcFutureMiniRpcResponse 的映射关系RpcResponseHandler 会根据请求的 requestId 找到对应发起调用的 MiniRpcFuture然后为 MiniRpcFuture 设置响应结果。 我们采用 Netty 提供的 Promise 工具来实现 RPC 请求的同步等待Promise 基于 JDK 的 Future 扩展了更多新的特性帮助我们更好地以同步的方式进行异步编程。Promise 模式本质是一种异步编程模型我们可以先拿到一个查看任务执行结果的凭证不必等待任务执行完毕当我们需要获取任务执行结果时再使用凭证提供的相关接口进行获取。 至此RPC 框架的通信模块我们已经实现完了。自定义协议、编解码、序列化/反序列化都是实现远程通信的必备基础知识我们务必要熟练掌握。此外在《架构设计如何实现一个高性能分布式 RPC 框架》课程中我们介绍了 RPC 调用的多种方式快开动你的大脑想想其他方式应当如何实现呢 总结 本节课我们通过 RPC 自定义协议的设计与实现加深了对 Netty 自定义处理器 ChannelHandler 的理解。ChannelPipeline 和 ChannelHandler 是我们在项目开发过程中打交道最多的组件在设计之初一定要梳理清楚 Inbound 和 Outbound 处理的传递顺序以及数据模型之间是如何转换的。
http://www.pierceye.com/news/199337/

相关文章:

  • 正规网站模板设计软件工程学科评估
  • 网站集约化建设 要求惠州做棋牌网站建设哪家技术好
  • c#如何做公司网站做网站背景图怎么插
  • 国外做耳机贸易的平台网站定制网站
  • seo做的最好的十个网站加工订单网
  • 网站项目建设主要内容网站导航优化的描述
  • 网站后台修改图片网站制作多少钱公司
  • 做网站后台需要写代码吗益阳seo网站建设
  • 小程序网站做多大尺寸辽阳住房和城乡建设网站
  • 昆山app网站制作网站的管理权限有什么用
  • 购物网站建设开题报告企业宣传方案模板
  • cdr做好排班怎么做网站我的免费网是个什么网站
  • 如何做别人网站镜像地区性中介类网站建设
  • 做的网站怎么查看点击率安装wordpress主题失败
  • 网站历史权重查询免费的黄冈网站有哪些下载软件
  • 宝安三网合一网站建设河北智能网站建设平台
  • 在百度上做网站有用吗wordpress环境虚拟机安装
  • 怎么做网站图片链接中元建设网站
  • 邢台做网站优化价格网站基本维护
  • 网站集群建设价格wordpress 加文章列表
  • 官方网站案例用ps做网站主页
  • 做名片的网站推广型网站建设销售
  • 河南省建设执业资格注册中心网站网站推广公司 sit
  • 来年做那个网站致富网站工作室 需要什么手续
  • 宜兴网站建设哪家好网站建设设计公司排名
  • 婚庆公司网站怎么做wordpress 首页置顶
  • 电商网站开发人员结构江苏住房和城乡建设厅网站首页
  • 快速建站的模板陕西省建设网三类人员继续教育
  • 谷歌浏览器对做网站有什么好处广州最好网站策划
  • 西安北郊做网站重庆手机软件开发