当前位置: 首页 > news >正文

该网站无备案网站分哪些类型

该网站无备案,网站分哪些类型,网站建设 邯郸网站制作,本地建站教程Netty核心原理剖析与RPC实践16-20 16 IO 加速#xff1a;与众不同的 Netty 零拷贝技术 今天的课程我们继续讨论 Netty 实现高性能的另一个高阶特性——零拷贝。零拷贝是一个耳熟能详的词语#xff0c;在 Linux、Kafka、RocketMQ 等知名的产品中都有使用#xff0c;通常用于…Netty核心原理剖析与RPC实践16-20 16 IO 加速与众不同的 Netty 零拷贝技术 今天的课程我们继续讨论 Netty 实现高性能的另一个高阶特性——零拷贝。零拷贝是一个耳熟能详的词语在 Linux、Kafka、RocketMQ 等知名的产品中都有使用通常用于提升 I/O 性能。而且零拷贝也是面试过程中的高频问题那么你知道零拷贝体现在哪些地方吗Netty 的零拷贝技术又是如何实现的呢接下来我们就针对 Netty 零拷贝特性进行详细地分析。 传统 Linux 中的零拷贝技术 在介绍 Netty 零拷贝特性之前我们有必要学习下传统 Linux 中零拷贝的工作原理。所谓零拷贝就是在数据操作时不需要将数据从一个内存位置拷贝到另外一个内存位置这样可以减少一次内存拷贝的损耗从而节省了 CPU 时钟周期和内存带宽。 我们模拟一个场景从文件中读取数据然后将数据传输到网络上那么传统的数据拷贝过程会分为哪几个阶段呢具体如下图所示。 从上图中可以看出从数据读取到发送一共经历了四次数据拷贝具体流程如下 当用户进程发起 read() 调用后上下文从用户态切换至内核态。DMA 引擎从文件中读取数据并存储到内核态缓冲区这里是第一次数据拷贝。请求的数据从内核态缓冲区拷贝到用户态缓冲区然后返回给用户进程。第二次数据拷贝的过程同时会导致上下文从内核态再次切换到用户态。用户进程调用 send() 方法期望将数据发送到网络中此时会触发第三次线程切换用户态会再次切换到内核态请求的数据从用户态缓冲区被拷贝到 Socket 缓冲区。最终 send() 系统调用结束返回给用户进程发生了第四次上下文切换。第四次拷贝会异步执行从 Socket 缓冲区拷贝到协议引擎中。 说明DMADirect Memory Access直接内存存取是现代大部分硬盘都支持的特性DMA 接管了数据读写的工作不需要 CPU 再参与 I/O 中断的处理从而减轻了 CPU 的负担。 传统的数据拷贝过程为什么不是将数据直接传输到用户缓冲区呢其实引入内核缓冲区可以充当缓存的作用这样就可以实现文件数据的预读提升 I/O 的性能。但是当请求数据量大于内核缓冲区大小时在完成一次数据的读取到发送可能要经历数倍次数的数据拷贝这就造成严重的性能损耗。 接下来我们介绍下使用零拷贝技术之后数据传输的流程。重新回顾一遍传统数据拷贝的过程可以发现第二次和第三次拷贝是可以去除的DMA 引擎从文件读取数据后放入到内核缓冲区然后可以直接从内核缓冲区传输到 Socket 缓冲区从而减少内存拷贝的次数。 在 Linux 中系统调用 sendfile() 可以实现将数据从一个文件描述符传输到另一个文件描述符从而实现了零拷贝技术。在 Java 中也使用了零拷贝技术它就是 NIO FileChannel 类中的 transferTo() 方法transferTo() 底层就依赖了操作系统零拷贝的机制它可以将数据从 FileChannel 直接传输到另外一个 Channel。transferTo() 方法的定义如下 public abstract long transferTo(long position, long count, WritableByteChannel target) throws IOException;FileChannel#transferTo() 的使用也非常简单我们直接看如下的代码示例通过 transferTo() 将 from.data 传输到 to.data()等于实现了文件拷贝的功能。 public void testTransferTo() throws IOException {RandomAccessFile fromFile new RandomAccessFile(from.data, rw);FileChannel fromChannel fromFile.getChannel();RandomAccessFile toFile new RandomAccessFile(to.data, rw);FileChannel toChannel toFile.getChannel();long position 0;long count fromChannel.size();fromChannel.transferTo(position, count, toChannel); }在使用了 FileChannel#transferTo() 传输数据之后我们看下数据拷贝流程发生了哪些变化如下图所示 比较大的一个变化是DMA 引擎从文件中读取数据拷贝到内核态缓冲区之后由操作系统直接拷贝到 Socket 缓冲区不再拷贝到用户态缓冲区所以数据拷贝的次数从之前的 4 次减少到 3 次。 但是上述的优化离达到零拷贝的要求还是有差距的能否继续减少内核中的数据拷贝次数呢在 Linux 2.4 版本之后开发者对 Socket Buffer 追加一些 Descriptor 信息来进一步减少内核数据的复制。如下图所示DMA 引擎读取文件内容并拷贝到内核缓冲区然后并没有再拷贝到 Socket 缓冲区只是将数据的长度以及位置信息被追加到 Socket 缓冲区然后 DMA 引擎根据这些描述信息直接从内核缓冲区读取数据并传输到协议引擎中从而消除最后一次 CPU 拷贝。 通过上述 Linux 零拷贝技术的介绍你也许还会存在疑问最终使用零拷贝之后不是还存在着数据拷贝操作吗其实从 Linux 操作系统的角度来说零拷贝就是为了避免用户态和内核态之间的数据拷贝。无论是传统的数据拷贝还是使用零拷贝技术其中有 2 次 DMA 的数据拷贝必不可少只是这 2 次 DMA 拷贝都是依赖硬件来完成不需要 CPU 参与。所以在这里我们讨论的零拷贝是个广义的概念只要能够减少不必要的 CPU 拷贝都可以被称为零拷贝。 Netty 的零拷贝技术 介绍完传统 Linux 的零拷贝技术之后我们再来学习下 Netty 中的零拷贝如何实现。Netty 中的零拷贝和传统 Linux 的零拷贝不太一样。Netty 中的零拷贝技术除了操作系统级别的功能封装更多的是面向用户态的数据操作优化主要体现在以下 5 个方面 堆外内存避免 JVM 堆内存到堆外内存的数据拷贝。CompositeByteBuf 类可以组合多个 Buffer 对象合并成一个逻辑上的对象避免通过传统内存拷贝的方式将几个 Buffer 合并成一个大的 Buffer。通过 Unpooled.wrappedBuffer 可以将 byte 数组包装成 ByteBuf 对象包装过程中不会产生内存拷贝。ByteBuf.slice 操作与 Unpooled.wrappedBuffer 相反slice 操作可以将一个 ByteBuf 对象切分成多个 ByteBuf 对象切分过程中不会产生内存拷贝底层共享一个 byte 数组的存储空间。Netty 使用 FileRegion 实现文件传输FileRegion 底层封装了 FileChannel#transferTo() 方法可以将文件缓冲区的数据直接传输到目标 Channel避免内核缓冲区和用户态缓冲区之间的数据拷贝这属于操作系统级别的零拷贝。 下面我们从以上 5 个方面逐一进行介绍。 堆外内存 如果在 JVM 内部执行 I/O 操作时必须将数据拷贝到堆外内存才能执行系统调用。这是所有 VM 语言都会存在的问题。那么为什么操作系统不能直接使用 JVM 堆内存进行 I/O 的读写呢主要有两点原因第一操作系统并不感知 JVM 的堆内存而且 JVM 的内存布局与操作系统所分配的是不一样的操作系统并不会按照 JVM 的行为来读写数据。第二同一个对象的内存地址随着 JVM GC 的执行可能会随时发生变化例如 JVM GC 的过程中会通过压缩来减少内存碎片这就涉及对象移动的问题了。 Netty 在进行 I/O 操作时都是使用的堆外内存可以避免数据从 JVM 堆内存到堆外内存的拷贝。 CompositeByteBuf CompositeByteBuf 是 Netty 中实现零拷贝机制非常重要的一个数据结构CompositeByteBuf 可以理解为一个虚拟的 Buffer 对象它是由多个 ByteBuf 组合而成但是在 CompositeByteBuf 内部保存着每个 ByteBuf 的引用关系从逻辑上构成一个整体。比较常见的像 HTTP 协议数据可以分为头部信息 header和消息体数据 body分别存在两个不同的 ByteBuf 中通常我们需要将两个 ByteBuf 合并成一个完整的协议数据进行发送可以使用如下方式完成 ByteBuf httpBuf Unpooled.buffer(header.readableBytes() body.readableBytes()); httpBuf.writeBytes(header); httpBuf.writeBytes(body);可以看出如果想实现 header 和 body 这两个 ByteBuf 的合并需要先初始化一个新的 httpBuf然后再将 header 和 body 分别拷贝到新的 httpBuf。合并过程中涉及两次 CPU 拷贝这非常浪费性能。如果使用 CompositeByteBuf 如何实现类似的需求呢如下所示 CompositeByteBuf httpBuf Unpooled.compositeBuffer(); httpBuf.addComponents(true, header, body);CompositeByteBuf 通过调用 addComponents() 方法来添加多个 ByteBuf但是底层的 byte 数组是复用的不会发生内存拷贝。但对于用户来说它可以当作一个整体进行操作。那么 CompositeByteBuf 内部是如何存放这些 ByteBuf并且如何进行合并的呢我们先通过一张图看下 CompositeByteBuf 的内部结构 从图上可以看出CompositeByteBuf 内部维护了一个 Components 数组。在每个 Component 中存放着不同的 ByteBuf各个 ByteBuf 独立维护自己的读写索引而 CompositeByteBuf 自身也会单独维护一个读写索引。由此可见Component 是实现 CompositeByteBuf 的关键所在下面看下 Component 结构定义 private static final class Component {final ByteBuf srcBuf; // 原始的 ByteBuffinal ByteBuf buf; // srcBuf 去除包装之后的 ByteBufint srcAdjustment; // CompositeByteBuf 的起始索引相对于 srcBuf 读索引的偏移int adjustment; // CompositeByteBuf 的起始索引相对于 buf 的读索引的偏移int offset; // Component 相对于 CompositeByteBuf 的起始索引位置int endOffset; // Component 相对于 CompositeByteBuf 的结束索引位置// 省略其他代码 }为了方便理解上述 Component 中的属性含义我同样以 HTTP 协议中 header 和 body 为示例通过一张图来描述 CompositeByteBuf 组合后其中 Component 的布局情况如下所示 从图中可以看出header 和 body 分别对应两个 ByteBuf假设 ByteBuf 的内容分别为 “header” 和 “body”那么 header ByteBuf 中 offset~endOffset 为 0~6body ByteBuf 对应的 offset~endOffset 为 0~10。由此可见Component 中的 offset 和 endOffset 可以表示当前 ByteBuf 可以读取的范围通过 offset 和 endOffset 可以将每一个 Component 所对应的 ByteBuf 连接起来形成一个逻辑整体。 此外 Component 中 srcAdjustment 和 adjustment 表示 CompositeByteBuf 起始索引相对于 ByteBuf 读索引的偏移。初始 adjustment readIndex - offset这样通过 CompositeByteBuf 的起始索引就可以直接定位到 Component 中 ByteBuf 的读索引位置。当 header ByteBuf 读取 1 个字节body ByteBuf 读取 2 个字节此时每个 Component 的属性又会发生什么变化呢如下图所示。 至此CompositeByteBuf 的基本原理我们已经介绍完了关于具体 CompositeByteBuf 数据操作的细节在这里就不做展开了有兴趣的同学可以自己深入研究 CompositeByteBuf 的源码。 Unpooled.wrappedBuffer 操作 介绍完 CompositeByteBuf 之后再来理解 Unpooled.wrappedBuffer 操作就非常容易了Unpooled.wrappedBuffer 同时也是创建 CompositeByteBuf 对象的另一种推荐做法。 Unpooled 提供了一系列用于包装数据源的 wrappedBuffer 方法如下所示 Unpooled.wrappedBuffer 方法可以将不同的数据源的一个或者多个数据包装成一个大的 ByteBuf 对象其中数据源的类型包括 byte[]、ByteBuf、ByteBuffer。包装的过程中不会发生数据拷贝操作包装后生成的 ByteBuf 对象和原始 ByteBuf 对象是共享底层的 byte 数组。 ByteBuf.slice 操作 ByteBuf.slice 和 Unpooled.wrappedBuffer 的逻辑正好相反ByteBuf.slice 是将一个 ByteBuf 对象切分成多个共享同一个底层存储的 ByteBuf 对象。 ByteBuf 提供了两个 slice 切分方法: public ByteBuf slice(); public ByteBuf slice(int index, int length);假设我们已经有一份完整的 HTTP 数据可以通过 slice 方法切分获得 header 和 body 两个 ByteBuf 对象对应的内容分别为 “header” 和 “body”实现方式如下 ByteBuf httpBuf ... ByteBuf header httpBuf.slice(0, 6); ByteBuf body httpBuf.slice(6, 4);通过 slice 切分后都会返回一个新的 ByteBuf 对象而且新的对象有自己独立的 readerIndex、writerIndex 索引如下图所示。由于新的 ByteBuf 对象与原始的 ByteBuf 对象数据是共享的所以通过新的 ByteBuf 对象进行数据操作也会对原始 ByteBuf 对象生效。 文件传输 FileRegion 在 Netty 源码的 example 包中提供了 FileRegion 的使用示例以下代码片段摘自 FileServerHandler.java。 Override public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {RandomAccessFile raf null;long length -1;try {raf new RandomAccessFile(msg, r);length raf.length();} catch (Exception e) {ctx.writeAndFlush(ERR: e.getClass().getSimpleName() : e.getMessage() \n);return;} finally {if (length 0 raf ! null) {raf.close();}}ctx.write(OK: raf.length() \n);if (ctx.pipeline().get(SslHandler.class) null) {// SSL not enabled - can use zero-copy file transfer.ctx.write(new DefaultFileRegion(raf.getChannel(), 0, length));} else {// SSL enabled - cannot use zero-copy file transfer.ctx.write(new ChunkedFile(raf));}ctx.writeAndFlush(\n); }从 FileRegion 的使用示例可以看出Netty 使用 FileRegion 实现文件传输的零拷贝。FileRegion 的默认实现类是 DefaultFileRegion通过 DefaultFileRegion 将文件内容写入到 NioSocketChannel。那么 FileRegion 是如何实现零拷贝的呢我们通过源码看看 FileRegion 到底使用了什么黑科技。 public class DefaultFileRegion extends AbstractReferenceCounted implements FileRegion {private final File f; // 传输的文件private final long position; // 文件的起始位置private final long count; // 传输的字节数private long transferred; // 已经写入的字节数private FileChannel file; // 文件对应的 FileChannel Overridepublic long transferTo(WritableByteChannel target, long position) throws IOException {long count this.count - position;if (count 0 || position 0) {throw new IllegalArgumentException(position out of range: position (expected: 0 - (this.count - 1) ));}if (count 0) {return 0L;}if (refCnt() 0) {throw new IllegalReferenceCountException(0);}open();long written file.transferTo(this.position position, count, target);if (written 0) {transferred written;} else if (written 0) {validate(this, position);}return written;} // 省略其他代码 }从源码可以看出FileRegion 其实就是对 FileChannel 的包装并没有什么特殊操作底层使用的是 JDK NIO 中的 FileChannel#transferTo() 方法实现文件传输所以 FileRegion 是操作系统级别的零拷贝对于传输大文件会很有帮助。 到此为止Netty 相关的零拷贝技术都已经介绍完了可以看出 Netty 对于 ByteBuf 做了更多精进的设计和优化。 总结 零拷贝是网络编程中一种常用的技术可以用于优化网络数据传输的性能。本文介绍了操作系统 Linux 和 Netty 中的零拷贝技术Netty 除了支持操作系统级别的零拷贝更多提供了面向用户态的零拷贝特性主要体现在 5 个方面堆外内存、CompositeByteBuf、Unpooled.wrappedBuffer、ByteBuf.slice 以及 FileRegion。以操作系统的角度来看零拷贝是一个广义的概念可以认为只要能够减少不必要的 CPU 拷贝都可以理解为是零拷贝。 最后留一个思考题使用具备零拷贝特性的 transfer() 方法拷贝文件一定会比传统 I/O 的方式更高效吗 17 源码篇从 Linux 出发深入剖析服务端启动流程 通过前几章课程的学习我们已经对 Netty 的技术思想和基本原理有了初步的认识从今天这节课开始我们将正式进入 Netty 核心源码学习的课程。希望能够通过源码解析的方式让你更加深入理解 Netty 的精髓如 Netty 的设计思想、工程技巧等为之后继续深入研究 Netty 打下坚实的基础。 在课程开始之前我想分享一下关于源码学习的几点经验和建议。第一很多同学在开始学习源码时面临的第一个问题就是不知道从何下手这个时候一定不能对着源码毫无意义地四处翻看。建议你可以通过 Hello World 或者 TestCase 作为源码学习的入口然后再通过 Debug 断点的方式调试并跑通源码。第二阅读源码一定要有全局观。首先要把握源码的主流程避免刚开始陷入代码细节的死胡同。第三源码一定要反复阅读让自己每一次读都有不同的收获。我们可以通过画图、注释的方式帮助自己更容易理解源码的核心流程方便后续的复习和回顾。 作为源码解析的第一节课我们将深入分析 Netty 服务端的启动流程。启动服务的过程中我们可以了解到 Netty 各大核心组件的关系这将是学习 Netty 源码一个非常好的切入点让我们一起看看 Netty 的每个零件是如何运转起来的吧。 说明本文参考的 Netty 源码版本为 4.1.42.Final。 从 Echo 服务器示例入手 在《引导器作用客户端和服务端启动都要做些什么》的课程中我们介绍了如何使用引导器搭建服务端的基本框架。在这里我们实现了一个最简单的 Echo 服务器用于调试 Netty 服务端启动的源码。 public class EchoServer {public void startEchoServer(int port) throws Exception {EventLoopGroup bossGroup new NioEventLoopGroup();EventLoopGroup workerGroup new NioEventLoopGroup();try {ServerBootstrap b new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)) // 设置ServerSocketChannel 对应的 Handler.childHandler(new ChannelInitializerSocketChannel() { // 设置 SocketChannel 对应的 HandlerOverridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new FixedLengthFrameDecoder(10));ch.pipeline().addLast(new ResponseSampleEncoder());ch.pipeline().addLast(new RequestSampleHandler());}});ChannelFuture f b.bind(port).sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}} }我们以引导器 ServerBootstrap 为切入点开始深入分析 Netty 服务端的启动流程。在服务端启动之前需要配置 ServerBootstrap 的相关参数这一步大致可以分为以下几个步骤 配置 EventLoopGroup 线程组配置 Channel 的类型设置 ServerSocketChannel 对应的 Handler设置网络监听的端口设置 SocketChannel 对应的 Handler配置 Channel 参数。 配置 ServerBootstrap 参数的过程非常简单把参数值保存在 ServerBootstrap 定义的成员变量里就可以了。我们可以看下 ServerBootstrap 的成员变量定义基本与 ServerBootstrap 暴露出来的配置方法是一一对应的。如下所示我以注释的形式说明每个成员变量对应的调用方法。 volatile EventLoopGroup group; // group() volatile EventLoopGroup childGroup; // group() volatile ChannelFactory? extends C channelFactory; // channel() volatile SocketAddress localAddress; // localAddress MapChannelOption?, Object childOptions new ConcurrentHashMapChannelOption?, Object(); // childOption() volatile ChannelHandler childHandler; // childHandler() ServerBootstrapConfig config new ServerBootstrapConfig(this);关于 ServerBootstrap 如何为每个成员变量保存参数的过程我们就不一一展开了你可以理解为这部分工作只是一个前置准备课后你可以自己跟进下每个方法的源码。今天我们核心聚焦在 b.bind().sync() 这行代码bind() 才是真正进行服务器端口绑定和启动的入口sync() 表示阻塞等待服务器启动完成。接下来我们对 bind() 方法进行展开分析。 在开始源码分析之前我们带着以下几个问题边看边思考 Netty 自己实现的 Channel 与 JDK 底层的 Channel 是如何产生联系的ChannelInitializer 这个特殊的 Handler 处理器的作用是什么Pipeline 初始化的过程是什么样的 服务端启动全过程 首先我们来看下 ServerBootstrap 中 bind() 方法的源码实现 public ChannelFuture bind() {validate();SocketAddress localAddress this.localAddress;if (localAddress null) {throw new IllegalStateException(localAddress not set);}return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) {final ChannelFuture regFuture initAndRegister();final Channel channel regFuture.channel();if (regFuture.cause() ! null) {return regFuture;}if (regFuture.isDone()) {ChannelPromise promise channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {final PendingRegistrationPromise promise new PendingRegistrationPromise(channel);regFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {Throwable cause future.cause();if (cause ! null) {promise.setFailure(cause);} else {promise.registered();doBind0(regFuture, channel, localAddress, promise);}}});return promise;} }由此可见doBind() 方法是我们需要分析的重点。我们再一起看下 doBind() 具体做了哪些事情 调用 initAndRegister() 初始化并注册 Channel同时返回一个 ChannelFuture 实例 regFuture所以我们可以猜测出 initAndRegister() 是一个异步的过程。接下来通过 regFuture.cause() 方法判断 initAndRegister() 的过程是否发生异常如果发生异常则直接返回。regFuture.isDone() 表示 initAndRegister() 是否执行完毕如果执行完毕则调用 doBind0() 进行 Socket 绑定。如果 initAndRegister() 还没有执行结束regFuture 会添加一个 ChannelFutureListener 回调监听当 initAndRegister() 执行结束后会调用 operationComplete()同样通过 doBind0() 进行端口绑定。 doBind() 整个实现结构非常清晰其中 initAndRegister() 负责 Channel 初始化和注册doBind0() 用于端口绑定。这两个过程最为重要下面我们分别进行详细的介绍。 服务端 Channel 初始化及注册 initAndRegister() 方法顾名思义主要负责初始化和注册的相关工作我们具体看下它的源码实现 final ChannelFuture initAndRegister() {Channel channel null;try {channel channelFactory.newChannel(); // 创建 Channelinit(channel); // 初始化 Channel} catch (Throwable t) {if (channel ! null) {channel.unsafe().closeForcibly();return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);}return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);}ChannelFuture regFuture config().group().register(channel); // 注册 Channelif (regFuture.cause() ! null) {if (channel.isRegistered()) {channel.close();} else {channel.unsafe().closeForcibly();}}return regFuture; }initAndRegister() 可以分为三步创建 Channel、初始化 Channel 和注册 Channel接下来我们一步步进行拆解分析。 创建服务端 Channel 首先看下创建 Channel 的过程直接跟进 channelFactory.newChannel() 的源码。 public class ReflectiveChannelFactoryT extends Channel implements ChannelFactoryT {private final Constructor? extends T constructor;public ReflectiveChannelFactory(Class? extends T clazz) {ObjectUtil.checkNotNull(clazz, clazz);try {this.constructor clazz.getConstructor();} catch (NoSuchMethodException e) {throw new IllegalArgumentException(Class StringUtil.simpleClassName(clazz) does not have a public non-arg constructor, e);}}Overridepublic T newChannel() {try {return constructor.newInstance(); // 反射创建对象} catch (Throwable t) {throw new ChannelException(Unable to create Channel from class constructor.getDeclaringClass(), t);}}// 省略其他代码 }在前面 Echo 服务器的示例中我们通过 channel(NioServerSocketChannel.class) 配置 Channel 的类型工厂类 ReflectiveChannelFactory 是在该过程中被创建的。从 constructor.newInstance() 我们可以看出ReflectiveChannelFactory 通过反射创建出 NioServerSocketChannel 对象所以我们重点需要关注 NioServerSocketChannel 的构造函数。 public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT); // 调用父类方法config new NioServerSocketChannelConfig(this, javaChannel().socket()); } private static ServerSocketChannel newSocket(SelectorProvider provider) {try {return provider.openServerSocketChannel(); // 创建 JDK 底层的 ServerSocketChannel} catch (IOException e) {throw new ChannelException(Failed to open a server socket., e);} }SelectorProvider 是 JDK NIO 中的抽象类实现通过 openServerSocketChannel() 方法可以用于创建服务端的 ServerSocketChannel。而且 SelectorProvider 会根据操作系统类型和版本的不同返回不同的实现类具体可以参考 DefaultSelectorProvider 的源码实现 public static SelectorProvider create() {String osname AccessController.doPrivileged(new GetPropertyAction(os.name));if (osname.equals(SunOS))return createProvider(sun.nio.ch.DevPollSelectorProvider);if (osname.equals(Linux))return createProvider(sun.nio.ch.EPollSelectorProvider);return new sun.nio.ch.PollSelectorProvider(); }在这里我们只讨论 Linux 操作系统的场景在 Linux 内核 2.6版本及以上都会默认采用 EPollSelectorProvider。如果是旧版本则使用 PollSelectorProvider。对于目前的主流 Linux 平台而言都是采用 Epoll 机制实现的。 创建完 ServerSocketChannel我们回到 NioServerSocketChannel 的构造函数接着它会通过 super() 依次调用到父类的构造进行初始化工作最终我们可以定位到 AbstractNioChannel 和 AbstractChannel 的构造函数 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);// 省略其他代码try {ch.configureBlocking(false);} catch (IOException e) {// 省略其他代码} } protected AbstractChannel(Channel parent) {this.parent parent;id newId(); // Channel 全局唯一 id unsafe newUnsafe(); // unsafe 操作底层读写pipeline newChannelPipeline(); // pipeline 负责业务处理器编排 }首先调用 AbstractChannel 的构造函数创建了三个重要的成员变量分别为 id、unsafe、pipeline。id 表示全局唯一的 Channelunsafe 用于操作底层数据的读写操作pipeline 负责业务处理器的编排。初始化状态pipeline 的内部结构只包含头尾两个节点如下图所示。三个核心成员变量创建好之后会回到 AbstractNioChannel 的构造函数通过 ch.configureBlocking(false) 设置 Channel 是非阻塞模式。 创建服务端 Channel 的过程我们已经讲完了简单总结下其中几个重要的步骤 ReflectiveChannelFactory 通过反射创建 NioServerSocketChannel 实例创建 JDK 底层的 ServerSocketChannel为 Channel 创建 id、unsafe、pipeline 三个重要的成员变量设置 Channel 为非阻塞模式。 初始化服务端 Channel 回到 ServerBootstrap 的 initAndRegister() 方法继续跟进用于初始化服务端 Channel 的 init() 方法源码 void init(Channel channel) {setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger); // 设置 Socket 参数setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0))); // 保存用户自定义属性ChannelPipeline p channel.pipeline();// 获取 ServerBootstrapAcceptor 的构造参数final EventLoopGroup currentChildGroup childGroup;final ChannelHandler currentChildHandler childHandler;final EntryChannelOption?, Object[] currentChildOptions childOptions.entrySet().toArray(newOptionArray(0));final EntryAttributeKey?, Object[] currentChildAttrs childAttrs.entrySet().toArray(newAttrArray(0));// 添加特殊的 Handler 处理器p.addLast(new ChannelInitializerChannel() {Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline ch.pipeline();ChannelHandler handler config.handler();if (handler ! null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}}); }init() 方法的源码比较长我们依然拆解成两个部分来看 第一步设置 Socket 参数以及用户自定义属性。在创建服务端 Channel 时Channel 的配置参数保存在 NioServerSocketChannelConfig 中在初始化 Channel 的过程中Netty 会将这些参数设置到 JDK 底层的 Socket 上并把用户自定义的属性绑定在 Channel 上。 第二步添加特殊的 Handler 处理器。首先 ServerBootstrap 为 Pipeline 添加了一个 ChannelInitializerChannelInitializer 是实现了 ChannelHandler 接口的匿名类其中 ChannelInitializer 实现的 initChannel() 方法用于添加 ServerSocketChannel 对应的 Handler。然后 Netty 通过异步 task 的方式又向 Pipeline 一个处理器 ServerBootstrapAcceptor从 ServerBootstrapAcceptor 的命名可以看出这是一个连接接入器专门用于接收新的连接然后把事件分发给 EventLoop 执行在这里我们先不做展开。此时服务端的 pipeline 内部结构又发生了变化如下图所示。 思考一个问题为什么需要 ChannelInitializer 处理器呢ServerBootstrapAcceptor 的注册过程为什么又需要封装成异步 task 呢因为我们在初始化时还没有将 Channel 注册到 Selector 对象上所以还无法注册 Accept 事件到 Selector 上所以事先添加了 ChannelInitializer 处理器等待 Channel 注册完成后再向 Pipeline 中添加 ServerBootstrapAcceptor 处理器。 服务端 Channel 初始化的过程已经结束了。整体流程比较简单主要是设置 Socket 参数以及用户自定义属性并向 Pipeline 中添加了两个特殊的处理器。接下来我们继续分析如何将初始化好的 Channel 注册到 Selector 对象上 注册服务端 Channel 回到 initAndRegister() 的主流程创建完服务端 Channel 之后继续一层层跟进 register() 方法的源码 // MultithreadEventLoopGroup#register public ChannelFuture register(Channel channel) {return next().register(channel); // 选择一个 eventLoop 注册 } // AbstractChannel#register public final void register(EventLoop eventLoop, final ChannelPromise promise) {// 省略其他代码AbstractChannel.this.eventLoop eventLoop;if (eventLoop.inEventLoop()) { // Reactor 线程内部调用register0(promise);} else { // 外部线程调用try {eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});} catch (Throwable t) {// 省略其他代码}} }Netty 会在线程池 EventLoopGroup 中选择一个 EventLoop 与当前 Channel 进行绑定之后 Channel 生命周期内的所有 I/O 事件都由这个 EventLoop 负责处理如 accept、connect、read、write 等 I/O 事件。可以看出不管是 EventLoop 线程本身调用还是外部线程用最终都会通过 register0() 方法进行注册 private void register0(ChannelPromise promise) {try {if (!promise.setUncancellable() || !ensureOpen(promise)) {return;}boolean firstRegistration neverRegistered;doRegister(); // 调用 JDK 底层的 register() 进行注册neverRegistered false;registered true;pipeline.invokeHandlerAddedIfNeeded(); // 触发 handlerAdded 事件safeSetSuccess(promise);pipeline.fireChannelRegistered(); // 触发 channelRegistered 事件// 此时 Channel 还未注册绑定地址所以处于非活跃状态if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive(); // Channel 当前状态为活跃时触发 channelActive 事件} else if (config().isAutoRead()) {beginRead();}}} catch (Throwable t) {// 省略其他代码} }register0() 主要做了四件事调用 JDK 底层进行 Channel 注册、触发 handlerAdded 事件、触发 channelRegistered 事件、Channel 当前状态为活跃时触发 channelActive 事件。我们对它们逐一进行分析。 首先看下 JDK 底层注册 Channel 的过程对应 doRegister() 方法的实现逻辑。 protected void doRegister() throws Exception {boolean selected false;for (;;) {try {selectionKey javaChannel().register(eventLoop().unwrappedSelector(), 0, this); // 调用 JDK 底层的 register() 进行注册return;} catch (CancelledKeyException e) {// 省略其他代码}} } public final SelectionKey register(Selector sel, int ops,Object att)throws ClosedChannelException {synchronized (regLock) {// 省略其他代码SelectionKey k findKey(sel);if (k ! null) {k.interestOps(ops);k.attach(att);}if (k null) {synchronized (keyLock) {if (!isOpen())throw new ClosedChannelException();k ((AbstractSelector)sel).register(this, ops, att);addKey(k);}}return k;} }javaChannel().register() 负责调用 JDK 底层将 Channel 注册到 Selector 上register() 的第三个入参传入的是 Netty 自己实现的 Channel 对象调用 register() 方法会将它绑定在 JDK 底层 Channel 的 attachment 上。这样在每次 Selector 对象进行事件循环时Netty 都可以从返回的 JDK 底层 Channel 中获得自己的 Channel 对象。 完成 Channel 向 Selector 注册后接下来就会触发 Pipeline 一系列的事件传播。在事件传播之前用户自定义的业务处理器是如何被添加到 Pipeline 中的呢答案就在pipeline.invokeHandlerAddedIfNeeded() 当中我们重点看下 handlerAdded 事件的处理过程。invokeHandlerAddedIfNeeded() 方法的调用层次比较深推荐你结合上述 Echo 服务端示例使用 IDE Debug 的方式跟踪调用栈如下图所示。 我们首先抓住 ChannelInitializer 中的核心源码逐层进行分析。 // ChannelInitializer public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {removeState(ctx);}} } private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) {try {initChannel((C) ctx.channel()); // 调用 ChannelInitializer 实现的 initChannel() 方法} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {ChannelPipeline pipeline ctx.pipeline();if (pipeline.context(this) ! null) {pipeline.remove(this); // 将 ChannelInitializer 自身从 Pipeline 中移出}}return true;}return false; }可以看出 ChannelInitializer 首先会调用 initChannel() 抽象方法然后 Netty 会把 ChannelInitializer 自身从 Pipeline 移出。其中 initChannel() 抽象方法是在哪里实现的呢这就要跟踪到 ServerBootstrap 之前的 init() 方法其中有这么一段代码 p.addLast(new ChannelInitializerChannel() {Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline ch.pipeline();ChannelHandler handler config.handler();if (handler ! null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});} });在前面我们已经分析了 initChannel() 方法的实现逻辑首先向 Pipeline 中添加 ServerSocketChannel 对应的 Handler然后通过异步 task 的方式向 Pipeline 添加 ServerBootstrapAcceptor 处理器。其中有一个点不要混淆handler() 方法是添加到服务端的Pipeline 上而 childHandler() 方法是添加到客户端的 Pipeline 上。所以对应 Echo 服务器示例中此时被添加的是 LoggingHandler 处理器。 因为添加 ServerBootstrapAcceptor 是一个异步过程需要 EventLoop 线程负责执行。而当前 EventLoop 线程正在执行 register0() 的注册流程所以等到 register0() 执行完之后才能被添加到 Pipeline 当中。完成 initChannel() 这一步之后ServerBootstrapAcceptor 并没有被添加到 Pipeline 中此时 Pipeline 的内部结构变化如下图所示。 我们回到 register0() 的主流程接着向下分析。channelRegistered 事件是由 fireChannelRegistered() 方法触发沿着 Pipeline 的 Head 节点传播到 Tail 节点并依次调用每个 ChannelHandler 的 channelRegistered() 方法。然而此时 Channel 还未注册绑定地址所以处于非活跃状态所以并不会触发 channelActive 事件。 执行完整个 register0() 的注册流程之后EventLoop 线程会将 ServerBootstrapAcceptor 添加到 Pipeline 当中此时 Pipeline 的内部结构又发生了变化如下图所示。 整个服务端 Channel 注册的流程我们已经讲完注册过程中 Pipeline 结构的变化值得你再反复梳理从而加深理解。目前服务端还是不能工作的还差最后一步就是进行端口绑定我们继续向下分析。 端口绑定 回到 ServerBootstrap 的 bind() 方法我们继续跟进端口绑定 doBind0() 的源码。 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {assertEventLoop();// 省略其他代码boolean wasActive isActive();try {doBind(localAddress); // 调用 JDK 底层进行端口绑定} catch (Throwable t) {safeSetFailure(promise, t);closeIfClosed();return;}if (!wasActive isActive()) {invokeLater(new Runnable() {Overridepublic void run() {pipeline.fireChannelActive(); // 触发 channelActive 事件}});}safeSetSuccess(promise); }bind() 方法主要做了两件事分别为调用 JDK 底层进行端口绑定绑定成功后并触发 channelActive 事件。下面我们逐一进行分析。 首先看下调用 JDK 底层进行端口绑定的 doBind() 方法 protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() 7) {javaChannel().bind(localAddress, config.getBacklog());} else {javaChannel().socket().bind(localAddress, config.getBacklog());} }Netty 会根据 JDK 版本的不同分别调用 JDK 底层不同的 bind() 方法。我使用的是 JDK8所以会调用 JDK 原生 Channel 的 bind() 方法。执行完 doBind() 之后服务端 JDK 原生的 Channel 真正已经完成端口绑定了。 完成端口绑定之后Channel 处于活跃 Active 状态然后会调用 pipeline.fireChannelActive() 方法触发 channelActive 事件。我们可以一层层跟进 fireChannelActive() 方法发现其中比较重要的部分 // DefaultChannelPipeline#channelActive public void channelActive(ChannelHandlerContext ctx) {ctx.fireChannelActive();readIfIsAutoRead(); } // AbstractNioChannel#doBeginRead protected void doBeginRead() throws Exception {// Channel.read() or ChannelHandlerContext.read() was calledfinal SelectionKey selectionKey this.selectionKey;if (!selectionKey.isValid()) {return;}readPending true;final int interestOps selectionKey.interestOps();if ((interestOps readInterestOp) 0) {selectionKey.interestOps(interestOps | readInterestOp); // 注册 OP_ACCEPT 事件到服务端 Channel 的事件集合} }可以看出在执行完 channelActive 事件传播之后会调用 readIfIsAutoRead() 方法触发 Channel 的 read 事件而它最终调用到 AbstractNioChannel 中的 doBeginRead() 方法其中 readInterestOp 参数就是在前面初始化 Channel 所传入的 SelectionKey.OP_ACCEPT 事件所以 OP_ACCEPT 事件会被注册到 Channel 的事件集合中。 到此为止整个服务端已经真正启动完毕。我们总结一下服务端启动的全流程如下图所示。 创建服务端 Channel本质是创建 JDK 底层原生的 Channel并初始化几个重要的属性包括 id、unsafe、pipeline 等。初始化服务端 Channel设置 Socket 参数以及用户自定义属性并添加两个特殊的处理器 ChannelInitializer 和 ServerBootstrapAcceptor。注册服务端 Channel调用 JDK 底层将 Channel 注册到 Selector 上。端口绑定调用 JDK 底层进行端口绑定并触发 channelActive 事件把 OP_ACCEPT 事件注册到 Channel 的事件集合中。 加餐服务端如何处理客户端新建连接 Netty 服务端完全启动后就可以对外工作了。接下来 Netty 服务端是如何处理客户端新建连接的呢主要分为四步 Boss NioEventLoop 线程轮询客户端新连接 OP_ACCEPT 事件构造 Netty 客户端 NioSocketChannel注册 Netty 客户端 NioSocketChannel 到 Worker 工作线程中注册 OP_READ 事件到 NioSocketChannel 的事件集合。 下面我们对每个步骤逐一进行简单的介绍。 Netty 中 Boss NioEventLoop 专门负责接收新的连接关于 NioEventLoop 的核心源码我们下节课会着重介绍在这里我们只先了解基本的处理流程。当客户端有新连接接入服务端时Boss NioEventLoop 会监听到 OP_ACCEPT 事件源码如下所示 // NioEventLoop#processSelectedKey if ((readyOps (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! 0 || readyOps 0) {unsafe.read(); }NioServerSocketChannel 所持有的 unsafe 是 NioMessageUnsafe 类型我们看下 NioMessageUnsafe.read() 方法中做了什么事。 public void read() {assert eventLoop().inEventLoop();final ChannelConfig config config();final ChannelPipeline pipeline pipeline();final RecvByteBufAllocator.Handle allocHandle unsafe().recvBufAllocHandle(); allocHandle.reset(config);boolean closed false;Throwable exception null;try {try { do {int localRead doReadMessages(readBuf); // while 循环不断读取 Buffer 中的数据if (localRead 0) {break;}if (localRead 0) {closed true;break;}allocHandle.incMessagesRead(localRead);} while (allocHandle.continueReading());} catch (Throwable t) {exception t;}int size readBuf.size();for (int i 0; i size; i ) {readPending false;pipeline.fireChannelRead(readBuf.get(i)); // 传播读取事件}readBuf.clear();allocHandle.readComplete();pipeline.fireChannelReadComplete(); // 传播读取完毕事件// 省略其他代码} finally {if (!readPending !config.isAutoRead()) {removeReadOp();}} }可以看出 read() 方法的核心逻辑就是通过 while 循环不断读取数据然后放入 List 中这里的数据其实就是新连接。需要重点跟进一下 NioServerSocketChannel 的 doReadMessages() 方法。 protected int doReadMessages(ListObject buf) throws Exception {SocketChannel ch SocketUtils.accept(javaChannel());try {if (ch ! null) {buf.add(new NioSocketChannel(this, ch));return 1;}} catch (Throwable t) {logger.warn(Failed to create a new channel from an accepted socket., t);try {ch.close();} catch (Throwable t2) {logger.warn(Failed to close a socket., t2);}}return 0; }这时就开始执行第二个步骤构造 Netty 客户端 NioSocketChannel。Netty 先通过 JDK 底层的 accept() 获取 JDK 原生的 SocketChannel然后将它封装成 Netty 自己的 NioSocketChannel。新建 Netty 的客户端 Channel 的实现原理与上文中我们讲到的创建服务端 Channel 的过程是类似的只是服务端 Channel 的类型是 NioServerSocketChannel而客户端 Channel 的类型是 NioSocketChannel。NioSocketChannel 的创建同样会完成几件事创建核心成员变量 id、unsafe、pipeline注册 SelectionKey.OP_READ 事件设置 Channel 的为非阻塞模式新建客户端 Channel 的配置。 成功构造客户端 NioSocketChannel 后接下来会通过 pipeline.fireChannelRead() 触发 channelRead 事件传播。对于服务端来说此时 Pipeline 的内部结构如下图所示。 上文中我们提到了一种特殊的处理器 ServerBootstrapAcceptor在这里它就发挥了重要的作用。channelRead 事件会传播到 ServerBootstrapAcceptor.channelRead() 方法channelRead() 会将客户端 Channel 分配到工作线程组中去执行。具体实现如下 public void channelRead(ChannelHandlerContext ctx, Object msg) {final Channel child (Channel) msg;// 在客户端 Channel 中添加 childHandlerchildHandler 是用户在启动类中通过 childHandler() 方法指定的child.pipeline().addLast(childHandler);setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {// 注册客户端 ChannelchildGroup.register(child).addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);} }ServerBootstrapAcceptor 开始就把 msg 强制转换为 Channel。难道不会有其他类型的数据吗因为 ServerBootstrapAcceptor 是服务端 Channel 中一个特殊的处理器而服务端 Channel 的 channelRead 事件只会在新连接接入时触发所以这里拿到的数据都是客户端新连接。 ServerBootstrapAcceptor 通过 childGroup.register() 方法会完成第三和第四两个步骤将 NioSocketChannel 注册到 Worker 工作线程中并注册 OP_READ 事件到 NioSocketChannel 的事件集合。在注册过程中比较有意思的一点是它会调用 pipeline.fireChannelRegistered() 方法传播 channelRegistered 事件然后再调用 pipeline.fireChannelActive() 方法传播 channelActive 事件。兜了一圈这又会回到之前我们介绍的 readIfIsAutoRead() 方法此时它会将 SelectionKey.OP_READ 事件注册到 Channel 的事件集合。 关于服务端如何处理客户端新建连接的具体源码我在此就不继续展开了。这里留一个小任务建议你亲自动手分析下 childGroup.register() 的相关源码从而加深对服务端启动以及新连接处理流程的理解。有了服务端启动源码分析的基础再去理解客户端新建连接的过程会相对容易很多。 总结 本节课我们深入分析了 Netty 服务端启动的全流程对其中涉及的核心组件有了基本的认识。Netty 服务端启动的相关源码层次比较深推荐大家在读源码的时候可以先把主体流程梳理清楚开始时先不用纠结具体的方法是用来做什么自顶而下先画出完整的调用链路图如下图所示然后再逐一击破。 下节课我们将学习 Netty 最核心的 Reactor 线程模型的源码推荐你把两节课放在一起再进行复习可以解答你目前不少的疑问如异步 task 是如何封装并执行的事件注册之后是如何被处理的 18 源码篇解密 Netty Reactor 线程模型 通过第一章 Netty 基础课程的学习我们知道 Reactor 线程模型是 Netty 实现高性能的核心所在在 Netty 中 EventLoop 是 Reactor 线程模型的核心处理引擎那么 EventLoop 到底是如何实现的呢又是如何保证高性能和线程安全性的呢今天这节课让我们一起一探究竟。 说明本文参考的 Netty 源码版本为 4.1.42.Final。 Reactor 线程执行的主流程 在《事件调度层为什么 EventLoop 是 Netty 的精髓》的课程中我们介绍了 EventLoop 的概貌因为 Netty 是基于 NIO 实现的所以推荐使用 NioEventLoop 实现我们再次通过 NioEventLoop 的核心入口 run() 方法回顾 Netty Reactor 线程模型执行的主流程并以此为基础继续深入研究 NioEventLoop 的逻辑细节。 protected void run() {for (;;) {try {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.BUSY_WAIT:case SelectStrategy.SELECT:select(wakenUp.getAndSet(false)); // 轮询 I/O 事件if (wakenUp.get()) {selector.wakeup();}default:}} catch (IOException e) {rebuildSelector0();handleLoopException(e);continue;}cancelledKeys 0;needsToSelectAgain false;final int ioRatio this.ioRatio;if (ioRatio 100) {try {processSelectedKeys(); // 处理 I/O 事件} finally {runAllTasks(); // 处理所有任务}} else {final long ioStartTime System.nanoTime();try {processSelectedKeys(); // 处理 I/O 事件} finally {final long ioTime System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio); // 处理完 I/O 事件再处理异步任务队列}}} catch (Throwable t) {handleLoopException(t);}try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}} }NioEventLoop 的 run() 方法是一个无限循环没有任何退出条件在不间断循环执行以下三件事情可以用下面这张图形象地表示。 轮询 I/O 事件select轮询 Selector 选择器中已经注册的所有 Channel 的 I/O 事件。处理 I/O 事件processSelectedKeys处理已经准备就绪的 I/O 事件。处理异步任务队列runAllTasksReactor 线程还有一个非常重要的职责就是处理任务队列中的非 I/O 任务。Netty 提供了 ioRatio 参数用于调整 I/O 事件处理和任务处理的时间比例。 下面我们对 NioEventLoop 的三个步骤进行详细的介绍。 轮询 I/O 事件 我们首先聚焦在轮询 I/O 事件的关键代码片段 case SelectStrategy.CONTINUE:continue; case SelectStrategy.BUSY_WAIT: case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}NioEventLoop 通过核心方法 select() 不断轮询注册的 I/O 事件。当没有 I/O 事件产生时为了避免 NioEventLoop 线程一直循环空转在获取 I/O 事件或者异步任务时需要阻塞线程等待 I/O 事件就绪或者异步任务产生后才唤醒线程。NioEventLoop 使用 wakeUp 变量表示是否唤醒 selectorNetty 在每一次执行新的一轮循环之前都会将 wakeUp 设置为 false。 Netty 提供了选择策略 SelectStrategy 对象它用于控制 select 循环行为包含 CONTINUE、SELECT、BUSY_WAIT 三种策略因为 NIO 并不支持 BUSY_WAIT所以 BUSY_WAIT 与 SELECT 的执行逻辑是一样的。在 I/O 事件循环的过程中 Netty 选择使用何种策略具体的判断依据如下 // DefaultSelectStrategy#calculateStrategy public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT; } // NioEventLoop#selectNowSupplier private final IntSupplier selectNowSupplier new IntSupplier() {Overridepublic int get() throws Exception {return selectNow();} } // NioEventLoop#selectNow int selectNow() throws IOException {try {return selector.selectNow();} finally {if (wakenUp.get()) {selector.wakeup();}} }如果当前 NioEventLoop 线程存在异步任务会通过 selectSupplier.get() 最终调用到 selectNow() 方法selectNow() 是非阻塞执行后立即返回。如果存在就绪的 I/O 事件那么会走到 default 分支后直接跳出然后执行 I/O 事件处理 processSelectedKeys 和异步任务队列处理 runAllTasks 的逻辑。所以在存在异步任务的场景NioEventLoop 会优先保证 CPU 能够及时处理异步任务。 当 NioEventLoop 线程的不存在异步任务即任务队列为空返回的是 SELECT 策略, 就会调用 select(boolean oldWakenUp) 方法接下来我们看看 select() 内部是如何实现的 private void select(boolean oldWakenUp) throws IOException {Selector selector this.selector;try {int selectCnt 0;long currentTimeNanos System.nanoTime();long selectDeadLineNanos currentTimeNanos delayNanos(currentTimeNanos); // 计算 select 阻塞操作的最后截止时间long normalizedDeadlineNanos selectDeadLineNanos - initialNanoTime();if (nextWakeupTime ! normalizedDeadlineNanos) {nextWakeupTime normalizedDeadlineNanos;}for (;;) {// ------ 1. 检测 select 阻塞操作是否超过截止时间 ------long timeoutMillis (selectDeadLineNanos - currentTimeNanos 500000L) / 1000000L;if (timeoutMillis 0) {if (selectCnt 0) {selector.selectNow();selectCnt 1;}break;}// ------ 2. 轮询过程中如果有任务产生中断本次轮询if (hasTasks() wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt 1;break;}// ------ 3. select 阻塞等待获取 I/O 事件 ------int selectedKeys selector.select(timeoutMillis);selectCnt ;if (selectedKeys ! 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {break;}if (Thread.interrupted()) {if (logger.isDebugEnabled()) {logger.debug(Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.);}selectCnt 1;break;}// ------ 4. 解决臭名昭著的 JDK epoll 空轮询 Bug ------long time System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) currentTimeNanos) {selectCnt 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD 0 selectCnt SELECTOR_AUTO_REBUILD_THRESHOLD) {selector selectRebuildSelector(selectCnt);selectCnt 1;break;}currentTimeNanos time;}if (selectCnt MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug(Selector.select() returned prematurely {} times in a row for Selector {}.,selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() raised by a Selector {} - JDK bug?,selector, e);}} }Netty 为了解决臭名昭著的 JDK epoll 空轮询 Bug造成整个 select() 方法是相对比较复杂的我把它划分成四个部分逐一拆解来看。 第一步检测 select 阻塞操作是否超过截止时间。 在进入无限循环之前Netty 首先记录了当前时间 currentTimeNanos 以及定时任务队列中最近待执行任务的执行时间 selectDeadLineNanosNetty 中定时任务队列是按照延迟时间从小到大进行排列的通过调用 delayNanos(currentTimeNanos) 方法可以获得第一个待执行定时任务的延迟时间。然后代码会进入无限循环。首先判断 currentTimeNanos 是否超过 selectDeadLineNanos 0.5ms 以上如果超过说明当前任务队列中有定时任务需要立刻执行所以此时会退出无限循环。退出之前如果从未执行过 select 操作那么会立即一次非阻塞的 selectNow 操作。那么这里有一个疑问为什么会留出 0.5ms 的时间窗口呢在任务队列为空的情况下可能 select 操作没有获得到任何 I/O 事件就立即停止阻塞返回。 其中有一点容易混淆Netty 的任务队列包括普通任务、定时任务以及尾部任务hasTask() 判断的是普通任务队列和尾部队列是否为空而 delayNanos(currentTimeNanos) 方法获取的是定时任务的延迟时间。 第二步轮询过程中及时处理产生的任务。 Netty 为了保证任务能够及时执行会立即一次非阻塞的 selectNow 操作后立即跳出循环回到事件循环的主流程确保接下来能够优先执行 runAllTasks。 第三步select 阻塞等待获取 I/O 事件。 执行 select 阻塞操作说明任务队列已经为空而且第一个待执行定时任务还没有到达任务执行的截止时间需要阻塞等待 timeoutMillis 的超时时间。假设一种极端情况如果定时任务的截止时间非常久那么 select 操作岂不是会一直阻塞造成 Netty 无法工作所以 Netty 在外部线程添加任务的时候可以唤醒 select 阻塞操作具体源码如下 // SingleThreadEventExecutor#execute public void execute(Runnable task) {// 省略其他代码if (!addTaskWakesUp wakesUpForTask(task)) {wakeup(inEventLoop); } } // NioEventLoop#wakeup protected void wakeup(boolean inEventLoop) {// 如果是外部线程设置 wakenUp 为true则唤醒 select 阻塞操作if (!inEventLoop wakenUp.compareAndSet(false, true)) {selector.wakeup(); } }selector.wakeup() 操作的开销是非常大的所以 Netty 并不是每次都直接调用在每次调用之前都会先执行 wakenUp.compareAndSet(false, true)只有设置成功之后才会执行 selector.wakeup() 操作。 第四步解决臭名昭著的 JDK epoll 空轮询 Bug。 在之前的课程中已经初步介绍了 Netty 的解决方案在这里结合整体 select 操作我们再做一次回顾。实际上 Netty 并没有从根源上解决该问题而是巧妙地规避了这个问题。Netty 引入了计数变量 selectCnt用于记录 select 操作的次数如果事件轮询时间小于 timeoutMillis并且在该时间周期内连续发生超过 SELECTOR_AUTO_REBUILD_THRESHOLD默认512 次空轮询说明可能触发了 epoll 空轮询 Bug。Netty 通过重建新的 Selector 对象将异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector重建完成之后异常的 Selector 就可以废弃了。 NioEventLoop 轮询 I/O 事件 select 的过程已经讲完了我们简单总结 select 过程所做的事情。select 操作也是一个无限循环在事件轮询之前检查任务队列是否为空确保任务队列中待执行的任务能够及时执行。如果任务队列中已经为空然后执行 select 阻塞操作获取等待获取 I/O 事件。Netty 通过引入计数器变量并统计在一定时间窗口内 select 操作的执行次数识别出可能存在异常的 Selector 对象然后采用重建 Selector 的方式巧妙地避免了 JDK epoll 空轮询的问题。 处理 I/O 事件 通过 select 过程我们已经获取到准备就绪的 I/O 事件接下来就需要调用 processSelectedKeys() 方法处理 I/O 事件。在开始处理 I/O 事件之前Netty 通过 ioRatio 参数控制 I/O 事件处理和任务处理的时间比例默认为 ioRatio 50。如果 ioRatio 100表示每次都处理完 I/O 事件后会执行所有的 task。如果 ioRatio 100也会优先处理完 I/O 事件再处理异步任务队列。所以不论如何 processSelectedKeys() 都是先执行的接下来跟进下 processSelectedKeys() 的源码 private void processSelectedKeys() {if (selectedKeys ! null) {processSelectedKeysOptimized();} else {processSelectedKeysPlain(selector.selectedKeys());} }处理 I/O 事件时有两种选择一种是处理 Netty 优化过的 selectedKeys另外一种是正常的处理逻辑。根据是否设置了 selectedKeys 来判断使用哪种策略这两种策略使用的 selectedKeys 集合是不一样的。Netty 优化过的 selectedKeys 是 SelectedSelectionKeySet 类型而正常逻辑使用的是 JDK HashSet 类型。下面我们逐一介绍两种策略的实现。 1. processSelectedKeysPlain 首先看下正常的处理逻辑 processSelectedKeysPlain 的源码 private void processSelectedKeysPlain(SetSelectionKey selectedKeys) {if (selectedKeys.isEmpty()) {return;}IteratorSelectionKey i selectedKeys.iterator();for (;;) {final SelectionKey k i.next();final Object a k.attachment();i.remove();if (a instanceof AbstractNioChannel) {// I/O 事件由 Netty 负责处理processSelectedKey(k, (AbstractNioChannel) a);} else {// 用户自定义任务SuppressWarnings(unchecked)NioTaskSelectableChannel task (NioTaskSelectableChannel) a;processSelectedKey(k, task);}if (!i.hasNext()) {break;}if (needsToSelectAgain) {selectAgain();selectedKeys selector.selectedKeys();if (selectedKeys.isEmpty()) {break;} else {i selectedKeys.iterator();}}} }Netty 会遍历依次处理已经就绪的 SelectionKeySelectionKey 上面可以挂载 attachment。再根据 attachment 属性可以判断 SelectionKey 的类型SelectionKey 的类型可能是 AbstractNioChannel 和 NioTask这两种类型对应的处理方式也是不同的AbstractNioChannel 类型由 Netty 框架负责处理NioTask 是用户自定义的 task一般不会是这种类型。我们着重看下 AbstractNioChannel 的处理场景跟进 processSelectedKey() 的源码 private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {final AbstractNioChannel.NioUnsafe unsafe ch.unsafe();if (!k.isValid()) { // 检查 Key 是否合法final EventLoop eventLoop;try {eventLoop ch.eventLoop();} catch (Throwable ignored) {return;}if (eventLoop ! this || eventLoop null) {return;}unsafe.close(unsafe.voidPromise()); // Key 不合法直接关闭连接return;}try {int readyOps k.readyOps();// 处理连接事件if ((readyOps SelectionKey.OP_CONNECT) ! 0) {int ops k.interestOps();ops ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}// 处理可写事件if ((readyOps SelectionKey.OP_WRITE) ! 0) {ch.unsafe().forceFlush();}// 处理可读事件if ((readyOps (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! 0 || readyOps 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());} }从上述源码可知processSelectedKey 一共处理了 OP_CONNECT、OP_WRITE、OP_READ 三个事件我们分别了解下这三个事件的处理过程。 OP_CONNECT 连接建立事件。表示 TCP 连接建立成功, Channel 处于 Active 状态。处理 OP_CONNECT 事件首先将该事件从事件集合中清除避免事件集合中一直存在连接建立事件然后调用 unsafe.finishConnect() 方法通知上层连接已经建立。可以跟进 unsafe.finishConnect() 的源码发现会底层调用的 pipeline().fireChannelActive() 方法这时会产生一个 Inbound 事件然后会在 Pipeline 中进行传播依次调用 ChannelHandler 的 channelActive() 方法通知各个 ChannelHandler 连接建立成功。 OP_WRITE可写事件。表示上层可以向 Channel 写入数据通过执行 ch.unsafe().forceFlush() 操作将数据冲刷到客户端最终会调用 javaChannel 的 write() 方法执行底层写操作。OP_READ可读事件。表示 Channel 收到了可以被读取的新数据。Netty 将 READ 和 Accept 事件进行了统一的封装都通过 unsafe.read() 进行处理。unsafe.read() 的逻辑可以归纳为几个步骤从 Channel 中读取数据并存储到分配的 ByteBuf调用 pipeline.fireChannelRead() 方法产生 Inbound 事件然后依次调用 ChannelHandler 的 channelRead() 方法处理数据调用 pipeline.fireChannelReadComplete() 方法完成读操作最终执行 removeReadOp() 清除 OP_READ 事件。 我们再次回到 processSelectedKeysPlain 的主流程接下来会判断 needsToSelectAgain 决定是否需要重新轮询。如果 needsToSelectAgain true会调用 selectAgain() 方法进行重新轮询该方法会将 needsToSelectAgain 再次置为 false然后调用 selectorNow() 后立即返回。 我们回顾一下 Reactor 线程的主流程会发现每次在处理 I/O 事件之前needsToSelectAgain 都会被设置为 false那么在什么场景下 needsToSelectAgain 会再次设置为 true 呢我们通过查找变量的引用最后定位到 AbstractChannel#doDeregister。该方法的作用是将 Channel 从当前注册的 Selector 对象中移除方法内部可能会把 needsToSelectAgain 设置为 true具体源码如下 protected void doDeregister() throws Exception {eventLoop().cancel(selectionKey()); } void cancel(SelectionKey key) {key.cancel();cancelledKeys ;// 当取消的 Key 超过默认阈值 256needsToSelectAgain 设置为 trueif (cancelledKeys CLEANUP_INTERVAL) {cancelledKeys 0;needsToSelectAgain true;} }当 Netty 在处理 I/O 事件的过程中如果发现超过默认阈值 256 个 Channel 从 Selector 对象中移除后会将 needsToSelectAgai 设置为 true重新做一次轮询操作从而确保 keySet 的有效性。 2. processSelectedKeysOptimized 介绍完正常的 I/O 事件处理 processSelectedKeysPlain 之后回过头我们再来分析 Netty 优化的 processSelectedKeysOptimized 就会轻松很多Netty 是否采用 SelectedSelectionKeySet 类型的优化策略由 DISABLE_KEYSET_OPTIMIZATION 参数决定。那么到底 SelectedSelectionKeySet 是如何进行优化的呢我们继续跟进下 processSelectedKeysOptimized 的源码 private void processSelectedKeysOptimized() {for (int i 0; i selectedKeys.size; i) {final SelectionKey k selectedKeys.keys[i];selectedKeys.keys[i] null;final Object a k.attachment();if (a instanceof AbstractNioChannel) {processSelectedKey(k, (AbstractNioChannel) a);} else {SuppressWarnings(unchecked)NioTaskSelectableChannel task (NioTaskSelectableChannel) a;processSelectedKey(k, task);}if (needsToSelectAgain) {selectedKeys.reset(i 1);selectAgain();i -1;}} }可以发现 processSelectedKeysOptimized 与 processSelectedKeysPlain 的代码结构非常相似其中最重要的一点就是 selectedKeys 的遍历方式是不同的所以还是需要看下 SelectedSelectionKeySet 的源码一探究竟。 final class SelectedSelectionKeySet extends AbstractSetSelectionKey {SelectionKey[] keys;int size;SelectedSelectionKeySet() {keys new SelectionKey[1024];}Overridepublic boolean add(SelectionKey o) {if (o null) {return false;}keys[size] o;if (size keys.length) {increaseCapacity();}return true;}// 省略其他代码 }因为 SelectedSelectionKeySet 内部使用的是 SelectionKey 数组所以 processSelectedKeysOptimized 可以直接通过遍历数组取出 I/O 事件相比 JDK HashSet 的遍历效率更高。SelectedSelectionKeySet 内部通过 size 变量记录数据的逻辑长度每次执行 add 操作时会把对象添加到 SelectionKey[] 尾部。当 size 等于 SelectionKey[] 的真实长度时SelectionKey[] 会进行扩容。相比于 HashSetSelectionKey[] 不需要考虑哈希冲突的问题所以可以实现 O(1) 时间复杂度的 add 操作。 那么 SelectedSelectionKeySet 是什么时候生成的呢通过查找 SelectedSelectionKeySet 的引用定位到 NioEventLoop#openSelector 方法摘录核心源码片段如下 private SelectorTuple openSelector() {// 省略其他代码final SelectedSelectionKeySet selectedKeySet new SelectedSelectionKeySet();Object maybeException AccessController.doPrivileged(new PrivilegedActionObject() {Overridepublic Object run() {try {Field selectedKeysField selectorImplClass.getDeclaredField(selectedKeys);Field publicSelectedKeysField selectorImplClass.getDeclaredField(publicSelectedKeys);if (PlatformDependent.javaVersion() 9 PlatformDependent.hasUnsafe()) {long selectedKeysFieldOffset PlatformDependent.objectFieldOffset(selectedKeysField);long publicSelectedKeysFieldOffset PlatformDependent.objectFieldOffset(publicSelectedKeysField);if (selectedKeysFieldOffset ! -1 publicSelectedKeysFieldOffset ! -1) {PlatformDependent.putObject(unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);PlatformDependent.putObject(unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);return null;}}// 省略其他代码} catch (NoSuchFieldException e) {return e;} catch (IllegalAccessException e) {return e;}}}); // 省略其他代码 }Netty 通过反射的方式将 Selector 对象内部的 selectedKeys 和 publicSelectedKeys 替换为 SelectedSelectionKeySet原先 selectedKeys 和 publicSelectedKeys 这两个字段都是 HashSet 类型。这真是很棒的一个小技巧对于 JDK 底层的优化一般是很少见的Netty 在细节优化上追求极致的精神值得我们学习。 到这里Reactor 线程主流程的第二步。处理 I/O 事件 processSelectedKeys 已经讲完了简单总结一下 processSelectedKeys 的要点。处理 I/O 事件时有两种选择一种是处理 Netty 优化过的 selectedKeys另外一种是正常的处理逻辑两种策略的处理逻辑是相似的都是通过获取 SelectionKey 上挂载的 attachment 判断 SelectionKey 的类型不同的 SelectionKey 的类型又会调用不同的处理方法然后通过 Pipeline 进行事件传播。Netty 优化过的 selectedKeys 是使用数组存储的 SelectionKey相比于 JDK 的 HashSet 遍历效率更高效。processSelectedKeys 还做了更多的优化处理如果发现超过默认阈值 256 个 Channel 从 Selector 对象中移除后会重新做一次轮询操作以确保 keySet 的有效性。 处理异步任务队列 继续分析 Reactor 线程主流程的最后一步处理异步任务队列 runAllTasks。为什么 Netty 能够保证 Channel 的操作都是线程安全的呢这要归功于 Netty 的任务机制。下面我们从任务添加和任务执行两个方面介绍 Netty 的任务机制。 任务添加 NioEventLoop 内部有两个非常重要的异步任务队列分别为普通任务队列和定时任务队列。NioEventLoop 提供了 execute() 和 schedule() 方法用于向不同的队列中添加任务execute() 用于添加普通任务schedule() 方法用于添加定时任务。 首先我们看下如何添加普通任务。NioEventLoop 继承自 SingleThreadEventExecutorSingleThreadEventExecutor 提供了 execute() 用于添加普通任务源码如下 public void execute(Runnable task) {if (task null) {throw new NullPointerException(task);}boolean inEventLoop inEventLoop();addTask(task);if (!inEventLoop) {startThread();if (isShutdown()) {boolean reject false;try {if (removeTask(task)) {reject true;}} catch (UnsupportedOperationException e) {}if (reject) {reject();}}}if (!addTaskWakesUp wakesUpForTask(task)) {wakeup(inEventLoop);} } protected void addTask(Runnable task) {if (task null) {throw new NullPointerException(task);}if (!offerTask(task)) {reject(task);} } final boolean offerTask(Runnable task) {if (isShutdown()) {reject();}return taskQueue.offer(task); }我们一步步跟进 addTask(task)发现最后是将任务添加到了 taskQueueSingleThreadEventExecutor 中 taskQueue 就是普通任务队列。taskQueue 默认使用的是 Mpsc Queue可以理解为多生产者单消费者队列关于 Mpsc Queue 我们会有一节课程单独介绍在这里不详细展开。此外在任务处理的场景下inEventLoop() 始终是返回 true始终都是在 Reactor 线程内执行既然在 Reactor 线程内都是串行执行可以保证线程安全那为什么还需要 Mpsc Queue 呢我们继续往下看。 这里举一种很常见的场景比如在 RPC 业务线程池里处理完业务请求后可以根据用户请求拿到关联的 Channel将数据写回客户端。那么对于外部线程调用 Channel 的相关方法 Netty 是如何操作的呢我们一直跟进下 channel.write() 的源码 // #AbstractChannel#write public ChannelFuture write(Object msg) {return pipeline.write(msg); } // AbstractChannelHandlerContext#write private void write(Object msg, boolean flush, ChannelPromise promise) {// 省略其他代码final AbstractChannelHandlerContext next findContextOutbound(flush ?(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m pipeline.touch(msg, next);EventExecutor executor next.executor();if (executor.inEventLoop()) { // Reactor 线程内部调用if (flush) {next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else { // 外部线程调用会走到该分支final AbstractWriteTask task;if (flush) {task WriteAndFlushTask.newInstance(next, m, promise);} else {task WriteTask.newInstance(next, m, promise);}if (!safeExecute(executor, task, promise, m)) {task.cancel();}} } // AbstractChannelHandlerContext#safeExecute private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {try {executor.execute(runnable);return true;} catch (Throwable cause) {try {promise.setFailure(cause);} finally {if (msg ! null) {ReferenceCountUtil.release(msg);}}return false;} }如果是 Reactor 线程发起调用 channel.write() 方法inEventLoop() 返回 true此时直接在 Reactor 线程内部直接交由 Pipeline 进行事件处理。如果是外部线程调用那么会走到 else 分支此时会将写操作封装成一个 WriteTask然后通过 safeExecute() 执行可以发现 safeExecute() 就是调用的 SingleThreadEventExecutor#execute() 方法最终会将任务添加到 taskQueue 中。因为多个外部线程可能会并发操作同一个 Channel这时候 Mpsc Queue 就可以保证线程的安全性。 接下来我们再分析定时任务的添加过程。与普通任务类似定时任务也会有 Reactor 线程内和外部线程两种场景我们直接跟进到 AbstractScheduledEventExecutor#schedule() 源码的深层发现如下核心代码 private V ScheduledFutureV schedule(final ScheduledFutureTaskV task) {if (inEventLoop()) { // Reactor 线程内部scheduledTaskQueue().add(task.setId(nextTaskId));} else { // 外部线程executeScheduledRunnable(new Runnable() {Overridepublic void run() {scheduledTaskQueue().add(task.setId(nextTaskId));}}, true, task.deadlineNanos());}return task; } PriorityQueueScheduledFutureTask? scheduledTaskQueue() {if (scheduledTaskQueue null) {scheduledTaskQueue new DefaultPriorityQueueScheduledFutureTask?(SCHEDULED_FUTURE_TASK_COMPARATOR,11);}return scheduledTaskQueue; } void executeScheduledRunnable(Runnable runnable,SuppressWarnings(unused) boolean isAddition,SuppressWarnings(unused) long deadlineNanos) {execute(runnable); }AbstractScheduledEventExecutor 中 scheduledTaskQueue 就是定时任务队列可以看到 scheduledTaskQueue 的默认实现是优先级队列 DefaultPriorityQueue这样可以方便队列中的任务按照时间进行排序。但是 DefaultPriorityQueue 是非线程安全的如果是 Reactor 线程内部调用因为是串行执行所以不会有线程安全问题。如果是外部线程添加定时任务我们发现 Netty 把添加定时任务的操作又再次封装成一个任务交由 executeScheduledRunnable() 处理而 executeScheduledRunnable() 中又再次调用了普通任务的 execute() 的方法巧妙地借助普通任务场景中 Mpsc Queue 解决了外部线程添加定时任务的线程安全问题。 任务执行 介绍完 Netty 中不同任务的添加过程回过头我们再来分析 Reactor 线程是如何执行这些任务的呢通过 Reactor 线程主流程的分析我们知道处理异步任务队列有 runAllTasks() 和 runAllTasks(long timeoutNanos) 两种实现第一种会处理所有任务第二种是带有超时时间来处理任务。之所以设置超时时间是为了防止 Reactor 线程处理任务时间过长而导致 I/O 事件阻塞我们着重分析下 runAllTasks(long timeoutNanos) 的源码 protected boolean runAllTasks(long timeoutNanos) {fetchFromScheduledTaskQueue(); // 1. 合并定时任务到普通任务队列// 2. 从普通任务队列中取出任务并处理Runnable task pollTask();if (task null) {afterRunningAllTasks();return false;}// 计算任务处理的超时时间final long deadline ScheduledFutureTask.nanoTime() timeoutNanos;long runTasks 0;long lastExecutionTime;for (;;) {safeExecute(task); // 执行任务runTasks ;// 每执行 64 个任务检查一下是否超时if ((runTasks 0x3F) 0) {lastExecutionTime ScheduledFutureTask.nanoTime();if (lastExecutionTime deadline) {break;}}task pollTask(); // 继续取出下一个任务if (task null) {lastExecutionTime ScheduledFutureTask.nanoTime();break;}}// 3. 收尾工作afterRunningAllTasks();this.lastExecutionTime lastExecutionTime;return true; }异步任务处理 runAllTasks 的过程可以分为三步合并定时任务到普通任务队列然后从普通任务队列中取出任务并处理最后进行收尾工作。我们分别看看三个步骤都是如何实现的。 第一步合并定时任务到普通任务队列对应的实现是 fetchFromScheduledTaskQueue() 方法。 private boolean fetchFromScheduledTaskQueue() {if (scheduledTaskQueue null || scheduledTaskQueue.isEmpty()) {return true;}long nanoTime AbstractScheduledEventExecutor.nanoTime();for (;;) {Runnable scheduledTask pollScheduledTask(nanoTime); // 从定时任务队列中取出截止时间小于等于当前时间的定时任务if (scheduledTask null) {return true;}if (!taskQueue.offer(scheduledTask)) {// 如果普通任务队列已满把定时任务放回scheduledTaskQueue.add((ScheduledFutureTask?) scheduledTask);return false;}} } protected final Runnable pollScheduledTask(long nanoTime) {assert inEventLoop();QueueScheduledFutureTask? scheduledTaskQueue this.scheduledTaskQueue;ScheduledFutureTask? scheduledTask scheduledTaskQueue null ? null : scheduledTaskQueue.peek();// 如果定时任务的 deadlineNanos 小于当前时间就取出if (scheduledTask null || scheduledTask.deadlineNanos() - nanoTime 0) {return null;}scheduledTaskQueue.remove();return scheduledTask; }定时任务只有满足截止时间 deadlineNanos 小于当前时间才可以取出合并到普通任务。由于定时任务是按照截止时间 deadlineNanos 从小到大排列的所以取出的定时任务不满足合并条件那么定时任务队列中剩下的所有任务都不会满足条件合并操作完成并退出。 第二步从普通任务队列中取出任务并处理可以回过头再看 runAllTasks(long timeoutNanos) 第二部分的源码我已经用注释标明。真正处理任务的 safeExecute() 非常简单就是直接调用的 Runnable 的 run() 方法。因为异步任务处理是有超时时间的所以 Netty 采取了定时检测的策略每执行 64 个任务的时候就会检查一下是否超时这也是出于对性能的折中考虑如果异步队列中有大量的短时间任务每一次执行完都检测一次超时性能会有所降低。 第三步收尾工作对应的是 afterRunningAllTasks() 方法实现。 protected void afterRunningAllTasks() {runAllTasksFrom(tailTasks); } protected final boolean runAllTasksFrom(QueueRunnable taskQueue) {Runnable task pollTaskFrom(taskQueue);if (task null) {return false;}for (;;) {safeExecute(task);task pollTaskFrom(taskQueue);if (task null) {return true;}} }这里的尾部队列 tailTasks 相比于普通任务队列优先级较低可以理解为是收尾任务在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。可以看出 afterRunningAllTasks() 就是把尾部队列 tailTasks 里的任务以此取出执行一遍。尾部队列并不常用一般用于什么场景呢例如你想对 Netty 的运行状态做一些统计数据例如任务循环的耗时、占用物理内存的大小等等都可以向尾部队列添加一个收尾任务完成统计数据的实时更新。 到这里Netty 处理异步任务队列的流程就讲完了再做一个简单的总结。异步任务主要分为普通任务和定时任务两种在任务添加和任务执行时都需要考虑 Reactor 线程内和外部线程两种情况。外部线程添加定时任务时Netty 巧妙地借助普通任务的 Mpsc Queue 解决多线程并发操作时的线程安全问题。Netty 执行任务之前会将满足条件的定时任务合并到普通任务队列由普通任务队列统一负责执行并且每执行 64 个任务的时候就会检查一下是否超时。 总结 Reactor 线程模型是 Netty 最核心的内容本节课我也花了大量的篇幅对其进行讲解。NioEventLoop 作为 Netty Reactor 线程的实现它的设计原理是非常精妙的值得我们反复阅读和思考。我们始终需要记住 NioEventLoop 的无限循环中所做的三件事轮询 I/O 事件处理 I/O 事件处理异步任务队列。 关于 Netty Reactor 线程模型经常会遇到几个高频的面试问题读完本节课之后你是否都已经清楚了呢 Netty 的 NioEventLoop 是如何实现的它为什么能够保证 Channel 的操作是线程安全的Netty 如何解决 JDK epoll 空轮询 BugNioEventLoop 是如何实现无锁化的 欢迎你在评论区留言期待看到你分享关于 Reactor 线程模型更多的认识和思考。 19 源码篇一个网络请求在 Netty 中的旅程 通过前面两节源码课程的学习我们知道 Netty 在服务端启动时会为创建 NioServerSocketChannel当客户端新连接接入时又会创建 NioSocketChannel不管是服务端还是客户端 Channel在创建时都会初始化自己的 ChannelPipeline。如果把 Netty 比作成一个生产车间那么 Reactor 线程无疑是车间的中央管控系统ChannelPipeline 可以看作是车间的流水线将原材料按顺序进行一步步加工然后形成一个完整的产品。本节课我将带你完整梳理一遍网络请求在 Netty 中的处理流程从而加深对前两节课内容的理解并着重讲解 ChannelPipeline 的工作原理。 说明本文参考的 Netty 源码版本为 4.1.42.Final。 事件处理机制回顾 首先我们以服务端接入客户端新连接为例并结合前两节源码课学习的知识点一起复习下 Netty 的事件处理流程如下图所示。 Netty 服务端启动后BossEventLoopGroup 会负责监听客户端的 Accept 事件。当有客户端新连接接入时BossEventLoopGroup 中的 NioEventLoop 首先会新建客户端 Channel然后在 NioServerSocketChannel 中触发 channelRead 事件传播NioServerSocketChannel 中包含了一种特殊的处理器 ServerBootstrapAcceptor最终通过 ServerBootstrapAcceptor 的 channelRead() 方法将新建的客户端 Channel 分配到 WorkerEventLoopGroup 中。WorkerEventLoopGroup 中包含多个 NioEventLoop它会选择其中一个 NioEventLoop 与新建的客户端 Channel 绑定。 完成客户端连接注册之后就可以接收客户端的请求数据了。当客户端向服务端发送数据时NioEventLoop 会监听到 OP_READ 事件然后分配 ByteBuf 并读取数据读取完成后将数据传递给 Pipeline 进行处理。一般来说数据会从 ChannelPipeline 的第一个 ChannelHandler 开始传播将加工处理后的消息传递给下一个 ChannelHandler整个过程是串行化执行。 在前面两节课中我们介绍了服务端如何接收客户端新连接以及 NioEventLoop 的工作流程接下来我们重点介绍 ChannelPipeline 是如何实现 Netty 事件驱动的这样 Netty 整个事件处理流程已经可以串成一条主线。 Pipeline 的初始化 我们知道 ChannelPipeline 是在创建 Channel 时被创建的它是 Channel 中非常重要的一个成员变量。回到 AbstractChannel 的构造函数以此为切入点我们一起看下 ChannelPipeline 是如何一步步被构造出来的。 // AbstractChannel protected AbstractChannel(Channel parent) {this.parent parent;id newId();unsafe newUnsafe();pipeline newChannelPipeline(); } // AbstractChannel#newChannelPipeline protected DefaultChannelPipeline newChannelPipeline() {return new DefaultChannelPipeline(this); } // DefaultChannelPipeline protected DefaultChannelPipeline(Channel channel) {this.channel ObjectUtil.checkNotNull(channel, channel);succeededFuture new SucceededChannelFuture(channel, null);voidPromise new VoidChannelPromise(channel, true);tail new TailContext(this);head new HeadContext(this);head.next tail;tail.prev head; }当 ChannelPipeline 初始化完成后会构成一个由 ChannelHandlerContext 对象组成的双向链表默认 ChannelPipeline 初始化状态的最小结构仅包含 HeadContext 和 TailContext 两个节点如下图所示。 HeadContext 和 TailContext 属于 ChannelPipeline 中两个特殊的节点它们都继承自 AbstractChannelHandlerContext根据源码看下 AbstractChannelHandlerContext 有哪些实现类如下图所示。除了 HeadContext 和 TailContext还有一个默认实现类 DefaultChannelHandlerContext我们可以猜到 DefaultChannelHandlerContext 封装的是用户在 Netty 启动配置类中添加的自定义业务处理器DefaultChannelHandlerContext 会插入到 HeadContext 和 TailContext 之间。 接着我们比较一下上述三种 AbstractChannelHandlerContext 实现类的内部结构发现它们都包含当前 ChannelPipeline 的引用、处理器 ChannelHandler。有一点不同的是 HeadContext 节点还包含了用于操作底层数据读写的 unsafe 对象。对于 Inbound 事件会先从 HeadContext 节点开始传播所以 unsafe 可以看作是 Inbound 事件的发起者对于 Outbound 事件数据最后又会经过 HeadContext 节点返回给客户端此时 unsafe 可以看作是 Outbound 事件的处理者。 接下来我们继续看下用户自定义的处理器是如何加入 ChannelPipeline 的双向链表的。 Pipeline 添加 Handler 在 Netty 客户端或者服务端启动时就需要用户配置自定义实现的业务处理器。我们先看一段服务端启动类的代码片段 ServerBootstrap b new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new SampleInboundA());ch.pipeline().addLast(new SampleInboundB());ch.pipeline().addLast(new SampleOutboundA());ch.pipeline().addLast(new SampleOutboundB());}});我们知道 ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器它们都会被 ChannelHandlerContext 封装不管是哪种处理器最终都是通过双向链表连接代码示例中构成的 ChannelPipeline 的结构如下。 那么 ChannelPipeline 在添加 Handler 时是如何区分 Inbound 和 Outbound 类型的呢我们一起跟进 ch.pipeline().addLast() 方法源码定位到核心代码如下。 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {// 1. 检查是否重复添加 HandlercheckMultiplicity(handler);// 2. 创建新的 DefaultChannelHandlerContext 节点newCtx newContext(group, filterName(name, handler), handler);// 3. 添加新的 DefaultChannelHandlerContext 节点到 ChannelPipelineaddLast0(newCtx);// 省略其他代码}// 4. 回调用户方法callHandlerAdded0(newCtx);return this; }addLast() 主要做了以下四件事 检查是否重复添加 Handler。创建新的 DefaultChannelHandlerContext 节点。添加新的 DefaultChannelHandlerContext 节点到 ChannelPipeline。回调用户方法。 前三个步骤通过 synchronized 加锁完成的为了防止多线程并发操作 ChannelPipeline 底层双向链表。下面我们一步步进行拆解介绍。 首先在添加 Handler 时ChannelPipeline 会检查该 Handler 有没有被添加过。如果一个非线程安全的 Handler 被添加到 ChannelPipeline 中那么当多线程访问时会造成线程安全问题。Netty 具体检查重复性的逻辑由 checkMultiplicity() 方法实现 private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h (ChannelHandlerAdapter) handler;if (!h.isSharable() h.added) {throw new ChannelPipelineException(h.getClass().getName() is not a Sharable handler, so cant be added or removed multiple times.);}h.added true;} }用户自定义实现的处理一般都继承于 ChannelHandlerAdapterChannelHandlerAdapter 中使用 added 变量标识该 Handler 是否被添加过。如果当前添加的 Handler 是非共享且已被添加过那么就会抛出异常否则将当前 Handler 标记为已添加。 h.isSharable() 用于判断 Handler 是否是共享的所谓共享就是这个 Handler 可以被重复添加到不同的 ChannelPipeline 中共享的 Handler 必须要确保是线程安全的。如果我们想实现一个共享的 Handler只需要在 Handler 中添加 Sharable 注解即可如下所示 ChannelHandler.Sharable public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {}接下来我们分析 addLast() 的第二步创建新的 DefaultChannelHandlerContext 节点。在执行 newContext() 方法之前会通过 filterName() 为 Handler 创建一个唯一的名称一起先看下 Netty 生成名称的策略是怎样的。 private String filterName(String name, ChannelHandler handler) {if (name null) {return generateName(handler);}checkDuplicateName(name);return name; } private String generateName(ChannelHandler handler) {MapClass?, String cache nameCaches.get();Class? handlerType handler.getClass();String name cache.get(handlerType);if (name null) {name generateName0(handlerType);cache.put(handlerType, name);}if (context0(name) ! null) {String baseName name.substring(0, name.length() - 1);for (int i 1;; i ) {String newName baseName i;if (context0(newName) null) {name newName;break;}}}return name; } private static String generateName0(Class? handlerType) {return StringUtil.simpleClassName(handlerType) #0; }Netty 会使用 FastThreadLocal 缓存 Handler 和名称的映射关系在为 Handler 生成默认名称的之前会先从缓存中查找是否已经存在如果不存在会调用 generateName0() 方法生成默认名称后并加入缓存。可以看出 Netty 生成名称的默认规则是 “简单类名#0”例如 HeadContext 的默认名称为 “DefaultChannelPipeline$HeadContext#0”。 为 Handler 生成完默认名称之后还会通过 context0() 方法检查生成的名称是否和 ChannelPipeline 已有的名称出现冲突查重的过程很简单就是对双向链表进行线性搜索。如果存在冲突现象Netty 会将名称最后的序列号截取出来一直递增直至生成不冲突的名称为止例如 “简单类名#1” “简单类名#2” “简单类名#3” 等等。 接下来回到 newContext() 创建节点的流程可以定位到 AbstractChannelHandlerContext 的构造函数 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,String name, Class? extends ChannelHandler handlerClass) {this.name ObjectUtil.checkNotNull(name, name);this.pipeline pipeline;this.executor executor;this.executionMask mask(handlerClass);ordered executor null || executor instanceof OrderedEventExecutor; }AbstractChannelHandlerContext 中有一个 executionMask 属性并不是很好理解它其实是一种常用的掩码运算操作看下 mask() 方法是如何生成掩码的呢 private static int mask0(Class? extends ChannelHandler handlerType) {int mask MASK_EXCEPTION_CAUGHT;try {if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {// 如果是 ChannelInboundHandler 实例所有 Inbound 事件置为 1mask | MASK_ALL_INBOUND;// 排除 Handler 不感兴趣的 Inbound 事件if (isSkippable(handlerType, channelRegistered, ChannelHandlerContext.class)) {mask ~MASK_CHANNEL_REGISTERED;}if (isSkippable(handlerType, channelUnregistered, ChannelHandlerContext.class)) {mask ~MASK_CHANNEL_UNREGISTERED;}if (isSkippable(handlerType, channelActive, ChannelHandlerContext.class)) {mask ~MASK_CHANNEL_ACTIVE;}if (isSkippable(handlerType, channelInactive, ChannelHandlerContext.class)) {mask ~MASK_CHANNEL_INACTIVE;}if (isSkippable(handlerType, channelRead, ChannelHandlerContext.class, Object.class)) {mask ~MASK_CHANNEL_READ;}if (isSkippable(handlerType, channelReadComplete, ChannelHandlerContext.class)) {mask ~MASK_CHANNEL_READ_COMPLETE;}if (isSkippable(handlerType, channelWritabilityChanged, ChannelHandlerContext.class)) {mask ~MASK_CHANNEL_WRITABILITY_CHANGED;}if (isSkippable(handlerType, userEventTriggered, ChannelHandlerContext.class, Object.class)) {mask ~MASK_USER_EVENT_TRIGGERED;}}if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {// 如果是 ChannelOutboundHandler 实例所有 Outbound 事件置为 1mask | MASK_ALL_OUTBOUND;// 排除 Handler 不感兴趣的 Outbound 事件if (isSkippable(handlerType, bind, ChannelHandlerContext.class,SocketAddress.class, ChannelPromise.class)) {mask ~MASK_BIND;}if (isSkippable(handlerType, connect, ChannelHandlerContext.class, SocketAddress.class,SocketAddress.class, ChannelPromise.class)) {mask ~MASK_CONNECT;}if (isSkippable(handlerType, disconnect, ChannelHandlerContext.class, ChannelPromise.class)) {mask ~MASK_DISCONNECT;}if (isSkippable(handlerType, close, ChannelHandlerContext.class, ChannelPromise.class)) {mask ~MASK_CLOSE;}if (isSkippable(handlerType, deregister, ChannelHandlerContext.class, ChannelPromise.class)) {mask ~MASK_DEREGISTER;}if (isSkippable(handlerType, read, ChannelHandlerContext.class)) {mask ~MASK_READ;}if (isSkippable(handlerType, write, ChannelHandlerContext.class,Object.class, ChannelPromise.class)) {mask ~MASK_WRITE;}if (isSkippable(handlerType, flush, ChannelHandlerContext.class)) {mask ~MASK_FLUSH;}}if (isSkippable(handlerType, exceptionCaught, ChannelHandlerContext.class, Throwable.class)) {mask ~MASK_EXCEPTION_CAUGHT;}} catch (Exception e) {PlatformDependent.throwException(e);}return mask; }Netty 中分别有多种 Inbound 事件和 Outbound 事件如 Inbound 事件有 channelRegistered、channelActive、channelRead 等等。Netty 会判断 Handler 的类型是否是 ChannelInboundHandler 的实例如果是会把所有 Inbound 事件先置为 1然后排除 Handler 不感兴趣的方法。同理Handler 类型如果是 ChannelOutboundHandler也是这么实现的。 那么如何排除 Handler 不感兴趣的事件呢Handler 对应事件的方法上如果有 Skip 注解Netty 认为该事件是需要排除的。大部分情况下用户自定义实现的 Handler 只需要关心个别事件那么剩余不关心的方法都需要加上 Skip 注解吗Netty 其实已经在 ChannelHandlerAdapter 中默认都添加好了所以用户如果继承了 ChannelHandlerAdapter默认没有重写的方法都是加上 Skip 的只有用户重写的方法才是 Handler 关心的事件。 回到 addLast() 的主流程接着需要将新创建的 DefaultChannelHandlerContext 节点添加到 ChannelPipeline 中跟进 addLast0() 方法的源码。 private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev tail.prev;newCtx.prev prev;newCtx.next tail;prev.next newCtx;tail.prev newCtx; }addLast0() 非常简单就是向 ChannelPipeline 中双向链表的尾部插入新的节点其中 HeadContext 和 TailContext 一直是链表的头和尾新的节点被插入到 HeadContext 和 TailContext 之间。例如代码示例中 SampleOutboundA 被添加时双向链表的结构变化如下所示。 最后添加完节点后就到了回调用户方法定位到 callHandlerAdded() 的核心源码 final void callHandlerAdded() throws Exception {if (setAddComplete()) {handler().handlerAdded(this);} } final boolean setAddComplete() {for (;;) {int oldState handlerState;if (oldState REMOVE_COMPLETE) {return false;} if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {return true;}} }Netty 会通过 CAS 修改节点的状态直至 REMOVE_COMPLETE 或者 ADD_COMPLETE如果修改节点为 ADD_COMPLETE 状态表示节点已经添加成功然后会回调用户 Handler 中实现的 handlerAdded() 方法。 至此Pipeline 添加 Handler 的实现原理我们已经讲完了下面接着看下 Pipeline 删除 Handler 的场景。 Pipeline 删除 Handler 在《源码篇从 Linux 出发深入剖析服务端启动流程》的课程中我们介绍了一种特殊的处理器 ChannelInitializerChannelInitializer 在服务端 Channel 注册完成之后会从 Pipeline 的双向链表中移除我们一起回顾下这段代码 private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) {try {initChannel((C) ctx.channel()); // 调用 ChannelInitializer 实现的 initChannel() 方法} catch (Throwable cause) {exceptionCaught(ctx, cause);} finally {ChannelPipeline pipeline ctx.pipeline();if (pipeline.context(this) ! null) {pipeline.remove(this); // 将 ChannelInitializer 自身从 Pipeline 中移出}}return true;}return false; }继续跟进 pipeline.remove() 的源码。 Override public final ChannelPipeline remove(ChannelHandler handler) {// 1. getContextOrDie 用于查找需要删除的节点remove(getContextOrDie(handler));return this; } private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {assert ctx ! head ctx ! tail;synchronized (this) {// 删除双向链表中的 Handler 节点atomicRemoveFromHandlerList(ctx);if (!registered) {callHandlerCallbackLater(ctx, false);return ctx;}EventExecutor executor ctx.executor();if (!executor.inEventLoop()) {executor.execute(new Runnable() {Overridepublic void run() {callHandlerRemoved0(ctx);}});return ctx;}}// 3. 回调用户函数callHandlerRemoved0(ctx);return ctx; }整个删除 Handler 的过程可以分为三步分别为 查找需要删除的 Handler 节点然后删除双向链表中的 Handler 节点最后回调用户函数。 我们对每一步逐一进行拆解。 第一步查找需要删除的 Handler 节点我们自然可以想到通过遍历双向链表实现。一起看下 getContextOrDie() 方法的源码 private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {AbstractChannelHandlerContext ctx (AbstractChannelHandlerContext) context(handler);if (ctx null) {throw new NoSuchElementException(handler.getClass().getName());} else {return ctx;} } public final ChannelHandlerContext context(ChannelHandler handler) {if (handler null) {throw new NullPointerException(handler);}// 遍历双向链表查找AbstractChannelHandlerContext ctx head.next;for (;;) {if (ctx null) {return null;}// 如果 Handler 相同返回当前的 Context 节点if (ctx.handler() handler) { return ctx;}ctx ctx.next;} }Netty 确实是从双向链表的头结点开始依次遍历如果当前 Context 节点的 Handler 要被删除的 Handler 相同那么便找到了要删除的 Handler然后返回当前 Context 节点。 找到需要删除的 Handler 节点之后接下来就是将节点从双向链表中删除再跟进atomicRemoveFromHandlerList() 方法的源码 private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {AbstractChannelHandlerContext prev ctx.prev;AbstractChannelHandlerContext next ctx.next;prev.next next;next.prev prev; }删除节点和添加节点类似都是基本的链表操作通过调整双向链表的指针即可实现。假设现在需要删除 SampleOutboundA 节点我们以一幅图来表示删除时指针的变化过程如下所示。 删除完节点之后最后 Netty 会回调用户自定义实现的 handlerRemoved() 方法回调的实现过程与添加节点时是类似的在这里我就不赘述了。 到此为止我们已经学会了 ChannelPipeline 内部结构的基本操作只需要基本的链表操作就可以实现 Handler 节点的添加和删除添加时通过掩码运算的方式排出 Handler 不关心的事件。 ChannelPipeline 是如何调度 Handler 的呢接下来我们继续学习。 数据在 Pipeline 中的运转 我们知道根据数据的流向ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器。Inbound 事件和 Outbound 事件的传播方向相反Inbound 事件的传播方向为 Head - Tail而 Outbound 事件传播方向是 Tail - Head。今天我们就以客户端和服务端请求-响应的场景深入研究 ChannelPipeline 的事件传播机制。 Inbound 事件传播 当客户端向服务端发送数据时服务端是如何接收的呢回顾下之前我们所学习的 Netty Reactor 线程模型首先 NioEventLoop 会不断轮询 OP_ACCEPT 和 OP_READ 事件当事件就绪时NioEventLoop 会及时响应。首先定位到 NioEventLoop 中源码的入口 // NioEventLoop#processSelectedKey if ((readyOps (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) ! 0 || readyOps 0) {unsafe.read(); }可以看出 unsafe.read() 会触发后续事件的处理有一点需要避免混淆在服务端 Channel 和客户端 Channel 中绑定的 unsafe 对象是不一样的因为服务端 Channel 只关心如何接收客户端连接而客户端 Channel 需要关心数据的读写。这里我们重点分析一下客户端 Channel 读取数据的过程跟进 unsafe.read() 的源码 public final void read() {final ChannelConfig config config();// 省略其他代码final ChannelPipeline pipeline pipeline();final ByteBufAllocator allocator config.getAllocator();final RecvByteBufAllocator.Handle allocHandle recvBufAllocHandle();allocHandle.reset(config);ByteBuf byteBuf null;boolean close false;try {do {byteBuf allocHandle.allocate(allocator); // 分配 ByteBufallocHandle.lastBytesRead(doReadBytes(byteBuf)); // 将 Channel 中的数据读到 ByteBuf 中if (allocHandle.lastBytesRead() 0) {byteBuf.release();byteBuf null;close allocHandle.lastBytesRead() 0;if (close) {readPending false;}break;}allocHandle.incMessagesRead(1);readPending false;pipeline.fireChannelRead(byteBuf); // 传播 ChannelRead 事件byteBuf null;} while (allocHandle.continueReading());allocHandle.readComplete();pipeline.fireChannelReadComplete(); // 传播 readComplete 事件if (close) {closeOnRead(pipeline);}} catch (Throwable t) {handleReadException(pipeline, byteBuf, t, close, allocHandle);} finally {if (!readPending !config.isAutoRead()) {removeReadOp();}} }Netty 会不断从 Channel 中读取数据到分配的 ByteBuf 中然后通过 pipeline.fireChannelRead() 方法触发 ChannelRead 事件的传播fireChannelRead() 是我们需要重点分析的对象。 // DefaultChannelPipeline public final ChannelPipeline fireChannelRead(Object msg) {AbstractChannelHandlerContext.invokeChannelRead(head, msg);return this; } // AbstractChannelHandlerContext static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {final Object m next.pipeline.touch(ObjectUtil.checkNotNull(msg, msg), next);EventExecutor executor next.executor();if (executor.inEventLoop()) { // 当前在 Reactor 线程内部直接执行next.invokeChannelRead(m);} else {executor.execute(new Runnable() { // 如果是外部线程则提交给异步任务队列Overridepublic void run() {next.invokeChannelRead(m);}});} }Netty 首先会以 Head 节点为入参直接调用一个静态方法 invokeChannelRead()。如果当前是在 Reactor 线程内部会直接执行 next.invokeChannelRead() 方法。如果是外部线程发起的调用Netty 会把 next.invokeChannelRead() 调用封装成异步任务提交到任务队列。通过之前对 NioEventLoop 源码的学习我们知道这样可以保证执行流程全部控制在当前 NioEventLoop 线程内部串行化执行确保线程安全性。我们抓住核心逻辑 next.invokeChannelRead() 继续跟进。 // AbstractChannelHandlerContext private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);} }可以看出当前 ChannelHandlerContext 节点会取出自身对应的 Handler执行 Handler 的 channelRead 方法。此时当前节点是 HeadContext所以 Inbound 事件是从 HeadContext 节点开始进行传播的看下 HeadContext.channelRead() 是如何实现的。 // HeadContext public void channelRead(ChannelHandlerContext ctx, Object msg) {ctx.fireChannelRead(msg); } // AbstractChannelHandlerContext public ChannelHandlerContext fireChannelRead(final Object msg) {// 找到下一个节点执行 invokeChannelReadinvokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);return this; }我们发现 HeadContext.channelRead() 并没有做什么特殊操作而是直接通过 fireChannelRead() 方法继续将读事件继续传播下去。接下来 Netty 会通过 findContextInbound(MASK_CHANNEL_READ), msg) 找到 HeadContext 的下一个节点然后继续执行我们之前介绍的静态方法 invokeChannelRead()从而进入一个递归调用的过程直至某个条件结束。以上 channelRead 的执行过程我们可以梳理成一幅流程图 Netty 是如何判断 InboundHandler 是否关心 channelRead 事件呢这就涉及findContextInbound(MASK_CHANNEL_READ), msg) 中的一个知识点和上文中我们介绍的 executionMask 掩码运算是息息相关的。首先看下 findContextInbound() 的源码 private AbstractChannelHandlerContext findContextInbound(int mask) {AbstractChannelHandlerContext ctx this;do {ctx ctx.next;} while ((ctx.executionMask mask) 0);return ctx; }MASK_CHANNEL_READ 的值为 1 5表示 channelRead 事件所在的二进制位已被置为 1。在代码示例中SampleInboundA 是我们添加的 Inbound 类型的自定义处理器它所对应的 executionMask 掩码和 MASK_CHANNEL_READ 进行与运算的结果如果不为 0表示 SampleInboundA 对 channelRead 事件感兴趣需要触发执行 SampleInboundA 的 channelRead() 方法。 Inbound 事件在上述递归调用的流程中什么时候能够结束呢有以下两种情况 用户自定义的 Handler 没有执行 fireChannelRead() 操作则在当前 Handler 终止 Inbound 事件传播。如果用户自定义的 Handler 都执行了 fireChannelRead() 操作Inbound 事件传播最终会在 TailContext 节点终止。 接下来我们着重看下 TailContext 节点做了哪些工作。 public void channelRead(ChannelHandlerContext ctx, Object msg) {onUnhandledInboundMessage(ctx, msg); } protected void onUnhandledInboundMessage(Object msg) {try {logger.debug(Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration., msg);} finally {ReferenceCountUtil.release(msg);} }可以看出 TailContext 只是日志记录了丢弃的 Inbound 消息并释放 ByteBuf 做一个兜底保护防止内存泄漏。 到此为止Inbound 事件的传播流程已经介绍完了Inbound 事件在 ChannelPipeline 中的传播方向是 Head - Tail。Netty 会从 ChannelPipeline 中找到对传播事件感兴趣的 Inbound 处理器执行事件回调方法然后继续向下一个节点传播整个事件传播流程是一个递归调用的过程。 Outbound 事件传播 分析完 Inbound 事件的传播流程之后再学习 Outbound 事件传播就会简单很多。Outbound 事件传播的方向是从 Tail - Head与 Inbound 事件的传播方向恰恰是相反的。Outbound 事件最常见的就是写事件执行 writeAndFlush() 方法时就会触发 Outbound 事件传播。我们直接从 TailContext 跟进 writeAndFlush() 源码 Override public final ChannelFuture writeAndFlush(Object msg) {return tail.writeAndFlush(msg); }继续跟进 tail.writeAndFlush() 的源码最终会定位到 AbstractChannelHandlerContext 中的 write 方法。该方法是 writeAndFlush 的核心逻辑具体源码如下。 private void write(Object msg, boolean flush, ChannelPromise promise) {// ...... 省略部分非核心代码 ...... // 找到 Pipeline 链表中下一个 Outbound 类型的 ChannelHandler 节点final AbstractChannelHandlerContext next findContextOutbound(flush ?(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);final Object m pipeline.touch(msg, next);EventExecutor executor next.executor();// 判断当前线程是否是 NioEventLoop 中的线程if (executor.inEventLoop()) {if (flush) {// 因为 flush true所以流程走到这里next.invokeWriteAndFlush(m, promise);} else {next.invokeWrite(m, promise);}} else {final AbstractWriteTask task;if (flush) {task WriteAndFlushTask.newInstance(next, m, promise);} else {task WriteTask.newInstance(next, m, promise);}if (!safeExecute(executor, task, promise, m)) {task.cancel();}} }在《数据传输writeAndFlush 处理流程剖析》的课程中我们已经对 write() 方法做了深入分析这里抛开其他技术细节我们只分析 Outbound 事件传播的过程。 假设我们在代码示例中 SampleOutboundB 调用了 writeAndFlush() 方法那么 Netty 会调用 findContextOutbound() 方法找到 Pipeline 链表中下一个 Outbound 类型的 ChannelHandler对应上述代码示例中下一个 Outbound 节点是 SampleOutboundA然后调用 next.invokeWriteAndFlush(m, promise)我们跟进去 private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {if (invokeHandler()) {invokeWrite0(msg, promise);invokeFlush0();} else {writeAndFlush(msg, promise);} } private void invokeWrite0(Object msg, ChannelPromise promise) {try {((ChannelOutboundHandler) handler()).write(this, msg, promise);} catch (Throwable t) {notifyOutboundHandlerException(t, promise);} }我们发现invokeWriteAndFlush() 方法最终会它会执行下一个 ChannelHandler 节点的 write 方法。一般情况下用户在实现 outBound 类型的 ChannelHandler 时都会继承 ChannelOutboundHandlerAdapter一起看下它的 write() 方法是如何处理 outBound 事件的。 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise); }ChannelOutboundHandlerAdapter.write() 只是调用了 AbstractChannelHandlerContext 的 write() 方法是不是似曾相识与之前介绍的 Inbound 事件处理流程类似此时流程又回到了 AbstractChannelHandlerContext 中重复执行 write 方法继续寻找下一个 Outbound 节点也是一个递归调用的过程。 编码器是用户经常需要自定义实现的处理器然而为什么用户的编码器里并没有重写 write()只是重写一个 encode() 方法呢在《Netty 如何实现自定义通信协议》课程中我们所介绍的 MessageToByteEncoder 源码用户自定义的编码器基本都会继承 MessageToByteEncoderMessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法其中会调用子类实现的 encode 方法完成数据编码这里我们不再赘述了。 那么 OutBound 事件什么时候传播结束呢也许你已经猜到了OutBound 事件最终会传播到 HeadContext 节点。所以 HeadContext 节点既是 Inbound 处理器又是 OutBound 处理器继续看下 HeadContext 是如何拦截和处理 write 事件的。 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {unsafe.write(msg, promise); }HeadContext 最终调用了底层的 unsafe 写入数据数据在执行 write() 方法时只会写入到一个底层的缓冲数据结构然后等待 flush 操作将数据冲刷到 Channel 中。关于 write 和 flush 是如何操作缓存数据结构的快去复习一遍《数据传输writeAndFlush 处理流程剖析》吧将知识点形成一个完整的体系。 到此为止outbound 事件传播也介绍完了它的传播方向是 Tail - Head与 Inbound 事件的传播是相反的。MessageToByteEncoder 是用户在实现编码时经常用到的一个抽象类MessageToByteEncoder 中已经重写了 ChanneOutboundHandler 的 write() 方法大部分情况下用户只需要重写 encode() 即可。 异常事件传播 在《服务编排层Pipeline 如何协调各类 Handler》中我们已经初步介绍了 Netty 实现统一异常拦截和处理的最佳实践首先回顾下异常拦截器的简单实现。 public class ExceptionHandler extends ChannelDuplexHandler {Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof RuntimeException) {System.out.println(Handle Business Exception Success.);}} }异常处理器 ExceptionHandler 一般会继承 ChannelDuplexHandlerChannelDuplexHandler 既是一个 Inbound 处理器又是一个 Outbound 处理器。ExceptionHandler 应该被添加在自定义处理器的尾部如下图所示 那么异常处理器 ExceptionHandler 什么时候被执行呢我们分别从 Inbound 异常事件传播和 Outbound 异常事件传播两种场景进行分析。 首先看下 Inbound 异常事件的传播。还是从数据读取的场景入手发现 Inbound 事件传播的时候有异常处理的相关逻辑我们再一起重新分析下数据读取环节的源码。 // AbstractChannelHandlerContext private void invokeChannelRead(Object msg) {if (invokeHandler()) {try {((ChannelInboundHandler) handler()).channelRead(this, msg);} catch (Throwable t) {notifyHandlerException(t);}} else {fireChannelRead(msg);} } // AbstractChannelHandlerContext private void notifyHandlerException(Throwable cause) {// 省略其他代码invokeExceptionCaught(cause); } // AbstractChannelHandlerContext private void invokeExceptionCaught(final Throwable cause) {if (invokeHandler()) {try {handler().exceptionCaught(this, cause); // 调用 Handler 实现的 exceptionCaught 方法} catch (Throwable error) {// 省略其他代码}} else {fireExceptionCaught(cause);} }如果 SampleInboundA 在读取数据时发生了异常invokeChannelRead 会捕获异常并执行 notifyHandlerException() 方法进行异常处理。我们一步步跟进发现最终会调用 Handler 的 exceptionCaught() 方法所以用户可以通过重写 exceptionCaught() 实现自定义的异常处理。 我们知道统一异常处理器 ExceptionHandler 是在 ChannelPipeline 的末端SampleInboundA 并没有重写 exceptionCaught() 方法那么 SampleInboundA 产生的异常是如何传播到 ExceptionHandler 中呢用户实现的 Inbound 处理器一般都会继承 ChannelInboundHandlerAdapter 抽象类果然我们在 ChannelInboundHandlerAdapter 中发现了 exceptionCaught() 的实现 // ChannelInboundHandlerAdapter public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {ctx.fireExceptionCaught(cause); } // AbstractChannelHandlerContext public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {invokeExceptionCaught(findContextInbound(MASK_EXCEPTION_CAUGHT), cause);return this; }ChannelInboundHandlerAdapter 默认调用 fireExceptionCaught() 方法传播异常事件而 fireExceptionCaught() 执行时会先调用 findContextInbound() 方法找到下一个对异常事件关注的 Inbound 处理器然后继续向下传播异常。所以这里应该明白为什么统一异常处理器 ExceptionHandler 为什么需要添加在 ChannelPipeline 的末端了吧这样 ExceptionHandler 可以接收所有 Inbound 处理器发生的异常。 接下来我们分析 Outbound 异常事件传播。你可能此时就会有一个疑问Outbound 事件的传播方向与 Inbound 事件是相反的为什么统一异常处理器 ExceptionHandler 没有添加在 ChannelPipeline 的头部呢我们通过 writeAndFlush() 的调用过程再来一探究竟。 // AbstractChannelHandlerContext private void invokeFlush0() {try {((ChannelOutboundHandler) handler()).flush(this);} catch (Throwable t) {notifyHandlerException(t);} }我们发现flush 发送数据时如果发生异常那么异常也会被捕获并交由同样的 notifyHandlerException() 方法进行处理。因为 notifyHandlerException() 方法中会向下寻找 Inbound 处理器此时又会回到 Inbound 异常事件的传播流程。所以说异常事件的传播方向与 Inbound 事件几乎是一样的最后一定会传播到统一异常处理器 ExceptionHandler。 到这里整个异常事件的传播过程已经分析完了。你需要记住的是异常事件的传播顺序与 ChannelHandler 的添加顺序相同会依次向后传播与 Inbound 事件和 Outbound 事件无关。 总结 这节点我们学习了数据在 Netty 中的完整处理流程其中重点分析了数据是如何在 ChannelPipeline 中流转的。我们做一个知识点总结 ChannelPipeline 是双向链表结构包含 ChannelInboundHandler 和 ChannelOutboundHandler 两种处理器。Inbound 事件和 Outbound 事件的传播方向相反Inbound 事件的传播方向为 Head - Tail而 Outbound 事件传播方向是 Tail - Head。异常事件的处理顺序与 ChannelHandler 的添加顺序相同会依次向后传播与 Inbound 事件和 Outbound 事件无关。 再整体回顾下 ChannelPipeline 中事件传播的实现原理 Inbound 事件传播从 HeadContext 节点开始Outbound 事件传播从 TailContext 节点开始。AbstractChannelHandlerContext 抽象类中实现了一系列 fire 和 invoke 方法如果想让事件想下传播只需要调用 fire 系列的方法即可。fire 和 invoke 的系列方法结合 findContextInbound() 和 findContextOutbound() 可以控制 Inbound 和 Outbound 事件的传播方向整个过程是一个递归调用。 20 技巧篇Netty 的 FastThreadLocal 究竟比 ThreadLocal 快在哪儿 在前面几篇源码解析的课程中我们都有在源码中发现 FastThreadLocal 的身影。顾名思义Netty 作为高性能的网络通信框架FastThreadLocal 是比 JDK 自身的 ThreadLocal 性能更高的通信框架。FastThreadLocal 到底比 ThreadLocal 快在哪里呢这节课我们就一起来探索 FastThreadLocal 高性能的奥秘。 说明本文参考的 Netty 源码版本为 4.1.42.Final。 JDK ThreadLocal 基本原理 JDK ThreadLocal 不仅是高频的面试知识点而且在日常工作中也是常用一种工具所以首先我们先学习下 Java 原生的 ThreadLocal 的实现原理可以帮助我们更好地对比和理解 Netty 的 FastThreadLocal。 如果你需要变量在多线程之间隔离或者在同线程内的类和方法中共享那么 ThreadLocal 大显身手的时候就到了。ThreadLocal 可以理解为线程本地变量它是 Java 并发编程中非常重要的一个类。ThreadLocal 为变量在每个线程中都创建了一个副本该副本只能被当前线程访问多线程之间是隔离的变量不能在多线程之间共享。这样每个线程修改变量副本时不会对其他线程产生影响。 接下来我们通过一个例子看下 ThreadLocal 如何使用 public class ThreadLocalTest {private static final ThreadLocalString THREAD_NAME_LOCAL ThreadLocal.withInitial(() - Thread.currentThread().getName());private static final ThreadLocalTradeOrder TRADE_THREAD_LOCAL new ThreadLocal();public static void main(String[] args) {for (int i 0; i 2; i) {int tradeId i;new Thread(() - {TradeOrder tradeOrder new TradeOrder(tradeId, tradeId % 2 0 ? 已支付 : 未支付);TRADE_THREAD_LOCAL.set(tradeOrder);System.out.println(threadName: THREAD_NAME_LOCAL.get());System.out.println(tradeOrder info TRADE_THREAD_LOCAL.get());}, thread- i).start();}}static class TradeOrder {long id;String status;public TradeOrder(int id, String status) {this.id id;this.status status;}Overridepublic String toString() {return id id , status status;}} }在上述示例中构造了 THREAD_NAME_LOCAL 和 TRADE_THREAD_LOCAL 两个 ThreadLocal 变量分别用于记录当前线程名称和订单交易信息。ThreadLocal 是可以支持泛型的THREAD_NAME_LOCAL 和 TRADE_THREAD_LOCAL 存放 String 类型和 TradeOrder 对象类型的数据你可以通过 set()/get() 方法设置和读取 ThreadLocal 实例。一起看下示例代码的运行结果 threadName: thread-0 threadName: thread-1 tradeOrder infoid1, status未支付 tradeOrder infoid0, status已支付可以看出 thread-1 和 thread-2 虽然操作的是同一个 ThreadLocal 对象但是它们取到了不同的线程名称和订单交易信息。那么一个线程内如何存在多个 ThreadLocal 对象每个 ThreadLocal 对象是如何存储和检索的呢 接下来我们看看 ThreadLocal 的实现原理。既然多线程访问 ThreadLocal 变量时都会有自己独立的实例副本那么很容易想到的方案就是在 ThreadLocal 中维护一个 Map记录线程与实例之间的映射关系。当新增线程和销毁线程时都需要更新 Map 中的映射关系因为会存在多线程并发修改所以需要保证 Map 是线程安全的。那么 JDK 的 ThreadLocal 是这么实现的吗答案是 NO。因为在高并发的场景并发修改 Map 需要加锁势必会降低性能。JDK 为了避免加锁采用了相反的设计思路。以 Thread 入手在 Thread 中维护一个 Map记录 ThreadLocal 与实例之间的映射关系这样在同一个线程内Map 就不需要加锁了。示例代码中线程 Thread 和 ThreadLocal 的关系可以用以下这幅图表示。 那么在 Thread 内部维护映射关系的 Map 是如何实现的呢从源码中可以发现 Thread 使用的是 ThreadLocal 的内部类 ThreadLocalMap所以 Thread、ThreadLocal 和 ThreadLocalMap 之间的关系可以用下图表示 为了更加深入理解 ThreadLocal了解 ThreadLocalMap 的内部实现是非常有必要的。ThreadLocalMap 其实与 HashMap 的数据结构类似但是 ThreadLocalMap 不具备通用性它是为 ThreadLocal 量身定制的。 ThreadLocalMap 是一种使用线性探测法实现的哈希表底层采用数组存储数据。如下图所示ThreadLocalMap 会初始化一个长度为 16 的 Entry 数组每个 Entry 对象用于保存 key-value 键值对。与 HashMap 不同的是Entry 的 key 就是 ThreadLocal 对象本身value 就是用户具体需要存储的值。 当调用 ThreadLocal.set() 添加 Entry 对象时是如何解决 Hash 冲突的呢这就需要我们了解线性探测法的实现原理。每个 ThreadLocal 在初始化时都会有一个 Hash 值为 threadLocalHashCode每增加一个 ThreadLocal Hash 值就会固定增加一个魔术 HASH_INCREMENT 0x61c88647。为什么取 0x61c88647 这个魔数呢实验证明通过 0x61c88647 累加生成的 threadLocalHashCode 与 2 的幂取模得到的结果可以较为均匀地分布在长度为 2 的幂大小的数组中。有了 threadLocalHashCode 的基础下面我们通过下面的表格来具体讲解线性探测法是如何实现的。 为了便于理解我们采用一组简单的数据模拟 ThreadLocal.set() 的过程是如何解决 Hash 冲突的。 threadLocalHashCode 4threadLocalHashCode 15 4此时数据应该放在数组下标为 4 的位置。下标 4 的位置正好没有数据可以存放。threadLocalHashCode 19threadLocalHashCode 15 4但是下标 4 的位置已经有数据了如果当前需要添加的 Entry 与下标 4 位置已存在的 Entry 两者的 key 相同那么该位置 Entry 的 value 将被覆盖为新的值。我们假设 key 都是不相同的所以此时需要向后移动一位下标 5 的位置没有冲突可以存放。threadLocalHashCode 33threadLocalHashCode 15 3下标 3 的位置已经有数据向后移一位下标 4 位置还是有数据继续向后查找发现下标 6 没有数据可以存放。 ThreadLocal.get() 的过程也是类似的也是根据 threadLocalHashCode 的值定位到数组下标然后判断当前位置 Entry 对象与待查询 Entry 对象的 key 是否相同如果不同继续向下查找。由此可见ThreadLocal.set()/get() 方法在数据密集时很容易出现 Hash 冲突需要 O(n) 时间复杂度解决冲突问题效率较低。 下面我们再聊聊 ThreadLocalMap 中 Entry 的设计原理。Entry 继承自弱引用类 WeakReferenceEntry 的 key 是弱引用value 是强引用。在 JVM 垃圾回收时只要发现了弱引用的对象不管内存是否充足都会被回收。那么为什么 Entry 的 key 要设计成弱引用呢我们试想下如果 key 都是强引用当 ThreadLocal 不再使用时然而 ThreadLocalMap 中还是存在对 ThreadLocal 的强引用那么 GC 是无法回收的从而造成内存泄漏。 虽然 Entry 的 key 设计成了弱引用但是当 ThreadLocal 不再使用被 GC 回收后ThreadLocalMap 中可能出现 Entry 的 key 为 NULL那么 Entry 的 value 一直会强引用数据而得不到释放只能等待线程销毁。那么应该如何避免 ThreadLocalMap 内存泄漏呢ThreadLocal 已经帮助我们做了一定的保护措施在执行 ThreadLocal.set()/get() 方法时ThreadLocal 会清除 ThreadLocalMap 中 key 为 NULL 的 Entry 对象让它还能够被 GC 回收。除此之外当线程中某个 ThreadLocal 对象不再使用时立即调用 remove() 方法删除 Entry 对象。如果是在异常的场景中记得在 finally 代码块中进行清理保持良好的编码意识。 关于 JDK 的 ThreadLocal 的基本原理我们已经介绍完了既然 ThreadLocal 已经非常成熟而且在日常开发中也被广泛使用Netty 为什么还要自己实现一个 FastThreadLocal 呢性能真的比 ThreadLocal 高很多吗我们接下来一起一探究竟。 FastThreadLocal 为什么快 FastThreadLocal 的实现与 ThreadLocal 非常类似Netty 为 FastThreadLocal 量身打造了 FastThreadLocalThread 和 InternalThreadLocalMap 两个重要的类。下面我们看下这两个类是如何实现的。 FastThreadLocalThread 是对 Thread 类的一层包装每个线程对应一个 InternalThreadLocalMap 实例。只有 FastThreadLocal 和 FastThreadLocalThread 组合使用时才能发挥 FastThreadLocal 的性能优势。首先看下 FastThreadLocalThread 的源码定义 public class FastThreadLocalThread extends Thread {private InternalThreadLocalMap threadLocalMap;// 省略其他代码 }可以看出 FastThreadLocalThread 主要扩展了 InternalThreadLocalMap 字段我们可以猜测到 FastThreadLocalThread 主要使用 InternalThreadLocalMap 存储数据而不再是使用 Thread 中的 ThreadLocalMap。所以想知道 FastThreadLocalThread 高性能的奥秘必须要了解 InternalThreadLocalMap 的设计原理。 上文中我们讲到了 ThreadLocal 的一个重要缺点就是 ThreadLocalMap 采用线性探测法解决 Hash 冲突性能较慢那么 InternalThreadLocalMap 又是如何优化的呢首先一起看下 InternalThreadLocalMap 的内部构造。 public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY 8;private static final int STRING_BUILDER_INITIAL_SIZE;private static final int STRING_BUILDER_MAX_SIZE;public static final Object UNSET new Object();private BitSet cleanerFlags; private InternalThreadLocalMap() {super(newIndexedVariableTable());}private static Object[] newIndexedVariableTable() {Object[] array new Object[32];Arrays.fill(array, UNSET);return array;} public static int nextVariableIndex() {int index nextIndex.getAndIncrement();if (index 0) {nextIndex.decrementAndGet();throw new IllegalStateException(too many thread-local indexed variables);}return index;}// 省略其他代码 } class UnpaddedInternalThreadLocalMap {static final ThreadLocalInternalThreadLocalMap slowThreadLocalMap new ThreadLocalInternalThreadLocalMap();static final AtomicInteger nextIndex new AtomicInteger(); Object[] indexedVariables;UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {this.indexedVariables indexedVariables;}// 省略其他代码 }从 InternalThreadLocalMap 内部实现来看与 ThreadLocalMap 一样都是采用数组的存储方式。但是 InternalThreadLocalMap 并没有使用线性探测法来解决 Hash 冲突而是在 FastThreadLocal 初始化的时候分配一个数组索引 indexindex 的值采用原子类 AtomicInteger 保证顺序递增通过调用 InternalThreadLocalMap.nextVariableIndex() 方法获得。然后在读写数据的时候通过数组下标 index 直接定位到 FastThreadLocal 的位置时间复杂度为 O(1)。如果数组下标递增到非常大那么数组也会比较大所以 FastThreadLocal 是通过空间换时间的思想提升读写性能。下面通过一幅图描述 InternalThreadLocalMap、index 和 FastThreadLocal 之间的关系。 通过上面 FastThreadLocal 的内部结构图我们对比下与 ThreadLocal 有哪些区别呢FastThreadLocal 使用 Object 数组替代了 Entry 数组Object[0] 存储的是一个SetFastThreadLocal? 集合从数组下标 1 开始都是直接存储的 value 数据不再采用 ThreadLocal 的键值对形式进行存储。 假设现在我们有一批数据需要添加到数组中分别为 value1、value2、value3、value4对应的 FastThreadLocal 在初始化的时候生成的数组索引分别为 1、2、3、4。如下图所示。 至此我们已经对 FastThreadLocal 有了一个基本的认识下面我们结合具体的源码分析 FastThreadLocal 的实现原理。 FastThreadLocal 源码分析 在讲解源码之前我们回过头看下上文中的 ThreadLocal 示例如果把示例中 ThreadLocal 替换成 FastThread应当如何使用呢 public class FastThreadLocalTest {private static final FastThreadLocalString THREAD_NAME_LOCAL new FastThreadLocal();private static final FastThreadLocalTradeOrder TRADE_THREAD_LOCAL new FastThreadLocal();public static void main(String[] args) {for (int i 0; i 2; i) {int tradeId i;String threadName thread- i;new FastThreadLocalThread(() - {THREAD_NAME_LOCAL.set(threadName);TradeOrder tradeOrder new TradeOrder(tradeId, tradeId % 2 0 ? 已支付 : 未支付);TRADE_THREAD_LOCAL.set(tradeOrder);System.out.println(threadName: THREAD_NAME_LOCAL.get());System.out.println(tradeOrder info TRADE_THREAD_LOCAL.get());}, threadName).start();}} }可以看出FastThreadLocal 的使用方法几乎和 ThreadLocal 保持一致只需要把代码中 Thread、ThreadLocal 替换为 FastThreadLocalThread 和 FastThreadLocal 即可Netty 在易用性方面做得相当棒。下面我们重点对示例中用得到 FastThreadLocal.set()/get() 方法做深入分析。 首先看下 FastThreadLocal.set() 的源码 public final void set(V value) {if (value ! InternalThreadLocalMap.UNSET) { // 1. value 是否为缺省值InternalThreadLocalMap threadLocalMap InternalThreadLocalMap.get(); // 2. 获取当前线程的 InternalThreadLocalMapsetKnownNotUnset(threadLocalMap, value); // 3. 将 InternalThreadLocalMap 中数据替换为新的 value} else {remove();} }FastThreadLocal.set() 方法虽然入口只有几行代码但是内部逻辑是相当复杂的。我们首先还是抓住代码主干一步步进行拆解分析。set() 的过程主要分为三步 判断 value 是否为缺省值如果等于缺省值那么直接调用 remove() 方法。这里我们还不知道缺省值和 remove() 之间的联系是什么我们暂且把 remove() 放在最后分析。如果 value 不等于缺省值接下来会获取当前线程的 InternalThreadLocalMap。然后将 InternalThreadLocalMap 中对应数据替换为新的 value。 首先我们看下 InternalThreadLocalMap.get() 方法源码如下 public static InternalThreadLocalMap get() {Thread thread Thread.currentThread();if (thread instanceof FastThreadLocalThread) { // 当前线程是否为 FastThreadLocalThread 类型return fastGet((FastThreadLocalThread) thread);} else {return slowGet();} } private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {InternalThreadLocalMap threadLocalMap thread.threadLocalMap(); // 获取 FastThreadLocalThread 的 threadLocalMap 属性if (threadLocalMap null) {thread.setThreadLocalMap(threadLocalMap new InternalThreadLocalMap());}return threadLocalMap; } private static InternalThreadLocalMap slowGet() {ThreadLocalInternalThreadLocalMap slowThreadLocalMap UnpaddedInternalThreadLocalMap.slowThreadLocalMap; InternalThreadLocalMap ret slowThreadLocalMap.get(); // 从 JDK 原生 ThreadLocal 中获取 InternalThreadLocalMapif (ret null) {ret new InternalThreadLocalMap();slowThreadLocalMap.set(ret);}return ret; }InternalThreadLocalMap.get() 逻辑很简单为了帮助你更好地理解下面使用一幅图描述 InternalThreadLocalMap 的获取方式。 如果当前线程是 FastThreadLocalThread 类型那么直接通过 fastGet() 方法获取 FastThreadLocalThread 的 threadLocalMap 属性即可。如果此时 InternalThreadLocalMap 不存在直接创建一个返回。关于 InternalThreadLocalMap 的初始化在上文中已经介绍过它会初始化一个长度为 32 的 Object 数组数组中填充着 32 个缺省对象 UNSET 的引用。 那么 slowGet() 又是什么作用呢从代码分支来看slowGet() 是针对非 FastThreadLocalThread 类型的线程发起调用时的一种兜底方案。如果当前线程不是 FastThreadLocalThread内部是没有 InternalThreadLocalMap 属性的Netty 在 UnpaddedInternalThreadLocalMap 中保存了一个 JDK 原生的 ThreadLocalThreadLocal 中存放着 InternalThreadLocalMap此时获取 InternalThreadLocalMap 就退化成 JDK 原生的 ThreadLocal 获取。 获取 InternalThreadLocalMap 的过程已经讲完了下面看下 setKnownNotUnset() 如何将数据添加到 InternalThreadLocalMap 的。 private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {if (threadLocalMap.setIndexedVariable(index, value)) { // 1. 找到数组下标 index 位置设置新的 valueaddToVariablesToRemove(threadLocalMap, this); // 2. 将 FastThreadLocal 对象保存到待清理的 Set 中} }setKnownNotUnset() 主要做了两件事 找到数组下标 index 位置设置新的 value。将 FastThreadLocal 对象保存到待清理的 Set 中。 首先我们看下第一步 threadLocalMap.setIndexedVariable() 的源码实现 public boolean setIndexedVariable(int index, Object value) {Object[] lookup indexedVariables;if (index lookup.length) {Object oldValue lookup[index]; lookup[index] value; // 直接将数组 index 位置设置为 value时间复杂度为 O(1)return oldValue UNSET;} else {expandIndexedVariableTableAndSet(index, value); // 容量不够先扩容再设置值return true;} }indexedVariables 就是 InternalThreadLocalMap 中用于存放数据的数组如果数组容量大于 FastThreadLocal 的 index 索引那么直接找到数组下标 index 位置将新 value 设置进去事件复杂度为 O(1)。在设置新的 value 之前会将之前 index 位置的元素取出如果旧的元素还是 UNSET 缺省对象那么返回成功。 如果数组容量不够了怎么办呢InternalThreadLocalMap 会自动扩容然后再设置 value。接下来看看 expandIndexedVariableTableAndSet() 的扩容逻辑 private void expandIndexedVariableTableAndSet(int index, Object value) {Object[] oldArray indexedVariables;final int oldCapacity oldArray.length;int newCapacity index;newCapacity | newCapacity 1;newCapacity | newCapacity 2;newCapacity | newCapacity 4;newCapacity | newCapacity 8;newCapacity | newCapacity 16;newCapacity ;Object[] newArray Arrays.copyOf(oldArray, newCapacity);Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);newArray[index] value;indexedVariables newArray; }上述代码的位移操作是不是似曾相识我们去翻阅下 JDK HashMap 中扩容的源码其中有这么一段代码 static final int tableSizeFor(int cap) {int n cap - 1;n | n 1;n | n 2;n | n 4;n | n 8;n | n 16;return (n 0) ? 1 : (n MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n 1; }可以看出 InternalThreadLocalMap 实现数组扩容几乎和 HashMap 完全是一模一样的所以多读源码还是可以给我们很多启发的。InternalThreadLocalMap 以 index 为基准进行扩容将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中空余部分填充缺省对象 UNSET最终把新数组赋值给 indexedVariables。 为什么 InternalThreadLocalMap 以 index 为基准进行扩容而不是原数组长度呢假设现在初始化了 70 个 FastThreadLocal但是这些 FastThreadLocal 从来没有调用过 set() 方法此时数组还是默认长度 32。当第 index 70 的 FastThreadLocal 调用 set() 方法时如果按原数组容量 32 进行扩容 2 倍后还是无法填充 index 70 的数据。所以使用 index 为基准进行扩容可以解决这个问题但是如果 FastThreadLocal 特别多数组的长度也是非常大的。 回到 setKnownNotUnset() 的主流程向 InternalThreadLocalMap 添加完数据之后接下就是将 FastThreadLocal 对象保存到待清理的 Set 中。我们继续看下 addToVariablesToRemove() 是如何实现的。 private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal? variable) {Object v threadLocalMap.indexedVariable(variablesToRemoveIndex); // 获取数组下标为 0 的元素SetFastThreadLocal? variablesToRemove;if (v InternalThreadLocalMap.UNSET || v null) {variablesToRemove Collections.newSetFromMap(new IdentityHashMapFastThreadLocal?, Boolean()); // 创建 FastThreadLocal 类型的 Set 集合threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); // 将 Set 集合填充到数组下标 0 的位置} else {variablesToRemove (SetFastThreadLocal?) v; // 如果不是 UNSETSet 集合已存在直接强转获得 Set 集合}variablesToRemove.add(variable); // 将 FastThreadLocal 添加到 Set 集合中 }variablesToRemoveIndex 是采用 static final 修饰的变量在 FastThreadLocal 初始化时 variablesToRemoveIndex 被赋值为 0。InternalThreadLocalMap 首先会找到数组下标为 0 的元素如果该元素是缺省对象 UNSET 或者不存在那么会创建一个 FastThreadLocal 类型的 Set 集合然后把 Set 集合填充到数组下标 0 的位置。如果数组第一个元素不是缺省对象 UNSET说明 Set 集合已经被填充直接强转获得 Set 集合即可。这就解释了 InternalThreadLocalMap 的 value 数据为什么是从下标为 1 的位置开始存储了因为 0 的位置已经被 Set 集合占用了。 为什么 InternalThreadLocalMap 要在数组下标为 0 的位置存放一个 FastThreadLocal 类型的 Set 集合呢这时候我们回过头看下 remove() 方法。 public final void remove() {remove(InternalThreadLocalMap.getIfSet()); } public static InternalThreadLocalMap getIfSet() {Thread thread Thread.currentThread();if (thread instanceof FastThreadLocalThread) {return ((FastThreadLocalThread) thread).threadLocalMap();}return slowThreadLocalMap.get(); } public final void remove(InternalThreadLocalMap threadLocalMap) {if (threadLocalMap null) {return;}Object v threadLocalMap.removeIndexedVariable(index); // 删除数组下标 index 位置对应的 valueremoveFromVariablesToRemove(threadLocalMap, this); // 从数组下标 0 的位置取出 Set 集合并删除当前 FastThreadLocalif (v ! InternalThreadLocalMap.UNSET) {try {onRemoval((V) v); // 空方法用户可以继承实现} catch (Exception e) {PlatformDependent.throwException(e);}} }在执行 remove 操作之前会调用 InternalThreadLocalMap.getIfSet() 获取当前 InternalThreadLocalMap。有了之前的基础理解 getIfSet() 方法就非常简单了如果是 FastThreadLocalThread 类型直接取 FastThreadLocalThread 中 threadLocalMap 属性。如果是普通线程 Thread从 ThreadLocal 类型的 slowThreadLocalMap 中获取。 找到 InternalThreadLocalMap 之后InternalThreadLocalMap 会从数组中定位到下标 index 位置的元素并将 index 位置的元素覆盖为缺省对象 UNSET。接下来就需要清理当前的 FastThreadLocal 对象此时 Set 集合就派上了用场InternalThreadLocalMap 会取出数组下标 0 位置的 Set 集合然后删除当前 FastThreadLocal。最后 onRemoval() 方法起到什么作用呢Netty 只是留了一处扩展并没有实现用户需要在删除的时候做一些后置操作可以继承 FastThreadLocal 实现该方法。 至此FastThreadLocal.set() 的完成过程已经讲完了接下来我们继续 FastThreadLocal.get() 方法的实现就易如反掌拉。FastThreadLocal.get() 的源码实现如下 public final V get() {InternalThreadLocalMap threadLocalMap InternalThreadLocalMap.get();Object v threadLocalMap.indexedVariable(index); // 从数组中取出 index 位置的元素if (v ! InternalThreadLocalMap.UNSET) {return (V) v;}return initialize(threadLocalMap); // 如果获取到的数组元素是缺省对象执行初始化操作 } public Object indexedVariable(int index) {Object[] lookup indexedVariables;return index lookup.length? lookup[index] : UNSET; } private V initialize(InternalThreadLocalMap threadLocalMap) {V v null;try {v initialValue();} catch (Exception e) {PlatformDependent.throwException(e);}threadLocalMap.setIndexedVariable(index, v);addToVariablesToRemove(threadLocalMap, this);return v; }首先根据当前线程是否是 FastThreadLocalThread 类型找到 InternalThreadLocalMap然后取出从数组下标 index 的元素如果 index 位置的元素不是缺省对象 UNSET说明该位置已经填充过数据直接取出返回即可。如果 index 位置的元素是缺省对象 UNSET那么需要执行初始化操作。可以看到initialize() 方法会调用用户重写的 initialValue 方法构造需要存储的对象数据如下所示。 private final FastThreadLocalString threadLocal new FastThreadLocalString() {Overrideprotected String initialValue() {return hello world;} };构造完用户对象数据之后接下来就会将它填充到数组 index 的位置然后再把当前 FastThreadLocal 对象保存到待清理的 Set 中。整个过程我们在分析 FastThreadLocal.set() 时都已经介绍过就不再赘述了。 到此为止FastThreadLocal 最核心的两个方法 set()/get() 我们已经分析完了。下面有两个问题我们再深入思考下。 FastThreadLocal 真的一定比 ThreadLocal 快吗答案是不一定的只有使用FastThreadLocalThread 类型的线程才会更快如果是普通线程反而会更慢。FastThreadLocal 会浪费很大的空间吗虽然 FastThreadLocal 采用的空间换时间的思路但是在 FastThreadLocal 设计之初就认为不会存在特别多的 FastThreadLocal 对象而且在数据中没有使用的元素只是存放了同一个缺省对象的引用并不会占用太多内存空间。 总结 本节课我们对比介绍了 ThreadLocal 和 FastThreadLocal简单总结下 FastThreadLocal 的优势。 高效查找。FastThreadLocal 在定位数据的时候可以直接根据数组下标 index 获取时间复杂度 O(1)。而 JDK 原生的 ThreadLocal 在数据较多时哈希表很容易发生 Hash 冲突线性探测法在解决 Hash 冲突时需要不停地向下寻找效率较低。此外FastThreadLocal 相比 ThreadLocal 数据扩容更加简单高效FastThreadLocal 以 index 为基准向上取整到 2 的次幂作为扩容后容量然后把原数据拷贝到新数组。而 ThreadLocal 由于采用的哈希表所以在扩容后需要再做一轮 rehash。安全性更高。JDK 原生的 ThreadLocal 使用不当可能造成内存泄漏只能等待线程销毁。在使用线程池的场景下ThreadLocal 只能通过主动检测的方式防止内存泄漏从而造成了一定的开销。然而 FastThreadLocal 不仅提供了 remove() 主动清除对象的方法而且在线程池场景中 Netty 还封装了 FastThreadLocalRunnableFastThreadLocalRunnable 最后会执行 FastThreadLocal.removeAll() 将 Set 集合中所有 FastThreadLocal 对象都清理掉 FastThreadLocal 体现了 Netty 在高性能方面精益求精的设计精神FastThreadLocal 仅仅是其中的冰山一角下节课我们继续探索 Netty 中其他高效的数据结构技巧。
http://www.pierceye.com/news/693047/

