当前位置: 首页 > news >正文

常州金坛建设局网站让别人做网站图片侵权

常州金坛建设局网站,让别人做网站图片侵权,开封美食网站建设规划,怎么自己做一个网址前言 用过 MQ 的同学#xff0c;可能会遇到过消息堆积的问题。而肥壕最近也踩上了这个坑#xff0c;但是发现结果竟然是这么一个意料之外的原因而导致的。 正文 那一晚月黑风高#xff0c;肥壕正准备踏上回家的路#xff0c;突然收到告警短信轰炸#xff01;“MQ 消息堆…前言 用过 MQ 的同学可能会遇到过消息堆积的问题。而肥壕最近也踩上了这个坑但是发现结果竟然是这么一个意料之外的原因而导致的。 正文 那一晚月黑风高肥壕正准备踏上回家的路突然收到告警短信轰炸“MQ 消息堆积告警 [TOPIC: XXX] ” 肥壕心里“万只草泥马崩腾~” 第一反应是“怎么肥事刚下班就来搞事情” 于是乎赶回公司赶紧打开电脑登上 RocketMQ 后台查看公司自己搭建的开源版RocketMQ 握草 (Д´)!!!    竟然堆积了3亿多条消息了 要知道出现消息堆积无在乎这个问题 生产者的生产速度 消费者的处理速度 生产者的生产速度骤增比如生产者的流量突然骤增。 消费速度变慢比如消费者实例 IO 阻塞严重或者宕机。 擦了一下头上的冷汗...赶紧登上消费者服务器瞧瞧。 应用运行正常服务器磁盘IO 正常网络正常 再去上去生产者的服务器咦...流量也很正常 什么佛了  ...生产者和消费者的应用都很正常但是为什么消息会堆积怎么多呢看着这堆积的数量越堆越多越发着急。 虽然说 RocketMQ 版能支持 10 亿级别的消息堆积不会因为消息堆积导致性能明显下降但是这堆积量很明显就是一个异常情况。 RocketMQ 有 BUG 没错这肯定是 RocketMQ 的锅 本篇完... 哈哈言归正传虽然肥壕拼爹不行但至少不能坑爹 进入消费者的工程查看一下日志emmm...没有发现报错没有错误日志...看起来好像一切都很正常。 咦...不过这个消费的速度是不是有点慢这不科学啊消费者可是配置了3个结点的消费集群啊按业务的需求量来说消费能力可是杠杠的呀。我再点开这个 TOPIC 的消费者信息。 咦这三个消费者的 ClientId 怎么会是一样呀 以多年采坑经验的直接告诉我难道是因为 ClientId 的相同的问题导致 broker 在分发消息的时候出现混乱从而导致消息不能正常推送给消费者 因为生产者和消费者都表现正常所以我猜测问题可能在于 Broker 这一块上。 基于这个推测那么我们就需要解决这几个问题 部署在不同的服务器上的两个消费者为什么 ClientId 是相同的呢 ClientId 相同会导致 broker 消息分发错误吗 问题分析 为什么 ClientId 相同呢我推测是因为 Docker 容器的问题。因为公司最近开始容器化阶段而刚好消费者的项目也在第一批容器化阶段的列表上。 有了解过 Docker 的小伙伴都知道当 Docker 进程启动时会在主机上创建一个名为docker0的虚拟网桥。宿主机上的 Docker 容器会连接到这个虚拟网桥上。虚拟网桥的工作方式和物理交换机类似这样主机上的所有容器就通过交换机连在了一个二层网络中。而 Docker 的网络模式一般有四种 Host 模式 Container 模式 None 模式 Bridge 模式 对这几个模式不清楚的同学自行找度娘 我们容器都是采用 Host 模式所以容器的网络跟宿主机是完全一致的。 可以看到这里第一个就是docker0网卡默认的 ip 都是172.17.0.1。所以显而易见ClientId 应该读取的都是docker0网卡的 IP这就是能解释为什么多个消费端的 ClientId 都一致的问题了。 那么接下来就是 clientId 的究竟是在哪里设置呢机智的我在 Github 的 Issues 搜索关键词 “Docker”啪啦啪啦一搜果然还是有不少踩过次坑的志同道合之士筛选了一番找到一个比较靠谱的  open issue 可以看到这个兄弟跟我的遇到的情况是一毛一样的而他的结论跟我上面的推测也是大致相同此时内心洋洋得意一番他这里还提到 clientId 是在 ClientConfig 类中 buildMQClientId 方法中定义的。 源码探索 进入 ClientConfig 类定位到 buildMQClientId 方法 public String buildMQClientId() {StringBuilder sb  new StringBuilder();sb.append(this.getClientIP());sb.append();sb.append(this.getInstanceName());if (!UtilAll.isBlank(this.unitName)) {sb.append();sb.append(this.unitName);}return sb.toString(); }通过这个相信大家都可以看出 clientId 的生成规则吧就是 消费者客户端的IP 实例名称 很明显问题就出在获取客户端 IP 上。 我们再继续看一下它究竟是如何获取客户端 IP 的 public class ClientConfig {... private String clientIP  RemotingUtil.getLocalAddress();... }public static String getLocalAddress() {try {// Traversal Network interface to get the first non-loopback and non-private addressEnumerationNetworkInterface enumeration  NetworkInterface.getNetworkInterfaces();ArrayListString ipv4Result  new ArrayListString();ArrayListString ipv6Result  new ArrayListString();while (enumeration.hasMoreElements()) {final NetworkInterface networkInterface  enumeration.nextElement();final EnumerationInetAddress en  networkInterface.getInetAddresses();while (en.hasMoreElements()) {final InetAddress address  en.nextElement();if (!address.isLoopbackAddress()) {if (address instanceof Inet6Address) {ipv6Result.add(normalizeHostAddress(address));} else {ipv4Result.add(normalizeHostAddress(address));}}}}// prefer ipv4if (!ipv4Result.isEmpty()) {for (String ip : ipv4Result) {if (ip.startsWith(127.0) || ip.startsWith(192.168)) {continue;}return ip;}return ipv4Result.get(ipv4Result.size() - 1);} else if (!ipv6Result.isEmpty()) {return ipv6Result.get(0);}//If failed to find,fall back to localhostfinal InetAddress localHost  InetAddress.getLocalHost();return normalizeHostAddress(localHost);} catch (Exception e) {log.error(Failed to obtain local address, e);}return null; }如果有操作过获取当前机器的 IP 的小伙伴应该对RemotingUtil.getLocalAddress()这个工具方法并不陌生~ 简单说就是获取当前机器网卡 IP但是由于容器的网络模式采用的是 host 模式也就意味着各个容器和宿主机都是处于同一个网络下所以容器中我们也可以看到 Docker - Server 所创建的docker 0网卡所以它读取的也就是 docker 0网卡所默认的 IP 地址 172.17.0.1。 跟运维同学沟通了一下目前由于是容器化的第一阶段所以先采用简单模式部署后面会慢慢替换成 k8s每个 pod 都有自己的独立 IP 到时网络会与宿主机和其他 pod 的相互隔离。emmm....k8s 听起来牛逼哄哄恰好最近也在看这方面的书。 这时候聪明的你可能会问 “不是还有一个实例名称的参数呢这个又怎么会相同呢”  别着急我们继续往下看 private String instanceName  System.getProperty(rocketmq.client.name, DEFAULT);public String getInstanceName() {return instanceName; }public void setInstanceName(String instanceName) {this.instanceName  instanceName; }public void changeInstanceNameToPID() {if (this.instanceName.equals(DEFAULT)) {this.instanceName  String.valueOf(UtilAll.getPid());} }getInstanceName() 方法其实直接获取 instanceName这个参数值但是这个参数值是什么时候赋值进去的呢没错就是通过changeInstanceNameToPID()这个方法赋值的在 consumer 在 start 的时候会调用此方法。 这个参数的逻辑很简单在初始化的时候首先会获取环境变量rocketmq.client.name是否有值如果没有就是用默认值DEFAULT。 然后 consumer 启动的时候会判断这参数值是否为DEFAULT如果是的话就调用 UtilAll.getPid()。 public static int getPid() {RuntimeMXBean runtime  ManagementFactory.getRuntimeMXBean();String name  runtime.getName(); // format: pidhostnametry {return Integer.parseInt(name.substring(0, name.indexOf()));} catch (Exception e) {return -1;} }通过方法名字我们就可以很清楚知道这个方法其实获取进程号的。那...为什么获取的进程号都是一致的呢? 聪明的你可以已经知道答案了对吧 这里就不得不提 Docker 的 三大特性 cgroup namespace unionFS 没错这里用的就是 namespace 技术啦。 Linux Namespace 是 Linux 内核提供的一个功能可以实现系统资源的隔离如PID、User ID、Network 等。 由于都是使用相同的基础镜像在最外层都是运行同样的 JAVA 工程所以我们可以进去容器里面看他们的进程号都是为 9。 经过肥壕的一系列巧妙的推理和论证在 Docker 容器 HOST 网络模式下 会生成相同的 clientId 到这里为止我们算是解决了上文推测的第一个问题 紧跟柯南肥壕的步伐我们继续推理第二个问题clientId 相同导致 Broker 分发消息错误 Consumer 在负载均衡的时候应该是根据 clientId 作为客户端消费者的唯一标识在消息下发的时候由于 clientId 的一致导致负载分发错误。 那么我们下面就要去探究一下 Consumer 的负载均衡究竟是如何实现的。一开始我以为消费端的负载均衡都是在 Broker 处理的由Broker 根据注册的 Consumer 把不同的 Queue 分配给不同的 Consumer。但是去看了一下源码上的 doc 描述文档和对源码进行一番的研究后结果发现自己见识还是太少了哈哈哈应该有小伙伴跟我开始的想法是一样的吧 先来补充一下 RocketMQ 的整体架构 image1.png 由于篇幅问题这里我只讲解一下 Broker 和 consumer 之间的关系其他的角色如果有不懂的可以看一下我之前写的 RocketMQ 介绍篇的文章 Consumer 与 NameServer 集群中的其中一个节点随机选择建立长连接定期从 NameServer 获取 Topic 路由信息。 根据获取 Topic 路由信息 与 Broker 建立长连接且定时向 Broker 发送心跳。 Broker 接收心跳消息的时候会把 Consumer 的信息保存到本地缓存变量 consumerTable。上图大致讲解了一下 consumerTable 的存储结构和内容最主要的是它缓存了每个 consumer 的 clientId。 关于 Consumer 的消费模式我直接引用源码的解释 在 RocketMQ 中Consumer 端的两种消费模式Push/Pull都是基于拉模式来获取消息的而在 Push 模式只是对 Pull 模式的一种封装其本质实现为消息拉取线程在从服务器拉取到一批消息后然后提交到消息消费线程池后又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息则延迟一下又继续拉取。 在两种基于拉模式的消费方式Push/Pull中均需要 Consumer 端在知道从 Broker 端的哪一个消息队列—队列中去获取消息。因此有必要在 Consumer 端来做负载均衡即 Broker 端中多个 MessageQueue 分配给同一个ConsumerGroup 中的哪些 Consumer 消费。 所以简单来说不管是 Push 还是 Pull 模式消息消费的控制权在 Consumer 上所以 Consumer 的负载均衡实现是在 Consumer 的 Client 端上。 通过查看源码可以发现 RebalanceService 会完成负载均衡服务线程每隔20s执行一次RebalanceService 线程的run() 方法最终调用的是 RebalanceImpl 类的 rebalanceByTopic()方法该方法是实现 Consumer 端负载均衡的核心。这里rebalanceByTopic()方法会根据消费者通信类型为“广播模式”还是“集群模式”做不同的逻辑处理。这里主要来看下集群模式下的主要处理流程 private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {..... // 省略}case CLUSTERING: {// 获取该Topic主题下的消息消费队列集合SetMessageQueue mqSet  this.topicSubscribeInfoTable.get(topic);// 向 broker 获取消费者的clientIdListString cidAll  this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null  mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn(doRebalance, {}, but the topic[{}] not exist., consumerGroup, topic);}}if (null  cidAll) {log.warn(doRebalance, {} {}, get consumer id list failed, consumerGroup, topic);}if (mqSet ! null  cidAll ! null) {ListMessageQueue mqAll  new ArrayListMessageQueue();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);// 默认平均分配算法AllocateMessageQueueStrategy strategy  this.allocateMessageQueueStrategy;ListMessageQueue allocateResult  null;try {allocateResult  strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error(AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName{}, strategy.getName(),e);return;}SetMessageQueue allocateResultSet  new HashSetMessageQueue();if (allocateResult ! null) {allocateResultSet.addAll(allocateResult);}boolean changed  this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info(rebalanced result changed. allocateMessageQueueStrategyName{}, group{}, topic{}, clientId{}, mqAllSize{}, cidAllSize{}, rebalanceResultSize{}, rebalanceResultSet{},strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;} }(1) 从本地缓存变量 topicSubscribeInfoTable 中获取该Topic主题下的消息消费队列集合mqSet (2) 根据 topic 和 consumerGroup 为参数调用findConsumerIdList()方法向 Broker 端发送获取该消费组下 clientId 列表 (3) 先对 Topic 下的消息消费队列、消费者Id排序然后用消息队列分配策略算法默认为消息队列的平均分配算法计算出待拉取的消息队列。这里的平均分配算法类似于分页的算法将所有 MessageQueue 排好序类似于记录将所有消费端 Consumer 排好序类似页数并求出每一页需要包含的平均 size 和每个页面记录的范围 range最后遍历整个range 而计算出当前 Consumer 端应该分配到的记录这里即为MessageQueue。 (4) 然后调用updateProcessQueueTableInRebalance()方法具体的做法是先将分配到的消息队列集合mqSet与processQueueTable做一个过滤比对。 上图中 processQueueTable 标注的红色部分表示与分配到的消息队列集合 mqSet 互不包含。将这些队列设置Dropped 属性为 true然后查看这些队列是否可以移除出 processQueueTable 缓存变量这里具体执行removeUnnecessaryMessageQueue()方法即每隔1s 查看是否可以获取当前消费处理队列的锁拿到的话返回true。如果等待1s后仍然拿不到当前消费处理队列的锁则返回false。如果返回true则从 processQueueTable 缓存变量中移除对应的 Entry 上图中 processQueueTable 的绿色部分表示与分配到的消息队列集合 mqSet 的交集。判断该 ProcessQueue 是否已经过期了在Pull模式的不用管如果是 Push 模式的设置 Dropped 属性为 true并且调用removeUnnecessaryMessageQueue()方法像上面一样尝试移除 Entry 消息消费队列在同一消费组不同消费者之间的负载均衡其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费一个消息消费者能同时消费多个消息队列。 上面这部分内容是摘自RocketMQ 源码中 docs的文档不知道你们看懂了没反正我是看了好几遍才理解了 其实看步骤3的图负载均衡的实现原来也就一目了然了简单说就是给不同的消费者分配数量相同的消费队列。而消费者都会生成 clientId 的唯一标识但是根据我们上文的推理在容器中并且是Host网络模式下会生成一致的 clientId。 Emmmm....到这里想必大家都能猜到究竟是哪里出问题了吧。 没错问题应该就出在步骤3中平均分配的计算方式。 Override public ListMessageQueue allocate(String consumerGroup, String currentCID, ListMessageQueue mqAll, ListString cidAll) {if (currentCID  null || currentCID.length()  1) {throw new IllegalArgumentException(currentCID is empty);}if (mqAll  null || mqAll.isEmpty()) {throw new IllegalArgumentException(mqAll is null or mqAll empty);}if (cidAll  null || cidAll.isEmpty()) {throw new IllegalArgumentException(cidAll is null or cidAll empty);}ListMessageQueue result  new ArrayListMessageQueue();if (!cidAll.contains(currentCID)) {log.info([BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {},consumerGroup,currentCID,cidAll);return result;}// 当前clientId所在的下标int index  cidAll.indexOf(currentCID);int mod  mqAll.size() % cidAll.size();int averageSize mqAll.size()  cidAll.size() ? 1 : (mod  0  index  mod ? mqAll.size() / cidAll.size() 1 : mqAll.size() / cidAll.size());int startIndex  (mod  0  index  mod) ? index * averageSize : index * averageSize  mod;int range  Math.min(averageSize, mqAll.size() - startIndex);for (int i  0; i  range; i) {result.add(mqAll.get((startIndex  i) % mqAll.size()));}return result; }上面的计算可以看起来有点绕但是其实看懂了之后说白就是计算当前 Consumer 所分配的消息队列就好比上图步骤3中的图示。 假设当前只有一个 consumer 那我们的消费其实是完全正常的因为当前 Topic 下所有的队列都会分配给当前的 consumer 也不存在负载均衡的问题。 假设当前有两个 consumer按照正常的计算方式结果应该是这样子的。但是因为cidAll是两个重复的 clientId所以两个 consumer 获得的 index 都是0自然他们分配的都是相同的 MessageQueue。这就能解释开头为什么能看到是有消费的日志但是消费速度非常慢的原因了。 解决方法 解决负载均衡错误 罪魁祸首clientId 经过一翻精彩的推论大家应该知道导致 Consumer 负载均衡错误的根本原因就是Consumer 客户端生成的 clientId 一致所以解决这个问题重点就是在于修改 clientId 的生成规则。上面简单地从源码分析了一下 clientId 的生成规则 我们可以通过手动设置 rocketmq.client.name 这个环境变量生成自定义唯一的 clientId 。 肥壕这里在原来的 pid 后再加上了时间戳 PostConstruct public void init() {System.setProperty(rocketmq.client.name, String.valueOf(UtilAll.getPid())    System.currentTimeMillis()); }解决消息堆积 终于解决了根本问题了行吧万事俱备只差上线队列里头堆积的3亿多条消息还在等着消费呢。可谓是一时堆积一时爽一直堆积一直爽 刚上线了不久emmm...效果显著堆积的消息数量逐渐减少了。但是另外一个告警来了mongodb 告警了 我差点忘记了消费者对消息业务处理后后会写入mongodb现在消费的流量入口突然骤增mongodb反倒扛不住了。不过还好历史的消息不重要是可以丢失的。于是肥壕果断去后台重置了一下消费点位妥了现在消费正常了mongodb也正常了。呼~有惊无险差点又酿造了另外一起事故。 总结 RocketMQ 的 consumer 客户端都会生成 clientId 唯一标识clientId 的生成规则是客户端IP客户端进程号 Docker 容器部署如果网络模式使用 Host 模式容器中的应用都会获取 Docker 网桥的默认IP RocketMQ 的 consumer 端负载均衡是在客户端实现的consumer 客户端会缓存对应的 Topic 消费队列默认采用消息队列的平均分配算法如果 clientId 相同那么所有的客户端都会分配到相同的队列导致消费异常。 对于消息堆积的处理要做好全面的检查。不能被瞬间大流量的消费入口而影响其他业务不然就像肥壕一样搞出另一起事故了大家如果有更好的消息堆积处理方案欢迎留言提议
http://www.pierceye.com/news/122904/

