当前位置: 首页 > news >正文

洛阳网站建设找洛阳铭信网络北京财优化

洛阳网站建设找洛阳铭信网络,北京财优化,网站footer怎么做,做推送用什么网站大部分内容源于https://segmentfault.com/a/1190000038173886, 本人手敲一边加强印象方便复习 消息系统的作用 解耦 冗余 扩展性 灵活性#xff08;峰值处理 可恢复 顺序保证 缓冲 异步 解耦#xff1a;扩展两边处理过程#xff0c;只需要让他们遵守约束即可冗余#xf…大部分内容源于https://segmentfault.com/a/1190000038173886, 本人手敲一边加强印象方便复习 消息系统的作用 解耦 冗余 扩展性 灵活性峰值处理 可恢复 顺序保证 缓冲 异步 解耦扩展两边处理过程只需要让他们遵守约束即可冗余持久化数据规避丢失风险。采用 插入-获取-删除范式明确指出消息被处理完毕扩展性解耦处理过程容易扩展处理过程增大消息处理频率灵活性峰值处理访问激增情况不常见无需投入过多标准资源。使用消息队列顶住访问压力可恢复系统失效时仍可保证队列消息在系统恢复后处理顺序保证kafka保证partition内消息有序缓冲控制和优化 数据经过系统的速度解决生产、消费速度不一致的问题异步允许用户把一个或若干个消息放入队列且不立即被处理 架构 producer消息生产者brokerkafka集群的服务器topic消息的类别partitionkafka分配单位一个topic包含一个或多个partitionconsumer消息消费者终端或服务comsumer group high-level consumer API 中每个 consumer 都属于一个 consumer group每条消息只能被 consumer group 中的一个 Consumer 消费但可以被多个 consumer group 消费。replicapartition副本leader特殊的replicaproducer和consumer只和leader交互follower除了leader的replica都为follwer复制数据controller服务器用于leader选举和failoverzookepper存储集群meta信息 发布消息 producer用push发布到broker消息被append到partition顺序写磁盘 消息路由 //构造函数 public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {if (topic null)throw new IllegalArgumentException(Topic cannot be null);if (timestamp ! null timestamp 0)throw new IllegalArgumentException(Invalid timestamp timestamp);this.topic topic;this.partition partition;this.key key;this.value value;this.timestamp timestamp; }private int partition(ProducerRecordK, V record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition record.partition();if (partition ! null) {//指定了 partition 则直接使用ListPartitionInfo partitions cluster.partitionsForTopic(record.topic());int lastPartition partitions.size() - 1;if (partition 0 || partition lastPartition) {throw new IllegalArgumentException(String.format(Invalid partition given with record: %d is not in the range [0...%d]., partition, lastPartition));}return partition;}//否则使用 key 计算return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); }public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {ListPartitionInfo partitions cluster.partitionsForTopic(topic);int numPartitions partitions.size();if (keyBytes null) {//轮询int nextValue counter.getAndIncrement();ListPartitionInfo availablePartitions cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() 0) {int part DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {return DefaultPartitioner.toPositive(nextValue) % numPartitions;}} else {//对 keyBytes 进行 hash 选出一个 patitionreturn DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;} } 指定partition直接用未指定partition但指定了key对key进行hash得到partition都未指定使用轮询 写入流程 producer从zk的/brokers/…/state找到leaderproducer发消息给leaderleader把消息写入logfollower从leader拉取消息写入log后发送ACK给leaderleader收到所有replica的ACK后增加high watermark(位置信息即位移offset)给producer发送ack 投递保证 ① At most once 消息可能会丢但绝不会重复传递② At least one 消息绝不会丢但可能会重复传递③ Exactly once 每条消息肯定会被传输一次且仅传输一次很多时候这是用户想要的默认 at least one 接收消息的行为 comsumer从broker读取消息后可以选择commit或处理消息 如果commit zookeeper存在comsumer在partition下读取消息的offsetcomsumer下次读取partition从下一条开始读取 未commit 下次读取位置和上次commit后开始位置相同 at most once 读完消息先commit再处理消息。 若commit后未处理消息系统崩坏下次重新开始工作无法读到已提交但未处理的消息 At least once 读完消息先处理再commit消费状态(保存offset) 若处理消息后未commit系统崩坏重新工作的时候会处理未commit的消息处理两次 Exactly once 两阶段提交 协调offset和实际操作的输出。但由于许多输出系统不支持两阶段提交更为通用的方式是将offset和操作输入存在同一个地方 consumer拿到数据后可能把数据放到HDFS最新的offset和数据一起写到HDFS保证offset更新和数据输出同时完成 目前就high level API而言offset是存于Zookeeper中的无法存于HDFS而low level API的offset是由自己去维护的可以将之存于HDFS中。 消息保存 topic分为多个partition每个partition对应一个文件夹 无论消息是否被消费kafka 都会保留所有消息。有两种策略可以删除旧数据 基于时间log.retention.hours168基于大小log.retention.bytes1073741824 log.cleanup.policydelete启用删除策略 直接删除删除后的消息不可恢复。可配置以下两个策略清理超过指定时间清理 log.retention.hours16超过指定大小后删除旧的消息 log.retention.bytes1073741824topic的创造 controller在ZK的/brokers/topics 节点上注册 watcher ,topic被创建的时候controller 会通过 watch 得到该 topic 的 partition/replica 分配。controller从 /brokers/ids 读取当前所有可用的 broker 列表对于 set_p 中的每一个 partition 分配给partition的所有replica称为AR任选一个可用的broker作为leader并将AR设置为ISR新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。 删除 topic 的序列 controller 在 zooKeeper 的 /brokers/topics 节点上注册 watchertopic 被删除则 controller 会通过 watch 得到该 topic 的 partition/replica 分配若 delete.topic.enablefalse结束反之controller 注册在 /admin/delete_topics 上的 watch 被 firecontroller 通过回调向对应的 broker 发送 StopReplicaRequest kafka HA 高可用性 replica 同一个 partition 可能会有多个 replica —— erver.properties 配置中的 default.replication.factorN 若没有replicabroker死机 patition 的数据都不可被消费producer 也不能再将数据存于其上的 patition 引入replica需要选取leaderleader与producer和consumer交互其他replica与leader复制数据 分配规则 将所有 broker假设共 n 个 broker和待分配的 partition 排序将第 i 个 partition 分配到第i mod n个 broker 上将第 i 个 partition 的第 j 个 replica 分配到第(i j) mode n个 broker上 leader failover partition 对应的 leader 宕机时需要从 follower 中选举出新 leader 新的 leader 必须拥有旧 leader commit 过的所有消息 zookeeper 中/brokers/…/state动态维护了一个 ISRin-sync replicas。只有 ISR 里面的成员才能选为 leader。若有f个replicapartition可以保证f-1个replica失效情况下消息不丢失 failover方案 等待 ISR 中的任一个 replica 活过来并选它作为 leader。可保障数据不丢失但时间可能相对较长。选择第一个活过来的 replica不一定是 ISR 成员作为 leader。无法保障数据不丢失但相对不可用时间较短 多用第二种方式 broker failover controller在zookeeper的/brokers/ids/[brokerId] 节点注册 Watcher当 broker 宕机时 zookeeper 会 fire watchcontroller从/brokers/ids 节点读取可用brokercontroller决定set_p集合包含死机broker上所有partition对set_p所有partition进行 1. 读取/brokers/ids 节点读取可用broker的ISR 2. 决定新leader 新leader ISR controller_epoch和leader_epoch信息写入state结点通过RPC给broker发送 leaderAndISRRequest 命令 controller failover controller 宕机时会触发 controller failover broker在zookeeper的controller节点注册watchercontroller宕机时zookeeper临时节点消失所有存活broker收到fire通知每个broker尝试创建新的controller path其中一个竞选成功为controller当选成功触发KafkaController.onControllerFailover 1. 读取并增加 Controller Epoch。 2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。 3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。 4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。 5. 若 delete.topic.enabletrue默认值是 false则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。 6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。 7. 初始化 ControllerContext 对象设置当前所有 topic“活”着的 broker 列表所有 partition 的 leader 及 ISR等。 8. 启动 replicaStateMachine 和 partitionStateMachine。 9. 将 brokerState 状态设置为 RunningAsController。 10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。 11. 若 auto.leader.rebalance.enabletrue默认值是true则启动 partition-rebalance 线程。 12. 若 delete.topic.enabletrue 且Delete Topic Patch(/admin/delete_topics)中有值则删除相应的Topic。消费 kafka 提供了两套 consumer API The high-level Consumer API The SimpleConsumer API consumer API high-level提供kafka消费数据的抽象 提供了 consumer group 的语义消息只能被group内一个consumer消费消费的时候不关注offset最后一个offset由zookeeper保存 使用high-level consumer API可以是多线程应用 if(消费线程 partition){部分线程收不到消息 } if(消费线程 partition){有些线程收到多个partition消息 }if(一个线程消费多个 patition){无法保证收到消息的顺序 }** SimpleConsumer API** 适用以下情况 多次读取一个消息只消费一个 patition 中的部分消息使用事务来保证一个消息仅被消费一次 partition offset broker leader不透明需要自己管理 追踪offset确定下一条消费的信息找出每个partition的follower处理leader变更 流程如下 查找到一个“活着”的 broker并且找出每个 partition 的 leader找到partition的follower定义好请求该请求应该能描述应用程序需要哪些数据fetch数据识别leader变化并做出响应 consumer group kafka分配单位是partitionconsumer属于一个group 一个partition被一个group内的一个consumer消费但是多个group可以同时消费这个partition 实现离线处理与实时处理 spark 实时处理hadoop 离线处理 消费方法 consumer用pull模式从broker读数据 push 模式很难适应消费速率不同的消费者 消息发送速率是由 broker 决定的尽可能以最快速度传递消息容易造成 consumer 来不及处理消息拒绝服务、网络拥塞 pull模式consumer根据自己的能力消费信息 pull的优点 简化broker设计consumer自主控制消费速率consumer自主控制消费方式 —— 批量/逐条选择不同提交方式 消费者递送保证 consumer 设置为 autocommitconsumer 一旦读到数据立即自动 commitExactly once 实际使用过程中并不是consumer读完消息就结束了还需要进一步处理。 处理和commit顺序决定了 consumer delivery guarantee 先commit后处理消息At most once consumer 在 commit 后还没来得及处理消息就 crash重新开始工作后就无法读到刚刚已提交而未处理的消息 先处理再commit At least once 处理完消息之后 commit 之前 consumer crash恢复工作处理刚刚未 commit 的消息 两阶段提交 offset 和操作输入存在同一个地方会更简洁和通用 若不支持consumer 拿到数据后可能把数据放到 HDFS如果把最新的 offset 和数据本身一起写到 HDFS那就可以保证数据的输出和 offset 的更新要么都完成要么都不完成间接实现 Exactly once —— high-level API里面offset存于zookeeper中无法存于HDFSsimple可以存于HDFS consumer rebalance 触发机制 consumer加入退出partition改变(broker 加入退出 算法如下 目标topic的partition排序存于PT选择consumer group下所有consumer排序, 存于CG N ⌈ s i z e ( P T ) / s i z e ( C G ) ⌉ N \lceil size(PT)/size(CG)\rceil N⌈size(PT)/size(CG)⌉对group内原本的分配partition解除关系然后每N个partition分配给一个consumer consumer调整了单个partition后为了保证一致性group内其他consumer也应触发balance 导致以下问题 herd effect brokercomsumer增减触发rebalance split brain 每个consumer单独通过zk判断broker和consumer宕机不同的consumer同时从zookeeper看到的view可能不一致 —— 导致不正确的rebalance所有consumer不知道其他consumer的rebalance是否成功导致kafka工作状态不正确因此0.9开始使用中心coordinator空值rebalance计划在consumer客户端分配方案
http://www.pierceye.com/news/405962/