相关文章:

  • 茂名建站公司南通长城建设集团有限公司网站
  • 网络平台怎么建立网站吗做暧暧视频网站安全吗
  • 免费域名x网站网站前期准备工作
  • 陕西网站建设公司排名智能优化网站
  • 做瞹瞹网站萍乡做网站的公司有哪些
  • 网站建设的类型有几种wordpress搜索返回页面内容
  • 建设网站备案与不备案区别招远建网站首选公司
  • 四川住房和城乡建设厅网站三类人员软文网站备案如何查询
  • 个人与企业签订网站开发合同网页制作教程实例
  • 做网站遇到竞争对手怎么办wordpress中文版邮件发送
  • 美橙互联旗下网站渐变网站
  • 做网站域名需要在哪里备案微信答题小程序
  • 购物网站页面布局个人站长做导航网站
  • wordpress 增强编辑器网站暂时关闭 seo
  • 重庆网站设计开发培训广西省住房和城乡建设厅官网
  • 购物网站模板免费下载网站排名快速提升工具
  • 中山制作网站的公司滨江区网站开发公司
  • 申请建设工作网站的函酒店网站建设方案策划方案
  • 宠物店网站模板你是网站设计有限公司的项目经理
  • 潍坊网站开发公司2018做网站还赚钱吗
  • 做化妆品网站怎样wordpress映射到外网访问
  • 关于加强门户网站建设爱客crm客户管理系统
  • 网站备案的是域名还是空间电子商务网站建设携程
  • 建设企业网站管理系统目的开发一个网站的费用
  • 网站开发和浏览器兼容问题软文广告案例分析
  • 更新网站的方法自贡网站建设哪家好
  • 沈阳网络建网站个人电子商务网站建设的总体目标
  • asp 大型网站开发优化公司治理结构
  • 做外贸 建网站要注意什么ssr网站怎么做
  • 杭州做兼职网站建设老五wordpress