当前位置: 首页 > news >正文

跟换网站域名网页编辑用什么软件好

跟换网站域名,网页编辑用什么软件好,wordpress后台升级,做网站代码用什么软件目录 一、消息系统 1、点对点的消息系统 2、发布-订阅消息系统 二、Apache Kafka 简介 三、Apache Kafka基本原理 3.1 分布式和分区#xff08;distributed、partitioned#xff09; 3.2 副本#xff08;replicated #xff09; 3.3 整体数据流程 3.4 消息传送机制…目录 一、消息系统 1、点对点的消息系统 2、发布-订阅消息系统 二、Apache Kafka 简介 三、Apache Kafka基本原理 3.1 分布式和分区distributed、partitioned 3.2 副本replicated  3.3 整体数据流程 3.4 消息传送机制 四、Apache Kafka 集群架构 五、Apache Kafka核心组件详解 1. producer生产者 2. topic主题 3. partition分区 4. consumer消费者 5. consumer group消费者组 6. partition replicas分区副本 7. segment文件 8. message的物理结构 六、ZooKeeper 的作用 6.1 kafka如何利用zookeeper来进行leader的选举 6.2 Kafka 在 ZooKeeper 中保存了哪些信息 6.3 Kafka 客户端如何找到对应的 Broker 6.4 zookeeper小结 七、Apache Kafka 工作流程 7.1 发布 - 订阅消息的工作流程 7.2 队列消息/用户组的工作流 八、Apache Kafka副本同步机制 8.1 副本同步队列(ISR) 8.2 副本不同步的异常情况 九、kafka消费者组的作用 9.1 共享消息队列模式 缺点 9.2 发布订阅模式 缺点 9.3 Kafka 如何桥接这两种模型 消费群体 重新平衡 用例实现 结论 QA Kafka与ZooKeeper关系 Kafka在ZooKeeper中保存元数据信息 Kafka 客户端如何找到对应的 Broker 消费者组的优势 1. 高性能 2. 消费模式灵活 3. 故障容灾 4. 小结 1. 需求场景分析 2. 物理机数量评估 3. 磁盘选择 4. 内存评估 5. CPU压力评估 6. 网络需求评估 对于大数据我们要考虑的问题有很多首先海量数据如何收集如 Flume然后对于收集到的数据如何存储典型的分布式文件系统 HDFS、分布式数据库 HBase、NoSQL 数据库 Redis其次存储的数据不是存起来就没事了要通过计算从中获取有用的信息这就涉及到计算模型典型的离线计算 MapReduce、流式实时计算Storm、Spark或者要从数据中挖掘信息还需要相应的机器学习算法。在这些之上还有一些各种各样的查询分析数据的工具如 Hive、Pig 等。除此之外要构建分布式应用还需要一些工具比如分布式协调服务 Zookeeper 等等。 这里我们讲到的是消息系统Kafka 专为分布式高吞吐量系统而设计其他消息传递系统相比Kafka 具有更好的吞吐量内置分区复制和固有的容错能力这使得它非常适合大规模消息处理应用程序。 一、消息系统 首先我们理解一下什么是消息系统消息系统负责将数据从一个应用程序传输到另外一个应用程序使得应用程序可以专注于处理逻辑而不用过多的考虑如何将消息共享出去。 分布式消息系统基于可靠消息队列的方式消息在应用程序和消息系统之间异步排队。实际上消息系统有两种消息传递模式一种是点对点另外一种是基于发布-订阅publish-subscribe的消息系统。 1、点对点的消息系统 在点对点的消息系统中消息保留在队列中一个或者多个消费者可以消耗队列中的消息但是消息最多只能被一个消费者消费一旦有一个消费者将其消费掉消息就从该队列中消失。这里要注意多个消费者可以同时工作但是最终能拿到该消息的只有其中一个。最典型的例子就是订单处理系统多个订单处理器可以同时工作但是对于一个特定的订单只有其中一个订单处理器可以拿到该订单进行处理。 2、发布-订阅消息系统 在发布 - 订阅系统中消息被保留在主题中。 与点对点系统不同消费者可以订阅一个或多个主题并使用该主题中的所有消息。在发布 - 订阅系统中消息生产者称为发布者消息使用者称为订阅者。 一个现实生活的例子是Dish电视它发布不同的渠道如运动电影音乐等任何人都可以订阅自己的频道集并获得他们订阅的频道时可用。 二、Apache Kafka 简介 Kafka is a distributed,partitioned,replicated commit logservice。 Apache Kafka 是一个分布式发布 - 订阅消息系统和一个强大的队列可以处理大量的数据并使你能够将消息从一个端点传递到另一个端点。 Kafka 适合离线和在线消息消费。 Kafka 消息保留在磁盘上并在群集内复制以防止数据丢失。 Kafka 构建在 ZooKeeper 同步服务之上。 它与 Apache Storm 和 Spark 非常好地集成用于实时流式数据分析。 Kafka 是一个分布式消息队列具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。 关键术语 1生产者和消费者producer和consumer消息的发送者叫 Producer消息的使用者和接受者是 Consumer生产者将数据保存到 Kafka 集群中消费者从中获取消息进行业务的处理。 2brokerKafka 集群中有很多台 Server其中每一台 Server 都可以存储消息将每一台 Server 称为一个 kafka 实例也叫做 broker。 3主题topic一个 topic 里保存的是同一类消息相当于对消息的分类每个 producer 将消息发送到 kafka 中都需要指明要存的 topic 是哪个也就是指明这个消息属于哪一类。 4分区partition每个 topic 都可以分成多个 partition每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。为什么要进行分区呢最根本的原因就是kafka基于文件进行存储当文件内容大到一定程度时很容易达到单个磁盘的上限因此采用分区的办法一个分区对应一个文件这样就可以将数据分别存储到不同的server上去另外这样做也可以负载均衡容纳更多的消费者。 5偏移量Offset一个分区对应一个磁盘上的文件而消息在文件中的位置就称为 offset偏移量offset 为一个 long 型数字它可以唯一标记一条消息。由于kafka 并没有提供其他额外的索引机制来存储 offset文件只能顺序的读写所以在kafka中几乎不允许对消息进行“随机读写”。 综上我们总结一下 Kafka 的几个要点: kafka 是一个基于发布-订阅的分布式消息系统消息队列 Kafka 面向大数据消息保存在主题中而每个 topic 有分为多个分区 kafka 的消息数据保存在磁盘每个 partition 对应磁盘上的一个文件消息写入就是简单的文件追加文件可以在集群内复制备份以防丢失 即使消息被消费kafka 也不会立即删除该消息可以通过配置使得过一段时间后自动删除以释放磁盘空间 kafka依赖分布式协调服务Zookeeper适合离线/在线信息的消费与 storm 和 spark 等实时流式数据分析常常结合使用 三、Apache Kafka基本原理 通过之前的介绍我们对 kafka 有了一个简单的理解它的设计初衷是建立一个统一的信息收集平台使其可以做到对信息的实时反馈。Kafka is a distributed,partitioned,replicated commit logservice。接下来我们着重从几个方面分析其基本原理。 3.1 分布式和分区distributed、partitioned 我们说 kafka 是一个分布式消息系统所谓的分布式实际上我们已经大致了解。消息保存在 Topic 中而为了能够实现大数据的存储一个 topic 划分为多个分区每个分区对应一个文件可以分别存储到不同的机器上以实现分布式的集群存储。另外每个 partition 可以有一定的副本备份到多台机器上以提高可用性。 总结起来就是一个 topic 对应的多个 partition 分散存储到集群中的多个 broker 上存储方式是一个 partition 对应一个文件每个 broker 负责存储在自己机器上的 partition 中的消息读写。 3.2 副本replicated  kafka 还可以配置 partitions 需要备份的个数(replicas)每个 partition 将会被备份到多台机器上,以提高可用性备份的数量可以通过配置文件指定。 这种冗余备份的方式在分布式系统中是很常见的那么既然有副本就涉及到对同一个文件的多个备份如何进行管理和调度。kafka 采取的方案是每个 partition 选举一个 server 作为“leader”由 leader 负责所有对该分区的读写其他 server 作为 follower 只需要简单的与 leader 同步保持跟进即可。如果原来的 leader 失效会重新选举由其他的 follower 来成为新的 leader。 至于如何选取 leader实际上如果我们了解 ZooKeeper就会发现其实这正是 Zookeeper 所擅长的Kafka 使用 ZK 在 Broker 中选出一个 Controller用于 Partition 分配和 Leader 选举。 另外这里我们可以看到实际上作为 leader 的 server 承担了该分区所有的读写请求因此其压力是比较大的从整体考虑有多少个 partition 就意味着会有多少个leaderkafka 会将 leader 分散到不同的 broker 上确保整体的负载均衡。 3.3 整体数据流程 Kafka 的总体数据流满足下图该图可以说是概括了整个 kafka 的基本原理。 1数据生产过程Produce 对于生产者要写入的一条记录可以指定四个参数分别是 topic、partition、key 和 value其中 topic 和 value要写入的数据是必须要指定的而 key 和 partition 是可选的。 对于一条记录先对其进行序列化然后按照 Topic 和 Partition放进对应的发送队列中。如果 Partition 没填那么情况会是这样的a、Key 有填。按照 Key 进行哈希相同 Key 去一个 Partition。b、Key 没填。Round-Robin 来选 Partition。 producer 将会和Topic下所有 partition leader 保持 socket 连接消息由 producer 直接通过 socket 发送到 broker。其中 partition leader 的位置( host : port )注册在 zookeeper 中producer 作为 zookeeper client已经注册了 watch 用来监听 partition leader 的变更事件因此可以准确的知道谁是当前的 leader。producer client通过broker代理的本地缓存获取到最新的leader broker是哪个然后找到对应的broker发送消息。 producer 端采用异步发送将多条消息暂且在客户端 buffer 起来并将他们批量的发送到 broker小数据 IO 太多会拖慢整体的网络延迟批量延迟发送事实上提升了网络效率。 2数据消费过程Consume 对于消费者不是以单独的形式存在的每一个消费者属于一个 consumer group一个 group 包含多个 consumer。特别需要注意的是订阅 Topic 是以一个消费组来订阅的发送到 Topic 的消息只会被订阅此 Topic 的每个 group 中的一个 consumer 消费。 如果所有的 Consumer 都具有相同的 group那么就像是一个点对点的消息系统如果每个 consumer 都具有不同的 group那么消息会广播给所有的消费者。 具体说来这实际上是根据 partition 来分的一个 Partition只能被消费组里的一个消费者消费但是可以同时被多个消费组消费消费组里的每个消费者是关联到一个 partition 的因此有这样的说法对于一个 topic,同一个 group 中不能有多于 partitions 个数的 consumer 同时消费,否则将意味着某些 consumer 将无法得到消息。 同一个消费组的两个消费者不会同时消费一个 partition。 在 kafka 中采用了 pull 方式即 consumer 在和 broker 建立连接之后主动去 pull(或者说 fetch )消息首先 consumer 端可以根据自己的消费能力适时的去 fetch 消息并处理且可以控制消息消费的进度(offset)。 partition 中的消息只有一个 consumer 在消费且不存在消息状态的控制也没有复杂的消息确认机制可见 kafka broker 端是相当轻量级的。当消息被 consumer 接收之后需要保存 Offset 记录消费到哪以前保存在 ZK 中由于 ZK 的写性能不好以前的解决方法都是 Consumer 每隔一分钟上报一次在 0.10 版本后Kafka 把这个 Offset 的保存从 ZK 中剥离保存在broker的一个名叫 consumeroffsets topic 的 Topic 中由此可见consumer 客户端也很轻量级。 3.4 消息传送机制 Kafka 支持 3 种消息投递语义,在业务中常常都是使用 At least once 的模型。 At most once最多一次消息可能会丢失但不会重复。 At least once最少一次消息不会丢失可能会重复。 Exactly once只且一次消息不丢失不重复只且消费一次。 四、Apache Kafka 集群架构 看看下面的插图。 它显示Kafka的集群图。 下表描述了上图中显示的每个组件。 组件 说明 Broker代理 Kafka集群通常由多个代理组成以保持负载平衡。 Kafka代理是无状态的所以他们使用ZooKeeper来维护它们的集群状态。 一个Kafka代理实例可以每秒处理数十万次读取和写入每个Broker可以处理TB的消息而没有性能影响。 Kafka经纪人领导选举可以由ZooKeeper完成。 ZooKeeper ZooKeeper用于管理和协调Kafka代理。 ZooKeeper服务主要用于通知生产者和消费者Kafka系统中存在任何新代理或Kafka系统中代理失败。 根据Zookeeper接收到关于代理的存在或失败的通知然后生产者和消费者采取决定并开始与某些其他代理协调他们的任务。 Producers生产者 生产者将数据推送给经纪人。 当新代理启动时所有生产者搜索它并自动向该新代理发送消息。 Kafka生产者不等待来自代理的确认并且发送消息的速度与代理可以处理的一样快。 Consumers消费者 因为Kafka代理是无状态的这意味着消费者必须通过使用分区偏移来维护已经消耗了多少消息。 如果消费者确认特定的消息偏移则意味着消费者已经消费了所有先前的消息。 消费者向代理发出异步拉取请求以具有准备好消耗的字节缓冲区。 消费者可以简单地通过提供偏移值来快退或跳到分区中的任何点。 消费者偏移值由ZooKeeper通知。 五、Apache Kafka核心组件详解 1. producer生产者 producer主要是用于生产消息是kafka当中的消息生产者生产的消息通过topic进行归类保存到kafka的broker里面去。 2. topic主题 kafka将消息以topic为单位进行归类 topic特指kafka处理的消息源feeds of messages的不同分类 topic是一种分类或者发布的一些列记录的名义上的名字。kafka主题始终是支持多用户订阅的也就是说一 个主题可以有零个一个或者多个消费者订阅写入的数据 在kafka集群中可以有无数的主题 生产者和消费者消费数据一般以主题为单位。更细粒度可以到分区级别。 3. partition分区 kafka当中topic是消息的归类一个topic可以有多个分区partition每个分区保存部分topic的数据所有的partition当中的数据全部合并起来就是一个topic当中的所有的数据。 一个broker服务下可以创建多个分区broker数与分区数没有关系 在kafka中每一个分区会有一个编号编号从0开始。 每一个分区内的数据是有序的但全局的数据不能保证是有序的。有序是指生产什么样顺序消费时也是什么样的顺序 4. consumer消费者 consumer是kafka当中的消费者主要用于消费kafka当中的数据消费者一定是归属于某个消费组中的。 5. consumer group消费者组 消费者组由一个或者多个消费者组成同一个组中的消费者对于同一条消息只消费一次。 每个消费者都属于某个消费者组如果不指定那么所有的消费者都属于默认的组。 每个消费者组都有一个ID即group ID。组内的所有消费者协调在一起来消费一个订阅主题( topic)的所有分区(partition)。当然每个分区只能由同一个消费组内的一个消费者(consumer)来消费可以由不同的消费组来消费。 partition数量决定了每个consumer group中并发消费者的最大数量。如下图 如上面左图所示如果只有两个分区即使一个组内的消费者有4个也会有两个空闲的。 如上面右图所示有4个分区每个消费者消费一个分区并发量达到最大4。 在来看如下一幅图 如上图所示不同的消费者组消费同一个topic这个topic有4个分区分布在两个节点上。左边的 消费组1有两个消费者每个消费者就要消费两个分区才能把消息完整的消费完右边的 消费组2有四个消费者每个消费者消费一个分区即可。 总结下kafka中分区与消费组的关系 消费组 由一个或者多个消费者组成同一个组中的消费者对于同一条消息只消费一次。 某一个主题下的分区数对于消费该主题的同一个消费组下的消费者数量应该小于等于该主题下的分区数。 如某一个主题有4个分区那么消费组中的消费者应该小于等于4而且最好与分区数成整数倍 1 2 4 这样。同一个分区下的数据在同一时刻不能同一个消费组的不同消费者消费。 总结分区数越多同一时间可以有越多的消费者来进行消费消费数据的速度就会越快提高消费的性能。 6. partition replicas分区副本 kafka 中的分区副本如下图所示 副本数replication-factor控制消息保存在几个broker服务器上一般情况下副本数等于broker的个数。 一个broker服务下不可以创建多个副本因子。创建主题时副本因子应该小于等于可用的broker数。 副本因子操作以分区为单位的。每个分区都有各自的主副本和从副本 主副本叫做leader从副本叫做 follower在有多个副本的情况下kafka会为同一个分区下的所有分区设定角色关系一个leader和N个 follower处于同步状态的副本叫做in-sync-replicas(ISR); follower通过拉的方式从leader同步数据。 消费者和生产者都是从leader读写数据不与follower交互。 副本因子的作用让kafka读取数据和写入数据时的可靠性。 副本因子是包含本身同一个副本因子不能放在同一个broker中。 如果某一个分区有三个副本因子就算其中一个挂掉那么只会剩下的两个中选择一个leader但不会在其他的broker中另启动一个副本因为在另一台启动的话存在数据传递只要在机器之间有数据传递就会长时间占用网络IOkafka是一个高吞吐量的消息系统这个情况不允许发生所以不会在另一个broker中启动。 如果所有的副本都挂了生产者如果生产数据到指定分区的话将写入不成功。 lsr表示当前可用的副本。 7. segment文件 一个partition当中由多个segment文件组成每个segment文件包含两部分一个是 .log 文件另外一个是 .index 文件其中 .log 文件包含了我们发送的数据存储.index 文件记录的是我们.log文件的数据索引值以便于我们加快数据的查询速度。 索引文件与数据文件的关系 既然它们是一一对应成对出现必然有关系。索引文件中元数据指向对应数据文件中message的物理偏移地址。 比如索引文件中 3,497 代表数据文件中的第三个message它的偏移地址为497。 再来看数据文件中Message 368772表示在全局partiton中是第368772个message。 注segment index file 采取稀疏索引存储方式减少索引文件大小通过mmap内存映射可以直接内存操作稀疏索引为数据文件的每个对应message设置一个元数据指针它比稠密索引节省了更多的存储空间但查找起来需要消耗更多的时间。 .index 与 .log 对应关系如下 Kafka中采用了稀疏索引的方式读取索引kafka每当写入了4k大小的日志.log就往index里写入一个记录索引。其中会采用二分查找。 上图左半部分是索引文件里面存储的是一对一对的key-value其中key是消息在数据文件对应的log文件中的编号比如“1,3,6,8……”分别表示在log文件中的第1条消息、第3条消息、第6条消息、第8条消息…… 那么为什么在index文件中这些编号不是连续的呢这是因为index文件中并没有为数据文件中的每条消息都建立索引而是采用了稀疏存储的方式每隔一定字节的数据建立一条索引。 这样避免了索引文件占用过多的空间从而可以将索引文件保留在内存中。 但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置从而需要做一次顺序扫描但是这次顺序扫描的范围就很小了。 value 代表的是在全局partiton中的第几个消息。 以索引文件中元数据 3,497 为例其中3代表在右边log数据文件中从上到下第3个消息497表示该消息的物理偏移地址位置为497(也表示在全局partiton表示第497个消息-顺序写入特性)。 log日志目录及组成 kafka在我们指定的log.dir目录下会创建一些文件夹名字是 主题名字-分区名 所组成的文件夹。 在主题名字-分区名的目录下会有两个文件存在如下所示 #索引文件 00000000000000000000.index #日志内容 00000000000000000000.log在目录下的文件会根据log日志的大小进行切分.log文件的大小为1G的时候就会进行切分文件如下 -rw-r--r--. 1 root root 389k 1月 17 18:03 00000000000000000000.index -rw-r--r--. 1 root root 1.0G 1月 17 18:03 00000000000000000000.log -rw-r--r--. 1 root root 10M 1月 17 18:03 00000000000000077894.index -rw-r--r--. 1 root root 127M 1月 17 18:03 00000000000000077894.log在kafka的设计中将offset值作为了文件名的一部分。 segment文件命名规则partion全局的第一个segment从0开始后续每个segment文件名为上一个全局 partion的最大offset偏移message数。数值最大为64位long大小20位数字字符长度没有数字就用 0 填充。 通过索引信息可以快速定位到message。通过index元数据全部映射到内存可以避免segment File的IO磁盘操作 通过索引文件稀疏存储可以大幅降低index文件元数据占用空间大小。 稀疏索引为了数据创建索引但范围并不是为每一条创建而是为某一个区间创建好处就是可以减少索引值的数量。 不好的地方找到索引区间之后要得进行第二次处理。 8. message的物理结构 生产者发送到kafka的每条消息都被kafka包装成了一个message message 的物理结构如下图所示 所以生产者发送给kafka的消息并不是直接存储起来而是经过kafka的包装每条消息都是上图这个结构只有最后一个字段才是真正生产者发送的消息数据。 六、ZooKeeper 的作用 Apache ZooKeeper 它是一个非常特殊的中间件为什么这么说呢一般来说像中间件类的开源产品大多遵循“做一件事并做好它。”这样的 UNIX 哲学每个软件都专注于一种功能上。而 ZooKeeper 更像是一个“瑞士军刀”它提供了很多基本的操作能实现什么样的功能更多取决于使用者如何来使用它。 ZooKeeper 作为一个分布式的协调服务框架主要用来解决分布式集群中应用系统需要面对的各种通用的一致性问题。ZooKeeper 本身可以部署为一个集群集群的各个节点之间可以通过选举来产生一个 Leader选举遵循半数以上的原则所以一般集群需要部署奇数个节点。 ZooKeeper 最核心的功能是它提供了一个分布式的存储系统数据的组织方式类似于 UNIX 文件系统的树形结构。由于这是一个可以保证一致性的存储系统所以你可以放心地在你的应用集群中读写 ZooKeeper 的数据而不用担心数据一致性的问题。分布式系统中一些需要整个集群所有节点都访问的元数据比如集群节点信息、公共配置信息等特别适合保存在 ZooKeeper 中。 在这个树形的存储结构中每个节点被称为一个“ZNode”。ZooKeeper 提供了一种特殊的 ZNode 类型临时节点。这种临时节点有一个特性如果创建临时节点的客户端与 ZooKeeper 集群失去连接这个临时节点就会自动消失。在 ZooKeeper 内部它维护了 ZooKeeper 集群与所有客户端的心跳通过判断心跳的状态来确定是否需要删除客户端创建的临时节点。 ZooKeeper 还提供了一种订阅 ZNode 状态变化的通知机制Watcher一旦 ZNode 或者它的子节点状态发生了变化订阅的客户端会立即收到通知。 利用 ZooKeeper 临时节点和 Watcher 机制我们很容易随时来获取业务集群中每个节点的存活状态并且可以监控业务集群的节点变化情况当有节点上下线时都可以收到来自 ZooKeeper 的通知。 此外我们还可以用 ZooKeeper 来实现业务集群的快速选举、节点间的简单通信、分布式锁等很多功能。 ZooKeeper 用于存储关于 Kafka 集群的各种元数据 它维护每个分区每个消费者组的最后一个偏移位置以便消费者可以在发生故障时快速从最后一个位置恢复尽管现代客户端将偏移存储在单独的 Kafka 主题中。 它跟踪主题、分配给这些主题的分区数量以及领导者/追随者在每个分区中的位置。 它还管理集群中不同主题的访问控制列表 (ACL)。ACL 用于强制访问或授权。 下面来看一下 Kafka 是如何来使用 ZooKeeper 的。 6.1 kafka如何利用zookeeper来进行leader的选举 Kafka的leader选举详细过程如下 当Kafka集群启动时每个代理节点Broker都会实例化一个KafkaController类。 第一个启动的代理节点会在Zookeeper系统里面创建一个临时节点/Controller并写入该节点的注册信息使其成为控制器。 其他代理节点陆续启动时也会尝试在Zookeeper系统里面创建/controller节点。但由于/controller节点已经存在所以会抛出创建/controller节点失败的异常信息。 创建失败的代理节点会根据返回的结果判断出在Kafka集群中已经有一个控制器被创建成功了所以放弃创建/controller节点。 其他代理节点会在控制器上注册相应的监听器监听各自代理节点的状态变化。当监听到节点状态变化时会触发相应的监听函数进行处理。 当某个分区的leader节点宕机时该分区上的follower节点会感知到leader节点的宕机并在Zookeeper中重新创建相应的节点。 竞争领导者选举过程开始每个follower节点尝试创建Zookeeper中的相应节点如/topic/partition/controller并写入该节点的注册信息。 如果多个follower同时创建节点Zookeeper将只承认第一个成功创建的节点其他节点会再次尝试创建。 第一个成功创建节点的follower将成为新的leader节点。 如果ISR集合中没有其他副本Kafka会从所有副本中选择一个具有最新数据的副本作为新的leader。 如果新的leader是旧数据的副本它会在接管分区之前从其他副本中拉取最新的数据然后从最新的记录开始提供服务而不会回滚数据。 如果当Kafka的一个节点Broker宕机后Kafka会经历以下过程 在Kafka中如果分区leader节点在同步数据时宕机但数据尚未同步到所有副本其中一个是新的一个是旧的数据那么在选取新的leader节点时Kafka会选择具有最新数据的副本作为新的leader。 Kafka的选举机制会优先选择ISRIn-Sync Replica集合中的副本。ISR是指与leader同步的副本集合它们的数据同步状态与leader最接近并且它们与leader副本的网络通信延迟最小。如果ISR集合中没有可用的副本Kafka会从所有副本中选择一个具有最新数据的副本作为新的leader。 如果新的leader是旧数据的副本它会在接管分区之前从其他副本中拉取最新的数据。这被称为“从干净状态启动”意味着该副本从最新的记录开始提供服务而不会回滚数据。 开发者可以通过调整Kafka配置来控制这种情况下的行为。例如可以通过设置unclean.leader.election.enable参数来控制是否允许从旧数据副本中选举新的leader。如果设置为trueKafka将允许从旧数据副本中选举新的leader。如果设置为falseKafka将只选择最新的数据副本作为新的leader。 6.2 Kafka 在 ZooKeeper 中保存了哪些信息 首先我们来看一下 Kafka 在 ZooKeeper 都保存了哪些信息我把这些 ZNode 整理了一张图方便你来学习。 你可能在网上看到过和这个图类似的其他版本的图这些图中绘制的 ZNode 比我们这张图要多一些这些图大都是描述的 0.8.x 的旧版本的情况最新版本的 Kafka 已经将消费位置管理等一些原本依赖 ZooKeeper 实现的功能替换成了其他的实现方式。 图中圆角的矩形是临时节点直角矩形是持久化的节点。 我们从左往右来看左侧这棵树保存的是 Kafka 的 Broker 信息/brokers/ids/[0…N]每个临时节点对应着一个在线的 BrokerBroker 启动后会创建一个临时节点代表 Broker 已经加入集群可以提供服务了节点名称就是 BrokerID节点内保存了包括 Broker 的地址、版本号、启动时间等等一些 Broker 的基本信息。如果 Broker 宕机或者与 ZooKeeper 集群失联了这个临时节点也会随之消失。 右侧部分的这棵树保存的就是主题和分区的信息。/brokers/topics/ 节点下面的每个子节点都是一个主题节点的名称就是主题名称。每个主题节点下面都包含一个固定的 partitions 节点pattitions 节点的子节点就是主题下的所有分区节点名称就是分区编号。 每个分区节点下面是一个名为 state 的临时节点节点中保存着分区当前的 leader 和所有的 ISR 的 BrokerID。这个 state 临时节点是由这个分区当前的 Leader Broker 创建的。如果这个分区的 Leader Broker 宕机了对应的这个 state 临时节点也会消失直到新的 Leader 被选举出来再次创建 state 临时节点。 6.3 Kafka 客户端如何找到对应的 Broker 那 Kafka 客户端如何找到主题、队列对应的 Broker 呢其实通过上面 ZooKeeper 中的数据结构你应该已经可以猜的八九不离十了。是的先根据主题和队列在右边的树中找到分区对应的 state 临时节点我们刚刚说过state 节点中保存了这个分区 Leader 的 BrokerID。拿到这个 Leader 的 BrokerID 后再去左侧的树中找到 BrokerID 对应的临时节点就可以获取到 Broker 真正的访问地址了。 Kafka 的客户端并不会去直接连接 ZooKeeper它只会和 Broker 进行远程通信那我们可以合理推测一下ZooKeeper 上的元数据应该是通过 Broker 中转给每个客户端的。 客户端真正与服务端发生网络传输是在 org.apache.kafka.clients.NetworkClient#poll 方法中实现的我们一直跟踪这个调用链 NetworkClient#poll() - DefaultMetadataUpdater#maybeUpdate(long) - DefaultMetadataUpdater#maybeUpdate(long, Node)直到 maybeUpdate(long, Node) 这个方法在这个方法里面Kafka 构造了一个更新元数据的请求 private long maybeUpdate(long now, Node node) {String nodeConnectionId node.idString();if (canSendRequest(nodeConnectionId, now)) {// 构建一个更新元数据的请求的构造器Metadata.MetadataRequestAndVersion metadataRequestAndVersion metadata.newMetadataRequestAndVersion();inProgressRequestVersion metadataRequestAndVersion.requestVersion;MetadataRequest.Builder metadataRequest metadataRequestAndVersion.requestBuilder;log.debug(Sending metadata request {} to node {}, metadataRequest, node);// 发送更新元数据的请求sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);return defaultRequestTimeoutMs;}//... }这段代码先构造了更新元数据的请求的构造器然后调用 sendInternalMetadataRequest() 把这个请求放到待发送的队列中。这里面有两个地方我需要特别说明一下。 第一点是在这个方法里面创建的并不是一个真正的更新元数据的 MetadataRequest而是一个用于构造 MetadataRequest 的构造器 MetadataRequest.Builder等到真正要发送请求之前Kafka 才会调用 Builder.buid() 方法把这个 MetadataRequest 构建出来然后发送出去。而且不仅是元数据的请求所有的请求都是这样来处理的。 第二点是调用 sendInternalMetadataRequest() 方法时这个请求也并没有被真正发出去依然是保存在待发送的队列中然后择机来异步批量发送。 请求的具体内容封装在 org.apache.kafka.common.requests.MetadataRequest 这个对象中它包含的信息很简单只有一个主题的列表来表明需要获取哪些主题的元数据另外还有一个布尔类型的字段 allowAutoTopicCreation表示是否允许自动创建主题。 然后我们再来看下在 Broker 中Kafka 是怎么来处理这个更新元数据的请求的。 Broker 处理所有 RPC 请求的入口类在 kafka.server.KafkaApis#handle 这个方法里面我们找到对应处理更新元数据的方法 handleTopicMetadataRequest(RequestChannel.Request)这段代码是用 Scala 语言编写的 def handleTopicMetadataRequest(request: RequestChannel.Request) {val metadataRequest request.body[MetadataRequest]val requestVersion request.header.apiVersion// 计算需要获取哪些主题的元数据val topics // 在旧版本的协议中每次都获取所有主题的元数据if (requestVersion 0) {if (metadataRequest.topics() null || metadataRequest.topics.isEmpty)metadataCache.getAllTopics()elsemetadataRequest.topics.asScala.toSet} else {if (metadataRequest.isAllTopics)metadataCache.getAllTopics()elsemetadataRequest.topics.asScala.toSet}// 省略掉鉴权相关代码// ...val topicMetadata if (authorizedTopics.isEmpty)Seq.empty[MetadataResponse.TopicMetadata]else// 从元数据缓存过滤出相关主题的元数据getTopicMetadata(metadataRequest.allowAutoTopicCreation, authorizedTopics, request.context.listenerName,errorUnavailableEndpoints, errorUnavailableListeners)// ...// 获取所有 Broker 列表val brokers metadataCache.getAliveBrokerstrace(Sending topic metadata %s and brokers %s for correlation id %d to client %s.format(completeTopicMetadata.mkString(,),brokers.mkString(,), request.header.correlationId, request.header.clientId))// 构建 Response 并发送sendResponseMaybeThrottle(request, requestThrottleMs new MetadataResponse(requestThrottleMs,brokers.flatMap(_.getNode(request.context.listenerName)).asJava,clusterId,metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),completeTopicMetadata.asJava))}这段代码的主要逻辑是先根据请求中的主题列表去本地的元数据缓存 MetadataCache 中过滤出相应主题的元数据也就是我们上面那张图中右半部分的那棵树的子集然后再去本地元数据缓存中获取所有 Broker 的集合也就是上图中左半部分那棵树最后把这两部分合在一起作为响应返回给客户端。 Kafka 在每个 Broker 中都维护了一份和 ZooKeeper 中一样的元数据缓存并不是每次客户端请求元数据就去读一次 ZooKeeper。由于 ZooKeeper 提供了 Watcher 这种监控机制Kafka 可以感知到 ZooKeeper 中的元数据变化从而及时更新 Broker 中的元数据缓存。 这样就完成了一次完整的更新元数据的流程。通过分析代码可以证实我们开始的猜测都是没有问题的。 6.4 zookeeper小结 ZooKeeper是一个分布式的协调服务它的核心服务是一个高可用、高可靠的一致性存储在此基础上提供了包括读写元数据、节点监控、选举、节点间通信和分布式锁等很多功能这些功能可以极大方便我们快速开发一个分布式的集群系统。 但是ZooKeeper 也并不是完美的在使用的时候你需要注意几个问题 不要往 ZooKeeper 里面写入大量数据它不是一个真正意义上的存储系统只适合存放少量的数据。依据服务器配置的不同ZooKeeper 在写入超过几百 MB 数据之后性能和稳定性都会严重下降。 不要让业务集群的可用性依赖于 ZooKeeper 的可用性什么意思呢你的系统可以使用 Zookeeper但你要留一手要考虑如果 Zookeeper 集群宕机了你的业务集群最好还能提供服务。因为 ZooKeeper 的选举过程是比较慢的而它对网络的抖动又比较敏感一旦触发选举这段时间内的 ZooKeeper 是不能提供任何服务的。 Kafka 主要使用 ZooKeeper 来保存它的元数据、监控 Broker 和分区的存活状态并利用 ZooKeeper 来进行选举。 Kafka 在 ZooKeeper 中保存的元数据主要就是 Broker 的列表和主题分区信息两棵树。这份元数据同时也被缓存到每一个 Broker 中。客户端并不直接和 ZooKeeper 来通信而是在需要的时候通过 RPC 请求去 Broker 上拉取它关心的主题的元数据然后保存到客户端的元数据缓存中以便支撑客户端生产和消费。 可以看到目前 Kafka 的这种设计集群的可用性是严重依赖 ZooKeeper 的也就是说如果 ZooKeeper 集群不能提供服务那整个 Kafka 集群也就不能提供服务了这其实是一个不太好的设计。 如果你需要要部署大规模的 Kafka 集群建议的方式是拆分成多个互相独立的小集群部署每个小集群都使用一组独立的 ZooKeeper 提供服务。这样每个 ZooKeeper 中存储的数据相对比较少并且如果某个 ZooKeeper 集群故障只会影响到一个小的 Kafka 集群故障的影响面相对小一些。 Kafka 的开发者也意识到了这个问题目前正在讨论开发一个元数据服务来替代 ZooKeeper可以看一下他们的Proposal。 且kafka已经在2.8后逐步抛弃zookeeper 为什么Kafka在2.8版本中会“抛弃”Zookeeper 深度解读Kafka 放弃 ZooKeeper消息系统兴起二次革命 七、Apache Kafka 工作流程 到目前为止我们讨论了 Kafka 的核心概念。 让我们现在来看一下 Kafka 的工作流程。 Kafka 只是分为一个或多个分区的主题的集合。Kafka 分区是消息的线性有序序列其中每个消息由它们的索引(称为偏移)来标识。Kafka 集群中的所有数据都是不相连的分区联合。 传入消息写在分区的末尾消息由消费者顺序读取。 通过将消息复制到不同的代理提供持久性。 Kafka 以快速可靠持久容错和零停机的方式提供基于pub-sub 和队列的消息系统。 在这两种情况下生产者只需将消息发送到主题消费者可以根据自己的需要选择任何一种类型的消息传递系统。 让我们按照下一节中的步骤来了解消费者如何选择他们选择的消息系统。 7.1 发布 - 订阅消息的工作流程 以下是 Pub-Sub 消息的逐步工作流程 - 生产者定期向主题发送消息。 Kafka 代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区Kafka 将在第一分区中存储一个消息在第二分区中存储第二消息。 消费者订阅特定主题。 一旦消费者订阅主题Kafka 将向消费者提供主题的当前偏移并且还将偏移保存在 Zookeeper 系统中现在保存在一个叫__consumer_offsets-xx的topic中。 在kafka的log文件中有很多以 __consumer_offsets_的文件夹总共50个; 由于Zookeeper并不适合大批量的频繁写入操作新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中即__consumer_offsets topic并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。 __consumer_offsets 是 kafka 自行创建的和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。  __consumer_offsets 的每条消息格式大致如图所示   在这里插入图片描述 可以想象成一个 KV 格式的消息key 就是一个三元组group.idtopic分区号而 value 就是 offset 的值。 考虑到一个 kafka 生成环境中可能有很多consumer 和 consumer group如果这些 consumer 同时提交位移则必将加重 __consumer_offsets 的写入负载因此 kafka 默认为该 topic 创建了50个分区并且对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions从而将负载分散到不同的 __consumer_offsets 分区上。 一般情况下当集群中第一次有消费者消费消息时会自动创建__consumer_offsets它的副本因子受 offsets.topic.replication.factor 参数的约束默认值为3注意该参数的使用限制在0.11.0.0版本发生变化分区数可以通过 offsets.topic.num.partitions 参数设置默认值为50。 消费者将定期请求 Kafka (如100 Ms)新消息。 一旦 Kafka 收到来自生产者的消息它将这些消息转发给消费者。 消费者将收到消息并进行处理。 一旦消息被处理消费者将向 Kafka 代理发送确认。 一旦 Kafka 收到确认它将偏移更改为新值并在 Zookeeper 中更新它现在保存在一个叫__consumer_offsets-xx的topic中。 由于偏移在 Zookeeper 中维护消费者可以正确地读取下一封邮件即使在服务器暴力期间。 以上流程将重复直到消费者停止请求。 消费者可以随时回退/跳到所需的主题偏移量并阅读所有后续消息。 7.2 队列消息/用户组的工作流 在队列消息传递系统而不是单个消费者中具有相同组 ID 的一组消费者将订阅主题。 简单来说订阅具有相同 Group ID 的主题的消费者被认为是单个组并且消息在它们之间共享。 让我们检查这个系统的实际工作流程。 生产者以固定间隔向某个主题发送消息。 Kafka存储在为该特定主题配置的分区中的所有消息类似于前面的方案。 单个消费者订阅特定主题假设 Topic-01 为 Group ID 为 Group-1 。 Kafka 以与发布 - 订阅消息相同的方式与消费者交互直到新消费者以相同的组 ID 订阅相同主题Topic-01  1 。 一旦新消费者到达Kafka 将其操作切换到共享模式并在两个消费者之间共享数据。 此共享将继续直到用户数达到为该特定主题配置的分区数。 一旦消费者的数量超过分区的数量新消费者将不会接收任何进一步的消息直到现有消费者取消订阅任何一个消费者。 出现这种情况是因为 Kafka 中的每个消费者将被分配至少一个分区并且一旦所有分区被分配给现有消费者新消费者将必须等待。 此功能也称为使用者组。 同样Kafka 将以非常简单和高效的方式提供两个系统中最好的。 八、Apache Kafka副本同步机制 Kafka中主题的每个Partition有一个预写式日志文件每个Partition都由一系列有序的、不可变的消息组成这些消息被连续的追加到Partition中Partition中的每个消息都有一个连续的序列号叫做offset 确定它在分区日志中唯一的位置。 Kafka每个topic的partition有N个副本其中N是topic的复制因子。Kafka通过多副本机制实现故障自动转移当Kafka集群中一个Broker失效情况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其他节点上。N个replicas中。其中一个replica为leader其他都为followerleader处理partition的所有读写请求与此同时follower会被动定期地去复制leader上的数据。 如下图所示Kafka集群中有4个broker, 某topic有3个partition,且复制因子即副本个数也为3 Kafka提供了数据复制算法保证如果leader发生故障或挂掉一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader或者说follower追赶leader数据。leader负责维护和跟踪ISR(In-Sync Replicas的缩写表示副本同步队列具体可参考下节)中所有follower滞后的状态。当producer发送一条消息到broker后leader写入消息并复制到所有follower。消息提交之后才被成功复制到所有的同步副本。消息复制延迟受最慢的follower限制重要的是快速检测慢副本如果follower“落后”太多或者失效leader将会把它从ISR中删除。 8.1 副本同步队列(ISR) 所谓同步必须满足如下两个条件 副本节点必须能与zookeeper保持会话(心跳机制) 副本能复制leader上的所有写操作并且不能落后太多。(卡住或滞后的副本控制是由 replica.lag.time.max.ms 配置) 默认情况下Kafka对应的topic的replica数量为1即每个partition都有一个唯一的leader为了确保消息的可靠性通常应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1比如3。 所有的副本(replicas)统称为Assigned Replicas即AR。ISR是AR中的一个子集由leader维护ISR列表follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表新加入的follower也会先存放在OSR中。ARISROSR。 上一节中的HW俗称高水位是HighWatermark的缩写取一个partition对应的ISR中最小的LEO作为HWconsumer最多只能消费到HW所在的位置。另外每个replica都有HW,leader和follower各自负责更新自己的HW的状态。对于leader新写入的消息consumer不能立刻消费leader会等待该消息被所有ISR中的replicas同步后更新HW此时消息才能被consumer消费。这样就保证了如果leader所在的broker失效该消息仍然可以从新选举的leader中获取。对于来自内部broKer的读取请求没有HW的限制。 下图详细的说明了当producer生产消息至broker后ISR以及HW和LEO的流转过程 由此可见Kafka的复制机制既不是完全的同步复制也不是单纯的异步复制。事实上同步复制要求所有能工作的follower都复制完这条消息才会被commit这种复制方式极大的影响了吞吐率。而异步复制方式下follower异步的从leader复制数据数据只要被leader写入log就被认为已经commit这种情况下如果follower都还没有复制完落后于leader时突然leader宕机则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。 Kafka的ISR的管理最终都会反馈到Zookeeper节点上。具体位置为/brokers/topics/[topic]/partitions/[partition]/state。目前有两个地方会对这个Zookeeper的节点进行维护 Controller来维护Kafka集群中的其中一个Broker会被选举为Controller主要负责Partition管理和副本状态管理也会执行类似于重分配partition之类的管理任务。在符合某些特定条件下Controller下的LeaderSelector会选举新的leaderISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知所有的replicas。 leader来维护leader有单独的线程定期检测ISR中follower是否脱离ISR, 如果发现ISR变化则会将新的ISR的信息返回到Zookeeper的相关节点中。 8.2 副本不同步的异常情况 慢副本在一定周期时间内follower不能追赶上leader。最常见的原因之一是I / O瓶颈导致follower追加复制消息速度慢于从leader拉取速度。 卡住副本在一定周期时间内follower停止从leader拉取请求。follower replica卡住了是由于GC暂停或follower失效或死亡。 新启动副本当用户给主题增加副本因子时新的follower不在同步副本列表中直到他们完全赶上了leader日志。 由此我们可以得知kafka是如何保证数据的高可用的。 九、kafka消费者组的作用 关于kafka的消费者组的概念和作用首先概念很好理解就是一些消费者具有共同的group id比如有5个消费者都订阅了 topicA则我们可以将5个消费者放到 groupA中并将这个group命名为“groupA”则此时groupA的消费者组的id就是“groupA”。 那么为啥要有 消费者组这种设定呢主要原因在于 首先传统的消息传递模型分为两类 原文来自Scalability of Kafka Messaging using Consumer Groups - Cloudera Blog​blog.cloudera.com/scalability-of-kafka-messaging-using-consumer-groups/ 9.1 共享消息队列模式 共享消息队列模式允许来自producer的消息流到达单个消费者。推送到队列的每条消息只能读取一次并且只能由一个consumer读取。consumer从他们之间共享的队列的末尾拉取消息。共享队列然后从成功拉取的队列中删除消息。 缺点 一旦一个消费者拉取一条消息这条信息就会从队列中删除。 消息队列更适合命令式编程其中消息很像对属于同一域的消费者的命令而不是事件驱动编程在事件驱动编程中单个事件可以导致来自消费者端的多个动作. 虽然多个消费者可以连接到共享队列但他们必须都属于同一个逻辑域并执行相同的功能。因此共享消息队列中处理的可扩展性受限于单个消费域。 9.2 发布订阅模式 发布订阅模型允许多个生产者将消息发布到代理托管的topic这些topic可以被多个消费者订阅。因此一条消息实际上是被广播给一个主题的所有订阅者。 缺点 发布者与订阅者的逻辑分离允许松耦合的架构但规模有限。可扩展性是有限的因为每个消费者者必须订阅每个分区才能访问来自所有分区的消息。因此虽然传统的 pub-sub 模型适用于小型网络但不稳定性随着节点的增长而增加。 解耦的副作用还体现在消息传递的不可靠性上。由于每条消息都广播给所有订阅者因此很难缩放流的处理因为消费者彼此不同步。 9.3 Kafka 如何桥接这两种模型 Kafka 则结合了共享消息队列模式和发布-订阅模型的特点。它通过以下方式实现 使用消费者组consumer grup broker保留信息broker指kafka所在的服务器一台服务器可以运行一个或多个kafka 当消费者加入一个组并订阅一个主题时该组中只有一个消费者实际消费了该主题的每条消息。与传统消息队列不同消息也由代理保留在其主题分区中。 多个消费者组可以从同一组主题中读取并在不同时间满足不同的逻辑应用程序域。因此Kafka 通过属于同一消费者组的消费者提供了高可扩展性的优势以及同时为多个独立的下游应用程序提供服务的能力。 消费群体 消费者组使 Kafka 可以灵活地同时拥有消息队列和发布订阅模型的优势。属于同一个消费者组的 Kafka 消费者共享一个组 ID。然后组中的消费者通过确定每个分区仅由组中的单个消费者使用来尽可能公平地划分主题分区。 如果所有消费者都来自同一组那么 Kafka 模型的功能就像传统的消息队列一样。然后对所有记录和处理进行负载平衡 每条消息将仅由该组的一个使用者使用。每个分区最多连接到一个组中的一个消费者。 当存在多个消费群体时数据消费模型的流程与传统的发布订阅模型一致。消息被广播到所有消费者组。 也有独占消费者也就是只有一个消费者的消费群体。这样的消费者必须连接到它需要的所有分区。 理想情况下分区数等于消费者数。如果消费者数量更多多余的消费者就会闲置浪费客户端资源。如果分区数量更多一些消费者将从多个分区读取这应该不是问题除非消息的顺序对用例很重要。Kafka 不保证分区之间的消息排序。它确实提供了分区内的排序。因此如果 Kafka 仅订阅单个分区则它可以维护消费者的消息排序。还可以使用处理期间要分组的键对消息进行排序。 Kafka 还通过选择发送给代理的形式确认或抵消交付提交来确保它已到达订阅组从而消除了有关消息交付可靠性的问题。由于分区只能与消费者组中的消费者具有一对一或多对一的关系因此避免了消费者组内的消息复制因为给定的消息一次仅到达组中的一个消费者。 重新平衡 随着消费者组的扩展和缩减运行中的消费者会在自己之间拆分分区。重新平衡是由分区和消费者之间的所有权转移触发的这可能是由于消费者或经纪人崩溃或添加主题或分区引起的。它允许从系统中安全地添加或删除消费者。 在启动时代理被标记为消费者组子集的协调器这些组从消费者接收RegisterConsumer 请求并返回包含他们应该拥有的分区列表的RegisterConsumer 响应。协调器还启动故障检测以检查消费者是活着还是死了。当消费者未能在会话超时之前向协调器代理发送心跳时协调器将消费者标记为死亡并设置重新平衡以发生。可以使用Kafka 服务的session.timeout.ms属性设置此会话时间段。该heartbeat.interval.ms属性使健康的消费者认识到再平衡的发生以便重新发送RegisterConsumer向协调器请求。 例如假设 A 组的使用者 C2 发生故障C1 和 C3 将暂时暂停对来自其分区的消息的消耗并且这些分区将在它们之间重新分配。从之前的示例中当消费者 C2 丢失时会触发重新平衡过程并将分区重新分配给组中的其他消费者。B 组消费者不受 A 组事件的影响。 用例实现 我们设置了一个 Kafka 主题“推文”的水槽接收器它分布在两个代理之间。“推文”只有一个分区。 Java 消费者 Consumer0 连接到主题“tweets”和来自控制台的另一个消费者该消费者与前一个消费者属于同一组 ID。第一个具有组 ID group1。来自控制台的 kafka 消费者的组 ID 为“控制台”。然后我们将两个消费者添加到消费者组“group1”中。因为只有一个partition所以我们看到组内三个消费者只有一个消费者Consumer2继续为组拉取消息。 然后启动 group2 的使用者并连接到相同的主题“推文”。两个消费者以相同的偏移速度阅读。当 group1 中的 Consumer2 关闭时我们看到经过一段时间后会话超时来自第一组的 Consumer1 从最后一个关闭的偏移量 Consumer2 开始。Consumer0 仍然停留在它停止的偏移量处。这表明由于从组中丢失了一个消费者而发生了重新平衡。然而控制台消费者在消息消费方面不受影响。 在一个主题有多个分区的情况下我们可以看到根据启动时分配的分区属于同一组的许多消费者将处理主题外的消息。保证消息仅在分区内排序而不是在代理之间排序。当消费者在这种情况下失败时它正在读取的分区在会话超时启动的重新平衡阶段被重新分配。 结论 共享消息队列允许在单个域中扩展消息处理。发布订阅模型允许向消费者广播消息但在规模和消息传递方面存在不确定性。Kafka 将消息队列中的处理规模与发布-订阅模型的松散耦合架构结合在一起通过实现消费者组来实现处理规模、多域支持和消息可靠性。Kafka 中的重新平衡允许消费者在同等程度上保持容错性和可扩展性。 因此使用 kafka 消费者组来设计流应用程序的消息处理端可以让用户有效地利用 Kafka 的规模和容错优势。 QA ZK挂了后kafka还能不能用ZK存的什么数据 Kafka与ZooKeeper关系 Zookeeper的作用 Apache Kafka的一个关键依赖是Apache Zookeeper它是一个分布式配置和同步服务。 Zookeeper是Kafka代理和消费者之间的协调接口。 Kafka服务器通过Zookeeper集群共享信息。 Kafka在Zookeeper中存储基本元数据例如关于主题代理消费者偏移(队列读取器)等的信息。 由于所有关键信息存储在Zookeeper中并且它通常在其整体上复制此数据因此Kafka代理/ Zookeeper的故障不会影响Kafka集群的状态。 Kafka将恢复状态一旦Zookeeper重新启动。 这为Kafka带来了零停机时间。 Kafka代理之间的领导者选举也通过使用Zookeeper在领导者失败的情况下完成。 Kafka 主要使用 ZooKeeper 来保存它的元数据、监控 Broker 和分区的存活状态利用ZooKeeper临时节点以及Watcher机制来监控其集群节点的状态变化并利用ZooKeeper 来进行选举保证其高可用性。 Kafka在ZooKeeper中保存元数据信息 其中如图所示 绿色节点为临时节点其余为持久化节点 左侧树代表着Kafka 的 Broker 信息/brokers/ids/[0…N]每个临时节点对应着一个在线的 BrokerBroker 启动后会创建一个临时节点代表Broker 已经加入集群可以提供服务了节点名称就是 BrokerID节点内保存了包括Broker 的地址、版本号、启动时间等等一些 Broker 的基本信息。如果 Broker 宕机或者与ZooKeeper 集群失联了这个临时节点也会随之消失。 右侧部分的这棵树保存的就是主题和分区的信息。/brokers/topics/ 节点下面的每个子节点都是一个主题节点的名称就是主题名称。每个主题节点下面都包含一个固定的partitions 节点pattitions 节点的子节点就是主题下的所有分区节点名称就是分区编号。 每个分区节点下面是一个名为 state 的临时节点节点中保存着分区当前的 leader 和所有的 ISR 的 BrokerID。这个 state 临时节点是由这个分区当前的 Leader Broker 创建的。如果这个分区的 Leader Broker 宕机了对应的这个 state 临时节点也会消失直到新的Leader 被选举出来再次创建 state 临时节点。 Kafka 客户端如何找到对应的 Broker 那 Kafka 客户端如何找到主题、队列对应的 Broker 呢 先根据主题和队列在右边的树中找到分区对应的 state 临时节点state 节点中保存了这个分区 Leader 的 BrokerID。拿到这个 Leader 的 BrokerID 后再去左侧的树中找到 BrokerID 对应的临时节点就可以获取到 Broker 真正的访问地址了。 Kafka 在每个 Broker 中都维护了一份和 ZooKeeper 中一样的元数据缓存并不是每次客户端请求元数据就去读一次 ZooKeeper。由于 ZooKeeper 提供了 Watcher 这种监控机制Kafka 可以感知到 ZooKeeper 中的元数据变化从而及时更新 Broker 中的元数据缓存。 即先根据请求中的主题列表去本地的元数据缓存 MetadataCache 中过滤出相应主题的元数据也就是我们上面那张图中右半部分的那棵树的子集然后再去本地元数据缓存中获取所有 Broker 的集合也就是上图中左半部分那棵树最后把这两部分合在一起作为响应返回给客户端。 所以结论是如果只有zk挂了应该不影响kafka的使用。但是如果zk集群和某个kafka leader节点leader节点承担读写功能同时不可用了即触发了leader的选举因此时zk不可用会影响kafka的使用 副本做哪些工作leader承担哪些工作 副本集是做什么用的 leader承担读写工作并把数据同步至其他follower节点 partition的replicas副本只是承担备份的作用没有读写流量。 所有的副本(replicas)统称为Assigned Replicas即AR。 ISR是AR中的一个子集由leader维护ISR列表follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表新加入的follower也会先存放在OSR中。ARISROSR。 只有消费者没有消费组行不行消费组是用来做什么的 消费者组的优势 1. 高性能 假设一个主题有10个分区如果没有消费者组只有一个消费者对这10个分区消费他的压力肯定大。 如果有了消费者组组内的成员就可以分担这10个分区的压力提高消费性能。 问那我可以用多个消费者每个消费者消费一个分区效果和一个组里的不同消费者消费不同分区是一样的吧压力不会太大呀 答可以用多个消费者每个消费者消费一个分区。 这样确实可以达到与使用消费者组类似的效果每个消费者只消费一个分区的数据可以避免消费者之间的数据冲突和重复消费。同时由于每个消费者只需要处理一个分区的数据压力也会相对较小。 然而这种方式需要手动管理消费者的数量和分区分配对于大规模的分布式系统而言可能存在一些挑战和限制。例如当新增或减少消费者时需要重新分配分区可能会导致一些额外的开销和复杂性。 另外如果使用多个消费者分别消费不同分区的方式需要注意处理消费者失败的情况。如果某个消费者失败需要有人工介入来重新分配分区确保数据能够被正确消费。 相比之下使用消费者组可以自动处理这些情况提供更稳定和可靠的数据消费机制。 2. 消费模式灵活 假设有4个消费者订阅一个主题不同的组合方式就可以形成不同的消费模式。 使用4个消费者组每组里放一个消费者利用分区在消费者组间共享的特性就实现了 广播发布订阅模式 。 只使用一个消费者组把4个消费者都放在一起利用分区在组内成员间互斥的特性就实现了单播队列模式 。 3. 故障容灾 如果只有一个消费者出现故障后就比较麻烦了但有了消费者组之后就方便多了。 消费组会对其成员进行管理在有消费者加入或者退出后消费者成员列表发生变化消费组就会执行再平衡的操作。 例如一个消费者宕机后之前分配给他的分区会重新分配给其他的消费者实现消费者的故障容错。 4. 小结 消费者组的好处 消费效率更高 消费模式灵活 便于故障容灾 我们在kafka的log文件中发现了还有很多以 __consumer_offsets_的文件夹总共50个; 由于Zookeeper并不适合大批量的频繁写入操作新版Kafka已推荐将consumer的位移信息保存在Kafka内部的topic中即__consumer_offsets topic并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。 __consumer_offsets 是 kafka 自行创建的和普通的 topic 相同。它存在的目的之一就是保存 consumer 提交的位移。  __consumer_offsets 的每条消息格式大致如图所示   可以想象成一个 KV 格式的消息key 就是一个三元组group.idtopic分区号而 value 就是 offset 的值。 考虑到一个 kafka 生成环境中可能有很多consumer 和 consumer group如果这些 consumer 同时提交位移则必将加重 __consumer_offsets 的写入负载因此 kafka 默认为该 topic 创建了50个分区并且对每个 group.id做哈希求模运算Math.abs(groupID.hashCode()) % numPartitions从而将负载分散到不同的 __consumer_offsets 分区上。 一般情况下当集群中第一次有消费者消费消息时会自动创建__consumer_offsets它的副本因子受 offsets.topic.replication.factor 参数的约束默认值为3注意该参数的使用限制在0.11.0.0版本发生变化分区数可以通过 offsets.topic.num.partitions 参数设置默认值为50。 kafka集群对服务器配置的要求 1. 需求场景分析 电商平台需要每天10亿请求都要发送到Kafka集群上面。二八反正一般评估出来问题都不大。10亿请求 - 24 过来的一般情况下每天的12:00 到早上8:00 这段时间其实是没有多大的数据量的。80%的请求是用的另外16小时的处理的。16个小时处理 - 8亿的请求。16 * 0.2  3个小时 处理了8亿请求的80%的数据 也就是说6亿的数据是靠3个小时处理完的。我们简单的算一下高峰期时候的qps 6亿/3小时 5.5万/s qps5.5万 10亿请求 * 50kb  46T 每天需要存储46T的数据 一般情况下我们都会设置两个副本 46T * 2  92T  Kafka里面的数据是有保留的时间周期保留最近3天的数据。92T * 3天  276T我这儿说的是50kb不是说一条消息就是50kb不是把日志合并了多条日志合并在一起通常情况下一条消息就几b也有可能就是几百字节。 2. 物理机数量评估 1首先分析一下是需要虚拟机还是物理机 像Kafka mysql hadoop这些集群搭建的时候我们生产里面都是使用物理机。2高峰期需要处理的请求总的请求每秒5.5万个其实一两台物理机绝对是可以抗住的。一般情况下我们评估机器的时候是按照高峰期的4倍的去评估。如果是4倍的话大概我们集群的能力要准备到 20万qps。这样子的集群才是比较安全的集群。大概就需要5台物理机。每台承受4万请求。 场景总结搞定10亿请求高峰期5.5万的qps276T的数据需要5台物理机。 3. 磁盘选择 搞定10亿请求高峰期5.5万的qps276T的数据需要5台物理机。 1SSD固态硬盘还是需要普通的机械硬盘 SSD硬盘性能比较好但是价格贵  SATA/SAS盘某方面性能不是很好但是比较便宜。SSD硬盘性能比较好指的是它随机读写的性能比较好。适合MySQL这样集群。**但是其实他的顺序写的性能跟SAS盘差不多。kafka的理解就是用的顺序写。所以我们就用普通的【机械硬盘】就可以了。 2需要我们评估每台服务器需要多少块磁盘 5台服务器一共需要276T 大约每台服务器 需要存储60T的数据。我们公司里面服务器的配置用的是 11块硬盘每个硬盘 7T。11 * 7T  77T 77T * 5 台服务器  385T。 场景总结 搞定10亿请求需要5台物理机11SAS * 7T 4. 内存评估 搞定10亿请求需要5台物理机11SAS * 7T 我们发现kafka读写数据的流程 都是基于os cache,换句话说假设咱们的os cashe无限大那么整个kafka是不是相当于就是基于内存去操作如果是基于内存去操作性能肯定很好。内存是有限的。1 尽可能多的内存资源要给 os cache 2 Kafka的代码用 核心的代码用的是scala写的客户端的代码java写的。都是基于jvm。所以我们还要给一部分的内存给jvm。Kafka的设计没有把很多数据结构都放在jvm里面。所以我们的这个jvm不需要太大的内存。根据经验给个10G就可以了。 NameNode: jvm里面还放了元数据几十GJVM一定要给得很大。比如给个100G。 假设我们这个10请求的这个项目一共会有100个topic。100 topic * 5 partition * 2  1000 partition 一个partition其实就是物理机上面的一个目录这个目录下面会有很多个.log的文件。.log就是存储数据文件默认情况下一个.log文件的大小是1G。我们如果要保证 1000个partition 的最新的.log 文件的数据 如果都在内存里面这个时候性能就是最好。1000 * 1G  1000G内存. 我们只需要把当前最新的这个log 保证里面的25%的最新的数据在内存里面。250M * 1000  0.25 G* 1000 250G的内存。 250内存 / 5  50G内存 50G10G  60G内存 64G的内存另外的4G操作系统本生是不是也需要内存。其实Kafka的jvm也可以不用给到10G这么多。评估出来64G是可以的。当然如果能给到128G的内存的服务器那就最好。 我刚刚评估的时候用的都是一个topic是5个partition但是如果是数据量比较大的topic可能会有10个partition。 总结搞定10亿请求需要5台物理机11SAS * 7T 需要64G的内存128G更好 5. CPU压力评估 评估一下每台服务器需要多少cpu core(资源很有限) 我们评估需要多少个cpu 依据就是看我们的服务里面有多少线程去跑。线程就是依托cpu 去运行的。如果我们的线程比较多但是cpu core比较少这样的话我们的机器负载就会很高性能不就不好。 评估一下kafka的一台服务器 启动以后会有多少线程 Acceptor线程 1 processor线程 3 6~9个线程 处理请求线程 8个 32个线程 定时清理的线程拉取数据的线程定时检查ISR列表的机制 等等。所以大概一个Kafka的服务启动起来以后会有一百多个线程。 cpu core  4个一遍来说几十个线程就肯定把cpu 打满了。cpu core  8个应该很轻松的能支持几十个线程。如果我们的线程是100多个或者差不多200个那么8 个 cpu core是搞不定的。所以我们这儿建议CPU core  16个。如果可以的话能有32个cpu core 那就最好。 结论kafka集群最低也要给16个cpu core如果能给到32 cpu core那就更好。2cpu * 8 16 cpu core 4cpu * 8  32 cpu core 总结搞定10亿请求需要5台物理机11SAS * 7T 需要64G的内存128G更好需要16个cpu core32个更好 6. 网络需求评估 评估我们需要什么样网卡一般要么是千兆的网卡1G/s还有的就是万兆的网卡10G/s 高峰期的时候 每秒会有5.5万的请求涌入5.5/5  大约是每台服务器会有1万个请求涌入。 我们之前说的 10000 * 50kb  488M  也就是每条服务器每秒要接受488M的数据。数据还要有副本副本之间的同步 也是走的网络的请求。488 * 2  976m/s 说明一下 很多公司的数据一个请求里面是没有50kb这么大的我们公司是因为主机在生产端封装了数据 然后把多条数据合并在一起了所以我们的一个请求才会有这么大。 说明一下 一般情况下网卡的带宽是达不到极限的如果是千兆的网卡我们能用的一般就是700M左右。 但是如果最好的情况我们还是使用万兆的网卡。 如果使用的是万兆的那就是很轻松。 怎么排查kafka集群的问题请求(读、写)的流程 参考 Kafka的协调服务ZooKeeper实现分布式系统的“瑞士军刀” 关于kafka中的消费者组consumer group以及kafka到底用的啥消息传递模式待续 真的搞懂 Kafka 看这一篇就够了
http://www.pierceye.com/news/142169/

