当前位置: 首页 > news >正文

做电商网站用什么软件开发山东郓城网站建设

做电商网站用什么软件开发,山东郓城网站建设,广告公司和设计公司,wordpress标签不被收录前言 笔者使用了kafka用来传输数据#xff0c;笔者在今年10月写了文章#xff0c;怎么使用配置化实现kafka的装载#xff1a;springboot kafka多数据源#xff0c;通过配置动态加载发送者和消费者-CSDN博客 不过在实际运行中#xff0c;kafka broker是加密的#xff0c…前言 笔者使用了kafka用来传输数据笔者在今年10月写了文章怎么使用配置化实现kafka的装载springboot kafka多数据源通过配置动态加载发送者和消费者-CSDN博客 不过在实际运行中kafka broker是加密的本来也没啥但是突然的一天笔者在监控发现消费者掉线了发送者居然还是正常的见鬼的事情就是这么朴实的发生了而且日志有Authentication/Authorization Exception and no authExceptionRetryInterval set 这样的错误还有 Closing the Kafka consumer Kafka consumer has been closed 这样的日志非常诡异后面查询kafka集群才知道kafka集群在某个时间被改了AUTH配置然后又改回来了神奇的操作。 现象 实际上看到日志是懵的毕竟kafka是对方提供的AUTH用户名和密码也是对方给的而且发送者也出现AUTH失败但是发送者一直重试然后因为kafka集群的AUTH改回来了重试成功了唯独消费者AUTH失败后关闭了。 源码分析 从日志搜索Authentication/Authorization Exception and no authExceptionRetryInterval set 发现日志出现在 org.springframework.kafka.listener.KafkaMessageListenerContainer 刚好在消息监听器容器的内部类 ListenerConsumer 看源码定义是一个定时线程池的任务定义 private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback 分析怎么去消费的KafkaMessageListenerContainer的doStart方法 既然明晰了那么分析问题的来源, run方法 public void run() {ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent(); //事件通知spring事件this.consumerThread Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count 0;this.last System.currentTimeMillis();initAssignedPartitions(); // 初始分配partitionspublishConsumerStartedEvent(); // 消费者启动事件Throwable exitThrowable null;this.lastReceive System.currentTimeMillis();while (isRunning()) { //状态在上面的截图代码已经更新为运行状态try {pollAndInvoke(); // 队列拉取拉取过程会出现各种异常}catch (NoOffsetForPartitionException nofpe) { //这个需要注意但是这个是不可配置的this.fatalError true;ListenerConsumer.this.logger.error(nofpe, No offset and no reset policy);exitThrowable nofpe;break;}catch (AuthenticationException | AuthorizationException ae) { //这个是授权失败就是来源于kafka broker的auth鉴权if (this.authExceptionRetryInterval null) { //时间配置默认居然是nullListenerConsumer.this.logger.error(ae,Authentication/Authorization Exception and no authExceptionRetryInterval set);this.fatalError true;exitThrowable ae;break; //循环结束}else { // 一段时间后重试ListenerConsumer.this.logger.error(ae,Authentication/Authorization Exception, retrying in this.authExceptionRetryInterval.toMillis() ms);// We cant pause/resume here, as KafkaConsumer doesnt take pausing// into account when committing, hence risk of being flooded with// GroupAuthorizationExceptions.// see: https://github.com/spring-projects/spring-kafka/pull/1337sleepFor(this.authExceptionRetryInterval);}}catch (FencedInstanceIdException fie) {this.fatalError true;ListenerConsumer.this.logger.error(fie, ConsumerConfig.GROUP_INSTANCE_ID_CONFIG has been fenced);exitThrowable fie;break;}catch (StopAfterFenceException e) {this.logger.error(e, Stopping container due to fencing);stop(false);exitThrowable e;}catch (Error e) { // NOSONAR - rethrownthis.logger.error(e, Stopping container due to an Error);this.fatalError true;wrapUp(e);throw e;}catch (Exception e) {handleConsumerException(e);}finally {clearThreadState();}}//上面异常后这里的处理wrapUp(exitThrowable);} 注意NoOffsetForPartitionException和AuthenticationException | AuthorizationException其中能配置重试的是AuthenticationException | AuthorizationException异常就是授权失败可以通过配置过一段时间抢救一下一般而言kafka首次授权失败基本上就不太可能成功了但是这个只能控制consumer销毁producer还在重试所以出现了消费者销毁了发送者在kafka集群auth还原后成功恢复。看看wrapUp方法 既然知道了原理那么解决办法是配置 authExceptionRetryInterval解决方法 配置authExceptionRetryInterval也是不容易分析取值 private final Duration authExceptionRetryInterval this.containerProperties.getAuthExceptionRetryInterval(); 从 containerProperties来的实际上是继承org.springframework.kafka.listener.ConsumerProperties 在spring-kafka 2.8版本还对属性命名重构了毕竟以前的命名字母太多了 不过要修改这个值也不容易kafkaproperties并没有提供这个参数而且创建消费者容器工厂时 org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory 那么只能在 ConcurrentMessageListenerContainer 创建成功后从这个里面读取配置设置默认值。这就可以使用前面埋点的spring事件了通过事件拿到Consumer不就可以修改配置了 使用 publishConsumerStartedEvent(); 的事件最合适执行循环前最后一个事件而且这里的this就是消费者容器org.springframework.kafka.listener.KafkaMessageListenerContainer对象就是我们需要的 编写代码如下 Component public class KafkaConsumerStartedListener implements ApplicationListenerConsumerStartedEvent {Overridepublic void onApplicationEvent(ConsumerStartedEvent event) {KafkaMessageListenerContainer?, ? container event.getSource(KafkaMessageListenerContainer.class);container.getContainerProperties().setAuthExceptionRetryInterval(Duration.of(30, ChronoUnit.SECONDS));} } 就这样就解决了问题哈哈哈贼简单。  笔者在使用API时发现了还有一个API是授权失败后重启是布尔变量默认值是false即不重试。那么这个是哪里触发的呢在我们看到的消费者销毁事件里面实际上控制重试有多种办法一个是循环去kafka broker拉取一个是重启kafka消费者容器。 这个在上面已经说明了简单截个图再说明一下 设置的地方还是上面的代码里面同理也可以修改事件为 ConsumerStoppedEvent 可以更精准当然就用刚刚的代码加一行设置也是可以的。 总结 kafka在发送者和消费者是区分开的发送者如果连接kafka broker失败后可以一直重试直到成功但是消费者确有各种各样的逻辑可以精准控制比如消费者重启的配置 restartAfterAuthExceptions 可以控制消费者在停止时重启如果仅仅是授权失败而且不需要反复重启消耗资源那么可以通过 authExceptionRetryInterval 配置时间周期的方式实现但是kafka并没有给我们配置的入口但是kafka在消费者启动消费的过程埋了很多spring事件钩子通过这些钩子可以操作估计spring-kafka也不希望我们去修改毕竟消费者启动失败了或授权失败了消费者自动销毁是符合正常逻辑的。如果不使用kafka自己提供的事件可以在启动完成通过 org.springframework.kafka.config.KafkaListenerEndpointRegistry拿到所有消费者容器来批量设置属性毕竟spring-kafka也是通过这个端点注册器注册MessageListenerContainer的。
http://www.pierceye.com/news/392038/

