新民电子网站建设哪家好,应用商店下载安装正版,成都易锐互动科技有限公司,怎样把建好的网站上传到互联网大家好#xff0c;我是烤鸭#xff1a;
今天分享下 logback 源码 #xff0c;版本是 6.5-SNAPSHOT。 写这篇的目的 由于最近项目中一直出现这个日志#xff0c;而且基本每20秒就会打印一次#xff0c;也没法关掉#xff0c;百度上资料也很少#xff0c;只能自己来了。 …大家好我是烤鸭
今天分享下 logback 源码 版本是 6.5-SNAPSHOT。 写这篇的目的 由于最近项目中一直出现这个日志而且基本每20秒就会打印一次也没法关掉百度上资料也很少只能自己来了。 10:04:01,393 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxxx.com:1111: Waiting 19999ms before attempting reconnection. 正常来说这个提示就是简单提示下socket连接断开可能是网络或者是服务端的原因然后重连。比如下边这个日志。 11:17:06,337 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[LOGSTASH] - Log destination xxx.com:1234: Waiting 27476ms before attempting reconnection.
11:17:13,302 |-WARN in net.logstash.logback.appender.LogstashAccessTcpSocketAppender[logstash] - Log destination xxx.com:1234: connection failed. java.net.ConnectException: Connection refused: connectat java.net.ConnectException: Connection refused: connectat at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)at at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:81)at at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:476)at at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:218)at at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:200)at at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:162)at at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:394)at at java.net.Socket.connect(Socket.java:606)at at net.logstash.logback.appender.AbstractLogstashTcpSocketAppender$TcpSendingEventHandler.openSocket(AbstractLogstashTcpSocketAppender.java:721)at at net.logstash.logback.appender.AbstractLogstashTcpSocketAppender$TcpSendingEventHandler.onStart(AbstractLogstashTcpSocketAppender.java:640)at at net.logstash.logback.appender.AsyncDisruptorAppender$EventClearingEventHandler.onStart(AsyncDisruptorAppender.java:351)at at com.xxx.arch.encoder.com.lmax.disruptor.BatchEventProcessor.notifyStart(BatchEventProcessor.java:224)at at com.xxx.arch.encoder.com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:120)at at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at at java.util.concurrent.FutureTask.run(FutureTask.java:266)at at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)at at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)at at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at at java.lang.Thread.run(Thread.java:748)
11:17:13,303 |-WARN in net.logstash.logback.appender.LogstashAccessTcpSocketAppender[logstash] - Log destination xxx.com:1234: Waiting 27662ms before attempting reconnection. 异常的日志连接成功后每10s断开连接然后过20s重试后连接成功一直反复乐此不疲... 11:48:54,484 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: connection established.
11:49:04,524 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: Waiting 19949ms before attempting reconnection.
11:49:24,477 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: connection established.
11:49:34,478 |-WARN in net.logstash.logback.appender.LogstashTcpSocketAppender[SLEUTH-INFO] - Log destination xxx.com:4562: Waiting 19995ms before attempting reconnection. 源码AbstractLogstashTcpSocketAppender嫌麻烦的直接看3 由于 这个用到 com.lmax.disruptor 这个包推荐看一下美团的这篇 https://tech.meituan.com/2016/11/18/disruptor.html AbstractLogstashTcpSocketAppender 一般是用于发送日志内容比如将日志内容发送到 logstash/flume 等。 具体的配置可以参考下 https://www.cnblogs.com/zhyg/p/6994314.html 内部类 TcpSendingEventHandler implements EventHandlerLogEventEvent, LifecycleAware 负责执行TCP传输的事件处理器这个类内部还有3个线程内部类 分别是 KeepAliveRunnable(用于和socet 保持连接)、ReaderCallable(接收socket的流信息)、WriteTimeoutRunnable(检测写入超时,如果超时了就关闭连接) UnconnectedConfigurableSSLSocketFactory extends ConfigurableSSLSocketFactory (创建链接使用自定义配置参数) TcpSendingEventHandler 重点看下这个类处理tcp事务都处理些啥方法如下 onEvent 对 EventHandler.onEvent 的实现有事件就去处理。代码不长而且注释特别清晰。 接受到事件后循环5次判断socket的读取流的线程是否结束或者socket是否为空调用 reopenSocket 方法否则调用下面的writeEvent。如果 readerFuture.isDone() 是服务器关闭了连接如果是 socket为空是写入超时。 if (readerFuture.isDone() || socket null) {/** If readerFuture.isDone(), then the destination has shut down its output (our input),* and the destination is probably no longer listening to its input (our output).* This will be the case for Amazons Elastic Load Balancers (ELB)* when an instance behind the ELB becomes unhealthy while were connected to it.** If socket null here, it means that a write timed out,* and the socket was closed by the WriteTimeoutRunnable.* * Therefore, attempt reconnection.*/addInfo(peerId destination terminated the connection. Reconnecting.);reopenSocket();try {readerFuture.get();sendFailureException NOT_CONNECTED_EXCEPTION;} catch (Exception e) {sendFailureException e;}continue;
} writeEventtcp 往服务器写数据。由于keepalive 也会触发事件但是event 为null。所以这时候判断是 keepalive还是其他事件。 其他事件的写入还要 兼容 logbakck1.x版本keepalive 写入的话写入 换行符。还有个属性 endOfBatch如果是的话会执行 outputStream.flush() onStart , 启动方法。 初始化 destinationAttemptStartTimes 数组目的是为了存放每个连接目标最后尝试连接的时间。openSocket(建立 socket连接)scheduleKeepAlive (定时线程 触发keepAlive 事件)scheduleWriteTimeout(定时检查写超时的话就关闭连接(这个在5.x是没有的方法)) onShutdown 就不说了 reopenSocket 调了两个方法关闭连接打开连接。 openSocket 是被 synchronized 修饰的。方法注释说的是反复打开socket直到线程被打断了。 /*** Repeatedly tries to open a socket until it is successful,* or the hander is stopped, or the handler thread is interrupted.** If the socket is non-null when this method returns,* then it should be able to be used to send.*/ 方法比较长一点点看 while (isStarted() !Thread.currentThread().isInterrupted()) {// 获取下一个连接的index,多个链接地址的时候多个destination标签默认主从还有轮询获取和随机destinationIndex connectionStrategy.selectNextDestinationIndex(destinationIndex, destinations.size());long startWallTime System.currentTimeMillis();Socket tempSocket null;OutputStream tempOutputStream null;/** Choose next server*/InetSocketAddress currentDestination destinations.get(destinationIndex);try {/** Update peerId (for status message)*/peerId Log destination currentDestination : ;/** Delay the connection attempt if the last attempt to the selected destination* was less than the reconnectionDelay.* 判断最后一次尝试连接的时间和延迟重连比较,如果上一次重试的时间小于30s,会提示并在30减去重试时间后发起重连*/final long millisSinceLastAttempt startWallTime - destinationAttemptStartTimes[destinationIndex];if (millisSinceLastAttempt reconnectionDelay.getMilliseconds()) {final long sleepTime reconnectionDelay.getMilliseconds() - millisSinceLastAttempt;if (errorCount MAX_REPEAT_CONNECTION_ERROR_LOG * destinations.size()) {addWarn(peerId Waiting sleepTime ms before attempting reconnection.);}try {shutdownLatch.await(sleepTime, TimeUnit.MILLISECONDS);if (!isStarted()) {return;}} catch (InterruptedException ie) {Thread.currentThread().interrupt();addWarn(peerId connection interrupted. Will no longer attempt reconnection.);return;}// reset the start time to be after the wait period.startWallTime System.currentTimeMillis();}// 更新当前index的最后重试时间destinationAttemptStartTimes[destinationIndex] startWallTime;/** Set the SO_TIMEOUT so that SSL handshakes will timeout if they take too long.** Note that SO_TIMEOUT only applies to reads (which occur during the handshake process).*/tempSocket socketFactory.createSocket();tempSocket.setSoTimeout(acceptConnectionTimeout);/** currentDestination is unresolved, so a new InetSocketAddress* must be created to resolve the hostname.*/tempSocket.connect(new InetSocketAddress(getHostString(currentDestination), currentDestination.getPort()), acceptConnectionTimeout);/** Trigger SSL handshake immediately and declare the socket unconnected if it fails*/if (tempSocket instanceof SSLSocket) {((SSLSocket)tempSocket).startHandshake();}/** Issue #218, make buffering the output stream optional.*/tempOutputStream writeBufferSize 0? new BufferedOutputStream(tempSocket.getOutputStream(), writeBufferSize): tempSocket.getOutputStream();if (getLogback11Support().isLogback11OrBefore()) {getLogback11Support().init(encoder, tempOutputStream);}addInfo(peerId connection established.);this.socket tempSocket;this.outputStream tempOutputStream;boolean shouldUpdateThreadName (destinationIndex ! connectedDestinationIndex);connectedDestinationIndex destinationIndex;connectedDestination currentDestination;connectionStrategy.connectSuccess(startWallTime, destinationIndex, destinations.size());if (shouldUpdateThreadName) {/** destination has changed, so update the thread name*/updateCurrentThreadName();}// 默认的schedule线程池每10s触发一次读取server的返回this.readerFuture scheduleReaderCallable(new ReaderCallable(tempSocket.getInputStream()));fireConnectionOpened(this.socket);return;} catch (Exception e) {CloseUtil.closeQuietly(tempOutputStream);CloseUtil.closeQuietly(tempSocket);connectionStrategy.connectFailed(startWallTime, destinationIndex, destinations.size());fireConnectionFailed(currentDestination, e);/** Avoid spamming status messages by checking the MAX_REPEAT_CONNECTION_ERROR_LOG.*/if (errorCount MAX_REPEAT_CONNECTION_ERROR_LOG * destinations.size()) {addWarn(peerId connection failed., e);}}
} scheduleKeepAlive 维持连接的需要在xml中配置 keepAliveDuration默认不触发这个方法 scheduleWriteTimeout 监测写入超时的 ReaderCallable.call 读取服务器的流没有返回空。但是! 触发定时线程池往 Disruptor 中触发一个空事件。 其实作者也说了触发空事件就是为了 keepalive触发的时候会判断 future是否结束结束的话重新打开socket。 如果没有这个方法会在下次触发 onEvent时重新连接所以为了尽快打开socket作者加了这个折中的方案。 Override
public Void call() throws Exception {updateCurrentThreadName();try {while (true) {try {if (inputStream.read() -1) {/** End of stream reached, so were done.*/return null;}} catch (SocketTimeoutException e) {/** ignore, and try again*/} catch (Exception e) {/** Something else bad happened, so were done.*/throw e;}}} finally {if (!Thread.currentThread().isInterrupted()) {getExecutorService().submit(() - {/** https://github.com/logstash/logstash-logback-encoder/issues/341** Pro-actively trigger the event handlers onEvent method in the handler thread* by publishing a null event (which usually means a keepAlive event).** When onEvent handles the event in the handler thread,* it will detect that readerFuture.isDone() and reopen the socket.** Without this, onEvent would not be called until the next event,* which might not occur for a while.* So, this is really just an optimization to reopen the socket as soon as possible.** We cant reopen the socket from this thread,* since all socket open/close must be done from the event handler thread.** There is a potential race condition here as well, since* onEvent could be triggered before the readerFuture completes.* We reduce (but not eliminate) the chance of that happening by* scheduling this task on the executorService.*/getDisruptor().getRingBuffer().tryPublishEvent(getEventTranslator(), null);});}}
} 其实看完这块代码我的问题就破案了。 每隔10s发起重连是来源于这个地方触发的空事件也是正常的。期间很有可能是服务器断开了连接之后发起了重连。 对上面的流程梳理下 启动的时候创建连接、定时心跳维护连接(默认关闭)、定时监测写入超时(默认100ms) 创建连接看上次尝试连接时间是否超过30s没超过的话等待20s后重连超过的话立即重连。定时10s的单个线程读取socket的输入流读取完毕后触发 Disruptor 一个空事件。 触发事件的时候循环5次判断当前的连接状态(线程状态和socket状态)关闭调用关闭连接和创建连接。开启调用写入方法。 写入方法判断是心跳维护还是正常事件心跳维护写换行符正常事件写入事件值。如果是批量终结调用 flush 刷新流。 解决方案 虽然问题找到了由于猜测是服务端释放连接导致的这个问题所以没什么好的解决方案粗暴一点直接改了 logback-encoder 的日志级别改为ERROR看不到WARN日志了有点骗自己的意思... 当我写完整个的时候发现了真正的问题所在... logstash-logback-encoder 版本问题改成5.3 可以了。