怎样简单做网站,网站开发入门书籍,机械设计制造及其自动化圳建设网站,网站建站程序前言
我们直到在网络通信中客户端和服务端之间除了要传输数据外#xff0c;还会进行简单的心跳应答通信#xff0c;使得客户端和服务端的连接处于一种活跃状态#xff0c;那么客户端可以发送ONE_WAY和TWO_WAY两种方式的处理#xff0c;而服务端在处理这两种类型的数据时会…前言
我们直到在网络通信中客户端和服务端之间除了要传输数据外还会进行简单的心跳应答通信使得客户端和服务端的连接处于一种活跃状态那么客户端可以发送ONE_WAY和TWO_WAY两种方式的处理而服务端在处理这两种类型的数据时会做出不同的应答对于ONE_WAY形式的应答有可能会交由异步线程池来执行而对于TWO_WAY形式的消息则是立刻做出回应除了这些还会牵扯到序列化和反序列化、数据加密验证的问题因为网络通信中数据是二进制流的形式传输的这其中会牵扯到粘包/半包的问题以及序列化和反序列性能问题。在解决这些基本组件之后服务端还可以对于客户端进行认证不在白名单的客户端不接受连接。再者就是针对客户端的输入服务端做出不同的应答。接着给大家展示一个实例希望可以帮到大家. 先导入以下maven依赖 dependenciesdependencygroupIdio.netty/groupIdartifactIdnetty-all/artifactIdversion4.1.42.Final/version/dependencydependencygroupIdorg.msgpack/groupIdartifactIdmsgpack/artifactIdversion0.6.12/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.30/versionscopecompile/scope/dependencydependencygroupIdch.qos.logback/groupIdartifactIdlogback-core/artifactIdversion1.2.4/version/dependencydependencygroupIdch.qos.logback/groupIdartifactIdlogback-classic/artifactIdversion1.2.4/version/dependencydependencygroupIdcom.itextpdf/groupIdartifactIditextpdf/artifactIdversion5.5.8/version/dependencydependencygroupIdorg.bouncycastle/groupIdartifactIdbcprov-jdk15on/artifactIdversion1.49/versiontypejar/typescopecompile/scopeoptionaltrue/optional/dependencydependencygroupIdorg.bouncycastle/groupIdartifactIdbcpkix-jdk15on/artifactIdversion1.49/versiontypejar/typescopecompile/scopeoptionaltrue/optional/dependencydependencygroupIdde.javakaffee/groupIdartifactIdkryo-serializers/artifactIdversion0.42/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.13.1/versionscopetest/scope/dependency/dependencies1.Server
package adv;import adv.server.ServerInit;
import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.NettyRuntime;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class NettyServer {private static final Logger LOG LoggerFactory.getLogger(NettyServer.class);public void bind() throws InterruptedException {// 配置服务端的NIO线程组EventLoopGroup bossGroup new NioEventLoopGroup(1,new DefaultThreadFactory(boss));EventLoopGroup workerGroup new NioEventLoopGroup(NettyRuntime.availableProcessors(),new DefaultThreadFactory(nt_worker));ServerBootstrap b new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ServerInit());// 绑定端口同步等待成功ChannelFuture channelFuture b.bind(Constant.DEFAULT_PORT).sync();channelFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {System.out.println(绑定成功。。。。 future.toString());}});LOG.info(Netty server start : Constant.DEFAULT_SERVER_IP : Constant.DEFAULT_PORT);}public static void main(String[] args) throws InterruptedException {new NettyServer().bind();}}
2.ServerPipeline
package adv.server;import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import adv.server.asyncpro.DefaultTaskProcessor;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.timeout.ReadTimeoutHandler;public class ServerInit extends ChannelInitializerSocketChannel {Overrideprotected void initChannel(SocketChannel ch) throws Exception {// 粘包半包问题ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0, 2,0,2));ch.pipeline().addLast(new LengthFieldPrepender(2));// 序列化相关ch.pipeline().addLast(new KryoDecoder());ch.pipeline().addLast(new KryoEncoder());// 处理心跳超时ch.pipeline().addLast(new ReadTimeoutHandler(15));ch.pipeline().addLast(new LoginAuthRespHandler());ch.pipeline().addLast(new HeartBeatRespHandler());ch.pipeline().addLast(new ServerBusinessHandler(new DefaultTaskProcessor()));}
}
3.服务端业务处理
package adv.server;import adv.server.asyncpro.AsyncBusinessProcess;
import adv.server.asyncpro.ITaskProcessor;
import adv.vo.EncryptUtils;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ServerBusinessHandler extends SimpleChannelInboundHandlerMyMessage {private static final Logger LOG LoggerFactory.getLogger(ServerBusinessHandler.class);private ITaskProcessor taskProcessor;public ServerBusinessHandler(ITaskProcessor taskProcessor) {super();this.taskProcessor taskProcessor;}Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {// 检查MD5String headMd5 msg.getMsgHeader().getMd5();String calcMd5 EncryptUtils.encryptObj(msg.getBody());if (!headMd5.equals(calcMd5)) {LOG.error(报文MD5检查不通过: headMd5 vs calcMd5 , 关闭连接);ctx.writeAndFlush(buildBusinessResp(报文MD5检查不通过关闭连接));ctx.close();}LOG.info(msg.toString());if (msg.getMsgHeader().getType() MessageType.ONE_WAY.value()) {LOG.debug(ONE_WAY类型消息异步处理);AsyncBusinessProcess.submitTask(taskProcessor.execAsyncTask(msg));} else {LOG.debug(TWO_WAY类型消息应答);ctx.writeAndFlush(buildBusinessResp(OK));}}private MyMessage buildBusinessResp(String result) {MyMessage myMessage new MyMessage();MsgHeader msgHeader new MsgHeader();msgHeader.setType(MessageType.SERVICE_RESP.value());myMessage.setMsgHeader(msgHeader);myMessage.setBody(result);return myMessage;}
}
4.安全中心
package adv.server;import constant.Constant;import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;public class SecurityCenter {// 用以检查用户是否重复登录的缓存private static MapString, Boolean nodeCheck new ConcurrentHashMap();// 用户登录的白名单private static SetString whiteList new CopyOnWriteArraySet();static {whiteList.add(Constant.DEFAULT_SERVER_IP);}public static boolean isWhiteIP(String ip) {return whiteList.contains(ip);}public static boolean isDupLog(String usrInfo) {return nodeCheck.containsKey(usrInfo);}public static void addLoginUser(String usrInfo) {nodeCheck.put(usrInfo, true);}public static void removeLoginUser(String usrInfo) {nodeCheck.remove(usrInfo, true);}
}
5.登录认证
package adv.server;import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;public class LoginAuthRespHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG LoggerFactory.getLogger(LoginAuthRespHandler.class);Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);MyMessage message (MyMessage) msg;// 是不是握手认证请求if (message.getMsgHeader() ! null message.getMsgHeader().getType() MessageType.LOGIN_REQ.value()) {LOG.info(收到客户端认证请求 : message);String nodeIndex ctx.channel().remoteAddress().toString();MyMessage loginResp null;boolean checkAutuPass false;// 重复登录拒绝这里用客户端的地址代替了实际的用户信息if (SecurityCenter.isDupLog(nodeIndex)) {loginResp buildResponse((byte) -1);LOG.warn(拒绝重复登录, 应答消息: loginResp);ctx.writeAndFlush(loginResp);ctx.close();} else {// 检查用户是否在白名单中在则允许登录并写入缓存InetSocketAddress address (InetSocketAddress) ctx.channel().remoteAddress();String ip address.getAddress().getHostAddress();if (SecurityCenter.isWhiteIP(ip)) {SecurityCenter.addLoginUser(nodeIndex);loginResp buildResponse((byte) 0);LOG.info(认证通过应答消息: loginResp);ctx.writeAndFlush(loginResp);} else {loginResp buildResponse((byte) -1);LOG.warn(认证失败, 应答信息 : loginResp);ctx.writeAndFlush(loginResp);ctx.close();}}ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}private MyMessage buildResponse(byte result) {MyMessage myMessage new MyMessage();MsgHeader msgHeader new MsgHeader();msgHeader.setType(MessageType.LOGIN_RESP.value());myMessage.setMsgHeader(msgHeader);myMessage.setBody(result);return myMessage;}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}
}
6.心跳处理
package adv.server;import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class HeartBeatRespHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG LoggerFactory.getLogger(HeartBeatRespHandler.class);Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);MyMessage message (MyMessage)msg;// 是不是心跳请求if (message.getMsgHeader() ! null message.getMsgHeader().getType() MessageType.HEARTBEAT_REQ.value()) {// 心跳应答报文MyMessage heartBeatResp buildHeartBeat();LOG.debug(心跳应答: heartBeatResp);ctx.writeAndFlush(heartBeatResp);ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}private MyMessage buildHeartBeat() {MyMessage myMessage new MyMessage();MsgHeader msgHeader new MsgHeader();msgHeader.setType(MessageType.HEARTBEAT_RESP.value());myMessage.setMsgHeader(msgHeader);return myMessage;}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof ReadTimeoutException) {LOG.warn(客户端长时间未通信可能已经宕机关闭链路);SecurityCenter.removeLoginUser(ctx.channel().remoteAddress().toString());ctx.close();}super.exceptionCaught(ctx, cause);}
}
7.ONE_WAY/TWO_WAY处理
package adv.server.asyncpro;import io.netty.util.NettyRuntime;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class AsyncBusinessProcess {private static ExecutorService executorService new ThreadPoolExecutor(1,NettyRuntime.availableProcessors(),60, TimeUnit.SECONDS, new ArrayBlockingQueue(3000));public static void submitTask(Runnable task) {executorService.execute(task);}
}
package adv.server.asyncpro;import adv.vo.MyMessage;// 消息转任务处理器
public interface ITaskProcessor {Runnable execAsyncTask(MyMessage msg);
}
package adv.server.asyncpro;import adv.vo.MyMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class DefaultTaskProcessor implements ITaskProcessor{private static final Logger LOG LoggerFactory.getLogger(DefaultTaskProcessor.class);Overridepublic Runnable execAsyncTask(MyMessage msg) {Runnable task new Runnable() {Overridepublic void run() {LOG.info(DefaultTaskProcessor模拟任务处理: msg.getBody());}};return task;}
}
8.实体类以及加密
package adv.vo;public class MyMessage {private MsgHeader msgHeader;private Object body;public MsgHeader getMsgHeader() {return msgHeader;}public void setMsgHeader(MsgHeader msgHeader) {this.msgHeader msgHeader;}public Object getBody() {return body;}public void setBody(Object body) {this.body body;}Overridepublic String toString() {return MyMessage{ msgHeader msgHeader , body body };}
}
package adv.vo;import java.util.HashMap;
import java.util.Map;// 消息头
public class MsgHeader {// 消息体的MD5摘要private String md5;// 消息ID因为是同步处理模式不考虑应答消息需要填入请求消息IDprivate long msgId;// 消息类型private byte type;// 消息优先级private byte priority;private MapString, Object attachment new HashMap();public String getMd5() {return md5;}public void setMd5(String md5) {this.md5 md5;}public long getMsgId() {return msgId;}public void setMsgId(long msgId) {this.msgId msgId;}public byte getType() {return type;}public void setType(byte type) {this.type type;}public byte getPriority() {return priority;}public void setPriority(byte priority) {this.priority priority;}public MapString, Object getAttachment() {return attachment;}public void setAttachment(MapString, Object attachment) {this.attachment attachment;}Overridepublic String toString() {return MsgHeader{ md5 md5 \ , msgId msgId , type type , priority priority , attachment attachment };}
}
package adv.vo;public enum MessageType {SERVICE_REQ((byte) 0), // 业务请求消息SERVICE_RESP((byte) 1), // TWO_WAY消息需要业务应答ONE_WAY((byte) 2), // 无需应答的业务请求消息LOGIN_REQ((byte) 3), // 登录请求消息LOGIN_RESP((byte) 4), // 登录响应消息HEARTBEAT_REQ((byte) 5), // 心跳请求消息HEARTBEAT_RESP((byte) 6), // 心跳应答消息;private byte value;MessageType (byte value) {this.value value;}public byte value() {return this.value;}
}
package adv.vo;import java.util.concurrent.atomic.AtomicLong;public class MakeMsgId {private static AtomicLong msgId new AtomicLong(1);public static long getID() {return msgId.getAndIncrement();}
}
package adv.vo;import adv.kryocodec.KryoSerializer;import java.security.MessageDigest;public class EncryptUtils {private static String EncryptStr(String strSrc, String encName) {MessageDigest md null;String strDes null;byte[] bt strSrc.getBytes();try {if (encName null || encName.equals()) {encName MD5;}md MessageDigest.getInstance(encName);md.update(bt);strDes bytes2Hex(md.digest());} catch (Exception e) {System.out.println(Invalid algorithm.);return null;}return strDes;}/*** MD5 摘要* param str 需要被摘要的字符串* return 对字符串str进行MD5摘要后,将摘要字符串返回*/public static String EncryptByMD5(String str) {return EncryptStr(str, MD5);}/*** SHA1摘要* param str 需要被摘要的字符串* return 对字符串str进行SHA-1摘要后将摘要字符串返回*/public static String EncryptBySHA1(String str) {return EncryptStr(str, SHA-1);}/*** SHA256摘要* param str 需要被摘要的字符串* return 对字符串str进行SHA-256摘要后将摘要字符串返回*/public static String EncryptBySHA256(String str) {return EncryptStr(str, SHA-256);}/*** 字节转十六进制结果以字符串形式呈现*/private static String bytes2Hex(byte[] bts) {String des ;String tmp null;for (int i 0; i bts.length; i) {tmp (Integer.toHexString(bts[i] 0xFF));if (tmp.length() 1) {des 0;}des tmp;}return des;}/*** 对字符串进行MD5加盐摘要 先将str进行一次MD5摘要摘要后再取摘要后的字符串的* 1,3,5个字符追加到摘要串再拿这个摘要串再次进行摘要*/private static String encrypt(String str) {String encryptStr EncryptByMD5(str);if (encryptStr ! null) {encryptStr encryptStr encryptStr.charAt(0) encryptStr.charAt(2) encryptStr.charAt(4);encryptStr EncryptByMD5(encryptStr);}return encryptStr;}public static String encryptObj(Object o) {return encrypt(bytes2Hex(KryoSerializer.obj2Bytes(o)));}}
9.Kryo序列化/反序列化
package adv.kryocodec;import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;import java.io.ByteArrayOutputStream;
import java.io.IOException;public class KryoSerializer {private static Kryo kryo KryoFactory.createKryo();// 序列化public static void serialize(Object object, ByteBuf out) {ByteArrayOutputStream baos new ByteArrayOutputStream();Output output new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}out.writeBytes(b);}// 序列化为一个字节数组主要用在消息摘要上public static byte[] obj2Bytes(Object object) {ByteArrayOutputStream baos new ByteArrayOutputStream();Output output new Output(baos);kryo.writeClassAndObject(output, object);output.flush();output.close();byte[] b baos.toByteArray();try {baos.flush();baos.close();} catch (IOException e) {e.printStackTrace();}return b;}public static Object deserialize(ByteBuf out) {if (out null) {return null;}Input input new Input(new ByteBufInputStream(out));return kryo.readClassAndObject(input);}
}
package adv.kryocodec;import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.serializers.DefaultSerializers;
import de.javakaffee.kryoserializers.*;import java.lang.reflect.InvocationHandler;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;public class KryoFactory {public static Kryo createKryo() {Kryo kryo new Kryo();kryo.setRegistrationRequired(false);kryo.register(Arrays.asList().getClass(), new ArraysAsListSerializer());kryo.register(GregorianCalendar.class, new GregorianCalendarSerializer());kryo.register(InvocationHandler.class, new JdkProxySerializer());kryo.register(BigDecimal.class, new DefaultSerializers.BigDecimalSerializer());kryo.register(BigInteger.class, new DefaultSerializers.BigIntegerSerializer());kryo.register(Pattern.class, new RegexSerializer());kryo.register(BitSet.class, new BitSetSerializer());kryo.register(URI.class, new URISerializer());kryo.register(UUID.class, new UUIDSerializer());UnmodifiableCollectionsSerializer.registerSerializers(kryo);SynchronizedCollectionsSerializer.registerSerializers(kryo);kryo.register(HashMap.class);kryo.register(ArrayList.class);kryo.register(LinkedList.class);kryo.register(HashSet.class);kryo.register(TreeSet.class);kryo.register(Hashtable.class);kryo.register(Date.class);kryo.register(Calendar.class);kryo.register(ConcurrentHashMap.class);kryo.register(SimpleDateFormat.class);kryo.register(GregorianCalendar.class);kryo.register(Vector.class);kryo.register(BitSet.class);kryo.register(StringBuffer.class);kryo.register(StringBuilder.class);kryo.register(Object.class);kryo.register(Object[].class);kryo.register(String[].class);kryo.register(byte[].class);kryo.register(char[].class);kryo.register(int[].class);kryo.register(float[].class);kryo.register(double[].class);return kryo;}
}
package adv.kryocodec;import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;public class KryoEncoder extends MessageToByteEncoderMyMessage {Overrideprotected void encode(ChannelHandlerContext ctx, MyMessage msg, ByteBuf out) throws Exception {KryoSerializer.serialize(msg, out);ctx.flush();}
}
package adv.kryocodec;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;// 反序列化的Handler
public class KryoDecoder extends ByteToMessageDecoder {Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in,ListObject out) throws Exception {Object obj KryoSerializer.deserialize(in);out.add(obj);}
}
package adv.kryocodec;import adv.vo.EncryptUtils;
import adv.vo.MakeMsgId;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;import java.util.HashMap;
import java.util.Map;public class TestKryoCodeC {public MyMessage getMessage(int j) {
// String content abcdefg-----------AAAAAAAAAA j;String content abcdefg-----------AAAAAA j;MyMessage myMessage new MyMessage();MsgHeader msgHeader new MsgHeader();msgHeader.setMsgId(MakeMsgId.getID());msgHeader.setType((byte) 1);msgHeader.setPriority((byte) 7);msgHeader.setMd5(EncryptUtils.encryptObj(content));MapString, Object attachment new HashMap();for (int i 0; i 10; i) {attachment.put(city -- i, cover i);}msgHeader.setAttachment(attachment);myMessage.setMsgHeader(msgHeader);myMessage.setBody(content);return myMessage;}public static void main(String[] args) {TestKryoCodeC testC new TestKryoCodeC();for (int i 0; i 5; i) {ByteBuf sendBuf Unpooled.buffer();MyMessage message testC.getMessage(i);System.out.println(Encode: message);KryoSerializer.serialize(message, sendBuf);MyMessage decodeMsg (MyMessage)KryoSerializer.deserialize(sendBuf);System.out.println(Decode: decodeMsg);System.out.println(-----------------------------------------------);}}
}10.业务实体类
package adv.business;import serializable.msgpack.UserContact;public class User {private String id;private String userName;private int age;private UserContact userContact;public String getId() {return id;}public void setId(String id) {this.id id;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName userName;}public int getAge() {return age;}public void setAge(int age) {this.age age;}public UserContact getUserContact() {return userContact;}public void setUserContact(UserContact userContact) {this.userContact userContact;}public User() {}public User(String id, String userName, int age) {this.id id;this.userName userName;this.age age;}Overridepublic String toString() {return User{ id id \ , userName userName \ , age age , userContact userContact };}
}
package adv.business;public class UserContact {private String mail;private String phone;public UserContact() {}public UserContact(String mail, String phone) {this.mail mail;this.phone phone;}public String getMail() {return mail;}public void setMail(String mail) {this.mail mail;}public String getPhone() {return phone;}public void setPhone(String phone) {this.phone phone;}Overridepublic String toString() {return UserContact{ mail mail \ , phone phone \ };}
}
11.ClientPipeline()
package adv.client;import adv.kryocodec.KryoDecoder;
import adv.kryocodec.KryoEncoder;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;public class ClientInit extends ChannelInitializerSocketChannel {Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));// 连接写空闲检测ch.pipeline().addLast(new CheckWriteIdleHandler());// 粘包半包ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65535,0,2,0,2));ch.pipeline().addLast(new LengthFieldPrepender(2));// 序列化相关ch.pipeline().addLast(new KryoDecoder());ch.pipeline().addLast(new KryoEncoder());ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new LoginAuthReqHandler());// 连续读空闲检测ch.pipeline().addLast(new ReadTimeoutHandler(15));// 向服务器发出心跳请求ch.pipeline().addLast(new HeartBeatReqHandler());ch.pipeline().addLast(new ClientBusiHandler());}
}
12.客户端认证
package adv.client;import adv.NettyServer;
import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class LoginAuthReqHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG LoggerFactory.getLogger(LoginAuthReqHandler.class);Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//发出认证请求MyMessage loginMsg buildLoginReq();LOG.info(请求服务器认证: loginMsg);ctx.writeAndFlush(loginMsg);
// super.channelActive(ctx);}private MyMessage buildLoginReq() {MyMessage myMessage new MyMessage();MsgHeader msgHeader new MsgHeader();msgHeader.setType(MessageType.LOGIN_REQ.value());myMessage.setMsgHeader(msgHeader);return myMessage;}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);MyMessage message (MyMessage) msg;if (message.getMsgHeader() ! null message.getMsgHeader().getType() MessageType.LOGIN_RESP.value()) {LOG.info(收到认证应答报文服务器是否验证通过....);byte loginResult (byte) message.getBody();if (loginResult ! (byte)0) {// 握手成功关闭连接LOG.warn(未通过认证关闭连接: message);ctx.close();} else {LOG.info(通过认证 移除本处理器 进入业务通信 message);ctx.pipeline().remove(this);ReferenceCountUtil.release(msg);}} else {ctx.fireChannelRead(msg);}}
}13.客户端心跳
package adv.client;import adv.vo.MessageType;
import adv.vo.MsgHeader;
import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// 客户端在长久未向服务器业务请求时发出心跳请求报文
public class HeartBeatReqHandler extends ChannelInboundHandlerAdapter {private static final Logger LOG LoggerFactory.getLogger(LoginAuthReqHandler.class);Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT) {MyMessage heartBeat buildHeartBeat();LOG.debug(写空闲,发出心跳报文维持连接: heartBeat);ctx.writeAndFlush(heartBeat);}super.userEventTriggered(ctx, evt);}private MyMessage buildHeartBeat() {MyMessage myMessage new MyMessage();MsgHeader msgHeader new MsgHeader();msgHeader.setType(MessageType.HEARTBEAT_REQ.value());myMessage.setMsgHeader(msgHeader);return myMessage;}Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);MyMessage message (MyMessage)msg;if (message.getMsgHeader() ! null message.getMsgHeader().getType() MessageType.HEARTBEAT_RESP.value()) {LOG.debug(收到服务器心跳应答 服务器正常);ReferenceCountUtil.release(msg);} else {ctx.fireChannelRead(msg);}}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof ReadTimeoutException) {LOG.warn(服务器长时间未应答 关闭链路);}super.exceptionCaught(ctx, cause);}
}
14.客户端检查写空闲
package adv.client;import io.netty.handler.timeout.IdleStateHandler;public class CheckWriteIdleHandler extends IdleStateHandler {public CheckWriteIdleHandler () {super(0,8,0);}
}
15.客户端业务处理
package adv.client;import adv.vo.MyMessage;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ClientBusiHandler extends SimpleChannelInboundHandlerMyMessage {private static final Logger LOG LoggerFactory.getLogger(ClientBusiHandler.class);Overrideprotected void channelRead0(ChannelHandlerContext ctx, MyMessage msg) throws Exception {LOG.info(业务应答消息: msg.toString());}
}
16.客户端启动
package adv;import adv.client.ClientInit;
import adv.vo.*;
import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class NettyClient implements Runnable{private static final Logger LOG LoggerFactory.getLogger(NettyClient.class);// 负责重连的线程池private ScheduledExecutorService executorService Executors.newScheduledThreadPool(1);private Channel channel;private EventLoopGroup group new NioEventLoopGroup();// 是否用户主动关闭连接的标志private volatile boolean userClose false;// 连接是否成功关闭的标志private volatile boolean connected false;public boolean isConnected() {return connected;}public void connect(int port, String host) throws InterruptedException {try {// 客户端启动必备Bootstrap b new Bootstrap();b.group(group).channel(NioSocketChannel.class) // 指定使用NIO的通信模式.handler(new ClientInit());ChannelFuture future b.connect(new InetSocketAddress(host, port)).sync();LOG.info(已连接服务器);future.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {LOG.info(连接事件产生回调....operationCompletable);}});channel future.channel();synchronized (this) {this.connected true;this.notifyAll();}channel.closeFuture().sync(); } finally {if (!userClose) {// 非正常关闭有可能发生了网络问题LOG.warn(需要进行重连);executorService.execute(() - {try {// 给操作系统足够的时间取释放相关的资源TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();
// throw new RuntimeException(e);}});} else {// 正常关闭channel null;group.shutdownGracefully().sync();synchronized (this) {this.connected false;this.notifyAll();}}}}Overridepublic void run() {try {connect(Constant.DEFAULT_PORT, Constant.DEFAULT_SERVER_IP);} catch (InterruptedException e) {e.printStackTrace();}}public void sendOneWay(Object message) throws IllegalAccessException {if (channel null || !channel.isActive()) {throw new IllegalAccessException(和服务器还没建立起有效连接请稍后再试);}MyMessage myMessage new MyMessage();MsgHeader msgHeader new MsgHeader();msgHeader.setMsgId(MakeMsgId.getID());msgHeader.setType(MessageType.ONE_WAY.value());msgHeader.setMd5(EncryptUtils.encryptObj(message));myMessage.setMsgHeader(msgHeader);myMessage.setBody(message);channel.writeAndFlush(myMessage);}public void send(Object message) throws IllegalAccessException {if (channel null || !channel.isActive()) {throw new IllegalAccessException(和服务器还没建立起有效连接请稍后再试);}MyMessage myMessage new MyMessage();MsgHeader msgHeader new MsgHeader();msgHeader.setMsgId(MakeMsgId.getID());msgHeader.setType(MessageType.SERVICE_REQ.value());msgHeader.setMd5(EncryptUtils.encryptObj(message));myMessage.setMsgHeader(msgHeader);myMessage.setBody(message);channel.writeAndFlush(myMessage);}public void close() {userClose true;channel.close();}
}
package adv;import adv.business.User;
import serializable.msgpack.UserContact;import java.util.Scanner;public class BusiClient {public static void main(String[] args) throws InterruptedException, IllegalAccessException {NettyClient nettyClient new NettyClient();new Thread(nettyClient).start();while (!nettyClient.isConnected()) {synchronized (nettyClient) {nettyClient.wait();}}System.out.println(网络通信已准备好可以进行业务操作了。。。。。);Scanner scanner new Scanner(System.in);while (true) {String msg scanner.next();if (msg null) {break;} else if (q.equals(msg.toLowerCase())) {nettyClient.close();scanner.close();while (nettyClient.isConnected()) {synchronized (nettyClient) {System.out.println(等待网络关闭完成....);nettyClient.wait();}}System.exit(1);} else if (v.equals(msg.toLowerCase())) {User user new User();user.setAge(19);String userName cover;user.setUserName(userName);user.setId(No:1);user.setUserContact(new UserContact(userName gmail.com, 133));nettyClient.sendOneWay(user);} else {nettyClient.send(msg);}}}
}
17.常量类
package constant;import java.util.Date;/*** 常量*/
public class Constant {public static final Integer DEFAULT_PORT 7777;public static final String DEFAULT_SERVER_IP 127.0.0.1;// 根据输入信息拼接出一个应答信息public static String response(String msg) {return Hello, msg , Now is new Date(System.currentTimeMillis()).toString(); }
}
18.结果展示
Server端启动包含认证 Client端启动 心跳应答 ONE_WAY TWO_WAY 客户端退出请求