当前位置: 首页 > news >正文

代刷网网站建设托管服务器是什么意思

代刷网网站建设,托管服务器是什么意思,网站建设属于办公费吗,百度上海分公司背景 每次使用flink消费kafka消息的时候我就被这两个参数enableCommitOnCheckpoints 和 enable.auto.commit困扰#xff0c;本文就来从源码看看这两个参数的作用 enableCommitOnCheckpoints 和 enable.auto.commit参数 1.FlinkKafkaConsumerBase的open方法#xff0c;查看…背景 每次使用flink消费kafka消息的时候我就被这两个参数enableCommitOnCheckpoints 和 enable.auto.commit困扰本文就来从源码看看这两个参数的作用 enableCommitOnCheckpoints 和 enable.auto.commit参数 1.FlinkKafkaConsumerBase的open方法查看offsetCommitMode的赋值 public void open(Configuration configuration) throws Exception { // determine the offset commit mode this.offsetCommitMode OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());}2.OffsetCommitModes.fromConfiguration方法 public static OffsetCommitMode fromConfiguration( boolean enableAutoCommit, boolean enableCommitOnCheckpoint, boolean enableCheckpointing) {if (enableCheckpointing) { // if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabled return (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED; } else { // else, the mode depends only on whether auto committing is enabled in the provided Kafka properties return (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED; } }从这个代码可知enableCommitOnCheckpoint 和 enableAutoCommit是不会同时存在的也就是flink如果在checkpoint的时候提交偏移他就肯定不会设置enableAutoCommit自动提交反之亦然 enableCommitOnCheckpoint 提交偏移的关键代码 1.FlinkKafkaConsumerBase.snapshotState方法 public final void snapshotState(FunctionSnapshotContext context) throws Exception { if (!running) { LOG.debug(snapshotState() called on closed source); } else { unionOffsetStates.clear();final AbstractFetcher?, ? fetcher this.kafkaFetcher; if (fetcher null) { // the fetcher has not yet been initialized, which means we need to return the // originally restored offsets or the assigned partitions for (Map.EntryKafkaTopicPartition, Long subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) { unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue())); } // 这里如果是checkpoint模式会在checkpoint的时候保存offset到状态中 if (offsetCommitMode OffsetCommitMode.ON_CHECKPOINTS) { // the map cannot be asynchronously updated, because only one checkpoint call can happen // on this function at a time: either snapshotState() or notifyCheckpointComplete() pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState); }}2.FlinkKafkaConsumerBase.notifyCheckpointComplete方法 Override public final void notifyCheckpointComplete(long checkpointId) throws Exception { final AbstractFetcher?, ? fetcher this.kafkaFetcher; final int posInMap pendingOffsetsToCommit.indexOf(checkpointId);fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);enable.auto.commit参数 1.KafkaConsumerThread.run线程 if (records null) { try { records consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } }2.KafkaConsumer的poll方法 private ConsumerRecordsK, V poll(final Timer timer, final boolean includeMetadataInTimeout) { acquireAndEnsureOpen(); try { this.kafkaConsumerMetrics.recordPollStart(timer.currentTimeMs());if (this.subscriptions.hasNoSubscriptionOrUserAssignment()) { throw new IllegalStateException(Consumer is not subscribed to any topics or assigned any partitions); }// poll for new data until the timeout expires do { client.maybeTriggerWakeup(); // updateAssignmentMetadataIfNeeded方法是关键 if (includeMetadataInTimeout) { if (!updateAssignmentMetadataIfNeeded(timer)) { return ConsumerRecords.empty(); } } else { while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) { log.warn(Still waiting for metadata); } }final MapTopicPartition, ListConsumerRecordK, V records pollForFetches(timer); if (!records.isEmpty()) { // before returning the fetched records, we can send off the next round of fetches // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. if (fetcher.sendFetches() 0 || client.hasPendingRequests()) { client.transmitSends(); }return this.interceptors.onConsume(new ConsumerRecords(records)); } } while (timer.notExpired());return ConsumerRecords.empty(); } finally { release(); this.kafkaConsumerMetrics.recordPollEnd(timer.currentTimeMs()); } }3.KafkaConsumer.updateAssignmentMetadataIfNeeded方法 boolean updateAssignmentMetadataIfNeeded(final Timer timer) { if (coordinator ! null !coordinator.poll(timer)) { return false; }return updateFetchPositions(timer); }4.ConsumerCoordinator.poll方法public boolean poll(Timer timer) { maybeUpdateSubscriptionMetadata();invokeCompletedOffsetCommitCallbacks();if (subscriptions.partitionsAutoAssigned()) { if (protocol null) { throw new IllegalStateException(User configured ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG to empty while trying to subscribe for group protocol to auto assign partitions); } // Always update the heartbeat last poll time so that the heartbeat thread does not leave the // group proactively due to application inactivity even if (say) the coordinator cannot be found. pollHeartbeat(timer.currentTimeMs()); if (coordinatorUnknown() !ensureCoordinatorReady(timer)) { return false; }if (rejoinNeededOrPending()) { // due to a race condition between the initial metadata fetch and the initial rebalance, // we need to ensure that the metadata is fresh before joining initially. This ensures // that we have matched the pattern against the clusters topics at least once before joining. if (subscriptions.hasPatternSubscription()) { // For consumer group that uses pattern-based subscription, after a topic is created, // any consumer that discovers the topic after metadata refresh can trigger rebalance // across the entire consumer group. Multiple rebalances can be triggered after one topic // creation if consumers refresh metadata at vastly different times. We can significantly // reduce the number of rebalances caused by single topic creation by asking consumer to // refresh metadata before re-joining the group as long as the refresh backoff time has // passed. if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) 0) { this.metadata.requestUpdate(); }if (!client.ensureFreshMetadata(timer)) { return false; }maybeUpdateSubscriptionMetadata(); }if (!ensureActiveGroup(timer)) { return false; } } } else { // For manually assigned partitions, if there are no ready nodes, await metadata. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop. // When group management is used, metadata wait is already performed for this scenario as // coordinator is unknown, hence this check is not required. if (metadata.updateRequested() !client.hasReadyNodes(timer.currentTimeMs())) { client.awaitMetadataUpdate(timer); } } // 这里是重点 maybeAutoCommitOffsetsAsync(timer.currentTimeMs()); return true; }5.ConsumerCoordinatormaybeAutoCommitOffsetsAsync方法 public void maybeAutoCommitOffsetsAsync(long now) { if (autoCommitEnabled) { nextAutoCommitTimer.update(now); if (nextAutoCommitTimer.isExpired()) { nextAutoCommitTimer.reset(autoCommitIntervalMs); doAutoCommitOffsetsAsync(); } } }看到没这里就是判断autoCommitEnabled的地方这里如果打开了自动提交功能的话就会进行offset的提交 特别重要的两点 1.kafkaconsumer当开始进行消费时即使不提交任何偏移量也不影响它消费消息他还是能正常消费kafka主题的消息这里提交偏移的主要作用在于当kafkaconsumer断线然后需要重连kafka broker进行消费时此时它一般会从它最后提交的offset位置开始消费(此时还依赖于没有设置startFromLateststartFromEarlieststartFromTimeStamp的情况下)这才是consumer提交offset偏移的最大意义 2.对于flink来说由于每次重启的时候flink的consumer都会从checkpoint中把偏移取出来并设置所以flink的consumer在消息消费过程中无论通过enableCommitOnCheckpoint 还是enableAutoCommit提交的偏移并没有意义因为并没有使用到它的意义只在于flink没有从checkpoint中启动时此时flink的consumer才会从enableCommitOnCheckpoint 、enableAutoCommit提交的偏移开始消费消息(此时还依赖于没有设置startFromLateststartFromEarlieststartFromTimeStamp的情况下) 参考文章https://blog.csdn.net/qq_42009500/article/details/119875158
http://www.pierceye.com/news/527524/

