青岛安装建设股份公司网站,织梦与wordpress详细比较,抖音视频排名优化,网络营销网站建设方案Consumer 需要向 Kafka 汇报自己的位移数据#xff0c;这个汇报过程被称为提交位移#xff08;Committing Offsets#xff09;。因为 Consumer 能够同时消费多个分区的数据#xff0c;所以位移的提交实际上是在分区粒度上进行的#xff0c;即 Consumer 需要为分配给它的每…Consumer 需要向 Kafka 汇报自己的位移数据这个汇报过程被称为提交位移Committing Offsets。因为 Consumer 能够同时消费多个分区的数据所以位移的提交实际上是在分区粒度上进行的即 Consumer 需要为分配给它的每个分区提交各自的位移数据。
提交位移主要是为了表征 Consumer 的消费进度这样当 Consumer 发生故障重启之后就能够从 Kafka 中读取之前提交的位移值然后从相应的位移处继续消费从而避免整个消费过程重来一遍。
从用户的角度来说位移提交分为自动提交和手动提交从 Consumer 端的角度来说位移提交分为同步提交和异步提交。
自动提交
自动提交默认全部为同步提交
自动提交相关参数
enable.auto.commit (bool) – 如果为True将自动定时提交消费者offset。默认为True。auto.commit.interval.ms(int) – 自动提交offset之间的间隔毫秒数。如果enable_auto_commit 为true默认值为 5000。
当设置 enable.auto.commit 为 trueKafka 会保证在开始调用 poll 方法时提交上次 poll 返回的所有消息。从顺序上来说poll 方法的逻辑是先提交上一批消息的位移再处理下一批消息因此它能保证不出现消费丢失的情况。
网上有说 自动提交位移的一个问题在于它可能会出现重复消费。 如果设置 enable.auto.commit 为 trueConsumer 按照 auto.commit.interval.ms设置的值默认5秒自动提交一次位移。我们假设提交位移之后的 3 秒发生了 Rebalance 操作。在 Rebalance 之后所有 Consumer 从上一次提交的位移处继续消费但该位移已经是 3 秒前的位移数据了故在 Rebalance 发生前 3 秒消费的所有数据都要重新再消费一次。虽然你能够通过减少 auto.commit.interval.ms 的值来提高提交频率但这么做只能缩小重复消费的时间窗口不可能完全消除它。这是自动提交机制的一个缺陷。 在实际测试中未发现上述情况kafka 版本 2.13 而是会等待所有消费者消费完当前消息或者等待消费者超时等待过程中会报如下 warning 之后才会 reblance。
手动提交
手动提交可以自己选择是同步提交commitSync还是异步提交commitAsync commitAsync 不能够替代 commitSync。commitAsync 的问题在于出现问题时它不会自动重试。因为它是异步操作倘若提交失败后自动重试那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此异步提交的重试其实没有意义所以 commitAsync 是不会重试的。 手动提交我们需要将 commitSync 和 commitAsync 组合使用才能到达最理想的效果原因有两个:
我们可以利用 commitSync 的自动重试来规避那些瞬时错误比如网络的瞬时抖动Broker 端 GC 等。因为这些问题都是短暂的自动重试通常都会成功因此我们不想自己重试而是希望 Kafka Consumer 帮我们做这件事。我们不希望程序总处于阻塞状态影响 TPS。我们不希望程序总处于阻塞状态影响 TPS。
同时使用 commitSync() 和 commitAsync()
对于常规性、阶段性的手动提交我们调用 commitAsync() 避免程序阻塞而在 Consumer 要关闭前我们调用 commitSync() 方法执行同步阻塞式的位移提交以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后我们既实现了异步无阻塞式的位移管理也确保了 Consumer 位移的正确性.
手动提交和自动提交中的 reblance 问题 如果设置为手动提交当集群满足 reblance 的条件时集群会直接 reblance不会等待所有消息被消费完这会导致所有未被确认的消息会重新被消费会出现重复消费的问题如果设置为自动提交当集群满足 reblance 的条件时集群不会马上 reblance而是会等待所有消费者消费完当前消息或者等待消费者超时等待过程中会报如下 warning 之后才会 reblance。python kafka-python 输出信息如下
[WARNING]Heartbeat failed for group scan_result because it is rebalancing kafka 中加入消费者时kafka 会输出如下信息