网站logo替换,网站维护 静态页面,上海先进网站设计,营销型网站建设沈阳1 处理请求的过程概述
#xff08;1#xff09;消费端发起TCP连接后#xff0c;服务提供方的NettyServer的connected方法将被调用#xff1b;
#xff08;2#xff09;因为Netty默认的线程模型为All#xff0c;因此AllChannelHandler类把接收到的所有消息#xff08;…1 处理请求的过程概述
1消费端发起TCP连接后服务提供方的NettyServer的connected方法将被调用
2因为Netty默认的线程模型为All因此AllChannelHandler类把接收到的所有消息包括请求事件、响应事件、连接事件、断开事件心跳事件等包装成ChannelEventRunnable任务并将其投递到线程池中
3接着执行线程池中的任务并最终将调用DubboProtocol的connected方法。 2 处理请求的实现细节
2.1 NettyServer的connected方法被调用
消费端发起TCP连接后服务提供方的NettyServer的connected方法将被调用。connected方法为NettyServer父类AbstractServer的connected方法。
其中的依次调用关系为AbstractServer的connected()-AbstractPeer的connected()-ChannelHandler的connected()。具体实现如下所示。
1AbstractServer的connected() public void connected(Channel ch) throws RemotingException {// If the server has entered the shutdown process, reject any new connectionif (this.isClosing() || this.isClosed()) {logger.warn(INTERNAL_ERROR, unknown error in remoting module, , Close new channel ch , cause: server is closing or has been closed. For example, receive a new connect request while in shutdown process.);ch.close();return;}if (accepts 0 getChannelsSize() accepts) {logger.error(INTERNAL_ERROR, unknown error in remoting module, , Close channel ch , cause: The server ch.getLocalAddress() connections greater than max config accepts);ch.close();return;}super.connected(ch);}
2AbstractPeer的connected() private final ChannelHandler handler;public void connected(Channel ch) throws RemotingException {if (closed) {return;}handler.connected(ch);} 2.2 消息被投递到线程池中
调用ChannelHandler的connected()时因为Netty默认的线程模型为All因此AllChannelHandler类ChannelHandler的子类把接收到的所有消息包装成ChannelEventRunnable任务并将其投递到线程池中。具体实现如下所示。
public class AllChannelHandler extends WrappedChannelHandler {public AllChannelHandler(ChannelHandler handler, URL url) {super(handler, url);}Overridepublic void connected(Channel channel) throws RemotingException {ExecutorService executor getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));} catch (Throwable t) {throw new ExecutionException(connect event, channel, getClass() error when process connected event ., t);}}Overridepublic void disconnected(Channel channel) throws RemotingException {ExecutorService executor getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));} catch (Throwable t) {throw new ExecutionException(disconnect event, channel, getClass() error when process disconnected event ., t);}}Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor getPreferredExecutorService(message);try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {if(message instanceof Request t instanceof RejectedExecutionException){sendFeedback(channel, (Request) message, t);return;}throw new ExecutionException(message, channel, getClass() error when process received event ., t);}}Overridepublic void caught(Channel channel, Throwable exception) throws RemotingException {ExecutorService executor getSharedExecutorService();try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));} catch (Throwable t) {throw new ExecutionException(caught event, channel, getClass() error when process caught event ., t);}}
}
2.3 执行线程池中的任务
2.3.1 ChannelEventRunnable的run方法
执行线程池中的任务时将执行ChannelEventRunnable的run方法其实现细节具体如下所示。 public void run() {InternalThreadLocalMap internalThreadLocalMap InternalThreadLocalMap.getAndRemove();try {if (state ChannelState.RECEIVED) {try {handler.received(channel, message);} catch (Exception e) {logger.warn(INTERNAL_ERROR, unknown error in remoting module, , ChannelEventRunnable handle state operation error, channel is channel , message is message, e);}} else {switch (state) {case CONNECTED:try {handler.connected(channel);} catch (Exception e) {logger.warn(INTERNAL_ERROR, unknown error in remoting module, , ChannelEventRunnable handle state operation error, channel is channel, e);}break;case DISCONNECTED:try {handler.disconnected(channel);} catch (Exception e) {logger.warn(INTERNAL_ERROR, unknown error in remoting module, , ChannelEventRunnable handle state operation error, channel is channel, e);}break;case SENT:try {handler.sent(channel, message);} catch (Exception e) {logger.warn(INTERNAL_ERROR, unknown error in remoting module, , ChannelEventRunnable handle state operation error, channel is channel , message is message, e);}break;case CAUGHT:try {handler.caught(channel, exception);} catch (Exception e) {logger.warn(INTERNAL_ERROR, unknown error in remoting module, , ChannelEventRunnable handle state operation error, channel is channel , message is: message , exception is exception, e);}break;default:logger.warn(INTERNAL_ERROR, unknown error in remoting module, , unknown state: state , message is message);}}} finally {InternalThreadLocalMap.set(internalThreadLocalMap);}}
2.3.2 执行connected方法
执行handler.connected(channel)时将调用HeaderExchangeHandler#connected方法具体实现如下所示。 public void connected(Channel channel) throws RemotingException {ExchangeChannel exchangeChannel HeaderExchangeChannel.getOrAddChannel(channel);handler.connected(exchangeChannel);channel.setAttribute(Constants.CHANNEL_SHUTDOWN_TIMEOUT_KEY,ConfigurationUtils.getServerShutdownTimeout(channel.getUrl().getOrDefaultApplicationModel()));}
接着在执行handler.connected(exchangeChannel)时将调用DubboProtocol#connected方法实现如下所示。
public void connected(Channel channel) throws RemotingException {invoke(channel, ON_CONNECT_KEY);
}private void invoke(Channel channel, String methodKey) {Invocation invocation createInvocation(channel, channel.getUrl(), methodKey);if (invocation ! null) {try {if (Boolean.TRUE.toString().equals(invocation.getAttachment(STUB_EVENT_KEY))) {tryToGetStubService(channel, invocation);}received(channel, invocation);} catch (Throwable t) {logger.warn(PROTOCOL_FAILED_REFER_INVOKER, , , Failed to invoke event method invocation.getMethodName() (), cause: t.getMessage(), t);}}
}public void received(Channel channel, Object message) throws RemotingException {if (message instanceof Invocation) {reply((ExchangeChannel) channel, message);} else {super.received(channel, message);}
}执行接口请求最终将调用DubboProtocol#reply方法具体实现如下所示。
public CompletableFutureObject reply(ExchangeChannel channel, Object message) throws RemotingException {if (!(message instanceof Invocation)) {throw new RemotingException(channel, Unsupported request: (message null ? null : (message.getClass().getName() : message)) , channel: consumer: channel.getRemoteAddress() -- provider: channel.getLocalAddress());}Invocation inv (Invocation) message;// 1、获取调用方法对应的InvokerInvoker? invoker inv.getInvoker() null ? getInvoker(channel, inv) : inv.getInvoker();// switch TCCLif (invoker.getUrl().getServiceModel() ! null) {Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader());}// need to consider backward-compatibility if its a callbackif (Boolean.TRUE.toString().equals(inv.getObjectAttachmentWithoutConvert(IS_CALLBACK_SERVICE_INVOKE))) {String methodsStr invoker.getUrl().getParameters().get(methods);boolean hasMethod false;if (methodsStr null || !methodsStr.contains(,)) {hasMethod inv.getMethodName().equals(methodsStr);} else {String[] methods methodsStr.split(,);for (String method : methods) {if (inv.getMethodName().equals(method)) {hasMethod true;break;}}}if (!hasMethod) {logger.warn(PROTOCOL_FAILED_REFER_INVOKER, , , new IllegalStateException(The methodName inv.getMethodName() not found in callback service interface ,invoke will be ignored. please update the api interface. url is: invoker.getUrl()) ,invocation is : inv);return null;}}// 2、获取上下文对象并设置对端地址RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress());// 3、执行invoker调用链Result result invoker.invoke(inv);// 4、返回结果return result.thenApply(Function.identity());
}