虚拟币交易网站开发,wordpress 加入搜索,西安公司电话,衣服 div网站原标题#xff1a;一个基于TCP协议的Socket通信实例1. 前言一般接口对接多以http/https或webservice的方式#xff0c;socket方式的对接比较少并且会有一些难度。正好前段时间完成了一个socket的接口的对接需求#xff0c;现将实现的思路做一个整理。2. 需求概述2.1 需要提供…原标题一个基于TCP协议的Socket通信实例1. 前言一般接口对接多以http/https或webservice的方式socket方式的对接比较少并且会有一些难度。正好前段时间完成了一个socket的接口的对接需求现将实现的思路做一个整理。2. 需求概述2.1 需要提供一个socket服务端实时接收三方传递过来的数据2.2 实时报文规范说明2.2.1 通讯及接口格式说明通讯方式通讯采用 TCP 协议 SOCKET 同步短连接方式。报文结构报文为不定长报文以定长报文头不定长报文体的方式报文基本结构如下图所示报文长度报文体6位交易报文长度交易报文。其中 6 位交易报文长度以 ASCII 码字符串方式表示(6 个字节)右对齐左补 0不包括自身的长度表示的是报文体的长度。如“000036fbced3fe-7025-4b5c-9cef-2421cd981f39” 000036 为长度“fbced3fe-7025-4b5c-9cef-2421cd981f39”为报文内容。报文结构符合 XML 标准的报文格式报文以无 BOM 格式的 GBK 编码。报文根节点为 Transaction节点。除非报文里有特殊说明报文定义的字段都是 Transaction 节点的子节点。报文格式参考下节示例。2.2.2 报文示例请求000410?xml version1.0 encodingGBK?29greerg4741414141test02018-06-1516:15:00响应000683?xml version1.0 encodingGBK?1OK0c2c002f-ccc6-4c7b-86e1-c7871b1c98b31Message enqueued for sendingSMS-AFFS-00000010047419155906y06b02hdo0013 代码实现3.1 BIO 阻塞模式简单的描述一下BIO的服务端通信模型采用BIO通信模型的服务端通常由一个独立的Acceptor线程负责监听客户端的连接它接收到客户端连接请求之后为每个客户端分配一个线程进行业务逻辑处理通过输出流返回应答给客户端线程销毁。即典型的请求应答模型。传统BIO通信模型图(此图来源于网络)该模型最大的问题就是缺乏弹性伸缩能力当客户端并发访问量增加后服务端的线程个数和客户端并发访问数呈1:1的正比关系 Java中的线程也是比较宝贵的系统资源线程数量快速膨胀后系统的性能将急剧下降随着访问量的继续增大系统最终崩溃。但是这种模式在一些特定的应用场景下效果是最好的比如只有少量的TCP连接通信且双方都非常快速的传输数据此时这种模式的性能最好实现比较简单。实现代码如下3.1.1 服务端同步阻塞模式的import java.io.*;import java.net.*;import java.nio.charset.Charset;import java.text.NumberFormat;import javax.annotation.PostConstruct;public class TCPBlockServer {// 服务IPprivate final String SERVER_IP 127.0.0.1;// 服务端口private final int SERVER_PORT 8888;private final int BACKLOG 150;private final String CHARSET_NAME GBK;PostConstructpublic void start() throws Exception {System.out.println(server Socket 启动 。。。。。。。);// 这里使用了Java的自动关闭的语法try (ServerSocket serverSocket new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {while (true) {Socket socket serverSocket.accept() ;new Thread(()-handler(socket)).start();}}}private void handler(Socket socket2) {String msg null;try (Socket socket socket2 ; InputStream input socket.getInputStream(); OutputStream out socket.getOutputStream()) {msg receiveMsg(input, socket);System.out.println(msg: msg);doBusinessLogic(msg,out);} catch (Exception e) {e.printStackTrace();}}// 处理业务逻辑private void doBusinessLogic(String msg,OutputStream out) throws Exception {// todo Business Logicmsg formatMsg(msg);out.write(msg.getBytes(CHARSET_NAME));out.flush();}private String formatMsg(String msg) {byte[] bodyBytes msg.getBytes(Charset.forName(CHARSET_NAME));int bodyLength bodyBytes.length;NumberFormat numberFormat NumberFormat.getNumberInstance();numberFormat.setMinimumIntegerDigits(6);numberFormat.setGroupingUsed(false);return numberFormat.format(bodyLength) msg;}private String receiveMsg(InputStream input, Socket socket) throws Exception {byte[] lengthBytes new byte[6];int count input.read(lengthBytes);int length Integer.valueOf(new String(lengthBytes));byte[] buffer new byte[length 2];int readBytes 0;while (readBytes length) {count input.read(buffer, readBytes, length - readBytes);if (count -1) {break;}readBytes count;}return new String(buffer, Charset.forName(GBK));}public static void main(String[] args) throws Exception {TCPBlockServer server new TCPBlockServer();server.start();}}3.1.2 服务端伪异步I/O模型上面实现方面存在的一些不足之处1服务器创建和销毁工作线程的开销很大。如果服务器需要和许多客户通信并且与每个客户的通信时间都很短那么有可能服务器为客户创建新线程的开销比实际与客户通信的开销还大。2除了创建和销毁线程的开销之外活动的线程也消耗系统资源。并且每个线程本身也会占用一定的内存(每个线程大约需要1MB内存)如果同时有大量客户连接到服务器就必须创建大量的工作线程他们会消耗大量内存可能会导致系统内存不足应用产生OOM的错误。3如果线程数目固定并且每个线程都有很长的生命周期那么线程切换也是相对固定的。不同的操作系统有不同的切换周期一般在20毫秒左右。这里所说的线程切换是指Java虚拟机以及底层操作系统的调度下线程之间转让CPU的使用权。如果频繁创建和销毁线程那么将导致频繁的切换线程因为一个线程被销毁后必然要把CPU转移给另外一个已经就绪的线程是该线程获得运行机会。这种情况下线程间的切换不再遵循系统的固定切换周期切换线程的开销甚至比创建及销毁的开销还大。为了改进客户端访问就会创建线程的场景改为由一个线程池去管理固定数量的线程来执行客户所需业务逻辑。实现线程池线程和客户端 N(N 1): M的关系。如下图所示相关实现代码如下根据实际场景需要设置线程池中合适的线程数量import java.io.*;import java.net.*;import java.nio.charset.Charset;import java.text.NumberFormat;import java.util.concurrent.*;import javax.annotation.PostConstruct;public class TCPBlockThreadPoolServer {// 服务IPprivate final String SERVER_IP 127.0.0.1;// 服务端口private final int SERVER_PORT 8888;private final int BACKLOG 150;private final int THREADS 150 ;private final String CHARSET_NAME GBK;private ExecutorService executorService ;PostConstructpublic void start() throws Exception {System.out.println(server Socket 启动 。。。。。。。);executorService Executors.newFixedThreadPool(THREADS) ;// 这里使用了Java的自动关闭的语法try (ServerSocket serverSocket new ServerSocket(SERVER_PORT, BACKLOG, InetAddress.getByName(SERVER_IP))) {while (true) {Socket socket serverSocket.accept() ;executorService.execute(()-handler(socket));}}}private void handler(Socket socket2) {String msg null;try (Socket socket socket2 ; InputStream input socket.getInputStream(); OutputStream out socket.getOutputStream()) {msg receiveMsg(input, socket);System.out.println(msg: msg);doBusinessLogic(msg,out);} catch (Exception e) {e.printStackTrace();}}// 处理业务逻辑private void doBusinessLogic(String msg,OutputStream out) throws Exception {// todo Business Logicmsg formatMsg(msg);out.write(msg.getBytes(CHARSET_NAME));out.flush();}private String formatMsg(String msg) {byte[] bodyBytes msg.getBytes(Charset.forName(CHARSET_NAME));int bodyLength bodyBytes.length;NumberFormat numberFormat NumberFormat.getNumberInstance();numberFormat.setMinimumIntegerDigits(6);numberFormat.setGroupingUsed(false);return numberFormat.format(bodyLength) msg;}private String receiveMsg(InputStream input, Socket socket) throws Exception {byte[] lengthBytes new byte[6];int count input.read(lengthBytes);int length Integer.valueOf(new String(lengthBytes));byte[] buffer new byte[length 2];int readBytes 0;while (readBytes length) {count input.read(buffer, readBytes, length - readBytes);if (count -1) {break;}readBytes count;}return new String(buffer, Charset.forName(GBK));}public static void main(String[] args) throws Exception {TCPBlockServer server new TCPBlockServer();server.start();}}3.1.3 客户端简单的客户端实现如下import java.io.*;import java.net.Socket;import java.nio.charset.Charset;import org.apache.commons.lang3.StringUtils;public class Client {public String sendAndRecv(String content, String charsetName,String ip,int port) throws Exception {try(Socket socket new Socket(ip,port)){socket.setTcpNoDelay(true);socket.setKeepAlive(true);socket.setSoTimeout(60000);try(OutputStream output socket.getOutputStream();InputStream input socket.getInputStream()){output.write(content.getBytes(charsetName));output.flush();BufferedReader bufferedReader new BufferedReader(new InputStreamReader(input, Charset.forName(GBK)));StringBuffer buffer new StringBuffer();String message null ;while((message bufferedReader.readLine()) ! null){buffer.append(message);}return StringUtils.substring(buffer.toString(), 6);}}}}3.2 NIO 模式相对于BIO(阻塞通信)模型来说NIO模型非常复杂以至于花费很大的精力去学习也不太容易能够精通难以编写出一个没有缺陷高效且适应各种意外情况的稳定的NIO通信模块。之所以有这样的问题是因为NIO编程不是单纯的一个技术点而是涵盖了一系列的相关技术、专业知识、编程经验和编程技巧的复杂工程所以精通这些技术相当有难度。和BIO相比NIO有如下几个新的概念1. 通道(Channel)Channel对应BIO中Stream的模型到任何目的地(或来自任何地方)的所有数据都必须通过一个Channel对象。但是Channel和Stream不同的地方在于Channel是双向的而Stream是单向的(分为InputStream和OutputStream)所以Channel可以用于读/写或同时用于读写。2. 缓冲区(Buffer)虽然Channel用于读写数据但是我们不能直接操作Channel进行读写必须通过缓冲区来完成(Buffer)。NIO设计了一个全新的数据结构Buffer具体的缓存区有这些ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer等。Buffer中有3个重要的参数位置(position)、容量(capactiy)和上限(limit)参数写模式读模式位置(position)当前缓冲区的位置将从position的下一个位置写数据当前缓存区读取的位置将从此位置后读取数据。容量(capacity)缓存区总容量的上限缓存区总容量的上限上限(limit)缓存区实际上限它总是小于等于容量。通常情况下和容量相等代表可读取的总容量和上次写入的容量相等。3. 选择器(Selector)Selector 可以同时检测多个Channel的事件以实现异步I/O,我们可以将感兴趣的事件注册到Selector上面当事件发生时可以通过Selector获取事件发生的Channel,并进行相关的事件处理操作。一个Selector可以同时轮询多个Channel。3.2.1 服务端import java.io.IOException;import java.net.*;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.charset.Charset;import java.text.NumberFormat;import java.util.Iterator;import javax.annotation.PostConstruct;import lombok.extern.slf4j.Slf4j;Slf4jpublic class TCPNioServer {// 服务IPprivate final String SERVER_IP 127.0.0.1;// 服务端口private final int SERVER_PORT 8888;private final int BACKLOG 150;private final String CHARSET_NAME GBK;private Selector selector;public TCPNioServer() throws Exception {ServerSocketChannel serverChannel ServerSocketChannel.open();// 设置通道为非阻塞serverChannel.configureBlocking(false);// 将该通道所对应的serverSocket绑定到指定的ip和port端口InetAddress inetAddress InetAddress.getByName(SERVER_IP);serverChannel.socket().bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);// 获得一个通道管理器(选择器)selector Selector.open();/** 将通道管理器和该通道绑定并为该通道注册selectionKey.OP_ACCEPT事件* 注册该事件后当事件到达的时候selector.select()会返回 如果事件没有到达selector.select()会一直阻塞*/serverChannel.register(selector, SelectionKey.OP_ACCEPT);}/*** 采用轮询的方式监听selector上是否有需要处理的事件如果有进行处理*/PostConstructpublic void start() throws Exception {log.info(start server ip {} , port {}. , SERVER_IP, SERVER_PORT);while (true) {selector.select();//此方法会阻塞直到至少有一个以注册的事件被触发//获取发生事件的SelectionKey集合Iterator iterator this.selector.selectedKeys().iterator();while (iterator.hasNext()) {try {SelectionKey selectedKey iterator.next();if (selectedKey.isValid()) { // 如果key的状态是有效的if (selectedKey.isAcceptable()) { //如key是阻塞状态调用accept()方法accept(selectedKey);}if (selectedKey.isReadable()) { //如key是可读状态调用handle()方法handle(selectedKey);}}} catch (Exception e) {iterator.remove();} finally {iterator.remove();//从集合中移除避免重复处理}}}}private void accept(SelectionKey key) throws IOException {// 1 获取服务器通道ServerSocketChannel server (ServerSocketChannel) key.channel();// 2 执行阻塞方法SocketChannel chennel server.accept();// 3 设置阻塞模式为非阻塞chennel.configureBlocking(false);// 4 注册到多路复用选择器上并设置读取标识chennel.register(selector, SelectionKey.OP_READ);}private void handle(SelectionKey key) throws Exception {// 获取之前注册的SocketChannel通道try (SocketChannel channel (SocketChannel) key.channel()) {int length getMsgLength(key, channel);String msg recvMsg(key, channel, length);System.out.println(Server msg);doBusinessLogic(msg, channel);}}private byte[] read(SelectionKey key, SocketChannel channel,int capacity) throws Exception {ByteBuffer buffer ByteBuffer.allocate(capacity);channel.read(buffer);// 将channel中的数据放入buffer中int count channel.read(buffer);if (count -1) { // -1表示通道中没有数据key.channel().close();key.cancel();return null;}// 读取到了数据将buffer的position复位到0buffer.flip();byte[] bytes new byte[buffer.remaining()];// 将buffer中的数据写入byte[]中buffer.get(bytes);return bytes ;}private int getMsgLength(SelectionKey key, SocketChannel channel) throws Exception {byte[] bytes this.read(key, channel, 6) ;String length new String(bytes, CHARSET_NAME);return new Integer(length);}private String recvMsg(SelectionKey key, SocketChannel channel,int msgLength) throws Exception{byte[] bytes this.read(key, channel, msgLength) ;return new String(bytes, CHARSET_NAME);}// 处理业务逻辑private void doBusinessLogic(String msg, SocketChannel channel) throws Exception {// todo Business Logicmsg formatMsg(msg);ByteBuffer outBuffer ByteBuffer.wrap(msg.getBytes(CHARSET_NAME));channel.write(outBuffer);}private String formatMsg(String msg) {byte[] bodyBytes msg.getBytes(Charset.forName(CHARSET_NAME));int bodyLength bodyBytes.length;NumberFormat numberFormat NumberFormat.getNumberInstance();numberFormat.setMinimumIntegerDigits(6);numberFormat.setGroupingUsed(false);return numberFormat.format(bodyLength) msg;}public static void main(String[] args) throws Exception {TCPNioServer server new TCPNioServer();server.start();}}3.3 AIO模式与NIO不同当进行读写操作时只须直接调用API的read或write方法即可。这两种方法均为异步的对于读操作而言当有流可读取时操作系统会将可读的流传入read方法的缓冲区并通知应用程序对于写操作而言当操作系统将write方法传递的流写入完毕时操作系统主动通知应用程序。 即可以理解为read/write方法都是异步的完成后会主动调用回调函数。 在JDK1.7中这部分内容被称作NIO2主要在java.nio.channels包下增加了下面四个异步通道AsynchronousSocketChannel对应BIO中的ServerSocket和NIO中的ServerSocketChannel,用于server端网络程序AsynchronousServerSocketChannel对应BIO中的Socket和NIO中的SocketChannel,用于client端网络应用AsynchronousFileChannelAsynchronousDatagramChannel异步channel API提供了两种方式监控/控制异步操作(connect,accept, readwrite等)。第一种方式是返回java.util.concurrent.Future对象 检查Future的状态可以得到操作是完成还是失败还是进行中 future.get会阻塞当前进程。第二种方式为操作提供一个回调参数java.nio.channels.CompletionHandler这个回调类包含completed,failed两个方法。channel的每个I/O操作都为这两种方式提供了相应的方法 你可以根据自己的需要选择合适的方式编程。下面的例子中在accept和read方法中使用了回调CompletionHandler的方式,而发送数据(write)使用了future的方式,当然write也可以采用回调CompletionHandler的方式。因为CompletionHandler是完全异步的所以需要在mian方法中使用一个 while循环确保程序不退出或者也可以在start方法的最后使用channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);3.3.1 服务端import java.io.*;import java.net.*;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.charset.Charset;import java.text.NumberFormat;import java.util.concurrent.*;import lombok.extern.slf4j.Slf4j;Slf4jpublic class TCPAioServer {// 服务IPprivate final String SERVER_IP 127.0.0.1;// 服务端口private final int SERVER_PORT 8888;private final int BACKLOG 150;private final String CHARSET_NAME GBK;private ExecutorService executorService;private AsynchronousChannelGroup channelGroup;private AsynchronousServerSocketChannel serverSocketChannel;public void start() throws IOException, Exception {// 创建线程池executorService Executors.newCachedThreadPool();// 创建线程组channelGroup AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);// 创建服务器通道serverSocketChannel AsynchronousServerSocketChannel.open(channelGroup);// 绑定地址InetAddress inetAddress InetAddress.getByName(SERVER_IP);serverSocketChannel.bind(new InetSocketAddress(inetAddress, SERVER_PORT), BACKLOG);log.info(server start, ip: {} , port{}, SERVER_IP, SERVER_PORT);serverSocketChannel.accept(this, new ServerCompletionHandler());//channelGroup.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);}class ServerCompletionHandler implements CompletionHandler {Overridepublic void completed(AsynchronousSocketChannel channel, TCPAioServer attachment) {try {handle(channel);} finally {// 当有下一个客户端接入的时候直接调用Server的accept方法这样反复执行下去保证多个客户端都可以阻塞serverSocketChannel.accept(attachment, this);}}private void handle(AsynchronousSocketChannel channel) {ByteBuffer buffer allocateByteBuffer(channel);channel.read(buffer, buffer, new CompletionHandler() {Overridepublic void completed(Integer result, ByteBuffer attachment) {attachment.flip();String msg null;try {msg new String(attachment.array(), CHARSET_NAME);} catch (UnsupportedEncodingException e) {e.printStackTrace();}log.info(Server 收到客户端发送的数据为{}, msg);doBusinessLogic(msg, channel);}Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();}});}private ByteBuffer allocateByteBuffer(AsynchronousSocketChannel channel) {ByteBuffer buffer ByteBuffer.allocate(6);try {channel.read(buffer).get(1000, TimeUnit.SECONDS);// 读取到了数据将buffer的position复位到0buffer.flip();byte[] bytes new byte[buffer.remaining()];// 将buffer中的数据写入byte[]中buffer.get(bytes);String length new String(bytes, CHARSET_NAME);buffer ByteBuffer.allocate(new Integer(length));} catch (InterruptedException | ExecutionException | TimeoutException | UnsupportedEncodingException e1) {e1.printStackTrace();}return buffer;}// 处理业务逻辑private void doBusinessLogic(String msg, AsynchronousSocketChannel result) {try (AsynchronousSocketChannel channel result) {msg formatMsg(msg);byte[] bodyBytes msg.getBytes(Charset.forName(CHARSET_NAME));ByteBuffer buffer ByteBuffer.allocate(bodyBytes.length);buffer.put(bodyBytes);buffer.flip();channel.write(buffer).get();} catch (Exception e) {e.printStackTrace();}}private String formatMsg(String msg) {byte[] bodyBytes msg.getBytes(Charset.forName(CHARSET_NAME));int bodyLength bodyBytes.length;NumberFormat numberFormat NumberFormat.getNumberInstance();numberFormat.setMinimumIntegerDigits(6);numberFormat.setGroupingUsed(false);return numberFormat.format(bodyLength) msg;}Overridepublic void failed(Throwable exc, TCPAioServer attachment) {exc.printStackTrace();}}public static void main(String[] args) throws Exception {TCPAioServer server new TCPAioServer();server.start();while (true) {Thread.sleep(1000);}}}目前Linux上的AIO实现主要有两种Posix AIO 与Kernel Native AIO,前者是用户态实现的而后者是内核态实现的。所以Kernel Native AIO的性能和前景要好于他的前辈Posix AIO比较有名的的软件如Nginx,MySQL等在高版本中都有支持Kernel Native AIO,但是只应用在少部分功能中。因为当下Linux的AIO实现还不是很完美充斥着各种Bug,并且AIO Socket 还并非真正的异步I/O机制使用AIO所带来的性能提升也不太明显稳定性并非十分可靠,如是Kernel Native AIO引起的问题解决的难度会非常大。但是AIO是未来的发展方向需要我们持续的关注。3.4 开源框架Netty实现的Socket服务Netty是一个高性能、异步事件驱动的NIO框架它提供了对TCP、UDP和文件传输的支持作为一个异步NIO框架Netty的所有IO操作都是异步非阻塞的通过Future-Listener机制用户可以方便的主动获取或者通过通知机制获得IO操作结果。作为当前最流行的NIO框架Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用一些业界著名的开源软件也基于Netty的NIO框架构建如Spark、RocketMQ、Dubbo、Elasticsearch等等。Netty的优点1、API使用简单有丰富的例子开发门槛低。2、功能强大预置了多种编解码功能支持多种主流协议。3、定制功能强可以通过ChannelHandler对通信框架进行灵活的扩展。4、性能高通过与其他业界主流的NIO框架对比Netty综合性能最优。5、成熟、稳定Netty修复了已经发现的NIO所有BUG。6、社区活跃。7、经历了很多商用项目的考验。3.4.1 服务端(Netty4.X)import java.nio.ByteOrder;import java.nio.charset.Charset;import java.text.NumberFormat;import java.util.concurrent.ThreadFactory;import java.util.concurrent.atomic.AtomicInteger;import javax.annotation.*;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import io.netty.handler.codec.string.*;import io.netty.handler.logging.*;import io.netty.util.ReferenceCountUtil;import lombok.extern.slf4j.Slf4j;Slf4jpublic class NettySocketServer {private final String CHARSET_NAME GBK;private final int bosscount 2;private final int workerCount 8;private final int tcpPort 8888;private final int backlog 100;private final int receiveBufferSize 1048576;private ServerBootstrap serverBootstrap;private ChannelFuture serverChannelFuture;public NamedThreadFactory bossThreadFactory() {return new NamedThreadFactory(Server-Worker);}public NioEventLoopGroup bossGroup() {return new NioEventLoopGroup(bosscount, bossThreadFactory());}public NamedThreadFactory workerThreadFactory() {return new NamedThreadFactory(Server-Worker);}public NioEventLoopGroup workerGroup() {return new NioEventLoopGroup(workerCount, workerThreadFactory());}public ServerBootstrap bootstrap() {ServerBootstrap bootstrap new ServerBootstrap();bootstrap.group(bossGroup(), workerGroup()).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, backlog).childHandler(new ChannelInitializer() {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline ch.pipeline();pipeline.addLast(logging, new LoggingHandler(LogLevel.ERROR)).addLast(stringEncoder, new StringEncoder(Charset.forName(GBK))).addLast(frameDecoder, new MsgLengthFieldBasedFrameDecoder(receiveBufferSize, 0, 6, 0, 6)).addLast(stringDecoder, new StringDecoder(Charset.forName(GBK))).addLast(messageHandler, new ServerMessageHandler());}});return bootstrap;}PostConstructpublic void start() throws Exception {serverBootstrap bootstrap();serverChannelFuture serverBootstrap.bind(tcpPort).sync();log.info(Starting server at tcpPort {} , tcpPort);}PreDestroypublic void stop() throws Exception {serverChannelFuture.channel().closeFuture().sync();}static class NamedThreadFactory implements ThreadFactory {public static AtomicInteger counter new AtomicInteger(1);private String name this.getClass().getName();private boolean deamon ;//守护线程private int priority ; //线程优先级public NamedThreadFactory(String name){this(name, false);}public NamedThreadFactory(String name,boolean deamon){this(name, deamon, -1);}public NamedThreadFactory(String name,boolean deamon,int priority){this.name name ;this.deamon deamon ;this.priority priority ;}Overridepublic Thread newThread(Runnable r) {Thread thread new Thread(r,name[counter.getAndIncrement()]);thread.setDaemon(deamon);if(priority ! -1){thread.setPriority(priority);}return thread;}}//拆包class MsgLengthFieldBasedFrameDecoder extends LengthFieldBasedFrameDecoder {/*** param maxFrameLength 解码时处理每个帧数据的最大长度* param lengthFieldOffset 该帧数据中存放该帧数据的长度的数据的起始位置* param lengthFieldLength 记录该帧数据长度的字段本身的长度* param lengthAdjustment 修改帧数据长度字段中定义的值可以为负数* param initialBytesToStrip解析的时候需要跳过的字节数*/public MsgLengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip) {super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);}Overrideprotected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {if(length 6){buf buf.order(order);byte[] lengthBytes new byte[6];buf.readBytes(lengthBytes);buf.resetReaderIndex();return Integer.valueOf(new String(lengthBytes));} else {return super.getUnadjustedFrameLength(buf, offset, length, order);}}}class ServerMessageHandler extends ChannelInboundHandlerAdapter {/*** 功能读取服务器发送过来的信息*/Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof String) {try {doBusinessLogic(ctx,(String)msg);} finally {ReferenceCountUtil.release(msg);}}}// 处理业务逻辑private void doBusinessLogic(ChannelHandlerContext ctx,String msg) throws Exception {// todo Business Logicmsg formatMsg(msg);ctx.channel().writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);}private String formatMsg(String msg) {byte[] bodyBytes msg.getBytes(Charset.forName(CHARSET_NAME));int bodyLength bodyBytes.length;NumberFormat numberFormat NumberFormat.getNumberInstance();numberFormat.setMinimumIntegerDigits(6);numberFormat.setGroupingUsed(false);return numberFormat.format(bodyLength) msg;}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}public static void main(String[] args) throws Exception{NettySocketServer server new NettySocketServer();server.start();}}总结同步阻塞IO伪异步IO非阻塞IO异步IONetty的非阻塞IO客户端服务端1:1N:M(M1)N:M(M1单线程非阻塞多线程非阻塞)N0(不需要启动额外的IO线程被动回调)N:M(M1)IO类型BIOBIONIOAIONIOAPI使用难度简单简单非常复杂复杂简单可靠性相当差差高高高吞吐量低中高高高并发低中高高高参考文献▲http://www.ibm.com/developerworks/cn/linux/l-async/▲http://openjdk.java.net/projects/nio/presentations/TS-4222.pdf▲http://blog.csdn.net/anxpp/article/details/51512200▲Netty权威指南▲Asynchronous I/O Tricks and Tips责任编辑