当前位置: 首页 > news >正文

建设智能家居网站SWOT分析网站推广的四个阶段

建设智能家居网站SWOT分析,网站推广的四个阶段,苏州餐饮 网站建设,2023新闻头条最新消息今天Consumer位移管理机制 将Consumer的位移数据作为一条条普通的Kafka消息#xff0c;提交到__consumer_offsets中。可以这么说#xff0c;__consumer_offsets的主要作用是保存Kafka消费者的位移信息。使用Kafka主题来保存位移。 消息格式 位移主题就是普通的Kafka主题。也是…Consumer位移管理机制 将Consumer的位移数据作为一条条普通的Kafka消息提交到__consumer_offsets中。可以这么说__consumer_offsets的主要作用是保存Kafka消费者的位移信息。使用Kafka主题来保存位移。 消息格式 位移主题就是普通的Kafka主题。也是一个内部主题但它的消息格式却是Kafka自己定义的KV对Key和Value分别表示消息的键值和消息体用户不能修改Kafka Consumer有API去提交位移也就是向位移主题写消息。不要自己写个Producer随意向该主题发送消息。 主题消息的Key中应该保存标识Consumer的字段也就是Consumer Group的Group ID标识唯一的Consumer Group因为Consumer提交位移是在分区层面上进行的即它提交的是某个或某些分区的位移那么很显然Key中还应该保存 Consumer要提交位移的分区。 总结位移主题的Key中应该保存3部分内容Group ID主题名分区号。 还有2种格式 1. 用于保存Consumer Group信息的消息用来注册Consumer Group 2. tombstone消息即墓碑消息也称delete mark用于删除Group过期位移甚至是删除Group的消息。 位移主题的创建 当Kafka集群中的第一个Consumer程序启动时Kafka会自动创建位移主题。 分区数是怎么设置的呢这就要看Broker端参数offsets.topic.num.partitions的取值了。它的默认值是50因此Kafka会自动创建一个50分区的位移主题。Broker端另一个参数offsets.topic.replication.factor 控制副本数默认为3。所以如果位移主题是Kafka自动创建的那么该主题的分区数是50副本数是3。 提交位移Committing Offsets Consumer需要向Kafka汇报自己的位移数据这个汇报过程被称为提交位移Committing Offsets。当Consumer发生故障重启之后就能够从Kafka中读取之前提交的位移值然后从相应的位移处继续消费从而避免整个消费过程重来一遍。 从用户的角度来说位移提交分为自动提交和手动提交从Consumer端的角度来说位移提交分为同步提交和异步提交。 Kafka Consumer提交位移的方式有两种自动提交位移和手动提交位移。 手动提交位移 enable.auto.commit 如果值是false则为手动提交它能够把控位移提交的时机和频率。可以使用Kafka Consumer API的consumer.commitSync等方法当调用这些方法时Kafka会向位移主题写入相应的消息。 while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 try { consumer.commitSync(); } catch (CommitFailedException e) { handle(e); // 处理提交失败异常 } }调用consumer.commitSync()方法的时机是在处理完了poll()方法返回的所有消息之后。如果过早提交了位移就可能会出现消费数据丢失的情况。它还也有一个缺陷就是在调用commitSync()时Consumer程序会处于阻塞状态直到远端的Broker返回提交结果这个状态才会结束影响整个应用程序的TPS。 Kafka社区为手动提交位移提供了另一个API方法KafkaConsumer#commitAsync() 这是一个异步操作。调用commitAsync()之后它会立即返回不会阻塞因此不会影响Consumer应用的TPS。由于它是异步的Kafka提供了回调函数callback在实现提交之后的逻辑比如记录日志或处理异常等。下面这段代码展示了调用commitAsync()的方法 while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1)); process(records); // 处理消息 consumer.commitAsync((offsets, exception) - { if (exception ! null) handle(exception); }); }commitAsync是否能够替代commitSync呢 答案是不能。commitAsync的问题在于出现问题时它不会自动重试。因为它是异步操作倘若提交失败后自动重试那么它重试时提交的位移值可能早已经“过 期”或不是最新值了。因此异步提交的重试其实没有意义所以commitAsync是不会重试的。  将commitSync和commitAsync组合使用 try {while(true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息commitAysnc(); // 使用异步提交规避阻塞} } catch(Exception e) {handle(e); // 处理异常 } finally {try {consumer.commitSync(); // 最后一次提交使用同步阻塞式提交} finally {consumer.close();} }对于常规性、阶段性的手动提交我们调用commitAsync()避免程序阻塞而在Consumer要关闭前我们调用commitSync()方法执行同步阻塞式的位移提交以确保Consumer关闭前能够保存正确的位移数据。将两者结合后既实现了异步无阻塞式的位移管理也确保了Consumer位移的正确性。 分批处理细粒度的位移提交 commitSync(MapTopicPartition, OffsetAndMetadata) commitAsync(MapTopicPartition, OffsetAndMetadata) 它们的参数是一个Map对象键就 是TopicPartition即消费的分区而值是一个OffsetAndMetadata对象保存的主要是位移数据。 例如如何每处理100条消息就提交一次位移呢以commitAsync为例展示一段代码实际上commitSync的调用方法和它是一模一样的。 private MapTopicPartition, OffsetAndMetadata offsets new HashMap(); int count 0; // 其他操作 while (true) {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String record: records) {process(record); // 处理消息offsets.put(new TopicPartition(record.topic(), record.partition()) , new OffsetAndMetadata(record.offset() 1)ifcount % 100 0consumer.commitAsync(offsets, null); // 回调处理逻辑是nullcount;}} }程序先是创建了一个Map对象用于保存Consumer消费处理过程中要提交的分区位移之后开始逐条处理消息并构造要提交的位移值。要提交下一条消息的位移这里构造OffsetAndMetadata对象时使用当前消息位移加1的原因。代码的最后部分是做位移的提交。这里设置了一个计数器每累计100条消息就统一提交一次位移。与调用无参的 commitAsync不同这里调用了带Map对象参数的commitAsync进行细粒度的位移提交。这样这段代码就能够实现每处理100条消息就提交一次位移不用再受poll方法返回的消息总数的限制了。  自动提交位移 Properties props new Properties(); props.put(bootstrap.servers, localhost:9092); props.put(group.id, test); props.put(enable.auto.commit, true); props.put(auto.commit.interval.ms, 2000); props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Arrays.asList(foo, bar)); while (true) { ConsumerRecordsString, String records consumer.poll(100); for (ConsumerRecordString, String record : records) System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value()); } Consumer端有个参数叫enable.auto.commit如果值是true则Consumer 定期提交位移提交间隔由一个专属的参数auto.commit.interval.ms来控制。但是没法把控Consumer端的位移管理。 一旦设置了enable.auto.commit为trueKafka会保证在开始调用poll方法时提交上次poll返回的所有消息。从顺序上来说poll方法的逻辑是先提交上一批消息的位移再处理下一批消息因此它能保证不出现消费丢失的情况。但自动提交位移的一个问题在于它可能会出现重复消费。 在默认情况下Consumer每5秒自动提交一次位移。现在我们假设提交位移之后的3秒发生了Rebalance操作。在Rebalance之后所有Consumer从上一次提交的位移处继续消费但该位移已经是3秒前的位移数据了故在Rebalance发生前3秒消费的所有数据都要重新再消费一次。虽然能够通过减少auto.commit.interval.ms的值来提高提交频率但这么做只能缩小重复消费的时间窗口不可能完全消除它。这是自动提交机制的一个缺陷。  自动提交位移问题 自动提交位移那么就可能存在一个问题只要Consumer一直启动着它就会无限期地向位移主题写入消息。 假设Consumer当前消费到了某个主题的最新一条消息位移是100之后该主题没有任何新消息产生故Consumer无消息可消费了所以位移永远保持在100。由于是自动提交位移位移主题中会不停地写入位移100的消息。显然Kafka只需要保留这类消息中的最新一条就可以了之前的消息都是可以删除的。这就要求Kafka必须要有针对位移主题消息特点的消息删除策略否则这种消息会越来越多最终撑爆整个磁盘。 Kafka使用Compact策略来删除位移主题中的过期消息避免该主题无限期膨胀。那么应该如何定义Compact策略中的过期呢对于同一个Key的两条消息M1和M2如果M1的发送时间早于 M2那么M1就是过期消息。Compact的过程就是扫描日志的所有消息剔除那些过期的消息然后把剩下的消息整理在一起。 图中位移为0、2和3的消息的Key都是K1。Compact之后分区只需要保存位移为3的消息因为它是最新发送的。  Kafka提供了专门的后台线程定期地巡检待Compact的主题看看是否存在满足条件的可删除数据。这个后台线程叫LogCleaner。 参考Kafka 核心技术与实战 (geekbang.org)
http://www.pierceye.com/news/859087/

