当前位置: 首页 > news >正文

网站建设费算费用还是固定资产百度秒收录

网站建设费算费用还是固定资产,百度秒收录,网站搭建设计课程报告,wordpress 团购模版目录 第3章 Kafka架构深入3.3 Kafka消费者3.3.1 消费方式3.3.2 分区分配策略3.3.3 offset的维护 3.4 Kafka高效读写数据3.5 Zookeeper在Kafka中的作用3.6 Kafka事务3.6.1 Producer事务3.6.2 Consumer事务#xff08;精准一次性消费#xff09; 第4章 Kafka API4.1 Producer A… 目录 第3章 Kafka架构深入3.3 Kafka消费者3.3.1 消费方式3.3.2 分区分配策略3.3.3 offset的维护 3.4 Kafka高效读写数据3.5 Zookeeper在Kafka中的作用3.6 Kafka事务3.6.1 Producer事务3.6.2 Consumer事务精准一次性消费 第4章 Kafka API4.1 Producer API4.1.1 消息发送流程4.1.2 异步发送API4.1.3 同步发送API 4.2 Consumer API4.2.1 自动提交offset4.2.2 手动提交offset 第3章 Kafka架构深入 3.3 Kafka消费者 3.3.1 消费方式 consumer采用pull拉模式从broker中读取数据。 push推模式很难适应消费速率不同的消费者因为消息发送速率是由broker决定的。 它的目标是尽可能以最快速度传递消息但是这样很容易造成consumer来不及处理消息典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。 pull模式不足之处是如果kafka没有数据消费者可能会陷入循环中一直返回空数据。针对这一点Kafka的消费者在消费数据时会传入一个时长参数timeout如果当前没有数据可供消费consumer会等待一段时间之后再返回这段时长即为timeout。 3.3.2 分区分配策略 一个consumer group中有多个consumer一个 topic有多个partition所以必然会涉及到partition的分配问题即确定那个partition由哪个consumer来消费。 Kafka有两种分配策略一是roundrobin一是range。 roundrobin range 3.3.3 offset的维护 由于consumer在消费过程中可能会出现断电宕机等故障consumer恢复后需要从故障前的位置的继续消费所以consumer需要实时记录自己消费到了哪个offset以便故障恢复后继续消费。 Kafka 0.9版本之前consumer默认将offset保存在Zookeeper中从0.9版本开始consumer默认将offset保存在Kafka一个内置的topic中该topic为__consumer_offsets。 3.4 Kafka高效读写数据 顺序写磁盘 Kafka的producer生产数据要写入到log文件中写的过程是一直追加到文件末端为顺序写。官网有数据表明同样的磁盘顺序写能到到600M/s而随机写只有100k/s。这与磁盘的机械机构有关顺序写之所以快是因为其省去了大量磁头寻址的时间。应用Pagecache Kafka数据持久化是直接持久化到Pagecache中这样会产生以下几个好处 I/O Scheduler 会将连续的小块写组装成大块的物理写从而提高性能I/O Scheduler 会尝试将一些写操作重新按顺序排好从而减少磁盘头的移动时间充分利用所有空闲内存非 JVM 内存。如果使用应用层 Cache即 JVM 堆内存会增加 GC 负担读操作可直接在 Page Cache 内进行。如果消费和生产速度相当甚至不需要通过物理磁盘直接通过 Page Cache交换数据如果进程重启JVM 内的 Cache 会失效但 Page Cache 仍然可用 尽管持久化到Pagecache上可能会造成宕机丢失数据的情况但这可以被Kafka的Replication机制解决。如果为了保证这种情况下数据不丢失而强制将 Page Cache 中的数据 Flush 到磁盘反而会降低性能。 零复制技术 3.5 Zookeeper在Kafka中的作用 Kafka集群中有一个broker会被选举为Controller负责管理集群broker的上下线所有topic的分区副本分配和leader选举等工作。 Controller的管理工作都是依赖于Zookeeper的。 以下为partition的leader选举过程 3.6 Kafka事务 Kafka从0.11版本开始引入了事务支持。事务可以保证Kafka在Exactly Once语义的基础上生产和消费可以跨分区和会话要么全部成功要么全部失败。 3.6.1 Producer事务 为了实现跨分区跨会话的事务需要引入一个全局唯一的Transaction ID并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。 为了管理TransactionKafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic这样即使整个服务重启由于事务状态得到保存进行中的事务状态可以得到恢复从而继续进行。 3.6.2 Consumer事务精准一次性消费 上述事务机制主要是从Producer方面考虑对于Consumer而言事务的保证就会相对较弱尤其时无法保证Commit的信息被精确消费。这是由于Consumer可以通过offset访问任意信息而且不同的Segment File生命周期不同同一事务的消息可能会出现重启后被删除的情况。 如果想完成Consumer端的精准一次性消费那么需要kafka消费端将消费过程和提交offset过程做原子绑定。此时我们需要将kafka的offset保存到支持事务的自定义介质中比如mysql。这部分知识会在后续项目部分涉及。 第4章 Kafka API 4.1 Producer API 4.1.1 消息发送流程 Kafka的Producer发送消息采用的是异步发送的方式。在消息发送的过程中涉及到了两个线程——main线程和Sender线程以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulatorSender线程不断从RecordAccumulator中拉取消息发送到Kafka broker。 KafkaProducer 发送消息流程: 相关参数 batch.size只有数据积累到batch.size之后sender才会发送数据。linger.ms如果数据迟迟未达到batch.sizesender等待linger.time之后就会发送数据。 4.1.2 异步发送API 导入依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.4.1/version /dependency编写代码 需要用到的类 KafkaProducer需要创建一个生产者对象用来发送数据ProducerConfig获取所需的一系列配置参数ProducerRecord每条数据都要封装成一个ProducerRecord对象 不带回调函数的API package com.atguigu.kafka;import org.apache.kafka.clients.producer.*;import java.util.Properties; import java.util.concurrent.ExecutionException;public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props new Properties();props.put(bootstrap.servers, hadoop102:9092);//kafka集群broker-listprops.put(acks, all);props.put(retries, 1);//重试次数props.put(batch.size, 16384);//批次大小props.put(linger.ms, 1);//等待时间props.put(buffer.memory, 33554432);//RecordAccumulator缓冲区大小props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(props);for (int i 0; i 100; i) {producer.send(new ProducerRecordString, String(first, Integer.toString(i), Integer.toString(i)));}producer.close();} }带回调函数的API 回调函数会在producer收到ack时调用为异步调用该方法有两个参数分别是RecordMetadata和Exception如果Exception为null说明消息发送成功如果Exception不为null说明消息发送失败。 注意消息发送失败会自动重试不需要我们在回调函数中手动重试。 package com.atguigu.kafka;import org.apache.kafka.clients.producer.*;import java.util.Properties; import java.util.concurrent.ExecutionException;public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props new Properties();props.put(bootstrap.servers, hadoop102:9092);//kafka集群broker-listprops.put(acks, all);props.put(retries, 1);//重试次数props.put(batch.size, 16384);//批次大小props.put(linger.ms, 1);//等待时间props.put(buffer.memory, 33554432);//RecordAccumulator缓冲区大小props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(props);for (int i 0; i 100; i) {producer.send(new ProducerRecordString, String(first, Integer.toString(i), Integer.toString(i)), new Callback() {//回调函数该方法会在Producer收到ack时调用为异步调用Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {System.out.println(success- metadata.offset());} else {exception.printStackTrace();}}});}producer.close();} }4.1.3 同步发送API 同步发送的意思就是一条消息发送之后会阻塞当前线程直至返回ack。 由于send方法返回的是一个Future对象根据Futrue对象的特点我们也可以实现同步发送的效果只需在调用Future对象的get方发即可。 package com.atguigu.kafka;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties; import java.util.concurrent.ExecutionException;public class CustomProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {Properties props new Properties();props.put(bootstrap.servers, hadoop102:9092);//kafka集群broker-listprops.put(acks, all);props.put(retries, 1);//重试次数props.put(batch.size, 16384);//批次大小props.put(linger.ms, 1);//等待时间props.put(buffer.memory, 33554432);//RecordAccumulator缓冲区大小props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer);props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer);ProducerString, String producer new KafkaProducer(props);for (int i 0; i 100; i) {producer.send(new ProducerRecordString, String(first, Integer.toString(i), Integer.toString(i))).get();}producer.close();} }4.2 Consumer API Consumer消费数据时的可靠性是很容易保证的因为数据在Kafka中是持久化的故不用担心数据丢失问题。 由于consumer在消费过程中可能会出现断电宕机等故障consumer恢复后需要从故障前的位置的继续消费所以consumer需要实时记录自己消费到了哪个offset以便故障恢复后继续消费。 所以offset的维护是Consumer消费数据是必须考虑的问题。 4.2.1 自动提交offset 导入依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.4.1/version /dependency编写代码 需要用到的类 KafkaConsumer需要创建一个消费者对象用来消费数据ConsumerConfig获取所需的一系列配置参数ConsuemrRecord每条数据都要封装成一个ConsumerRecord对象 为了使我们能够专注于自己的业务逻辑Kafka提供了自动提交offset的功能。 自动提交offset的相关参数 enable.auto.commit是否开启自动提交offset功能 auto.commit.interval.ms自动提交offset的时间间隔以下为自动提交offset的代码 package com.atguigu.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties props new Properties();props.put(bootstrap.servers, hadoop102:9092);props.put(group.id, test);props.put(enable.auto.commit, true);props.put(auto.commit.interval.ms, 1000);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(first));while (true) {ConsumerRecordsString, String records consumer.poll(100);for (ConsumerRecordString, String record : records)System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());}} }4.2.2 手动提交offset 虽然自动提交offset十分简介便利但由于其是基于时间提交的开发人员难以把握offset提交的时机。因此Kafka还提供了手动提交offset的API。 手动提交offset的方法有两种分别是commitSync同步提交和commitAsync异步提交。两者的相同点是都会将本次poll的一批数据最高的偏移量提交不同点是commitSync阻塞当前线程一直到提交成功并且会自动失败重试由不可控因素导致也会出现提交失败而commitAsync则没有失败重试机制故有可能提交失败。 同步提交offset 由于同步提交offset有失败重试机制故更加可靠以下为同步提交offset的示例。 package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Arrays; import java.util.Properties;/*** author liubo*/ public class CustomComsumer {public static void main(String[] args) {Properties props new Properties();props.put(bootstrap.servers, hadoop102:9092);//Kafka集群props.put(group.id, test);//消费者组只要group.id相同就属于同一个消费者组props.put(enable.auto.commit, false);//关闭自动提交offsetprops.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(first));//消费者订阅主题while (true) {ConsumerRecordsString, String records consumer.poll(100);//消费者拉取数据for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());}consumer.commitSync();//同步提交当前线程会阻塞知道offset提交成功}} }异步提交offset 虽然同步提交offset更可靠一些但是由于其会阻塞当前线程直到提交成功。因此吞吐量会收到很大的影响。因此更多的情况下会选用异步提交offset的方式。 以下为异步提交offset的示例 package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition;import java.util.Arrays; import java.util.Map; import java.util.Properties;/*** author liubo*/ public class CustomConsumer {public static void main(String[] args) {Properties props new Properties();props.put(bootstrap.servers, hadoop102:9092);//Kafka集群props.put(group.id, test);//消费者组只要group.id相同就属于同一个消费者组props.put(enable.auto.commit, false);//关闭自动提交offsetprops.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);consumer.subscribe(Arrays.asList(first));//消费者订阅主题while (true) {ConsumerRecordsString, String records consumer.poll(100);//消费者拉取数据for (ConsumerRecordString, String record : records) {System.out.printf(offset %d, key %s, value %s%n, record.offset(), record.key(), record.value());}consumer.commitAsync(new OffsetCommitCallback() {Overridepublic void onComplete(MapTopicPartition, OffsetAndMetadata offsets, Exception exception) {if (exception ! null) {System.err.println(Commit failed for offsets);}}});//异步提交}} }数据漏消费和重复消费分析 无论是同步提交还是异步提交offset都有可能会造成数据的漏消费或者重复消费。先提交offset后消费有可能造成数据的漏消费而先消费后提交offset有可能会造成数据的重复消费。 数据重复消费问题
http://www.pierceye.com/news/668381/

