黄冈市建设局官方网站,项目立项查询平台,网站开发专业公司有哪些,微信小程序开发实战课后答案1、kafka是什么#xff1f;
kafka是一个高吞吐#xff0c;分布式#xff0c;基于发布/订阅的消息系统#xff0c;最大的特性就是可以实时的处理大量的数据以满足各种需求场景#xff1a;日志收集#xff0c;离线和在线的消息消费#xff0c;等等 2、kakfa的基础架构
kafka是一个高吞吐分布式基于发布/订阅的消息系统最大的特性就是可以实时的处理大量的数据以满足各种需求场景日志收集离线和在线的消息消费等等 2、kakfa的基础架构
topic 主题kafka根据topic对消息进行分类发布到kafka上的每一条消息都要指定一个topic
producer 生产者 向kafka主题发布消息的客户端
consumer 消费者 订阅topic主题读取消息的客户端
broker 消息处理中间件在kafka集群上一个服务器就是一个broker
partition 分区 为了实现拓展性一个大的topic可以分布在多个broker上也就是一个topic分为多个partition每个partition的内部消息有序的 一些其他的定义
consumer group消费者组 多个consumer组成消费者组中每个消费者负责消费不同分区的数据一个分区只能由一个组内消费者消费消费者组之间的消费互不影响。
replica 副本提升可用性为每个partition增加若干个副本分布在不同的broker上避免某个broker不可用的时候读不到消息
leader : 多个副本的主生产者只会往leader上发送数据消费者也只会从leader上读取数据
follower: 多个副本的从从leader中同步数据当leader发生故障时某个follower会成为新的leader。
offset 偏移量 可以唯一的标识一条消息消费者通过控制偏移量来决定下次去读的消息消息被消费后不会立刻被删除这样多个业务就可以重复的消费kafka的消息。 3、为什么要使用kafka或者为什么要使用消息队列
缓冲和削峰某一时候上游数据有突发流量下游服务器没有足够的性能来保证性能kafka在中间就可以起到一个缓冲的作用把消息暂存在kafka中即便某一时刻数据激增下游的服务器也可以按照自己的节奏慢慢处理。
解耦增加拓展性消息队列可以作为一个接口层解耦业务流程kafka想象成一个菜鸟驿站
一对多一个生产者发布的消息可以被多个消费者消费供一些没有关联的业务同时使用
增强健壮性消息队列可以堆积请求因此即便消费端业务短时间内挂掉也不会影响主要业务
异步通信消息队列提供了异步处理机制允许用户将一些消息放入队列然后在合适的时间去处理。 4、数据传输的事务定义有那三种
数据传输的事务定义有三种级别
1、最多一次消息不回重复发送最多被传输一次但也有可能一次不传输
2、最少一次消息不会漏发但是可能会被重复传输
3、精确的一次exactly one不会漏传但是也不会重复传输是大家所期望的。 5、kafka如何判断节点是否存活
2.8.0一起kafka是依赖zk的因此判断是否存活的条件
1、节点可以维护和zk的链接这个是zk通过心跳机制来检查
2、如果节点是个follower必须能够及时的同步leader的写操作延时不能太久。
2.8.0版本以后kafka移除了zk 待补充 6、kafka消息是采用pull模式还是push模式
producer将消息推送到brokerconsumer从broker上拉取消息。
好处是consumer可以自主决定从broker上拉取数据的速率。缺点是如果broker中没有可以供消费的消息consumer就会不断的轮询为了避免这点kafka有个参数可以让consumer阻塞直到消息到达。 7、kafka中的ISR, AR都是什么意思ISR的伸缩是什么意思
ISR:In-Sync Replicas 副本同步队列
AR: Assigned Replicas 已分配的副本即所有副本
OSRoutof-Sync Replicas 表示follower与leader副本同步的时候延迟过多的副本
ISR是由leader进行维护follower从leader上同步数据会有一定的延迟如果follower长时间未向leader发送通信请求同步数据延迟时间replica.lag.time.max.ms参数设定默认30s就会把follower从ISR中剔除存入OSR列表新加入的follower也会存在OSR中即 AR ISR OSR 8、leader的选举流程
kafka的broker启动后首先在在zk上注册controller节点利用zk的强一直性一个节点只能被一个客户端创建该节点中写入当前broker的信息创建成功的controller来决定leader的选举
选举出来的controller会监听集群broker节点的变化然后决定选举leader
partition的leader选举规则是在ISR中存活为前提按照AR中排在前面的优先。例如ISR【102】AR【102】那么leader就会按照102轮询此时leader就是broker1
controller此时就会将节点信息上传到ZK其他controller去ZK上同步节点的信息
假设某一时刻broker1 挂了controller监听到节点的变化就会更新ISR, 选举新的leader然后将信息同步到ZK上 9、Leader 和 Follower 故障处理
LEOLog End Offset每个副本的最后一个offsetLEO其实就是最新的offset 1。HWHigh Watermark所有副本中最小的LEO 。 follower故障处理如图
某一时刻三个broker的数据如上高水位线HW为5此时broker发生故障follower2被踢出ISR之后leader和follower1并不受影响因此可以继续接收并同步数据broker2恢复后follower2会读取本地磁盘记录的上次的HW也就是5并将高于HW部门截取掉删除kafka认为这段数据未经过校验不可信然后开始向leader进行同步。当follower2的同步的水平到了当前的水位线就可以从新加入ISR了。 leader故障
leader发生故障会从ISR中重新选出一个leader为了保证多个副本之间数据一致性其他的follower会将高于leader的部分截掉然后从新的leader同步数据。
可以看出故障处理只能保证副本之间数据的一致性但是不能保证数据不丢失或者不重复。 10、kafka的文件存储机制
topic是逻辑上的概念partition是物理上的概念。
一个topic可以分成多个partition每个partition对应一个log文件log文件中存储的就是生产者生产的数据每次producer生产新的数据就会追加到log文件末端。
为了保证定位效率kafka采用了分片和索引的机制即每个partition又分为多个segment每个segment包括 .log , .index.timeindex等文件一个segment默认是1G超过1G就会生成一个新的segment。
.log 文件记录生产信息
.index 记录偏移量用于快速定位 index是稀疏索引每往log日志中记录4Kb数据会记录一条索引index文件中记录的offset为相对offset参数log.index.interval,bytes4kb,
.timeindex 记录时间信息kafka默认是数据保留7天超过的会清理 11、kafka的文件清理策略
kafka默认是数据保留7天可以通过如下参数修改保存时间
log.retention.hours最低优先级小时默认 7 天。log.retention.minutes分钟。log.retention.ms最高优先级毫秒。log.retention.check.interval.ms负责设置检查周期默认 5 分钟。
日志一旦超过了设置的保留时间将怎么清理呢
1、delete删除默认值。就是将过期日志直接删除log.cleanup.policy delete
以segment中所有记录中最大的时间戳作为该文件的时间戳。因此如果一个segment中一部分数据过期一部分数据没过期那么是不会删除的。
2、compact 压缩log.cleanup.policy compact相同key的不同value值只保留最后一个版本压缩后的offset不一定是连续的只适用于特殊场景如消息的key也是实际数据的key一般不用。 12、leader partition的负载平衡
因为生产者和消费者操作的都是leader partition如果集群出现了leader partition不平衡就会导致broker压力太大。
一般情况下kafka本身会自动把leader均匀分散在各个机器上来保证每台机器的吞吐量都是均匀的但是如果某些broker宕机leader重新选举就可能导致leader partition过于集中在少部分broker上这样一来少数几台broker读写请求压力过高造成了集群的负载不均衡。 auto.leader.rebalance.enable默认是true说明自动启用leader 再平衡 leader.imbalance.per.broker.percentage默认是10%每个broker允许的不平衡的leader的比率。如果某个broker超过了这个值控制器会触发leader的平衡
leader.imbalance.check.interval.seconds默认值300秒。检查leader负载是否平衡的间隔时间。 针对broker3节点分区3的AR优先副本是3节点但是3节点却不是Leader节点所以不平衡数加1AR副本总数是4 所以broker3节点不平衡率为1/410%需要再平衡。 broker2和broker3节点不平衡率一样需要再平衡。 broker0和broker1的不平衡数为0不需要再平衡。 生产环境中建议不要将自动再平衡打开即便打开也要将再平衡因子设置的大一些。 13、分区好处以及生产者发送消息的分区策略
分区好处
1、便于合理使用存储资源每个partition在一个broker上存储可以将海量数据按照分区切割成一块块数据存储在多台broker上合理控制分区的任务实现负载均衡
2、提高并行度生产者可以以分区为单位发送数据消费者也可以以分区问单位消费数据。
生产者发送消息的分区策略
1、如果指明了partition直接将数据写入指明的分区
2、如果没有指明partition但是有key的情况下将key的hash值与该topic中partition的数量取余存入对应的分区
3、既没有指定partition也没有key的情况kafka会采用粘性分区即随机选取一个分区并尽可能的一直使用该分区如果该分区batch已满或者已经完成则在随机选一个不同的分区。 14、生产者发送消息的流程
在消息发送的过程涉及到两个线程一个main线程一个是sender线程
main线程创建了双端队列RecordAccumulator消息累加器消息经过拦截器序列化器分区器等然后先存在 RecordAccumulator中 每个分区都会创建一个双端队列消息发送到哪个队列中参考13题RecordAccumulator默认大小是32M在内存中RecordAccumulator包含多个双端队列其实每个partition有一个双端队列双端队列中有一个概念叫producerBatch通过参数batch.size控制大小默认大小是16ksender线程用来将队列中金的消息发往kafka集群发送是以producerbatch为单位的只有当数据累计到batch.size时sender才会发送数据如果数据量迟迟没有到batch.size会有一个超时时间 linger.ms默认值是0,表示没有延迟需要修改,到达这个时间后也会发送数据。sender线程从 recordAccumulator 中拉取的 batch 先保存到 InFlightRequests 中默认发往每个broker节点最多缓存5个请求5个batchsender线程发往kafka集群的broker时可能会失败而重试重试次数默认是integer的最大值由于失败重试原因可能会存在消息乱序的风险。数据成功发送到kafka集群后会清理掉请求清理掉消息累加器中对应的数据。 15、kafka中producer发消息到broker的应答原理如何解决数据丢失问题 ack应答原理支持配置三种参数
0生产者发送过来的数据不需要等待数据落盘应答 ---- 可靠性差效率高1生成者发送过来的数据leader收到数据后应答 ---- 可靠性中等效率中等-1生成者发送过来的数据leader和ISR队列里面的follower都收集数据后应答。 --- 可靠性最高效率低
15.1如果ack-1, 但是有一个follower因为故障迟迟无法同步这个问题怎么解决
----还是靠ISR队列如果follower长时间未向leader发送通信请求或者同步数据就会被提出ISR而ack-1时只需要ISR队列中的所有节点响应即可。
数据完全可靠的条件是解决数据丢失ack-1 分区副本大于等于2 ISR应答最小副本数大于等于2
生产环境中ack0的基本很少用ack1的一般用于传输普通日志允许个别数据的丢失而ack-1 一般用于对可靠性要求比较高的场景如和钱有关 如何解决数据丢失问题
1、producer到kafka端保证数据完全可靠即ack-1 分区副本大于等于2 ISR应答最小副本数大于等于2
2、consumer消费端业务端数据处理成功后手动提交offset 16 kafka的幂等性重复消费
幂等性是指producer不论向broker发送多少次重复数据broker端都只会持久化一条保证了数据不重复。
精确一次 幂等性 ack-1 分区副本大于等于2 ISR应答最小副本数大于等于2
幂等性判断数据重复性的一个标准是 PID Partition SeqNumber 相同的主键消息提交时broker只会持久化一条。
PIDkafka每次重启就会分配一个新的PID每个producer在初始化的时候都会分配一个唯一的PID这个PID对用户不可见partition分区号Sequence Number 单调递增的一个值针对每个producer发送到指定主体分区的消息都对应一个从0递增的Sequence Number
---- 所以可以知道幂等性只能保证的是在但分区单会话内不重复消费端消费的时候也要利用幂等性原理解决给每条数据加一个唯一标识保证数据不会被重复消费。
----kafka开启事务必须要开启幂等性。 17、kafka数据乱序
为什么会乱序
kafka的sender线程是先将数据请求放到一个in Flight requests 队列里面这个队列最大允许放置5个请求每个请求发送到kafka的broker上时允许在未响应的前提下发送后一个请求这就有可能导致了乱序比如1,2请求发送成功并应答发送3的时候没有应答就发送了4结果3发送失败4发送成功这就导致了顺序是1,2,4
如何有序
在1.x版本之前 max.in.flight.requests.per.connection1不考虑幂等性阻塞之前客户端在单个连接上发送的未确认最大请求是1即每发送一个请求都要得到成功响应之后在发送第二个。这就保证了有序性在1.x版本之后 未开启幂等性max.in.flight.requests.per.connection1开启幂等性max.in.flight.requests.per.connection 要小于等于5kafka服务端会缓存5个请求的元数据根据SeqNumber是否递增来判断还是上面的情况1,2发送成功会先落盘3失败了需要重试4,5成功了但是因为SeqNumber是从4开始的因此会被先缓存起来等到3发送到kafka时才会排序之后落盘。
但是需要注意的是kafka只是保证了单分区内有序多个分区是不保证的。
为什么多分区有序不保证
如果kafka保证多个partition内的消息也是有序的不仅broker保存的数据要有序消费者消费时的也要按照顺序消费假设partition1阻塞了其他分区的消息也不能被消费了这种情况kafka就退化成了单一队列失去了并发性和性能。 有没有办法保证整个topic级别的消息顺序性
可以在业务层面解决
通过message key 来保证需要顺序性消费的数据发送到同一个partition单分区有序消费端消费前先把多个partition内的消息缓存下来全部拿到后重排序保证顺序性消费
但是上述操作其实降低了性能不如就只创建一个分区。 18、消费者的总体工作流程
一个消费者可以消费多个分区一个分区也可以被多个消费者消费一个分区只能够被一个消费者组中的一个消费者消费消费者组看成是一个消费者 消费者组
consumer group消费者组由多个consumer组成形成一个消费者组的条件是消费者的group id相同。一个consumer也可以是一个消费者组
消费者组内每个消费者负责消费不同的分区数据一个分区只能由一个组内的消费者消费消费者组之间互不影响如果消费者组内消费者超过主体分区数量那么就会有一部分消费者闲置不会接收任何消息消费者组从逻辑上就是一个消费者。 19、消费者组的初始化流程
消费者组的初始化和分区分配是 由coordinator辅助实现的每个broker节点都会有一个coordinatorkafka会根据group id 计算出一个节点的coordinator来作为该消费者组的老大。消费者组下面所有消费者提交的offset都会往该节点去提交选举出来的coordinator会在消费者组中随机选一个consumer作为消费者组的leader然后coordinator会把要消费的topic情况发给这个消费者组的leader消费者leader会通过一些规则定制消费方案定制好消费方案后将该消费方案发回给选出的coordinatorcoordinator吧消费方案下发给消费者组的各个消费者每个消费者都会和coordinator保持心跳默认3s超时45s或者处理消息时间过长5min都会移除该消费者触发再平衡。 20、消费者组详细消费流程 消费者发送消费请求 通过sendFetches方法, 做一个抓取数据的初始化准备一些数据 Fetch.min,bytes 每批次最小抓取数据默认1字节小于这个值不抓取fetch.max.waits.ms 每次抓取超时时间默认500ms如果迟迟不到1字节这个就触发抓取fetch.max.bytes 每批次抓取最大数量默认50m准备好后就调用send方法发送消费请求通过回调方法onSuccess拉取对应的数据拉取到的数据放到一个消息队列里面消费者会从队列中一批次的拉取数据进行数据处理默认一次500条Max.poll.records设置数据处理就包括了数据的反序列化拦截器以及业务处理。 21、消费者的分区分配策略再平衡机制
kafka的消费者再平衡指kafka consumer订阅的topic发生变化时一种分区重分配机制。
一般如下三种情况会触发consumer的分区分配策略再平衡机制
consumer group中删除了某个consumer离线导致所消费的分区需要重新分配到组内其他consumer上consumer订阅的topic主题发生了变化比如订阅的topic是按照正则配置的如果新增了一个topic新topic的分区怎么分配给当前consumer已经订阅的topic新增了分区新增的分区怎么分配到consumer
消费者分区分配策略实现的方法有以下四种机制
RangeAssignor 范围分区策略RoundRobinAssignor 轮询分区策略StickyAssignor 粘性分区策略CooperativeStickyAssignor 协作粘性分区策略
通过consumer配置项partition.assignment.strtegy指定分区分配策略类kafka可以同时使用多个分区分配策略。
kafka默认就是使用rangeCooperativeStucky策略。同时也支持自定义策略重写ConsumerPartitionAssignor接口。 RangeAssignor 范围分区分配策略
解释是按照单个topic为一个维度来计算分配的负责将每一个的topic尽可能的均衡分配给其他的消费者
给消费者组里面所有消费者按照字母进行排序给topic中的分区按照分区号排序计算每个消费者最少平均分配多少个分区数然后剩下的按照消费者顺序逐个分。
示例
一个topic有四个分区消费者组中有三个消费者那么就先进行排序计算发现每个消费者最少一个分区还多了一个分区那么就分给consumer1
缺点
range方法虽然针对单个topic情况下比较均衡但是如果topic很多consumer排序靠前的消费者负载会变多。 RoundRobinAssignor 轮询分区策略
解释轮询针对的是所有的topic分区他把所有的partition、所有的consumer列举出来进行排序然后通过轮询策略分配给每个消费者如果该消费者没有订阅该主题就跳到下一个消费者
示例
1、如果消费者订阅的主题是一样的 2、如果消费者订阅的主题不一样 缺点
就如示例2的情况消费者分区分配很不平衡因此consumer group订阅消息不一致的情况下不太适用于轮询机制。 StickyAssignor 粘性分区策略
粘性分区策略分区的分配要尽可能的均匀分配给消费者的主题分区数最多相差一个分区的分配会尽可能与上次分配保持相同两个有冲突的时候第一个目标优先于第二个目标。 例如三个consumers(C0、C1、C2)四个Topics(T0、T1、T2、T3)。
则RoundRobinAssignor和StickyAssignor分区分配方案均为
C0 T0P0、T1P1、T3P0 C1 T0P1、T2P0、T3P1 C2 T1P0、T2P1
现在假设C1被移除将触发分区重分配
RoundRobinAssignor分区分配方案将变为
C0 T0P0、T1P0、T2P0、T3P0 C2 T0P1、T1P1、T2P1、T3P1 保留之前的分区分配方案的3个分区不变。
StickyAssignor分区分配方案将变为
C0 T0P0、T1P1、T3P0、T2P0 C2 T1P0、T2P1、T0P1、T3P1
可以看到StickyAssignor尽量保存之前的分区分配方案分区重分配变动更小。 CooperativeStickyAssignor策略
CooperativeStickyAssignor其实也是一种粘性分配策略但是有一定的区别
StickyAssignor仍然是基于eager协议分区重分配时候都需要consumers先放弃当前持有的分区重新加入consumer group 而CooperativeStickyAssignor基于cooperative协议该协议将原来的一次全局分区重平衡改成多次小规模分区重平衡。渐进式的重平衡。
示例
一个Topic(T0三个分区)两个consumers(consumer1、consumer2)均订阅Topic(T0)。那么分配完成的订阅信息就是
consumer1T0P0、T0P2consumer2T0P1 此时如果一个新的consumer3加入消费者组就会触发再平衡 基于eager协议的 粘性分区策略 1、consumer1、 consumer2正常发送心跳信息到Group Coordinator。 2、随着consumer3加入Group Coordinator收到对应的Join Group请求Group Coordinator确认有新成员需要加入消费者组。 3、Group Coordinator 通知consumer1和consumer2需要rebalance再平衡了。 4、consumer1和consumer2放弃revoke当前各自持有的已有分区重新发送Join Group请求到Group Coordinator。 5、Group Coordinator依据指定的分区分配策略的处理逻辑生成新的分区分配方案然后通过Sync Group请求将新的分区分配方案发送给consumer1、consumer2、consumer3。 6、所有consumers按照新的分区分配重新开始消费数据。 基于cooperative协议的粘性分区策略 1、consumer1、 consumer2正常发送心跳信息到Group Coordinator。 2、随着consumer3加入Group Coordinator收到对应的Join Group请求Group Coordinator确认有新成员需要加入消费者组。 3、Group Coordinator 通知consumer1和consumer2需要rebalance了。 4、consumer1、consumer2通过Join Group请求将已经持有的分区发送给Group Coordinator。注意并没有放弃(revoke)已有分区。 5、Group Coordinator取消consumer1对分区p2的消费然后发送sync group请求给consumer1、consumer2。 6、consumer1、consumer2接收到分区分配方案重新开始消费。至此一次Rebalance完成。 7、当前p2也没有被消费再次触发下一轮rebalance将p2分配给consumer3消费。 可以看到上述两个协议的区别在于 EAGER 重新平衡协议要求消费者在参与重新平衡事件之前始终撤销其拥有的所有分区。因此它允许完全改组分配。COOPERATIVE协议允许消费者在参与再平衡事件之前保留其当前拥有的分区。分配者不应该立即重新分配任何拥有的分区而是可以指示消费者需要撤销分区以便可以在下一次重新平衡事件中将被撤销的分区重新分配给其他消费者。 COOPERATIVE协议将全局重平衡改成了每次小规模的重平衡从而达到最终的平衡这样做的好处就是所选了STW时间。 22、offset位移 是什么维护的位置
offset 位移就是consumer记录的已经消费数据的位置。在0.9版本以前是保存在zookeeper中的从0.9版本之后默认将offset保存在kafka一个内置的topic日志文件后该topic名称为__consumer_offsets
__consumer_offsets 主题里面采用 key 和 value 的方式存储数据
key group.idtopic分区号value 当前offset的值
每隔一段时间kafka会对这个topic进行压缩也就是每个key只保留最新数据。
22.1 为什么消费者需要使用50个文件记录消费者的offset呢
如果消费者比较多都记录在同一个记录中那么读写的操作就比较麻烦
22.2 消费者怎么知道应该从哪个日志文件中读取数据
key%50文件数量然后就可以到对应的文件夹中去取值。 23、kafka中consumer offset的提交
自动提交 默认开启enable.auto.committrue提交时间间隔默认5sauto.commit.interval.ms手动提交 同步提交必须等offset提交完毕再去消费下一批数据存在提交失败的场景会不断重试异步提交发送完offset请求后就开始消费下一批数据了生产环境用的比较多 24、kafka指定offset的消费
当kafka中没有初始偏移量消费者组第一次消费或者服务器上没有存在偏移量数据被删除应该怎么消费呢kafka提供了三种消费方式
earliest自动将偏移量重置为最早的偏移量latest自动将偏移量重置为最新偏移量以前没消费到的也不管了 --- 默认值none如果没有找到消费者组的先前偏移量那么就向消费者抛出异常。 25、kafka按照指定时间消费
在生产环境中会遇到最近消费的几个小时数据异常想重新按照时间消费。例如要求按照时间消费前一天的数据怎么处理
kafkaConsumer.offsetsForTimes 这个API可以将时间转换为offset。 26、kafka数据积压消费者如何提高吞吐量
1、如果是kafka消费能力不足可以考虑增加topic的分区数目同时增加消费者组的消费者数量消费者数目 topic的分区数
2、如果是下游数据处理不及时可以考虑提高每批次拉取消息的数量同时要注意修改每批次最大拉取大小。
3、上游消费者也可以提示生产吞吐量来增大整个kafka的吞吐量如增大发送消息缓冲区的大小以及增大batch.size避免频繁网络请求