网站一年的费用,在线手机网站预览,沈阳网站网站建设,巴彦淖尔市 网站建设通信协议从广义上区分#xff0c;可以分为公有协议和私有协议。由于私有协议的灵活性#xff0c;它往往会在某个公司或者组织内部使用#xff0c;按需定制#xff0c;也因为如此#xff0c;升级起来会非常方便#xff0c;灵活性好。绝大多数的私有协议传输层都基于TCP/IP…通信协议从广义上区分可以分为公有协议和私有协议。由于私有协议的灵活性它往往会在某个公司或者组织内部使用按需定制也因为如此升级起来会非常方便灵活性好。绝大多数的私有协议传输层都基于TCP/IP所以利用Netty的NIO TCP协议栈可以非常方便地进行私有协议的定制和开发。 私有协议介绍 私有协议本质上是厂商内部发展和采用的标准除非授权其他厂商一般无权使用该协议。私有协议也称非标准协议就是未经国际或国家标准化组织采纳或批准由某个企业自己制订协议实现细节不愿公开只在企业自己生产的设备之间使用的协议。私有协议具有封闭性、垄断性、排他性等特点。如果网上大量存在私有非标准协议现行网络或用户一旦使用了它后进入的厂家设备就必须跟着使用这种非标准协议才能够互连互通否则根本不可能进入现行网络。这样使用非标准协议的厂家就实现了垄断市场的愿望。 在传统的Java应用中通常使用以下4种方式进行跨节点通信。 1通过RMI进行远程服务调用 2通过Java的SocketJava序列化的方式进行跨节点调用 3利用一些开源的RPC框架进行远程服务调用例如Facebook的ThriftApache的Avro等 4利用标准的公有协议进行跨节点服务调用例如HTTPXML、RESTfulJSON或者WebService。 跨节点的远程服务调用除了链路层的物理连接外还需要对请求和响应消息进行编解码。在请求和应答消息本身以外也需要携带一些其他控制和管理类指令例如链路建立的握手请求和响应消息、链路检测的心跳消息等。当这些功能组合到一起之后就会形成私有协议。 事实上私有协议并没有标准的定义只要是能够用于跨进程、跨主机数据交换的非标准协议都可以称为私有协议。通常情况下正规的私有协议都有具体的协议规范文档类似于《XXXX协议VXX规范》但是在实际的项目中内部使用的私有协议往往是口头约定的规范由于并不需要对外呈现或者被外部调用所以一般不会单独写相关的内部私有协议规范文档。 Netty协议栈功能设计 使用Netty提供的异步TCP协议栈开发一个私有协议栈该协议栈被命名为Netty协议栈。Netty协议栈用于内部各模块之间的通信它基于TCP/IP协议栈是一个类HTTP协议的应用层协议栈相比于传统的标准协议栈它更加轻巧、灵活和实用。 网络拓扑图 在分布式组网环境下每个Netty节点Netty进程之间建立长连接使用Netty协议进行通信。Netty节点并没有服务端和客户端的区分谁首先发起连接谁就作为客户端另一方自然就成为服务端。一个Netty节点既可以作为客户端连接另外的Netty节点也可以作为Netty服务端被其他Netty节点连接这完全取决于使用者的业务场景和具体的业务组网。 协议栈功能描述 Netty协议栈承载了业务内部各模块之间的消息交互和服务调用它的主要功能如下。 1基于Netty的NIO通信框架提供高性能的异步通信能力 2提供消息的编解码框架可以实现POJO的序列化和反序列化 3提供基于IP地址的白名单接入认证机制 4链路的有效性校验机制 5链路的断连重连机制。 通信模型 1Netty协议栈客户端发送握手请求消息携带节点ID等有效身份认证信息 2Netty协议栈服务端对握手请求消息进行合法性校验包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验校验通过后返回登录成功的握手应答消息 3链路建立成功之后客户端发送业务消息 4链路成功之后服务端发送心跳消息 5链路建立成功之后客户端发送心跳消息 6链路建立成功之后服务端发送业务消息 7服务端退出时服务端关闭连接客户端感知对方关闭连接后被动关闭客户端连接。 备注需要指出的是Netty协议通信双方链路建立成功之后双方可以进行全双工通信无论客户端还是服务端都可以主动发送请求消息给对方通信方式可以是TWO WAY或者ONE WAY。双方之间的心跳采用Ping-Pong机制当链路处于空闲状态时客户端主动发送Ping消息给服务端服务端接收到Ping消息后发送应答消息Pong给客户端如果客户端连续发送N条Ping消息都没有接收到服务端返回的Pong消息说明链路已经挂死或者对方处于异常状态客户端主动关闭连接间隔周期T后发起重连操作直到重连成功。 消息定义 Netty协议栈消息定义包含两部分 消息头消息体。 Netty协议支持的字段类型 Netty协议的编解码规范 1.Netty协议的编码 Netty协议NettyMessage的编码规范如下: 1crcCodejava.nio.ByteBuffer.putInt(int value)如果采用其他缓冲区实现必须与其等价 2lengthjava.nio.ByteBuffer.putInt(int value)如果采用其他缓冲区实现必须与其等价 3sessionIDjava.nio.ByteBuffer.putLong(long value)如果采用其他缓冲区实现必须与其等价 4type: java.nio.ByteBuffer.put(byte b)如果采用其他缓冲区实现必须与其等价 5priorityjava.nio.ByteBuffer.put(byte b)如果采用其他缓冲区实现必须与其等价 6attachment它的编码规则为——如果attachment长度为0表示没有可选附件则将长度编码设为0java.nio.ByteBuffer.putInt(0)如果大于0说明有附件需要编码具体的编码规则如下:首先对附件的个数进行编码java.nio.ByteBuffer.putInt(attachment.size())然后对Key进行编码再将它转换成byte数组之后编码内容. 7body的编码通过JBoss Marshalling将其序列化为byte数组然后调用java.nio.ByteBuffer.put(byte [] src)将其写入ByteBuffer缓冲区中。 由于整个消息的长度必须等全部字段都编码完成之后才能确认所以最后需要更新消息头中的length字段将其重新写入ByteBuffer中。 2.Netty协议的解码 相对于NettyMessage的编码仍旧以java.nio.ByteBuffer为例给出Netty协议的解码规范。 1crcCode通过java.nio.ByteBuffer.getInt()获取校验码字段其他缓冲区需要与其等价 2length通过java.nio.ByteBuffer.getInt()获取Netty消息的长度其他缓冲区需要与其等价 3sessionID通过java.nio.ByteBuffer.getLong()获取会话ID其他缓冲区需要与其等价 4type通过java.nio.ByteBuffer.get()获取消息类型其他缓冲区需要与其等价 5priority通过java.nio.ByteBuffer.get()获取消息优先级其他缓冲区需要与其等价 6attachment它的解码规则为——首先创建一个新的attachment对象调用java.nio.ByteBuffer.getInt()获取附件的长度如果为0说明附件为空解码结束继续解消息体如果非空则根据长度通过for循环进行解码。 7body通过JBoss的marshaller对其进行解码。 链路的建立 Netty协议栈支持服务端和客户端对于使用Netty协议栈的应用程序而言不需要刻意区分到底是客户端还是服务端在分布式组网环境中一个节点可能既是服务端也是客户端这个依据具体的用户场景而定。 Netty协议栈对客户端的说明如下如果A节点需要调用B节点的服务但是A和B之间还没有建立物理链路则由调用方主动发起连接此时调用方为客户端被调用方为服务端。 考虑到安全链路建立需要通过基于IP地址或者号段的黑白名单安全认证机制作为样例本协议使用基于IP地址的安全认证如果有多个IP通过逗号进行分割。在实际商用项目中安全认证机制会更加严格例如通过密钥对用户名和密码进行安全认证。 客户端与服务端链路建立成功之后由客户端发送握手请求消息握手请求消息的定义如下。 1消息头的type字段值为3 2可选附件为个数为0 3消息体为空 4握手消息的长度为22个字节。 服务端接收到客户端的握手请求消息之后如果IP校验通过返回握手成功应答消息给客户端应用层链路建立成功。握手应答消息定义如下。 1消息头的type字段值为4 2可选附件个数为0 3消息体为byte类型的结果0认证成功-1认证失败。 链路建立成功之后客户端和服务端就可以互相发送业务消息了。 链路的关闭 由于采用长连接通信在正常的业务运行期间双方通过心跳和业务消息维持链路任何一方都不需要主动关闭连接。 但是在以下情况下客户端和服务端需要关闭连接。 1当对方宕机或者重启时会主动关闭链路另一方读取到操作系统的通知信号得知对方REST链路需要关闭连接释放自身的句柄等资源。由于采用TCP全双工通信通信双方都需要关闭连接释放资源 2消息读写过程中发生了I/O异常需要主动关闭连接 3心跳消息读写过程中发生了I/O异常需要主动关闭连接 4心跳超时需要主动关闭连接 5发生编码异常等不可恢复错误时需要主动关闭连接。 可靠性设计 Netty协议栈可能会运行在非常恶劣的网络环境中网络超时、闪断、对方进程僵死或者处理缓慢等情况都有可能发生。为了保证在这些极端异常场景下Netty协议栈仍能够正常工作或者自动恢复需要对它的可靠性进行统一规划和设计。 1.心跳机制 在凌晨等业务低谷期时段如果发生网络闪断、连接被Hang住等网络问题时由于没有业务消息应用进程很难发现。到了白天业务高峰期时会发生大量的网络通信失败严重的会导致一段时间进程内无法处理业务消息。为了解决这个问题在网络空闲时采用心跳机制来检测链路的互通性一旦发现网络故障立即关闭链路主动重连。 具体的设计思路如下。 1当网络处于空闲状态持续时间达到T连续周期T没有读写消息时客户端主动发送Ping心跳消息给服务端 2如果在下一个周期T到来时客户端没有收到对方发送的Pong心跳应答消息或者读取到服务端发送的其他业务消息则心跳失败计数器加1 3每当客户端接收到服务的业务消息或者Pong应答消息将心跳失败计数器清零当连续N次没有接收到服务端的Pong消息或者业务消息则关闭链路间隔INTERVAL时间后发起重连操作 4服务端网络空闲状态持续时间达到T后服务端将心跳失败计数器加1只要接收到客户端发送的Ping消息或者其他业务消息计数器清零 5服务端连续N次没有接收到客户端的Ping消息或者其他业务消息则关闭链路释放资源等待客户端重连。 通过Ping-Pong双向心跳机制可以保证无论通信哪一方出现网络故障都能被及时地检测出来。为了防止由于对方短时间内繁忙没有及时返回应答造成的误判只有连续N次心跳检测都失败才认定链路已经损害需要关闭链路并重建链路。 当读或者写心跳消息发生I/O异常的时候说明链路已经中断此时需要立即关闭链路如果是客户端需要重新发起连接。如果是服务端需要清空缓存的半包信息等待客户端重连。 2.重连机制 如果链路中断等待INTERVAL时间后由客户端发起重连操作如果重连失败间隔周期INTERVAL后再次发起重连直到重连成功。 为了保证服务端能够有充足的时间释放句柄资源在首次断连时客户端需要等待INTERVAL时间之后再发起重连而不是失败后就立即重连。 为了保证句柄资源能够及时释放无论什么场景下的重连失败客户端都必须保证自身的资源被及时释放包括但不限于SocketChannel、Socket等。 重连失败后需要打印异常堆栈信息方便后续的问题定位。 3.重复登录保护 当客户端握手成功之后在链路处于正常状态下不允许客户端重复登录以防止客户端在异常状态下反复重连导致句柄资源被耗尽。 服务端接收到客户端的握手请求消息之后首先对IP地址进行合法性检验如果校验成功在缓存的地址表中查看客户端是否已经登录如果已经登录则拒绝重复登录返回错误码-1同时关闭TCP链路并在服务端的日志中打印握手失败的原因。 客户端接收到握手失败的应答消息之后关闭客户端的TCP连接等待INTERVAL时间之后再次发起TCP连接直到认证成功。 为了防止由服务端和客户端对链路状态理解不一致导致的客户端无法握手成功的问题当服务端连续N次心跳超时之后需要主动关闭链路清空该客户端的地址缓存信息以保证后续该客户端可以重连成功防止被重复登录保护机制拒绝掉。 4.消息缓存重发 无论客户端还是服务端当发生链路中断之后在链路恢复之前缓存在消息队列中待发送的消息不能丢失等链路恢复之后重新发送这些消息保证链路中断期间消息不丢失。 考虑到内存溢出的风险建议消息缓存队列设置上限当达到上限之后应该拒绝继续向该队列添加新的消息。 安全性设计 为了保证整个集群环境的安全内部长连接采用基于IP地址的安全认证机制服务端对握手请求消息的IP地址进行合法性校验如果在白名单之内则校验通过否则拒绝对方连接。 如果将Netty协议栈放到公网中使用需要采用更加严格的安全认证机制例如基于密钥和AES加密的用户名密码认证机制也可以采用SSL/TSL安全传输。 作为示例程序Netty协议栈采用最简单的基于IP地址的白名单安全认证机制。 可扩展性设计 Netty协议需要具备一定的扩展能力业务可以在消息头中自定义业务域字段例如消息流水号、业务自定义消息头等。通过Netty消息头中的可选附件attachment字段业务可以方便地进行自定义扩展。 Netty协议栈架构需要具备一定的扩展能力例如统一的消息拦截、接口日志、安全、加解密等可以被方便地添加和删除不需要修改之前的逻辑代码类似Servlet的FilterChain和AOP但考虑到性能因素不推荐通过AOP来实现功能的扩展。 Netty协议栈开发 数据结构定义 import lombok.Data;Data
public final class NettyMessage {private Header header; //消息头private Object body;//消息体
Overridepublic String toString() {return NettyMessage [header header ];}
}import java.util.HashMap;
import java.util.Map;
Data
public final class Header {private int crcCode 0xabef0101;private int length;// 消息长度private long sessionID;// 会话IDprivate byte type;// 消息类型private byte priority;// 消息优先级private Map attachment new HashMap(); // 附件
Overridepublic String toString() {return Header [crcCode crcCode , length length , sessionID sessionID , type type , priority priority , attachment attachment ];}
} 消息编解码 import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.*;import java.io.IOException;public class MarshallingDecoder {private final Unmarshaller unmarshaller;public MarshallingDecoder() throws IOException {final MarshallerFactory marshallerFactory Marshalling.getProvidedMarshallerFactory(serial);final MarshallingConfiguration configuration new MarshallingConfiguration();configuration.setVersion(5);unmarshaller marshallerFactory.createUnmarshaller(configuration);}protected Object decode(ByteBuf in) throws Exception {int objectSize in.readInt();ByteBuf buf in.slice(in.readerIndex(), objectSize);ByteInput input new ChannelBufferByteInput(buf);try {unmarshaller.start(input);Object obj unmarshaller.readObject();unmarshaller.finish();in.readerIndex(in.readerIndex() objectSize);return obj;} finally {unmarshaller.close();}}
}
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.Marshaller;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;import java.io.IOException;public class MarshallingEncoder {private static final byte[] LENGTH_PLACEHOLDER new byte[4];Marshaller marshaller;public MarshallingEncoder() throws IOException {final MarshallerFactory marshallerFactory Marshalling.getProvidedMarshallerFactory(serial);final MarshallingConfiguration configuration new MarshallingConfiguration();configuration.setVersion(5);marshaller marshallerFactory.createMarshaller(configuration);}protected void encode(Object msg, ByteBuf out) throws Exception {try {int lengthPos out.writerIndex();out.writeBytes(LENGTH_PLACEHOLDER);ChannelBufferByteOutput output new ChannelBufferByteOutput(out);marshaller.start(output);marshaller.writeObject(msg);marshaller.finish();out.setInt(lengthPos, out.writerIndex() - lengthPos - 4);} finally {marshaller.close();}}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;//Netty的LengthFieldBasedFrameDecoder解码器它支持自动的TCP粘包和半包处理
//只需要给出标识消息长度的字段偏移量和消息长度自身所占的字节数Netty就能自动实现对半包的处理。
public class NettyMessageDecoder extends LengthFieldBasedFrameDecoder {MarshallingDecoder marshallingDecoder;public NettyMessageDecoder(int maxFrameLength, int lengthFieldOffset,int lengthFieldLength) throws IOException {super(maxFrameLength, lengthFieldOffset, lengthFieldLength,-8,0);marshallingDecoder new MarshallingDecoder();}Overrideprotected Object decode(ChannelHandlerContext ctx, ByteBuf in)throws Exception {//对于业务解码器来说调用父类LengthFieldBasedFrameDecoder的解码方法后返回的就是整包消息或者为空//如果为空说明是个半包消息直接返回继续由I/O线程读取后续的码流ByteBuf frame (ByteBuf) super.decode(ctx, in);if (frame null) {return null;}int pre in.readerIndex();in.readerIndex(0);NettyMessage message new NettyMessage();Header header new Header();header.setCrcCode(in.readInt());header.setLength(in.readInt());header.setSessionID(in.readLong());header.setType(in.readByte());header.setPriority(in.readByte());int size in.readInt();if (size 0) {MapString,Object attch new HashMapString,Object(size);int keySize 0;byte[] keyArray null;String key null;for (int i 0; i size; i) {keySize in.readInt();keyArray new byte[keySize];in.readBytes(keyArray);key new String(keyArray, UTF-8);attch.put(key, marshallingDecoder.decode(in));}keyArray null;key null;header.setAttachment(attch);}if (in.readableBytes() 4) {message.setBody(marshallingDecoder.decode(in));}in.readerIndex(pre);message.setHeader(header);return message;}
}
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;import java.io.IOException;
import java.util.List;
import java.util.Map;public final class NettyMessageEncoder extends MessageToMessageEncoder {MarshallingEncoder marshallingEncoder;public NettyMessageEncoder() throws IOException {this.marshallingEncoder new MarshallingEncoder();}Overrideprotected void encode(ChannelHandlerContext ctx, Object o, List out) throws Exception {NettyMessage msg (NettyMessage) o;if (msg null || msg.getHeader() null) {throw new Exception(The encode message is null);}ByteBuf sendBuf Unpooled.buffer();sendBuf.writeInt((msg.getHeader().getCrcCode()));sendBuf.writeInt((msg.getHeader().getLength()));sendBuf.writeLong((msg.getHeader().getSessionID()));sendBuf.writeByte((msg.getHeader().getType()));sendBuf.writeByte((msg.getHeader().getPriority()));sendBuf.writeInt((msg.getHeader().getAttachment().size()));String key null;byte[] keyArray null;Object value null;for (Map.Entry param : msg.getHeader().getAttachment().entrySet()) {key (String) param.getKey();keyArray key.getBytes(UTF-8);sendBuf.writeInt(keyArray.length);sendBuf.writeBytes(keyArray);value param.getValue();marshallingEncoder.encode(value, sendBuf);}key null;keyArray null;value null;if (msg.getBody() ! null) {marshallingEncoder.encode(msg.getBody(), sendBuf);} else {sendBuf.writeInt(0);}sendBuf.setInt(4, sendBuf.readableBytes());out.add(sendBuf);}
} 握手和安全认证 import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;public class LoginAuthReqHandler extends ChannelHandlerAdapter {Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//当客户端跟服务端TCP三次握手成功之后由客户端构造握手请求消息发送给服务端ctx.writeAndFlush(buildLoginReq());}// 握手请求发送之后按照协议规范服务端需要返回握手应答消息。Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {NettyMessage message (NettyMessage) msg;// 如果是握手应答消息需要判断是否认证成功//对握手应答消息进行处理首先判断消息是否是握手应答消息if (message.getHeader() ! null message.getHeader().getType() MessageType.LOGIN_RESP.value()) {byte loginResult (Byte) message.getBody();if (loginResult ! (byte) 0) {// 如果是握手应答消息则对应答结果进行判断如果非0说明认证失败关闭链路重新发起连接。// 握手失败关闭连接ctx.close();} else {System.out.println(Login is ok : message);ctx.fireChannelRead(msg);}} else {// 如果不是直接透传给后面的ChannelHandler进行处理ctx.fireChannelRead(msg);}}private NettyMessage buildLoginReq() {// 由于采用IP白名单认证机制因此不需要携带消息体消息体为空消息类型为3握手请求消息。NettyMessage message new NettyMessage();Header header new Header();header.setType(MessageType.LOGIN_REQ.value());message.setHeader(header);return message;}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.fireExceptionCaught(cause);}
}import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class LoginAuthRespHandler extends ChannelHandlerAdapter {private Map nodeCheck new ConcurrentHashMap();private String[] whitekList {127.0.0.1,10.100.1.122};Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {NettyMessage message (NettyMessage) msg;// 如果是握手请求消息处理其他消息透传if (message.getHeader() ! null message.getHeader().getType() MessageType.LOGIN_REQ.value()) {String nodeIndex ctx.channel().remoteAddress().toString();NettyMessage loginResp null;// 重复登录拒绝// 重复登录保护if (nodeCheck.containsKey(nodeIndex)) {loginResp buildResponse((byte) -1);} else {//IP认证白名单列表InetSocketAddress address (InetSocketAddress) ctx.channel().remoteAddress();String ip address.getAddress().getHostAddress();boolean isOK false;for (String WIP : whitekList) {if (WIP.equals(ip)) {isOK true;break;}}//通过buildResponse构造握手应答消息返回给客户端loginResp isOK ? buildResponse((byte) 0) : buildResponse((byte) -1);if (isOK) {nodeCheck.put(nodeIndex, true);}}System.out.println(The login response is : loginResp body [ loginResp.getBody() ]);ctx.writeAndFlush(loginResp);} else {ctx.fireChannelRead(msg);}}private NettyMessage buildResponse(byte result) {NettyMessage message new NettyMessage();Header header new Header();header.setType(MessageType.LOGIN_RESP.value());message.setHeader(header);message.setBody(result);return message;}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)throws Exception {//当发生异常关闭链路的时候需要将客户端的信息从登录注册表中去注册以保证后续客户端可以重连成功。nodeCheck.remove(ctx.channel().remoteAddress().toString());//删除缓存ctx.close();ctx.fireExceptionCaught(cause);}
} 心跳检测机制 import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;public class HeartBeatReqHandler extends ChannelHandlerAdapter {private volatile ScheduledFuture heartBeat;Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {NettyMessage message (NettyMessage) msg;// 握手成功主动发送心跳消息//HeartBeatReqHandler接收到之后对消息进行判断if (message.getHeader() ! null message.getHeader().getType() MessageType.LOGIN_RESP.value()) {//当握手成功之后握手请求Handler会继续将握手成功消息向下透传//如果是握手成功消息则启动无限循环定时器用于定期发送心跳消息。//由于NioEventLoop是一个schedule因此它支持定时器的执行。// 心跳定时器的单位是毫秒默认为5000即每5秒发送一条心跳消息。heartBeat ctx.executor().scheduleAtFixedRate(new HeartBeatReqHandler.HeartBeatTask(ctx), 0, 5000,TimeUnit.MILLISECONDS);} else if (message.getHeader() ! null message.getHeader().getType() MessageType.HEARTBEAT_RESP.value()) {//接收服务端发送的心跳应答消息并打印客户端接收和发送的心跳消息。System.out.println(Client receive server heart beat message : --- message);} else {//当握手成功之后握手请求Handler会继续将握手成功消息向下透传ctx.fireChannelRead(msg);}}private class HeartBeatTask implements Runnable {private final ChannelHandlerContext ctx;public HeartBeatTask(final ChannelHandlerContext ctx) {this.ctx ctx;}Overridepublic void run() {NettyMessage heatBeat buildHeatBeat();System.out.println(Client send heart beat messsage to server : --- heatBeat);ctx.writeAndFlush(heatBeat);}private NettyMessage buildHeatBeat() {NettyMessage message new NettyMessage();Header header new Header();header.setType(MessageType.HEARTBEAT_REQ.value());message.setHeader(header);return message;}}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (heartBeat ! null) {heartBeat.cancel(true);heartBeat null;}ctx.fireExceptionCaught(cause);}
}
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;public class HeartBeatRespHandler extends ChannelHandlerAdapter {Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg)throws Exception {NettyMessage message (NettyMessage) msg;// 返回心跳应答消息if (message.getHeader() ! null message.getHeader().getType() MessageType.HEARTBEAT_REQ.value()) {System.out.println(Receive client heart beat message : --- message);NettyMessage heartBeat buildHeatBeat();System.out.println(Send heart beat response message to client : --- heartBeat);ctx.writeAndFlush(heartBeat);} else {ctx.fireChannelRead(msg);}}private NettyMessage buildHeatBeat() {NettyMessage message new NettyMessage();Header header new Header();header.setType(MessageType.HEARTBEAT_RESP.value());message.setHeader(header);return message;}
} 基础类 public enum MessageType {LOGIN_REQ((byte)3),LOGIN_RESP((byte)4),HEARTBEAT_REQ((byte)5),HEARTBEAT_RESP((byte)6),;public byte value;MessageType(byte v){this.value v;}public byte value(){return value;}}
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteInput;import java.io.IOException;public class ChannelBufferByteInput implements ByteInput {private final ByteBuf buffer;ChannelBufferByteInput(ByteBuf buffer) {this.buffer buffer;}Overridepublic void close() throws IOException {}Overridepublic int available() throws IOException {return buffer.readableBytes();}Overridepublic int read() throws IOException {if (buffer.isReadable()) {return buffer.readByte() 0xff;}return -1;}Overridepublic int read(byte[] array) throws IOException {return read(array, 0, array.length);}Overridepublic int read(byte[] dst, int dstIndex, int length) throws IOException {int available available();if (available 0) {return -1;}length Math.min(available, length);buffer.readBytes(dst, dstIndex, length);return length;}Overridepublic long skip(long bytes) throws IOException {int readable buffer.readableBytes();if (readable bytes) {bytes readable;}buffer.readerIndex((int) (buffer.readerIndex() bytes));return bytes;}}
import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;import java.io.IOException;public class ChannelBufferByteOutput implements ByteOutput {private final ByteBuf buffer;ChannelBufferByteOutput(ByteBuf buffer) {this.buffer buffer;}Overridepublic void close() throws IOException {// Nothing to do}Overridepublic void flush() throws IOException {// nothing to do}Overridepublic void write(int b) throws IOException {buffer.writeByte(b);}Overridepublic void write(byte[] bytes) throws IOException {buffer.writeBytes(bytes);}Overridepublic void write(byte[] bytes, int srcIndex, int length) throws IOException {buffer.writeBytes(bytes, srcIndex, length);}/*** Return the {link ByteBuf} which contains the written content**/ByteBuf getBuffer() {return buffer;}
}import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;import java.io.IOException;public class ChannelBufferByteOutput implements ByteOutput {private final ByteBuf buffer;ChannelBufferByteOutput(ByteBuf buffer) {this.buffer buffer;}Overridepublic void close() throws IOException {// Nothing to do}Overridepublic void flush() throws IOException {// nothing to do}Overridepublic void write(int b) throws IOException {buffer.writeByte(b);}Overridepublic void write(byte[] bytes) throws IOException {buffer.writeBytes(bytes);}Overridepublic void write(byte[] bytes, int srcIndex, int length) throws IOException {buffer.writeBytes(bytes, srcIndex, length);}/*** Return the {link ByteBuf} which contains the written content**/ByteBuf getBuffer() {return buffer;}
} 客户端 import io.netty.buffer.ByteBuf;
import org.jboss.marshalling.ByteOutput;import java.io.IOException;public class ChannelBufferByteOutput implements ByteOutput {private final ByteBuf buffer;ChannelBufferByteOutput(ByteBuf buffer) {this.buffer buffer;}Overridepublic void close() throws IOException {// Nothing to do}Overridepublic void flush() throws IOException {// nothing to do}Overridepublic void write(int b) throws IOException {buffer.writeByte(b);}Overridepublic void write(byte[] bytes) throws IOException {buffer.writeBytes(bytes);}Overridepublic void write(byte[] bytes, int srcIndex, int length) throws IOException {buffer.writeBytes(bytes, srcIndex, length);}/*** Return the {link ByteBuf} which contains the written content**/ByteBuf getBuffer() {return buffer;}
}public class NettyConstant {public static String LOCALIP 127.0.0.1;public static String REMOTEIP 127.0.0.1;public static Integer LOCAL_PORT 8085;public static Integer PORT 9099;
} 服务端 import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;import java.io.IOException;public class NettyServer {public void bind() throws Exception {// 配置服务端的NIO线程组EventLoopGroup bossGroup new NioEventLoopGroup();EventLoopGroup workerGroup new NioEventLoopGroup();ServerBootstrap b new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer() {Overridepublic void initChannel(Channel ch)throws IOException{ch.pipeline().addLast(new NettyMessageDecoder(1024 * 1024, 4, 4));ch.pipeline().addLast(new NettyMessageEncoder());ch.pipeline().addLast(readTimeoutHandler,new ReadTimeoutHandler(50));ch.pipeline().addLast(new LoginAuthRespHandler());ch.pipeline().addLast(HeartBeatHandler,new HeartBeatRespHandler());}});// 绑定端口同步等待成功b.bind(NettyConstant.REMOTEIP, NettyConstant.PORT).sync();System.out.println(Netty server start ok : (NettyConstant.REMOTEIP : NettyConstant.PORT));}public static void main(String[] args) throws Exception {new NettyServer().bind();}
} 测试结果 服务端 12:30:32.998 [nioEventLoopGroup-2-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x4d893ed4, /127.0.0.1:9099] RECEIVED: [id: 0x343516a3, /127.0.0.1:8085 /127.0.0.1:9099]
12:30:33.205 [nioEventLoopGroup-3-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetectionLevel: simple
The login response is : NettyMessage [headerHeader [crcCode-1410399999, length0, sessionID0, type4, priority0, attachment{}]] body [0]
Receive client heart beat message : --- NettyMessage [headerHeader [crcCode-1410399999, length26, sessionID0, type5, priority0, attachment{}]]
Send heart beat response message to client : --- NettyMessage [headerHeader [crcCode-1410399999, length0, sessionID0, type6, priority0, attachment{}]]
Receive client heart beat message : --- NettyMessage [headerHeader [crcCode-1410399999, length26, sessionID0, type5, priority0, attachment{}]]
Send heart beat response message to client : --- NettyMessage [headerHeader [crcCode-1410399999, length0, sessionID0, type6, priority0, attachment{}]]
Receive client heart beat message : --- NettyMessage [headerHeader [crcCode-1410399999, length26, sessionID0, type5, priority0, attachment{}]]
Send heart beat response message to client : --- NettyMessage [headerHeader [crcCode-1410399999, length0, sessionID0, type6, priority0, attachment{}]] 客户端 12:30:33.152 [nioEventLoopGroup-2-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetectionLevel: simple
Login is ok : NettyMessage [headerHeader [crcCode-1410399999, length101, sessionID0, type4, priority0, attachment{}]]
Client send heart beat messsage to server : --- NettyMessage [headerHeader [crcCode-1410399999, length0, sessionID0, type5, priority0, attachment{}]]
Client receive server heart beat message : --- NettyMessage [headerHeader [crcCode-1410399999, length26, sessionID0, type6, priority0, attachment{}]]
Client send heart beat messsage to server : --- NettyMessage [headerHeader [crcCode-1410399999, length0, sessionID0, type5, priority0, attachment{}]]
Client receive server heart beat message : --- NettyMessage [headerHeader [crcCode-1410399999, length26, sessionID0, type6, priority0, attachment{}]]
Client send heart beat messsage to server : --- NettyMessage [headerHeader [crcCode-1410399999, length0, sessionID0, type5, priority0, attachment{}]]
Client receive server heart beat message : --- NettyMessage [headerHeader [crcCode-1410399999, length26, sessionID0, type6, priority0, attachment{}]]
Client send heart beat messsage to server : --- NettyMessage [headerHeader [crcCode-1410399999, length0, sessionID0, type5, priority0, attachment{}]] 转载于:https://www.cnblogs.com/wade-luffy/p/6183806.html