相关文章:

  • 如何在网站上做推广自己做网站的图片
  • 珠海模板网站建设wordpress 底部工具栏
  • 网站建设的业务流程图招聘网站上找在家做
  • 网站设计的工具盱眙在仕德伟做网站的有几家
  • 建设一个网站要花多少时间临沂网站网站建设
  • 南宁网站推广经理做动漫网站如何应用数据绑定
  • 眼镜东莞网站建设兰州公司做网站
  • 改成 响应式 网站重庆微信企业网站
  • 用微信怎么做商城网站微信官网下载安装
  • 汽车网站建设方案预算md风格的wordpress主题
  • 免费外贸网站模板dede 网站栏目管理
  • 做网站有包括哪些东西站长素材网
  • 淘宝做促销的网站网站开发报价清单
  • 备案查询网站网站建设中可能遇到的问题
  • 怎么注册网站的步骤快速建站官网
  • 网站怎么做口碑wordpress淘宝客知乎
  • 响应式网站建设信息网站建设宽带
  • ps如何做网站超级链接微信公众平台运营中心电话
  • 网站建设怎么估算费用和报价h5特效网站欣赏
  • 东软集团建设网站娱乐网站排行榜
  • 石家庄网站建站米拓建站官网怎么用不了
  • 推广seo网站的公司金华网站建设公司排名
  • 阿里巴巴网站工作流程网站建设 教学设计
  • 电子商务网站建设的方法怎样用织梦做音乐网站
  • 临夏州住房和城乡建设局网站出词
  • 企业网站的综合要求最新领导班子7人名单
  • 通过阿里云建设企业网站联想企业网站建设的思路
  • 网站建设服务器的选择方案建设报名系统是正规网站吗
  • 揭阳高端模板建站WordPress背景音乐6
  • 如何使用云服务建设网站cpa之家 app推广平台