相关文章:

  • 创建网站的方案企业营销策划公司
  • 做彩铃的网站个人博客网站建设
  • 正黄集团博弘建设官方网站达州高端网站建设
  • 七台河建设网站wordpress logo制作
  • 怎么设计一个自己的网站番禺网站建设效果
  • 网站哪家做的好淄博网站开发选网泰
  • 网站建设与制作与维护ppt百度广告联盟收益
  • 在线网站建设费用是多少大学生活动策划书模板
  • 动物网站建设wordpress无法跳转正确页面
  • 上海市建设工程 安全协会网站wordpress会员微信支付宝
  • pc网站转换手机网站代码桂林工作网招聘
  • 营销型网站建设的要素怎么建网站赚钱
  • 成都网站建设学习郑州制作网站推荐
  • 网站建设 镇江丹阳php网站开发实例教程代码
  • 佛山外贸网站建设方案专业网站建设系统
  • 做一个网站团队需要哪些人员花钱也可以哪些网站可以做推广广告
  • 各省施工备案网站做动漫网站的素材
  • 新余网站设计网站模板做网站
  • 防止服务器上的网站被进攻app推广兼职
  • 保定电商网站建设国内最好的crm软件
  • 企业网站建设哪家公司好莱芜金点子信息港房产网
  • 个人可以建设网站吗海淀网站建设本溪
  • 宜昌网站建设兼职怎样做自媒体拍视频赚钱
  • 我的世界做视频封面的网站免费的app源码网
  • 网站搭建wordpress参考消息电子版在线阅读
  • 成立一个网站平台要多少钱科技有限公司一般是做什么的
  • 邵阳 网站开发 招聘桂林阳朔楼盘最新价格
  • 如何建设网站导航内链接wordpress 特别慢
  • 蚌埠网站建设文章网站软件定制开发公司
  • 软件通网站建设百度收录网站电话