六安网站建设 220,合同模板网站,wordpress 模板开发,网络公司发生网站建设费分录Kafka是最初由Linkedin公司开发#xff0c;是一个分布式、支持分区的#xff08;partition#xff09;、多副本的#xff08;replica#xff09;#xff0c;基于zookeeper协调的分布式消息系统#xff0c;它的最大的特性就是可以实时的处理大量数据以满足各种需求场景是一个分布式、支持分区的partition、多副本的replica基于zookeeper协调的分布式消息系统它的最大的特性就是可以实时的处理大量数据以满足各种需求场景比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎web/nginx日志、访问日志消息服务等等用scala语言编写kafka部署包“kafka_2.13-3.6.0”前面的2.13就是scala的版本
1、Kafka的使用场景 日志收集一个公司可以用Kafka收集各种服务的log通过kafka以统一接口服务的方式开放给各种consumer例如hadoop、Hbase、Solr等。 消息系统解耦和生产者和消费者、缓存消息等。 用户活动跟踪Kafka经常被用来记录web用户或者app用户的各种活动如浏览网页、搜索、点击等活动这些活动信息被各个服务器发布到kafka的topic中然后订阅者通过订阅这些topic来做实时的监控分析或者装载到hadoop、数据仓库中做离线分析和挖掘。 运营指标Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据生产各种操作的集中反馈比如报警和报告。
2、Kafka基本概念 kafka是一个分布式的分区的消息(官方称之为commit log)服务。它提供一个消息系统应该具备的功能但是确有着独特的设计。可以这样来说Kafka借鉴了JMS规范的思想但是并没有完全遵循JMS规范。
首先让我们来看一下基础的消息(Message)相关术语
名称 解释 Broker 消息中间件处理节点一个Kafka节点就是一个broker一个或者多个Broker可以组成一个Kafka集群 Topic Kafka根据topic对消息进行归类发布到Kafka集群的每条消息都需要指定一个topic Producer 消息生产者向Broker发送消息的客户端 Consumer 消息消费者从Broker读取消息的客户端 ConsumerGroup 每个Consumer属于一个特定的Consumer Group一条消息可以被多个不同的Consumer Group消费但是一个Consumer Group中只能有一个Consumer能够消费该消息 Partition 物理上的概念一个topic可以分为多个partition每个partition内部消息是有序的 Replica副本 一个 topic 的每个分区都有若干个副本一个 Leader 和若干个 Follower Leader 每个分区多个副本的“主”生产者发送数据的对象以及消费者消费数据的对象都是 Leader Follower 每个分区多个副本中的“从”实时从 Leader 中同步数据保持和 Leader 数据的同步。Leader 发生故障时某个 Follower 会成为新的 Leader。 服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。
3、Topic与Partition 在Kafka中Topic就是一个主题生产者往topic里面发送消息消费者从topic里面捞数据进行消费。
假设现在有一个场景如果我们现在有100T的数据需要进行消费但是现在我们一台主机上面并不能存储这么多数据该怎么办呢 其实做法很简单就是将海量的数据进行切割并且在Topic中添加分区的概念每一个分区都对应一台主机并且存储切分到的数据
当然为了实现高可用其实分区可以实现主从架构这个后面再了解
这样做的好处是
分区存储可以解决一个topic中文件过大无法存储的问题 提高了读写的吞吐量读写可以在多个分区中同时进行
4、搭建部署 首先部署java不再赘述很简单然后去官网下载两个安装包
kafka_2.13-3.6.0
apache-zookeeper-3.9.1-bin.tar
3.0之后kafka自带zookeeper也可以省略
解压后进入conf文件夹
cp zoo_sample.cfg zoo1.cfg#复制一份zk的配置文件
修改配置文件内容tickTime2000
initLimit10
syncLimit5
dataDir/data/zkdata #启动之前需要建好
datalogDir/data/zklog #启动之前需要建好
clientPort2181
autopurge.purgeInterval24
autopurge.snapRetainCount3
server.1192.168.1.1:2888:3888
启动
bin/zkServer.sh start
然后
bin/zkServer.sh status
查看状态。一定要查看。启动的时候不论如何都会输出成功 解压kafka文件夹。进入config文件夹
修改 server.propertiesauto.create.topics.enabletrue #配置自动创建topic
delete.topic.enabletrue #是否允许删除主题
broker.id0 #集群中唯一
listenersPLAINTEXT://192.168.1.1:9092 #不要填localhost:9092 localhost表示只能通过本机连接可以设置为0.0.0.0或本地局域网地址server接受客户端连接的端口
num.network.threads4 #broker处理消息的最大线程数一般情况下数量为cpu核数
num.io.threads8 #broker处理磁盘IO的线程数数值为cpu核数2倍
socket.send.buffer.bytes1024000 #socket的发送缓冲区 不要太小 避免频繁操作
socket.receive.buffer.bytes1024000 # 接收缓冲区 不要太小
socket.request.max.bytes1048576000 #socket请求的最大数值 message.max.bytes必然要小于socket.request.max.bytes会被topic创建时的指定参数覆盖
log.dirs/data/kafkalog #kafka存放数据的路径。这个路径并不是唯一的可以是多个路径之间只需要使用逗号分隔即可每当创建新partition时都会选择在包含最少partitions的路径下进行。
num.partitions2 #为1的时候不能自动创建topic创建topic的默认分区数
num.recovery.threads.per.data.dir1
offsets.topic.replication.factor1
transaction.state.log.replication.factor1
transaction.state.log.min.isr1
log.flush.interval.messages10000 #和下面的一起一些日志存储的配置默认不清理日志
log.flush.interval.ms1000
log.retention.hours24 #每个日志文件删除之前保存的时间。默认数据保存时间对所有topic都一样
log.roll.hours12
log.cleanup.policydelete
log.retention.bytes1073741824
log.retention.check.interval.ms300000
zookeeper.connect192.168.1.1:2181#zookeeper如果是集群连接方式为 hostname1:port1, hostname2:port2, hostname3:port3
zookeeper.connection.timeout.ms18000
group.initial.rebalance.delay.ms0初次启动可以不后台
bin/kafka-server-start.sh -daemon config/server.properties
没问题就放后台
bin/kafka-server-start.sh -daemon config/server.properties5、Kafka核心概念之Topic
在Kafka中Topic是一个非常重要的概念topic可以实现消息的分类不同消费者订阅不同的topic partition(分区)是kafka的一个核心概念kafka将1个topic分成了一个或多个分区每个分区在物理上对应一个目录 分区目录下存储的是该分区的日志段(segment)包括日志的数据文件和两个索引文件
执行以下命令创建名为test的topic这个topic只有一个partition并且备份因子也设置为1
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 1
查看当前kafka内有哪些topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list
kafka自带了一个producer命令客户端可以从本地文件中读取内容或者我们也可以以命令行中直接输入内容并将这些内容以消息的形式发送到kafka集群中。
在默认情况下每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端指定发送到的kafka服务器地址和topic
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
对于consumerkafka同样也携带了一个命令行客户端会将获取到内容在命令中进行输出默认是消费最新的消息。使用kafka的消费者消息的客户端从指定kafka服务器的指定topic中消费消息
方式一从最后一条消息的偏移量1开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test 方式二从头开始消费
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic test 几个注意点
消息会被存储消息是顺序存储消息是有偏移量的消费时可以指明偏移量进行消费
在上面我们展示了两种不同的消费方式根据偏移量消费和从头开始消费其实这个偏移量可以我们自己进行维护
我们进入我们在server.properties里面配置的日志文件地址/data/kafkalog
我们可以看到默认一共有五十个偏移量地址里面就记录了当前消费的偏移量。 我们先关注test-0这个文件 我们进入这个文件可以看到其中有个log文件里面就保存了Topic发送的数据
生产者将消息发送给brokerbroker会将消息保存在本地的日志文件中
/data/kafkalog/主题-分区/00000000.log 消息的保存是有序的通过offset偏移量来描述消息的有序性 消费者消费消息时也是通过offset来描述当前要消费的那条消息的位置 6、 单播消息 我们现在假设有一个场景有一个生产者两个消费者问生产者发送消息是否会同时被两个消费者消费
创建一个topic
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test2 --partitions 1 创建一个生产者
./kafka-console-producer.sh --broker-list localhost:9092 --topic test2 分别在两个终端上面创建两个消费者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test2 这里就要引申出一个概念消费组当我们配置多个消费者在一个消费组里面的时候其实只会有一个消费者进行消费
这样其实才符合常理毕竟一条消息被消费一次就够了
我们可以通过命令--consumer-property group.idtestGroup在设置消费者时将其划分到一个消费组里面
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.idtestGroup --topic test2 这个时候如果消费组里面有一个消费者挂掉了就会由其他消费者来进行消费 小结一下两个消费者在同一个组只有一个能接到消息两个在不同组或者未指定组则都能收到
7、多播消息 当多个消费组同时订阅一个Topic时那么不同的消费组中只有一个消费者能收到消息。实际上也是多个消费组中的多个消费者收到了同一个消息
// 消费组1
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.idtestGroup1 --topic test2
// 消费组2
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.idtestGroup2 --topic test2 8、查看消费组的详细信息 通过以下命令可以查看到消费组的详细信息
# 查看当前所有的消费组
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看指定消费组具体信息比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testGroup 9、创建分区 我们在上面已经了解了Topic与Partition的概念现在我们可以通过以下命令给一个topic创建多个分区
# 创建两个分区的主题
./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test3 --partitions 2
# 查看下创建的topic
./kafka-topics.sh --bootstrap-server localhost:9092 --list 现在我们再进到日志文件中看一眼可以看到日志是以分区来命名的 我们知道分区文件中
00000.log 这个文件中保存的就是消息
__consumer_offsets-49:
kafka内部自己创建了__consumer_offsets主题包含了50个分区。这个主题用来存放消费者消费某个主题的偏移量。因为每个消费者都会自己维护着消费的主题的偏移量也就是说每个消费者会把消费的主题的偏移量自主上报给kafka中的默认主题consumer_offsets。 因此kafka为了提升这个主题的并发性默认设置了50个分区。
提交到哪个分区通过hash函数hash(consumerGroupId) % __consumer_offsets主题的分区数
提交到该主题中的内容是key是consumerGroupId topic 分区号value就是当前offset的值
文件中保存的消息根据log.retention.hour这个参数确定。到期后消息会被删除。
10、副本的概念
在创建主题时除了指明了主题的分区数以外还指明了副本数那么副本是一个什么概念呢
我们现在创建一个主题、两个分区、三个副本的topic注意副本只有在集群下才有意义
./kafka-topics.sh \
--bootstrap-server localhost:9092 \ # 指定启动的机器
--create --topic my-replicated-topic \ # 创建一个topic
--partitions 2 \ # 设置分区数为2
--replication-factor 3 # 设置副本数为3 # 查看topic情况
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic leader kafka的写和读的操作都发生在leader上。leader负责把数据同步给follower。当leader挂了经过主从选举从多个follower中选举产生一个新的leader follower 接收leader的同步的数据 isr 可以同步和已同步的节点会被存入到isr集合中。这里有一个细节如果isr中的节点性能较差会被提出isr集合。 此时broker、主题、分区、副本 这些概念就全部展现了
集群中有多个broker创建主题时可以指明主题有多个分区把消息拆分到不同的分区中存储可以为分区创建多个副本不同的副本存放在不同的broker里
11、集群消费 向集群发送消息
./kafka-console-producer.sh --broker-list node1:9092,node1:9093,node1:9094 --topic my-replicated-topic从集群中消费消息
./kafka-console-consumer.sh --bootstrap-server liang:9092,dd1:9092,dd2:9092 --from-beginning --consumer-property group.idtestGroup1 --topic my-replicated-topic指定消费组来消费消息
./kafka-console-consumer.sh --bootstrap-server node1:9092,node1:9093,node1:9094 --from-beginning --consumer-property group.idtestGroup1 --topic my-replicated-topic 这里有一个细节结合上面的单播消息我们很容易可以想到下面的这种情况因为一个Partition只能被一个consumer Group里面的一个consumer所有很容易就可以形成组内单播的现象即 多Partition与多consumer一一对应 这样的好处是 分区存储可以解决一个topic中文件过大无法存储的问题 提高了读写的吞吐量读写可以在多个分区中同时进行 Kafka这种通过分区与分组进行并行消费的方式让kafka拥有极大的吞吐量 小结一下
一个partition只能被一个消费组中的一个消费者消费目的是为了保证消费的顺序性但是多个partion的多个消费者消费的总的顺序性是得不到保证的那怎么做到消费的总顺序性呢这个后面揭晓答案
partition的数量决定了消费组中消费者的数量建议同一个消费组中消费者的数量不要超过partition的数量否则多的消费者消费不到消息
如果消费者挂了那么会触发rebalance机制后面介绍会让其他消费者来消费该分区
kafka通过partition 可以保证每条消息的原子性但是不会保证每条消息的顺序性
12、生产者核心概念 在消息发送的过程中涉及到了两个线程 main 线程 Sender 线程 在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker 在main线程中消息的生产要经历拦截器、序列化器和分区器其中一个分区就会创建一个队列这样方便数据的管理
其中队列默认是32M而存放到队列里面的数据也会经过压缩为16k再发往send线程进行发送但是这样也会有问题就是如果只有一条消息难道就不发送了吗其实还有一个参数linger.ms用来表示一条消息如果超过这个时间就会直接发送不用管大小其实可以类比坐车的场景人满或者时间到了 都发车 send线程发送给kafka集群的时候我们需要联系到上面的Topic与Partition已经消费组形成一个Partition对应consumer Group里面的一个consumer这种组内单播的效果进行并发读写 13、ack的概念 在同步发送的前提下生产者在获得集群返回的ack之前会一直阻塞。那么集群什么时候返回ack呢此时ack有3个配置 ack 0 kafka-cluster不需要任何的broker收到消息就立即返回ack给生产者最容易丢消息的效率是最高的 ack1默认 多副本之间的leader已经收到消息并把消息写入到本地的log中才会返回ack给生产者性能和安全性是最均衡的s ack-1/all。里面有默认的配置min.insync.replicas2(默认为1推荐配置大于等于2)此时就需要leader和一个follower同步完后才会返回ack给生产者此时集群中有2个broker已完成数据的接收这种方式最安全但性能最差。 kafka默认会创建一个消息缓冲区用来存放要发送的消息缓冲区是32mkafka本地线程会去缓冲区中一次拉16k的数据发送到broker如果线程拉不到16k的数据间隔10ms也会将已拉到的数据发到broker
13、kafka集群中的controller、rebalance、HW
什么是controller呢其实就是集群中的一个broker当集群中的leader挂掉时需要controller来组织进行选举
那么集群中谁来充当controller呢
每个broker启动时会向zk创建一个临时序号节点获得的序号最小的那个broker将会作为集群中的controller负责这么几件事 当集群中有一个副本的leader挂掉需要在集群中选举出一个新的leader选举的规则是从isr集合中最左边获得 当集群中有broker新增或减少controller会同步信息给其他broker 当集群中有分区新增或减少controller会同步信息给其他broker rebalance机制 前提消费组中的消费者没有指明分区来消费
触发的条件当消费组中的消费者和分区的关系发生变化的时候
分区分配的策略在rebalance之前分区怎么分配会有这么三种策略 range根据公式计算得到每个消费者消费哪几个分区第一个消费者是分区总数 / 消费者数量 1之后的消费者是分区总数/消费者数量假设 n分区数消费者数量 2 m分区数%消费者数量 1那么前 m 个消费者每个分配 n1 个分区后面的消费者数量m 个消费者每个分配 n 个分区 轮询大家轮着来 sticky粘合策略如果需要rebalance会在之前已分配的基础上调整不会改变之前的分配情况。如果这个策略没有开那么就要进行全部的重新分配。建议开启 HW和LEO LEO是某个副本最后消息的消息位置log-end-offset
HW是已完成同步的位置。消息在写入broker时且每个broker完成这条消息的同步后hw才会变化。在这之前消费者是消费不到这条消息的。在同步完成之后HW更新之后消费者才能消费到这条消息这样的目的是防止消息的丢失。 14、Kafka中的优化问题
如何防止消息丢失 生产者 1使用同步发送 2把ack设成1或者all并且设置同步的分区数2 消费者把自动提交改成手动提交
如何防止重复消费 在防止消息丢失的方案中如果生产者发送完消息后因为网络抖动没有收到ack但实际上broker已经收到了。
此时生产者会进行重试于是broker就会收到多条相同的消息而造成消费者的重复消费。
怎么解决
生产者关闭重试会造成丢消息不建议
消费者解决非幂等性消费问题
所谓的幂等性多次访问的结果是一样的。对于rest的请求get幂等、post非幂等、put幂等、delete幂等
解决方案
在数据库中创建联合主键防止相同的主键 创建出多条记录 使用分布式锁以业务id为锁。保证只有一条记录能够创建成功 如何做到消息的顺序消费 其实我们知道在发送消息的时候我们可以通过设置key来指定发送的分区所以首先我们一定要指定key然后发到同一个分区
生产者使用同步的发送并且通过设置key指定路由策略只发送到一个分区中ack设置成非0的值。 消费者主题只能设置一个分区消费组中只能有一个消费者不要设置异步线程防止异步导致的乱序或者设置一个阻塞队列进行异步消费 kafka的顺序消费使用场景不多因为牺牲掉了性能但是比如rocketmq在这一块有专门的功能已设计好。
如何解决消息积压问题 1消息积压问题的出现 消息的消费者的消费速度远赶不上生产者的生产消息的速度导致kafka中有大量的数据没有被消费。随着没有被消费的数据堆积越多消费者寻址的性能会越来越差最后导致整个kafka对外提供的服务的性能很差从而造成其他服务也访问速度变慢造成服务雪崩。
2消息积压的解决方案 在这个消费者中使用多线程充分利用机器的性能进行消费消息。 通过业务的架构设计提升业务层面消费的性能。 创建多个消费组多个消费者部署到其他机器上一起消费提高消费者的消费速度 创建一个消费者该消费者在kafka另建一个主题配上多个分区多个分区再配上多个消费者。该消费者将poll下来的消息不进行消费直接转发到新建的主题上。此时新的主题的多个分区的多个消费者就开始一起消费了。——不常用 实现延时队列的效果 1应用场景 订单创建后超过30分钟没有支付则需要取消订单这种场景可以通过延时队列来实现
2具体方案 kafka中创建创建相应的主题 消费者消费该主题的消息轮询 消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟前提是订单没支付 如果是去数据库中修改订单状态为已取消 如果否记录当前消息的offset并不再继续消费之后的消息。等待1分钟后再次向kafka拉取该offset及之后的消息继续进行判断以此反复。