相关文章:

  • 网站在线访谈栏目建设个人网站可以备案了吗
  • 汉口北做网站搜索广告是什么
  • 电商网站可以用dw做嘉兴网站建设平台
  • 做网站是数据库应该放在哪里建筑工程水平防护网
  • vps网站无法通过ip访问网站怎么做的支付宝接口
  • 怎么创建一个博客网站网站的c4d动画是怎么做的
  • 西安做企业网站科技论文发表网
  • html 手机网站开发企业做网站的合同
  • 建立wordpress网站吗全州建设完小网站
  • 网站域名注册证书是什么制作WordPress友情链接
  • 如何在解决方案中新建网站html网页制作的软件下载
  • 企业网站怎么做优化开小加工厂去哪接单子
  • 网站建设推广费怎么做账域名和网站绑定
  • 商丘网站建设想象力网络中国流量最大的网站排行
  • 网站是否有备案网站集约化建设建议
  • 浏览器收录网站网上做图赚钱的网站
  • 网站建设优化过程中的优化策略相关文章 wordpress
  • 泉州网站深圳航空公司官网首页
  • 百度推广整体优化网站整体软装设计公司
  • 太原搜索引擎优化招聘信息服务好的镇江网站优化
  • 自己做网站下载怎么网站基础知识域名5个点
  • 网站搭建合作协议wordpress注册页面插件
  • 网络公司最好的是哪个兰州网络推广优化怎样
  • 网站文章采集工具新网站怎么做流畅
  • discuz 手机网站模板山东省住房建设厅网站首页
  • 网站建设违约责任条款枣庄专业做网站
  • python做爬虫和做网站做两个一摸一样的网站
  • 网站做微信登录asp.net做网站头部和尾部_都用什么来实现
  • 南充哪里做网站太原关键词优化公司
  • 哪个网站做的ppt模板好投放广告网站