昌江县住房和城乡建设局网站,佛山建设网站制作,太原专业网站建设,深圳网站建设联系电话服务端的启动通过ServerBootstrap类来完成#xff0c;ServerBootstrap内有以下主要属性
ServerBootstrap extends AbstractBootstrap {//处理channel连接事件的线程组EventLoopGroup group;//处理channel其它事件的线程组EventLoopGroup childGroup;//创建channel的工厂类Cha…服务端的启动通过ServerBootstrap类来完成ServerBootstrap内有以下主要属性
ServerBootstrap extends AbstractBootstrap {//处理channel连接事件的线程组EventLoopGroup group;//处理channel其它事件的线程组EventLoopGroup childGroup;//创建channel的工厂类ChannelFactory? extends C channelFactory//channel相关选项MapChannelOption?, Object options;//channel相关属性MapAttributeKey?, Object attrs//handlerChannelHandler handler;
}group()方法就是设置两个线程组属性。
channel()方法会new ReflectiveChannelFactory()的工厂赋值给channelFactory属性。
childHandler()设置childHandler属性。
另外还有一个重要的内部类ServerBootstrapAcceptor
bind方法
bind方法绑定端口启动channel这里是重点,这里实际会调到doBind方法进行处理
来看doBind代码
private ChannelFuture doBind(final SocketAddress localAddress) {//doBind-1 final ChannelFuture regFuture initAndRegister();final Channel channel regFuture.channel();if (regFuture.cause() ! null) {return regFuture;}if (regFuture.isDone()) {// At this point we know that the registration was complete and successful.ChannelPromise promise channel.newPromise();doBind0(regFuture, channel, localAddress, promise);return promise;} else {//...}
}初始化和注册
doBind-1会调用initAndRegister方法进行初始channel和注册事件
final ChannelFuture initAndRegister() {Channel channel null;//step1 创建channelchannel channelFactory.newChannel();//step2 初始化channelinit(channel);//step3 注册 这里的group是bossGroup ChannelFuture regFuture config().group().register(channel);return regFuture;
}step1、创建channel
创建channel是使用的channelFactory。我们上面有说这里工厂实例是ReflectiveChannelFactory。其newChannel就是调用入参class的无参构造函数创建实例。也就是我们传入的NioServerSocketChannel。这里NioServerSocketChannel无参构造方法我们要拿出来看一下。
这里会先根据SelectorProvider创建一个ServerSocketChannel这都是jdk创建channel的方式。然后调用下面的构造方法
public NioServerSocketChannel(ServerSocketChannel channel) {//调用父类初始化super(null, channel, SelectionKey.OP_ACCEPT);config new NioServerSocketChannelConfig(this, javaChannel().socket());
}super调用父类构造方法是AbstractNioChannel类
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);//这里parent是nullthis.ch ch;//设置感兴趣的操作 这里是上面传入的SelectionKey.OP_ACCEPTthis.readInterestOp readInterestOp;//设置channel为非阻塞ch.configureBlocking(false);
}这里又调用父类AbstractChannel的构造方法
protected AbstractChannel(Channel parent) {this.parent parent;id newId();//这里会创建一个NioMessageUnsafe类型的unsafe类unsafe newUnsafe();//初始化pipelinepipeline newChannelPipeline();
}其它的不看先来看下初始化pipleline方法。其实就是创建了一个DefaultChannelPipeline实例。
protected DefaultChannelPipeline(Channel channel) {this.channel ObjectUtil.checkNotNull(channel, channel);succeededFuture new SucceededChannelFuture(channel, null);voidPromise new VoidChannelPromise(channel, true);//设置链表头尾tail new TailContext(this);head new HeadContext(this);head.next tail;tail.prev head;
}我们知道Pipeline是一个双向链表这里就会初始化tail和head。
到这里看到chanel创建好了还是jdk的nio channel。设置为非阻塞模式封装成NioServerSocketChannel。并且创建了默认的pipleline。
这里有三个点需要几下readInterestOpSelectionKey.OP_ACCEPTunsafe和pipleline里的HeadContext后面会用到
step2、初始化channel
void init(Channel channel) {setChannelOptions(channel, newOptionsArray(), logger);setAttributes(channel, newAttributesArray());ChannelPipeline p channel.pipeline();final EventLoopGroup currentChildGroup childGroup;final ChannelHandler currentChildHandler childHandler;final EntryChannelOption?, Object[] currentChildOptions newOptionsArray(childOptions);final EntryAttributeKey?, Object[] currentChildAttrs newAttributesArray(childAttrs);//这里往pipeline里加一个ChannelInitializerp.addLast(new ChannelInitializerChannel() {//initChannel方法在Overridepublic void initChannel(final Channel ch) {final ChannelPipeline pipeline ch.pipeline();ChannelHandler handler config.handler();if (handler ! null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}});
}这里基本上是把serverboot里的属性设置给channel然后pipleline里加入一个ChannelInitializer。重写了其initChannel方法。目前不会被调到先不看。不过很重要。主要是ch.eventLoop().execute()这里。这里的ch就是我们的serverchanneleventLoop是绑定的bossgroup里的一个eventloop。显然这里还没有初始化.
这里调用的pipleline.addLast()方法看一下其中里面有一步逻辑
//这里的handler就是我们传入的ChannelInitializer
AbstractChannelHandlerContext newCtx newContext(group, filterName(name, handler), handler)
if (!registered) {//未注册成立newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;
}在addlast方法里会判断是否还未注册会调用callHandlerCallbackLater()
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {assert !registered;//added truePendingHandlerCallback task added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);PendingHandlerCallback pending pendingHandlerCallbackHead;if (pending null) {//赋值给pendingHandlerCallbackHeadpendingHandlerCallbackHead task;} else {// Find the tail of the linked-list.while (pending.next ! null) {pending pending.next;}pending.next task;}
}这里pendingHandlerCallbackHead 包装(ChannelInitializer)。这一步后面的注册会有回调。
step3、注册channel
第三步config().group().register(channel)
这里是调用的bossGroup的register方法。前面NioEventLoopGroup部分有说到其register方法。NioEventLoopGroup会拿出一个children也就是NioEventLoop进行与channel绑定。所以从SingleThreadEventLoop的register方法开始看
public ChannelFuture register(Channel channel) {return register(new DefaultChannelPromise(channel, this));
}Override
public ChannelFuture register(final ChannelPromise promise) {ObjectUtil.checkNotNull(promise, promise);//调用unsafe的register方法 这里实例是AbstractUnsafe是一个AbstractChannel的内部类promise.channel().unsafe().register(this, promise);return promise;
}这里unsafe我们在step1创建channel时候有看到是一个AbstractUnsafe类型最后调用AbstractUnsafe.register方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {//eventLoop是从NioEventLoopGroup拿出来的一个childAbstractChannel.this.eventLoop eventLoop;//判断当前线程和child线程是不是同一个线程 我们这里第一次是主线程 不成立if (eventLoop.inEventLoop()) {register0(promise);} else {//执行这里eventLoop.execute(new Runnable() {Overridepublic void run() {register0(promise);}});}
}最后执行eventLoop.execute。eventLoop这里是拿出来的一个child是SingleThreadEventLoop extends SingleThreadEventExecutor。这里eventLoop和unsafe类互相调用。最后会调到下面SingleThreadEventExecutor类的重载execute方法
//这里task就是上面传入的runnable。immediate是true
private void execute(Runnable task, boolean immediate) {boolean inEventLoop inEventLoop();//还是falseaddTask(task);//添加任务if (!inEventLoop) {startThread();//启动线程}//...
}这里SingleThreadEventExecutor有一个任务队列Queue taskQueue。addTask就是先将任务加入该队列。然后startThread方法会调用doStartThread真正启动一个线程执行任务。
startThread方法也看一眼
private void startThread() {if (state ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success false;try {doStartThread();success true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}
}我们看到这里会维护一个state用来标识起没启动过线程保证只启用一个线程。
doStartThread方法
private void doStartThread() {assert thread null;//这里的executor在创建NioEventLoop时指定的ThreadPerTaskExecutor//其execut方法就是threadFactory.newThread(command).start();启动一个线程executor.execute(new Runnable() {Overridepublic void run() {thread Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success false;updateLastExecutionTime();try {//重要的一句 这里this实例是NioEventLoopSingleThreadEventExecutor.this.run();success true;}}});
}绕来绕去又是run方法又是execute方法。我们这里来总结一下最后目前的状态。
下面是大致逻辑代码
NioEventLoop{ThreadPerTaskExecutor executor;QueueRunnable taskQueue;void execute(Runnable task){addTask(task);startThread();}void run(){...}void startThread() {executor.execute(new Runnable() {Overridepublic void run() {this.run();}});}
}1、unsafe调用NioEventLoop.execute()方法执行register0()任务。
2、execute方法首先会将该任务放到taskQueue里。然后startThread启动一个线程。
3、startThread执行其属性executor.execute()方法。executor是ThreadPerTaskExecutor类型其execute方法会创建并start运行传入的Runnable。所以就是运行起来NioEventLoop.run()方法。
这个时候NioEventLoop里的线程启动起来了然后任务队列里有一个执行register0()待处理任务。
NioEventLoop.run方法内容
protected void run() {int selectCnt 0;for (;;) {//死循环上面创建的线程一直运行try {int strategy;try {//计算策略值 如果有任务返回Selector.selectNow否则返回SelectStrategy.SELECTstrategy selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.CONTINUE://-2continue;case SelectStrategy.BUSY_WAIT://-3 case SelectStrategy.SELECT://-1long curDeadlineNanos nextScheduledTaskDeadlineNanos();if (curDeadlineNanos -1L) {curDeadlineNanos NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);if (!hasTasks()) {strategy select(curDeadlineNanos);}}} selectCnt;cancelledKeys 0;needsToSelectAgain false;final int ioRatio this.ioRatio;//这里默认值50boolean ranTasks;if (ioRatio 100) {try {if (strategy 0) {processSelectedKeys();}} finally {// Ensure we always run tasks.ranTasks runAllTasks();}} else if (strategy 0) {final long ioStartTime System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime System.nanoTime() - ioStartTime;ranTasks runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks runAllTasks(0); // This will run the minimum number of tasks}} }
}运行run方法逻辑是一个for循环。计算strategy值
第一次循环 task队列不为空Selector.selectNow()。这时候还没有channel注册到selectorselectorNow会返回0.跳过switch判断。ioRatio的判断也不成立会走最后的else。执行runAllTasks(0)。这个时候才会执行我们第一次AbstractUnsafe.register往taskQueue加的任务,也就是register0方法。
register0方法 AbstractChannel.AbstractUnsafe.register0
private void register0(ChannelPromise promise) {try {firstRegistration true;//reg1-注册selectordoRegister();registered true;//reg2-回调pipleline里handler的handlerAdded方法pipeline.invokeHandlerAddedIfNeeded();//reg3- 发布注册事件pipeline.fireChannelRegistered();//reg4- 发布active事件if (isActive()) {if (firstRegistration) {pipeline.fireChannelActive();} else if (config().isAutoRead()) {beginRead();}}//...}
}reg1-doRegister方法
protected void doRegister() throws Exception {boolean selected false;for (;;) {//将channel注册到selector上 注意看这里ops的值是0selectionKey javaChannel().register(eventLoop().unwrappedSelector(), 0, this);return; }
}这个时候channel会注册到Selector上但是关注的事件key值还是0。
reg2
注册完后会调用pipeline.invokeHandlerAddedIfNeeded()方法。第一次注册会调用callHandlerAddedForAllHandlers();方法
PendingHandlerCallback task pendingHandlerCallbackHead;
while (task ! null) {task.execute();task task.next;
}这里pendingHandlerCallbackHead就是我们step2初始化时候添加的ChannelInitializer。PendingHandlerAddedTask.execute()方法最后会执行到handler.handlerAdd()方法。我们addLast是加的ChannelInitializer类型。其handlerAdd方法如下
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {if (ctx.channel().isRegistered()) {if (initChannel(ctx)) {//这里入参是handler// We are done with init the Channel, removing the initializer now.removeState(ctx);}}
}会调用initChannel(ChannelHandlerContext)方法。
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {if (initMap.add(ctx)) { // Guard against re-entrance.try {initChannel((C) ctx.channel());} finally {if (!ctx.isRemoved()) {//移除该handlerctx.pipeline().remove(this);}}return true;}return false;
}调用initChannel(channel)这个方法是我们step2里从写的方法。然后执行完后会将该handler从pipline里删除
再回头看一下
public void initChannel(final Channel ch) {final ChannelPipeline pipeline ch.pipeline();ChannelHandler handler config.handler();if (handler ! null) {pipeline.addLast(handler);}ch.eventLoop().execute(new Runnable() {Overridepublic void run() {pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));}});}主要是ch.eventLoop().execute这里。这里ch.eventLoop()是NioEventLoop现在已经绑定好了。其execute前面已经介绍过会往任务队列里添加一个任务。
上面是执行runAllTasks第一个任务register0()register0()最后执行完后又加入一个任务。runAllTasks是个循环只有取不到任务才会跳出所以会执行第二个刚加入的任务也就是 pipeline.addLast(new ServerBootstrapAcceptor。往pipleline里加入请先记住这里pipeline里有一个ServerBootstrapAcceptor。
reg3-发布注册事件
reg4-发布active事件
目前pipleline里只有head和tail两个handler。fireChannelActive()最后会触发handler的channelActive()方法
然而在HeadContext.channelActive()方法最后会调用unsafe.beginRead()方法然后调用doBeginRead() protected void doBeginRead() throws Exception {final SelectionKey selectionKey this.selectionKey;if (!selectionKey.isValid()) {return;}readPending true;//这里是初始化的0final int interestOps selectionKey.interestOps();//readInterestOp是构造函数设置的值serverchannel是OP_READ,client创建的channel值是OP_READif ((interestOps readInterestOp) 0) {//与运算selectionKey.interestOps(interestOps | readInterestOp);}}这里会修改interestOps。这个时候才开始监听accept事件。就是要等到reg2步ServerBootstrapAcceptor被加入到pipline里之后。后面连接建立时候会有说明为什么。
所有任务执行完成执行第二次循环
第二次循环任务已经执行完成为空这时候strategy SelectStrategy.SELECT.
会走switch SELECT分支最后走到select(-1)。内部实现执行selector.select()方法。这个时候就阻塞等待事件的发生。有事件发生会继续往下走到processSelectedKeys()方法。实际在processSelectedKeysOptimized()方法处理selectKeys。最后具体处理一个selectKey方法是processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {//final AbstractNioChannel.NioUnsafe unsafe ch.unsafe();try {//不同的key事件处理int readyOps k.readyOps();if ((readyOps SelectionKey.OP_CONNECT) ! 0) { int ops k.interestOps();ops ~SelectionKey.OP_CONNECT;k.interestOps(ops);unsafe.finishConnect();}if ((readyOps SelectionKey.OP_WRITE) ! 0) {ch.unsafe().forceFlush();}if ((readyOps (SelectionKey.OP_READ| | SelectionKey.OP_ACCEPT)) ! 0 || readyOps 0) {unsafe.read();}} catch (CancelledKeyException ignored) {unsafe.close(unsafe.voidPromise());}
}到这里服务端就启动完成了等待客户端发起i连接事件。
step4、连接处理
这个时候我们启动一个client来连接serverselector就会监听到SelectionKey.OP_ACCEPT事件就会走unsafe.read()方法。这里server端unsafe实例是NioMessageUnsafe.read方法 Overridepublic void read() {assert eventLoop().inEventLoop();final ChannelConfig config config();final ChannelPipeline pipeline pipeline();final RecvByteBufAllocator.Handle allocHandle unsafe().recvBufAllocHandle();allocHandle.reset(config);boolean closed false;//读取数据这里的readBuf是客户端连接channeltry {try {do {//readBuf是ListNioSocketChannel 类型获取所有新连接int localRead doReadMessages(readBuf);if (localRead 0) {break;}if (localRead 0) {closed true;break;}allocHandle.incMessagesRead(localRead);} while (continueReading(allocHandle));} catch (Throwable t) {exception t;}int size readBuf.size();//逐个channel进行处理for (int i 0; i size; i ) {readPending false;//调用pileline的read方法pipeline.fireChannelRead(readBuf.get(i));}readBuf.clear();allocHandle.readComplete();//调用pipleline的ReadCompletepipeline.fireChannelReadComplete();} }
}doReadMessages就不看了还是调用nio的accept方法建立channel连接。这里包装成了NioSocketChannel。readInterestOp属性设置的是SelectionKey.OP_READ。
将新建的客户端连接逐个触发ChannelRead方法。这里回想下没有特殊处理现在pipeline里最少有
HeadContext、ServerBootstrapAcceptor、TailContext
ServerBootstrapAcceptor是一个ChannelInboundHandlerAdapter类型的handler。其channelRead方法如下
public void channelRead(ChannelHandlerContext ctx, Object msg) {//这里的child是新建的客户端channelfinal Channel child (Channel) msg;//这里的childHandler是我们调用serverchanel.childHandler()方法显示设置的child.pipeline().addLast(childHandler);//options 和attrs都是serverchannel初始化显示设置的setChannelOptions(child, childOptions, logger);setAttributes(child, childAttrs);try {//childGroup是workergroupchildGroup.register(child).addListener(new ChannelFutureListener() {Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {forceClose(child, future.cause());}}});} catch (Throwable t) {forceClose(child, t);}
}这里的register方法和前面bossgroup的register方法实现是一致的。因为两个group都是NioEventLoopGroup类型。只不过这里是从workgroup拿出来一个child走regiser0进行和新创建的客户端chanel进行绑定关注的是OP_READ事件。最后一直监听read事件。
最后就是server端会使用bossgroup进行线程channel绑定监听OP_ACCEPT事件。
clientchannel会和workgroup中的线程进行绑定。监听OP_READ事件。workgroup一个child可以绑定多个channel。同时监听多个channel的READ事件。
启动流程总结
qa
1、workgroup是怎么绑定多个clientchannel的
前面我们知道新clientchanel连接来了workgroup会分配一个child进行处理。child是怎么分配的呢。
workgroup的next()方法
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];idx绑定一个channel递增一个值用这个数与child数组长度取余。
这个时候拿出的child会有两种情况
1、未绑定channel还未初始化
这个就是走创建新线程.start()执行NioEventLoop.run()方法。和我们上面分析的服务端启动过程一致没有问题。
2、child已经使用中绑定过clientchannel。这个时候有可能处于select()方法阻塞状态。
那么我们新的register被加到taskQueue里岂不是要一直等待执行
其实不然这里有唤醒select()逻辑只是上面没有说。
回到child的execute方法
//immediate 是否立即执行这里是true
private void execute(Runnable task, boolean immediate) {boolean inEventLoop inEventLoop();addTask(task);if (!inEventLoop) {startThread();//...}if (!addTaskWakesUp immediate) {//如果有必要这里唤醒select()阻塞wakeup(inEventLoop);}
}wakeup方法
protected void wakeup(boolean inEventLoop) {if (!inEventLoop nextWakeupNanos.getAndSet(AWAKE) ! AWAKE) {selector.wakeup();}
}这里就打断了selector.select()的阻塞。然后进入run方法下一次循环判断会先执行taskQueue里的任务。