中国万网网站空间申请,最好的手表网站,贵州城乡住房和建设厅网站,抖店怎么推广目标
解决MQ消息的积压
背景
菜馆系统----------- 系统读取消息#xff0c;处理业务逻辑#xff0c;持久化订单和菜品数据#xff0c;然后将其显示在菜品管理客户端上。 最初我们的用户基数很小#xff0c;上线后的一段时间内#xff0c;MQ消息通信还算顺利。 随着用户…目标
解决MQ消息的积压
背景
菜馆系统----------- 系统读取消息处理业务逻辑持久化订单和菜品数据然后将其显示在菜品管理客户端上。 最初我们的用户基数很小上线后的一段时间内MQ消息通信还算顺利。 随着用户规模的扩大每个商家每天都会产生大量的订单数据每个订单都包含多个菜品。这导致我们的菜肴管理系统的数据量显着增加。 某一天商家投诉称用户下单后平板上的菜品列表出现延迟。 几分钟后厨房才看到菜品。
这能行
很明显出现这样的菜品展示延迟肯定和Kafka有关所以我们先从排查Kafka开始。 正如预期的那样有一个 message backlog 。 通常消息积压的原因有
MQ 使用者已关闭。MQ 生产者生成消息的速率超过 MQ 消费者消费消息的速率。
我们检查了监控系统发现我们的MQ消费服务运行正常没有任何异常。 剩下的原因可能是MQ消费者的消息处理速度变慢了。 接下来我查看了菜品管理表只有几十万条记录。
首先定位处理MQ日志比较慢的地方 在代码中添加了一些日志来打印出MQ消费者中各个关键点所花费的时间。 确实有两个地方延迟有点高 1、有一段代码在for循环中一条一条的查询数据库。 2、有一段代码执行多条件数据查询。
解决循环查询
对于在for循环中一一查询数据库的代码我使用参数集合将其更改为 batch query 。 原代码如下
public ListUser queryUser(ListUser searchList) {if (CollectionUtils.isEmpty(searchList)) {return Collections.emptyList();}
ListUser result Lists.newArrayList();searchList.forEach(user - result.add(userMapper.getUserById(user.getId())));return result;
}改进一下
public ListUser queryUser(ListUser searchList) {if (CollectionUtils.isEmpty(searchList)) {return Collections.emptyList();}ListLong ids searchList.stream().map(User::getId).collect(Collectors.toList());return userMapper.getUserByIds(ids);
}很简单的调整搞一个ids集合,轻松解决挤压问题。
第二次遇到消息积压
这一次它是零星的只是偶尔发生而不是大多数时候发生。 查了一下菜品管理表现在已经有几百万条记录了 通过监控和DBA每天的慢查询邮件我注意到了一些异常情况。
我发现有些SQL语句的WHERE条件完全相同只是参数值不同导致使用的索引不同。 例如 order_id123 使用索引 a而 order_id124 使用索引 b。 该表查询场景众多为了满足不同的业务需求增加了多个复合索引。
MySQL 根据几个因素选择索引 1、通过数据采样估计要扫描的行数。更多行可能会导致更高的 I/O 操作和更高的 CPU 使用率。 2、是否使用临时表也会影响查询速度。 3、是否需要排序因为它会影响查询速度。
考虑到这些因素和其他因素MySQL 优化器会选择它认为最合适的索引。
MySQL优化器通过采样来估计要扫描的行数这涉及到选择一些数据页进行统计估计从而引入一些误差。
由于MVCC设计存在多个版本的数据页。例如删除的数据可能在其他事务中仍然可见因此索引并未真正删除。这可能会导致统计数据不准确并影响优化器的决策。 这些因素都会导致MySQL在执行SQL语句时出现 错误索引 为了解决MySQL选择错误索引的问题我们使用 FORCE INDEX 关键字强制SQL查询使用索引a。
FORCE INDEX
force index() 方法强制使用这个索引
第三次遇到消息积压
半年后的某一天检查监控系统发现Kafka消息再次积压。 检查了MySQL索引发现使用了正确的索引但数据查询仍然很慢。 检查菜品管理表短短六个月内就增长到了 3000 万条记录。
通常当单个表包含太多数据时查询和写入性能都会下降。 这次查询缓慢的原因是数据量太大。
大数据表解决这个问题我们需要 1、实施数据库和表分区 2、备份历史数据
但是我们的体量和预算不支持分库分表
因此我们决定备份历史数据。 经过与产品经理和DBA讨论我们决定菜品管理表只保留最近30天的数据而早于该时间的数据将移至 historical table 。 经过这样的优化菜品管理表在30天内只积累了几百万条记录对性能的影响很小。
第四次遇到消息积压
又又又没错来了 年后的一个下午当我查看公司邮件时发现大量关于Kafka消息积压的监控警报邮件。 下午这个时间点很奇怪。。。
经过上面的排查都没问题。 我询问订单团队当天下午是否发布了新版本或执行了任何特定功能。因为我们的菜品管理系统是他们的下游系统跟他们的运营有直接关系。有同事提到半个小时前他们做了一个作业批量更新了几万个订单的状态。更改订单状态会自动发送 MQ 消息。 这导致他们的程序在很短的时间内生成了大量的MQ消息。 我们的 MQ 使用者无法足够快地处理这些消息导致消息积压。 我们检查了Kafka消息积压情况发现有几十万条消息在排队。
查看Kafka消息积压情况
https://www.cnblogs.com/lanbojini/p/17314699.html
快速提高MQ消费者的处理速度
我们考虑了两种解决方案 1、增加分区数量。 2、使用线程池来处理消息。
然而由于消息已经积压在现有分区中因此添加新分区并没有多大帮助。 因此我们决定重构代码以使用 thread pool 来处理消息。 为了开始消耗积压的消息我们将线程池的核心线程数和最大线程数增加到 50。 此次调整后积压的数十万条消息在20分钟左右就得到了处理。 解决这个问题后我们保留了消息消费的线程池逻辑将核心线程数设置为 8 最大线程数设置为 10 。 这使我们能够临时调整线程计数以快速解决未来的任何消息积压问题而不会显着影响用户。
注意使用线程池消费MQ消息并不是通用的解决方案。它有一些缺点例如潜在的消息排序问题以及导致服务器 CPU 使用率飙升的风险。另外如果在多线程环境下调用第三方接口可能会导致第三方服务过载而崩溃。
结语
MQ积压没有完美的解决方案只有最适合当前业务场景的解决方案。fuck everythimg