代刷网网站建设,托管服务器是什么意思,网站建设属于办公费吗,百度上海分公司背景
每次使用flink消费kafka消息的时候我就被这两个参数enableCommitOnCheckpoints 和 enable.auto.commit困扰#xff0c;本文就来从源码看看这两个参数的作用
enableCommitOnCheckpoints 和 enable.auto.commit参数
1.FlinkKafkaConsumerBase的open方法#xff0c;查看…背景
每次使用flink消费kafka消息的时候我就被这两个参数enableCommitOnCheckpoints 和 enable.auto.commit困扰本文就来从源码看看这两个参数的作用
enableCommitOnCheckpoints 和 enable.auto.commit参数
1.FlinkKafkaConsumerBase的open方法查看offsetCommitMode的赋值
public void open(Configuration configuration) throws Exception {
// determine the offset commit mode
this.offsetCommitMode OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints,
((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());}2.OffsetCommitModes.fromConfiguration方法
public static OffsetCommitMode fromConfiguration(
boolean enableAutoCommit,
boolean enableCommitOnCheckpoint,
boolean enableCheckpointing) {if (enableCheckpointing) {
// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled
return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;
} else {
// else, the mode depends only on whether auto committing is enabled in the provided Kafka properties
return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
}
}从这个代码可知enableCommitOnCheckpoint 和 enableAutoCommit是不会同时存在的也就是flink如果在checkpoint的时候提交偏移他就肯定不会设置enableAutoCommit自动提交反之亦然
enableCommitOnCheckpoint 提交偏移的关键代码
1.FlinkKafkaConsumerBase.snapshotState方法
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug(snapshotState() called on closed source);
} else {
unionOffsetStates.clear();final AbstractFetcher?, ? fetcher this.kafkaFetcher;
if (fetcher null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.EntryKafkaTopicPartition, Long subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
// 这里如果是checkpoint模式会在checkpoint的时候保存offset到状态中
if (offsetCommitMode OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}}2.FlinkKafkaConsumerBase.notifyCheckpointComplete方法
Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
final AbstractFetcher?, ? fetcher this.kafkaFetcher;
final int posInMap pendingOffsetsToCommit.indexOf(checkpointId);fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);enable.auto.commit参数
1.KafkaConsumerThread.run线程
if (records null) {
try {
records consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}2.KafkaConsumer的poll方法
private ConsumerRecordsK, V poll(final Timer timer, final boolean includeMetadataInTimeout) {
acquireAndEnsureOpen();
try {
this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) {
throw new IllegalStateException(Consumer is not subscribed to any topics or assigned any partitions);
}// poll for new data until the timeout expires
do {
client.maybeTriggerWakeup();
// updateAssignmentMetadataIfNeeded方法是关键
if (includeMetadataInTimeout) {
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
} else {
while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
log.warn(Still waiting for metadata);
}
}final MapTopicPartition, ListConsumerRecordK, V records pollForFetches(timer);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() 0 || client.hasPendingRequests()) {
client.transmitSends();
}return this.interceptors.onConsume(new ConsumerRecords(records));
}
} while (timer.notExpired());return ConsumerRecords.empty();
} finally {
release();
this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs());
}
}3.KafkaConsumer.updateAssignmentMetadataIfNeeded方法
boolean updateAssignmentMetadataIfNeeded(final Timer timer) {
if (coordinator ! null !coordinator.poll(timer)) {
return false;
}return updateFetchPositions(timer);
}4.ConsumerCoordinator.poll方法public boolean poll(Timer timer) {
maybeUpdateSubscriptionMetadata();invokeCompletedOffsetCommitCallbacks();if (subscriptions.partitionsAutoAssigned()) {
if (protocol null) {
throw new IllegalStateException(User configured ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to empty while trying to subscribe for group protocol to auto assign partitions);
}
// Always update the heartbeat last poll time so that the heartbeat thread does not leave the
// group proactively due to application inactivity even if (say) the coordinator cannot be found.
pollHeartbeat(timer.currentTimeMs());
if (coordinatorUnknown() !ensureCoordinatorReady(timer)) {
return false;
}if (rejoinNeededOrPending()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the clusters topics at least once before joining.
if (subscriptions.hasPatternSubscription()) {
// For consumer group that uses pattern-based subscription, after a topic is created,
// any consumer that discovers the topic after metadata refresh can trigger rebalance
// across the entire consumer group. Multiple rebalances can be triggered after one topic
// creation if consumers refresh metadata at vastly different times. We can significantly
// reduce the number of rebalances caused by single topic creation by asking consumer to
// refresh metadata before re-joining the group as long as the refresh backoff time has
// passed.
if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) 0) {
this.metadata.requestUpdate();
}if (!client.ensureFreshMetadata(timer)) {
return false;
}maybeUpdateSubscriptionMetadata();
}if (!ensureActiveGroup(timer)) {
return false;
}
}
} else {
// For manually assigned partitions, if there are no ready nodes, await metadata.
// If connections to all nodes fail, wakeups triggered while attempting to send fetch
// requests result in polls returning immediately, causing a tight loop of polls. Without
// the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
// awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
// When group management is used, metadata wait is already performed for this scenario as
// coordinator is unknown, hence this check is not required.
if (metadata.updateRequested() !client.hasReadyNodes(timer.currentTimeMs())) {
client.awaitMetadataUpdate(timer);
}
}
// 这里是重点
maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
return true;
}5.ConsumerCoordinatormaybeAutoCommitOffsetsAsync方法
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}看到没这里就是判断autoCommitEnabled的地方这里如果打开了自动提交功能的话就会进行offset的提交
特别重要的两点
1.kafkaconsumer当开始进行消费时即使不提交任何偏移量也不影响它消费消息他还是能正常消费kafka主题的消息这里提交偏移的主要作用在于当kafkaconsumer断线然后需要重连kafka broker进行消费时此时它一般会从它最后提交的offset位置开始消费(此时还依赖于没有设置startFromLateststartFromEarlieststartFromTimeStamp的情况下)这才是consumer提交offset偏移的最大意义
2.对于flink来说由于每次重启的时候flink的consumer都会从checkpoint中把偏移取出来并设置所以flink的consumer在消息消费过程中无论通过enableCommitOnCheckpoint 还是enableAutoCommit提交的偏移并没有意义因为并没有使用到它的意义只在于flink没有从checkpoint中启动时此时flink的consumer才会从enableCommitOnCheckpoint 、enableAutoCommit提交的偏移开始消费消息(此时还依赖于没有设置startFromLateststartFromEarlieststartFromTimeStamp的情况下)
参考文章https://blog.csdn.net/qq_42009500/article/details/119875158