相关文章:

  • 沈阳微营销网站制作厨师培训机构 厨师短期培训班
  • 个人备案用作资讯网站网站开发yuanmus
  • 大连网站建设 选领超科技网站建设实录音乐
  • 上海网站建设流wordpress关闭会员
  • 网站运营的目的及意义pc网站怎么适配移动端
  • 网站深圳优化建设10月上海娱乐场所又要关门了
  • 怎么做网页文件打开别的网站河南省城乡和住房建设厅
  • 泰州公司做网站成都网页设计培训中心
  • 网站业务需求文档网站正在建设中 动态
  • 一级a做爰电影片免费网站姑苏区住房建设局网站
  • 宁夏建设教育协会网站医院网站跳出率高
  • 网站建设佰首选金手指二关于网站建设的职位
  • 网站建设公司商务网站项目书中堂东莞网站建设
  • 欧美品牌网站设计wordpress好用的文章编辑器
  • 长春网站建设q479185700強wordpress数学公式的代码
  • 郑州软件app开发公司嘉兴优化网站排名
  • 可以建微信网站的做网站的项目开发计划书
  • 湖北网站建设模板下载太原线上教学
  • 西宁网站建设开发公司开发网站监控工具
  • 外贸网站优势杭州百度快速排名提升
  • 制作个人网站论文ipage wordpress
  • 十堰建设网站首页优化大师免安装版
  • 深圳建设企业网站公司敬请期待素材
  • 网络营销网站建设课程wordpress 文章描述
  • 网站制作有什么好的介绍北京网站建设及推广招聘
  • 殡仪馆做网站的好处制作响应式网站报价
  • 网站建设平台杭州做网站前台后台是怎么连接的
  • 太原市0元网站建设wordpress wcps
  • 怎么自己做免费网站wordpress 优酷通用代码自适应
  • 网站设置三方交易深圳品牌设计公司招聘