相关文章:

  • 视频教学网站cms陕西网站备案查询
  • 湖州网站设计浙北数据wordpress自定义搜索页面
  • 昆明公司网站开发流线型的网站建设
  • 南京建设网站企业泊头市建设网站
  • 前端跟后端哪个就业难北京网站建设seo优化
  • 简述网站开发建设的基本流程做一个京东这样的网站需要多少钱
  • 与通信工程专业做项目的网站微信开发显示wordpress
  • 自己做链接网站萍乡做网站哪家好
  • 做网站最适合用多大的图片医院 网站建设 新闻
  • 网站开发职业分析产品展示的手机网站
  • 精通网站建设pdf网上自学电脑课程
  • 一站式网站建设业务沈阳网站建设 熊掌号
  • 58同城网站建设目的劳务公司怎么注册需要什么要求
  • 龙华网站建设设计公司国家中小学智慧教育平台
  • 摄影网站采用照片做宣传_版权费是多少?pythom+网站开发规范
  • 免费制作一个自己的网站吗达内教育口碑怎么样
  • 2015做那个网站能致富网站建设模板ppt模板
  • 网站后台管理系统教程自助网站建设程序
  • 做黑帽需不需要搭建网站没有做等保的网站不能上线对吗
  • 怎么在微信建立公众号郑州专业seo首选
  • 万网网站后台国家域名
  • 怎么做 niche网站临港注册公司优惠政策
  • 做网站开发怎么做网站推广的步骤
  • 网站空间文件删不掉软文免费发布平台
  • 电子商务网站开发教程论文推广app平台有哪些
  • 郑州专业的网站建设优化自己的网站
  • 申请渠道门户网站是什么意思微信公众平台推广网站
  • 公司网站未备案公众号如何推广产品
  • 网站建设服务器环境配置郑州网站建设企业名录
  • e福州官方网站wordpress注册目录