相关文章:

  • 网站建设公司前十名电子商务网站建设论文开题报告
  • 泉州公司建设网站秦皇岛市网站建设
  • 网站建设说课获奖视频小程序制作用华网天下优惠
  • 杭州网站建设公司代理加盟广东建设企业网站怎么样
  • 网站建制作公司企业营销
  • 建设网站基本流程佛山制作网页公司
  • 眼睛网站开发wordpress影院插件
  • 成都专业做网站的公司有哪些建设工程管理专业
  • 北京seo关键词优化外包网站seo诊断分析报告
  • 怎么做淘宝客网站注册域名后如何建立网站
  • 网络营销资讯网站茶山镇仿做网站
  • 东莞产品网站建设网络设计方案包括哪些
  • 精品课程网站建设论文一个网站如何做推广
  • elementui 做的网站个人网站做推广
  • 外贸 静态网站 怎么做网页制作的目的和意义
  • 做酒店的网站免费进销存软件哪个简单好用
  • 湖州做网站推广的公司phpnow安装wordpress
  • 荆州网站建设销售网站怎么做的
  • 访问失效链接 如何删除 网站维护免费推广做产品的网站
  • 哪个网站做ppt能赚钱揭阳网站建设方案托管
  • 哪些网站可以免费做h5wordpress目录迁移
  • 郑州网站建设哪家有什么可以做兼职的网站吗
  • 没有影视许可怎么用国内空间做网站wordpress首页加广告代码
  • 高端电子商务网站建设js网页特效案例
  • 一个网站做三个关键词网站的建设与维护的职责
  • wordpress tag伪静态网站建设与优化推广方案模板
  • 公司网站建设 宁波传奇网站模板psd
  • 安县移动网站建设广州 网站制作
  • 山西太原网站建设网站设计计划
  • 广州番禺网站制作推广新浦网站制作