相关文章:

  • 制作企业网站怎么报价可以做我女朋友吗网站
  • 广西玉林网站建设正规公司建手机网站
  • 乐清网站制作公司招聘做私人网站 违法
  • 珠海电脑自己建网站电子商务排名
  • 怎样做网站的背景图片安卓原生开发
  • 现代电子商务网站建设技术wordpress采用的mvc
  • 台州网站建设团队如何申请建设网站域名
  • 资料查询网站建设桂林微代码网络科技有限公司
  • 做暖视频网站免费番禺网站制作技术
  • 如何做网站百度排名优化深圳市住房和建设网站
  • 汉沽做网站简单网站建设
  • 建信建设投资有限公司网站网站建设app小程序
  • wordpress文章所有图片seo中文含义
  • 免费网站建设开发个人 网站备案 幕布
  • 公司网站设计 优帮云网站开发合同注意事件有哪些
  • 网站建设费用用温州建设局老网站
  • 做网站全部乱码怎么办网络平台销售
  • wordpress建立移动m站wordpress免费主题企业
  • 珠海市网站建设重庆建站公司网站模板
  • 网页设计与网站开发pdf备案网站可以做接码平台么
  • 国外网站国内备案南京网站seo优化公司
  • 岱山县网站建设网站后台管理系统怎么上传
  • 做网站需要什么东西重庆市招投标网官网
  • 潢川手机网站建设戴尔公司网站开发的经营目标
  • 创建网站代码是什么问题wordpress屏蔽广告插件下载
  • 网站接入服务提供商纪检网站建设计划
  • 佛山做网站公司有哪些做拆分盘网站
  • 沈阳做微网站注册一个有限公司需要多少钱
  • 网站首页logo怎么修改优化方案英语答案
  • 东南亚网站建设市场用照片做视频的模板下载网站