化工网站制作,window做网站的软件下载,郑州网站建设快速排名熊掌,discuz破解zookeeperkafka.txt
Zookeeper概述
Zookeeper是一个分布式的开源协调服务#xff0c;用于管理和维护大型分布式系统中的配置信息、命名服务、状态同步等。它提供了一个可靠的分布式环境#xff0c;用于协调多个节点之间的通信和管理。
数据结构
ZooKeeper数据模型的结构与…zookeeperkafka.txt
Zookeeper概述
Zookeeper是一个分布式的开源协调服务用于管理和维护大型分布式系统中的配置信息、命名服务、状态同步等。它提供了一个可靠的分布式环境用于协调多个节点之间的通信和管理。
数据结构
ZooKeeper数据模型的结构与Linux文件系统很类似整体上可以看作是一棵树每个节点称做一个ZNode。每一个ZNode默认能够存储1MB的数据每个ZNode都可以通过其路径唯一标识。
Znode 路径结构 Znode是ZooKeeper中的基本单元使用类似文件路径的层次结构类似于Unix文件系统由斜杠/分隔路径元素。 节点类型 Znodes可以是持久节点Persistent或临时节点Ephemeral。持久节点在创建后一直存在于ZooKeeper中而临时节点在创建它的客户端断开连接后被删除。 顺序节点 还可以创建顺序节点这些节点在其名称后面附加一个数字序列以便在创建时自动分配顺序。这有助于客户端在特定路径下创建唯一序列的节点。
数据
Znode关联数据 每个Znode都可以存储少量的数据1MB以下这些数据可以与节点关联并由客户端读取和更新。
Watches
Watcher机制 ZooKeeper允许客户端注册对Znode上事件的观察。当Znode的状态发生变化时ZooKeeper会通知那些注册了对应节点的Watcher的客户端。
Znode的应用 配置管理 存储配置信息并确保配置在分布式系统中的一致性。 命名服务 提供唯一标识和发现节点的方式。 分布式锁 实现互斥访问共享资源。 集群管理 用于选主、状态同步等分布式系统管理操作。
ZooKeeper的灵活性和可扩展性使其适用于各种分布式系统场景并且能够有效地管理数据和状态信息以保持整个分布式系统的一致性和可靠性。
工作机制
ZnodeZookeeper节点 Znode结构 类似于文件系统的目录结构是Zookeeper中的基本数据单元。 节点类型 每个Znode可以是持久节点Persistent或临时节点Ephemeral。持久节点在Zookeeper上创建后一直存在而临时节点在创建它的客户端断开连接后被删除。
Watcher机制 概述 Watcher是Zookeeper的一种通知机制允许客户端订阅Znode上的事件。当Znode上的状态发生变化如数据改变、节点删除等Zookeeper将通知那些已经注册Watcher的客户端。 使用场景 Watcher机制广泛用于实现分布式的发布/订阅、分布式锁、集群管理等功能。 分布式锁 实现原理 利用Zookeeper的临时顺序节点客户端在Zookeeper上创建一个带有唯一序号的临时节点序号越小节点在队列中排得越前。节点创建成功的客户端获得锁释放锁时删除对应的节点。 应用场景 用于在分布式环境下实现互斥访问共享资源确保数据的一致性。 选主机制Leader Election 选主过程 在Zookeeper集群中多个节点竞争成为主节点通过Zookeeper的选主机制实现。一旦某个节点成为主节点它负责处理写操作而其他节点是从节点负责处理读操作。 应用场景 适用于分布式系统中需要有一个主节点负责协调整个系统的场景例如Hadoop中的NameNode。 数据一致性 原子性操作 Zookeeper提供原子性的操作例如创建节点、更新数据等确保操作的原子性要么全部成功要么全部失败。 顺序一致性 Zookeeper保证在分布式环境下所有节点看到的数据是一致的。
Zookeeper从设计模式角度来理解是一个基于观察者模式设计的分布式服务管理框架它负责存储和管理大家都关心的数据然后接受观察者的注册一旦这些数据的状态发生变化Zookeeper就将负责通知已经在Zookeeper上注册的那些观察者做出相应的反应。也就是说 Zookeeper 文件系统 通知机制。
主要特点 Zookeeper一个领导者Leader多个跟随者Follower组成的集群。 Zookeepe集群中只要有半数以上节点存活Zookeeper集群就能正常服务。所以Zookeeper适合安装奇数台服务器 全局数据一致每个Server保存一份相同的数据副本Client无论连接到哪个Server数据都是一致的。 更新请求顺序执行来自同一个Client的更新请求按其发送顺序依次执行即先进先出。 数据更新原子性一次数据更新要么成功要么失败。 实时性在一定时间范围内Client能读到最新数据。 可靠性和稳定性 Zookeeper通过主从模型和选主机制实现高可用性即使部分节点失效仍能保持服务的可用性确保分布式系统的稳定运行。 高性能 其设计追求高性能和低延迟能够快速响应客户端请求并有效处理大规模的数据操作。 数据模型和Znode Zookeeper采用类似文件系统的树形结构来组织数据称为Znode通过维护这些节点实现数据存储和状态维护。 Watcher机制 Watcher机制允许客户端订阅Znode上的事件当节点状态发生变化时Zookeeper能够及时通知已注册Watcher的客户端用于实现事件驱动的分布式架构。 分布式锁 Zookeeper提供了分布式锁的实现机制用于在分布式环境下实现互斥访问共享资源确保数据操作的原子性和一致性。 数据一致性和原子性 Zookeeper保证在分布式环境下的数据一致性和操作的原子性所有节点看到的数据是一致的操作要么全部成功要么全部失败。 用途广泛 作为分布式协调服务Zookeeper被广泛应用于诸如Hadoop、Kafka、HBase等分布式系统中作为这些系统的重要组件管理和协调各个部分。
应用场景
统一命名服务 问题 在分布式环境中应用或服务需要一个统一的命名机制以便更容易识别。 解决方案 使用ZooKeeper提供的统一命名服务。将应用或服务的名称映射到ZooKeeper上的特定Znode实现统一的命名机制。这使得在分布式系统中更容易定位和识别应用或服务。 统一配置管理 问题 分布式环境下需要配置信息同步确保集群中所有节点的配置信息保持一致。 解决方案 使用ZooKeeper作为统一的配置管理中心。将配置信息写入ZooKeeper上的一个Znode并让各个客户端服务器监听该Znode。当配置信息发生变化时ZooKeeper会通知所有监听的客户端服务器实现配置信息的快速同步。 统一集群管理 问题 在分布式环境中需要实时监控每个节点的状态以做出相应的调整。 解决方案 利用ZooKeeper实时监控节点状态变化。将节点信息写入ZooKeeper上的一个Znode并让各个节点监听该Znode。通过监听实时获取节点状态的变化可以进行相应的集群管理操作。 服务器动态上下线 问题 需要实时感知服务器的上线和下线以便及时做出调整。 解决方案 利用ZooKeeper记录每个服务器的状态信息。当服务器上线或下线时更新对应的Znode。客户端可以监听这些Znodes从而实时感知服务器的状态变化。 软负载均衡 问题 希望实现软负载均衡将请求分配到访问数最少的服务器上。 解决方案 在ZooKeeper中记录每台服务器的访问数等负载信息。通过监控这些信息可以实现动态的软负载均衡策略确保请求被分配到访问数最少的服务器上实现更加均衡的系统负载。
Zookeeper 选举机制
ZooKeeper的选举机制是保证在一个ZooKeeper集群中有一个主节点Leader的机制。选举机制的目标是确保集群中只有一个活跃的Leader其他节点作为Followers跟随主节点。以下是ZooKeeper选举机制的主要步骤
初始化
当一个ZooKeeper节点启动时它会进入LOOKING状态表示它正在寻找Leader。
选举 节点提出选举 在LOOKING状态的节点发起一次选举。它将自己提名为Leader并向其他节点发送投票请求。 投票阶段 其他节点收到投票请求后会检查自己的状态。如果发现自己是LOOKING状态就投票给发起选举的节点表示同意它成为Leader。 选票计数 发起选举的节点等待一定时间收集其他节点的投票。如果它收到了半数以上的节点的赞成票那么它将成为新的Leader。
成为Leader
成为Leader 如果节点收到了足够多的赞成票它就会成为新的Leader。同时其他节点将转变为FOLLOWING状态跟随新的Leader。
主节点故障 主节点故障检测 ZooKeeper集群会周期性地进行主节点是否存活的检测。 主节点失效 如果集群检测到主节点失效其他节点将进入新一轮的选举过程。 选举新主节点 这时其他节点重新发起选举进行投票过程选举一个新的主节点。 数据一致性 选举过程中ZooKeeper会确保只有一个Leader被选出以维持系统的一致性。 触发条件 选举可能被触发的条件包括节点启动、主节点失效、新节点加入等。
第一次启动选举
第一次启动选举机制的过程如下 服务器1启动发起选举但票数不足半数状态为LOOKING。 服务器2启动发起选举服务器1和2交换选票信息服务器1发现服务器2的myid更大更改选票为推举服务器2但仍未达到半数以上状态保持LOOKING。 服务器3启动发起选举服务器1和2更改选票为服务器3此时服务器3的票数超过半数成为Leader服务器1和2状态变为FOLLOWING服务器3状态为LEADING。 服务器4和5依次启动发起选举根据多数服从的原则依次更改选票为当前Leader状态变为FOLLOWING。
非第一次选举
Leader选举触发条件 服务器初始化启动。 服务器在运行期间无法与当前Leader保持连接。
Leader选举流程中的集群状态 存在Leader的情况 如果集群中已经存在Leader当一台机器试图进行Leader选举时它会被告知当前服务器的Leader信息。 对于这样的机器来说它只需与现有的Leader建立连接并进行状态同步而不需要进行实际的选举过程。 不存在Leader的情况 如果集群中确实不存在Leader那么开始Leader选举的机器将按照一定的规则进行选举。
选举Leader的规则 EPOCH大的直接胜出。 如果EPOCH相同则比较事务id事务id大的胜出。 如果事务id也相同则比较服务器id服务器id大的胜出。
举例说明假设有5台服务器SID分别为1、2、3、4、5ZXID分别为8、8、8、7、7。在某一时刻服务器3是Leader。如果服务器3和5出现故障开始进行Leader选举其他服务器将按照上述规则进行选举以确定新的Leader。这确保了在ZooKeeper中Leader的选举是有序和可靠的过程。
SID服务器ID SID用于唯一标识ZooKeeper集群中的每台机器。 每台机器的SID必须是唯一的而且它与myidmyid是ZooKeeper配置文件中配置的应该保持一致。
ZXID事务ID ZXID是用来标识一次服务器状态的变更通常对应于一次事务。 在集群中的每台机器上ZXID的值不一定完全一致。这是因为不同的机器可能在处理客户端的更新请求时具有不同的处理速度。 ZXID的变更是由ZooKeeper事务引起的每次状态变更例如创建、更新、删除节点都会导致ZXID的增加。
Epoch每个Leader任期的代号 Epoch是用来标识每个Leader任期的代号。 当没有Leader时同一轮投票过程中的逻辑时钟值是相同的。每次投完一次票Epoch的值就会增加。 Epoch的作用是确保在选举Leader时能够对不同的轮次进行正确的比较以防止过期的选票对当前轮次的影响。
综合来说这些标识符SID、ZXID、Epoch在ZooKeeper中用于确保分布式环境下的一致性和可靠性尤其在Leader选举和事务处理方面扮演重要角色。
Zookeeper 集群部署实例
环境准备
服务器准备 三台服务器 IP 192.168.41.31 192.168.41.32 192.168.41.33
安装前准备
在每台服务器上执行以下操作
关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce 0安装 JDK
yum install -shy java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version下载 Zookeeper 安装包至 /opt 目录 cd /opt
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.5.7/apache-zookeeper-3.5.7-bin.tar.gz安装 Zookeeper
解压安装包至指定目录
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7修改配置文件 /usr/local/zookeeper-3.5.7/conf/zoo.cfg设置参数并添加集群信息。
cd /usr/local/zookeeper-3.5.7/conf/
cp zoo_sample.cfg zoo.cfgvim zoo.cfg
tickTime2000 #通信心跳时间Zookeeper服务器与客户端心跳时间单位毫秒
initLimit10 #Leader和Follower初始连接时能容忍的最多心跳数tickTime的数量这里表示为10*2s
syncLimit5 #Leader和Follower之间同步通信的超时时间这里表示如果超过5*2sLeader认为Follwer死掉并从服务器列表中删除Follwer
dataDir/usr/local/zookeeper-3.5.7/data ●修改指定保存Zookeeper中的数据的目录目录需要单独创建
dataLogDir/usr/local/zookeeper-3.5.7/logs ●添加指定存放日志的目录目录需要单独创建
clientPort2181 #客户端连接端口
#添加集群信息
server.1192.168.41.31:3188:3288
server.2192.168.41.32:3188:3288
server.3192.168.41.33:3188:3288-------------------------------------------------------------------------------------
server.AB:C:D
●A是一个数字表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myid这个文件里面有一个数据就是A的值Zookeeper启动时读取此文件拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
●B是这个服务器的地址。
●C是这个服务器Follower与集群中的Leader服务器交换信息的端口。
●D是万一集群中的Leader服务器挂了需要一个端口来重新进行选举选出一个新的Leader而这个端口就是用来执行选举时服务器相互通信的端口。拷贝配置好的 Zookeeper 配置文件到其他机器上
//拷贝配置好的 Zookeeper 配置文件到其他机器上
scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.10.21:/usr/local/zookeeper-3.5.7/conf/
scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.10.22:/usr/local/zookeeper-3.5.7/conf/在每台服务器上创建数据目录和日志目录并创建 myid 文件以标识服务器编号
mkdir /usr/local/zookeeper-3.5.7/data
mkdir /usr/local/zookeeper-3.5.7/logs
echo 1 /usr/local/zookeeper-3.5.7/data/myid # 对应每个服务器不同的编号
echo 2 /usr/local/zookeeper-3.5.7/data/myid
echo 3 /usr/local/zookeeper-3.5.7/data/myid配置 Zookeeper 启动脚本 vim /etc/init.d/zookeeper#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME/usr/local/zookeeper-3.5.7
case $1 in
start)echo ---------- zookeeper 启动 ------------$ZK_HOME/bin/zkServer.sh start
;;
stop)echo ---------- zookeeper 停止 ------------$ZK_HOME/bin/zkServer.sh stop
;;
restart)echo ---------- zookeeper 重启 ------------$ZK_HOME/bin/zkServer.sh restart
;;
status)echo ---------- zookeeper 状态 ------------$ZK_HOME/bin/zkServer.sh status
;;
*)echo Usage: $0 {start|stop|restart|status}
esac在文件中添加启动脚本包括启动、停止、重启和状态检查。
设置脚本权限并设置开机自启
chmod x /etc/init.d/zookeeper
chkconfig --add zookeeper启动 Zookeeper
service zookeeper start检查状态
service zookeeper statusKafka概述
Kafka 是一个开源的分布式流处理平台和消息队列系统最初由 LinkedIn 开发并于2010年成为 Apache 软件基金会的顶级项目。它的设计目标是处理大规模数据流并提供高吞吐量、持久性、可扩展性和容错性。
定义
Kafka 是一个分布式的基于发布/订阅模式的消息队列MQMessage Queue主要应用于大数据实时处理领域。
核心概念 主题Topics消息的分类名称数据在 Kafka 中以主题为单位进行发布和订阅。 分区Partitions每个主题可以分成多个分区每个分区是一个有序且不可变的消息序列。 生产者Producers负责向 Kafka 主题发布消息的应用程序。 消费者Consumers从 Kafka 主题订阅消息并进行处理的应用程序。 代理BrokerKafka 集群中的服务器节点负责存储和管理消息。 副本Replication为确保容错性和高可用性Kafka 将分区的副本存储在多个代理上。
工作原理 发布 - 订阅模型生产者发布消息到主题消费者订阅并从主题拉取消息进行处理。 持久性Kafka 通过将消息持久化到磁盘并在配置的时间范围内保留消息确保消息的可靠性和持久性。 水平扩展性能够通过增加代理和分区来扩展处理能力保持高吞吐量和低延迟。 流处理Kafka Streams API 允许开发者构建流式处理应用实时处理数据并生成输出。 分布式架构Kafka 集群由多个代理组成可在多台服务器上部署确保容错性和负载均衡。
应用场景 日志收集与分析收集分布式系统的日志并进行实时分析和监控。 消息通信作为微服务架构中的异步通信中介。 事件驱动架构构建实时流处理应用程序处理事件流。 指标监控实时处理和监控大规模指标数据。
特性
高吞吐量和低延迟 Kafka 设计用于处理大规模数据流并能够以极高的速度进行消息处理。其优化的存储机制和分区策略以及高效的批量处理方式使其每秒能够处理大量消息并且提供非常低的延迟。 Kafka 每秒可以处理几十万条消息它的延迟最低只有几毫秒。每个 topic 可以分多个 PartitionConsumer Group 对 Partition 进行消费操作提高负载均衡能力和消费能力。
可扩展性
Kafka 的集群架构支持水平扩展可以轻松地增加更多的代理节点和分区以适应不断增长的数据处理需求。这种扩展性允许用户根据需求动态地扩展 Kafka 集群的处理能力而不需要大规模的系统停机或迁移。热扩展
持久性和可靠性
Kafka 将消息持久化到磁盘确保消息不会因为消费速度低于生产速度而丢失。它采用了可配置的持久化策略保证数据在一定时间内可用。此外Kafka 的副本机制允许数据备份即使发生节点故障也能保证数据的可靠性和完整性。
容错性 Kafka 的分布式架构设计具备高度的容错性。如果集群中的某些节点失效副本机制可以确保数据的可用性和一致性即使多个节点失败也不会影响整个集群的运行。 允许集群中节点失败多副本情况下若副本数量为 n则允许 n-1 个节点失败
高并发性
Kafka 能够支持大量的客户端同时读写这使得它在高并发环境下能够处理数千个客户端的请求而不影响其性能表现。
系统架构
Kafka 的系统架构是一个分布式、可扩展的架构主要包括生产者、代理Broker、主题Topic、分区Partition以及消费者等关键组件。以下是 Kafka 的系统架构的详细解释
生产者Producer 生产者负责将消息发布到 Kafka 集群中的主题。消息可以是任何字节类型的有效负载。生产者将消息发布到指定的主题而 Kafka 负责确保这些消息被持久化并发送到相应的消费者。 代理Broker 代理是 Kafka 集群中的服务器节点。每个代理负责存储消息处理生产者和消费者的请求并协调分区的复制。代理之间进行协调和通信形成一个分布式的消息处理系统。 一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。 主题Topic 主题是消息的逻辑容器用于对消息进行分类。生产者将消息发布到特定的主题而消费者则订阅一个或多个主题以接收相关消息。主题的设计允许将消息按照业务逻辑进行组织和分类。 可以理解为一个队列生产者和消费者面向的都是一个 topic。 类似于数据库的表名或者 ES 的 index 物理上不同 topic 的消息分开存储 分区Partition 每个主题可以分成多个分区每个分区是一个有序的、不可变的消息序列。分区的引入允许 Kafka 实现水平扩展提高消息的并行处理能力。每个分区可以在集群中的不同代理上进行复制以确保容错性和高可用性。 为了实现扩展性一个非常大的 topic 可以分布到多个 broker即服务器上一个 topic 可以分割为一个或多个 partition每个 partition 是一个有序的队列。Kafka 只保证 partition 内的记录是有序的而不保证 topic 中不同 partition 的顺序。 每个 topic 至少有一个 partition当生产者产生数据的时候会根据分配策略选择分区然后将消息追加到指定的分区的队列末尾。 消费者Consumer 消费者订阅一个或多个主题并从代理拉取消息进行处理。消费者组织在消费者组中每个消费者组可以有多个消费者。Kafka 通过分区的机制确保同一分区内的消息只能被同一消费者组的一个消费者处理从而实现负载均衡和高并发处理。 ZooKeeper 在 Kafka 集群中ZooKeeper 通常用于协调和管理集群的元数据例如代理的位置、分区的分配和消费者组的状态。ZooKeeper 的使用帮助 Kafka 集群实现了更强大的分布式协调和管理功能。
工作流程 生产者发布消息到特定主题。 代理存储并持久化消息同时根据分区规则将消息分发到不同的分区。 消费者订阅一个或多个主题并从代理拉取消息。 消费者组中的各个消费者并行处理不同分区的消息。 ZooKeeper 用于协调集群的元数据和状态信息。
Partation 数据路由规则 指定了 partition 如果在消息发送时已经指定了特定的 partition消息将直接被发送到该 partition。 未指定 partition 但指定 key 如果没有指定 partition 但指定了 key消息的某个属性系统会对这个 key 的值进行哈希取模运算以选定一个 partition。 既未指定 partition 也未指定 key 这种情况下会采用轮询round-robin的方式选出一个 partition。
消息编号和数据存储 每条消息都有一个自增的编号用于标识消息的偏移量从0开始标识顺序。 每个 partition 中的数据使用多个 segment 文件存储。
Partition 数目与消息顺序保证
如果一个 topic 有多个 partition消费数据时无法保证数据的顺序。若需严格保证消息的消费顺序例如在商品秒杀或抢红包场景需要将 partition 数目设为 1。
Broker 存储和集群配置 Broker 存储 topic 的数据。如果某个 topic 有 N 个 partition集群中有 N 个 broker每个 broker 存储该 topic 的一个 partition。 如果某个 topic 有 N 个 partition集群中有 (NM) 个 broker其中 N 个 broker 存储 topic 的一个 partition剩下的 M 个 broker 不存储该 topic 的 partition 数据。 如果某个 topic 有 N 个 partition但集群中 broker 数目少于 N 个一个 broker 可能会存储该 topic 的一个或多个 partition。这种情况可能导致 Kafka 集群数据不均衡因此在生产环境中要尽量避免这种情况。
Partition分区的原因
Partition 的原因 集群扩展 允许在集群中方便地扩展每个 Partition 可以根据所在的机器进行调整从而适应不同大小的数据。 提高并发 Partition 可以作为并发的基本单位使得可以并行读写提高了整体的并发处理能力。
Replica副本 保障数据不丢失 为了应对节点故障每个 Partition 都有若干个副本其中一个是 Leader其余是 Follower。 Leader-Follower 模式 Leader负责数据的读写Follower负责备份。写请求通过Leader路由变更广播给所有Follower。 选举新的 Leader 如果Leader故障系统从Follower中选举新的Leader。
Leader
每个 partition 有多个副本其中有且仅有一个作为 LeaderLeader 是当前负责数据的读写的 partition。
Follower Follower 跟随 Leader所有写请求都通过 Leader 路由数据变更会广播给所有 FollowerFollower 与 Leader 保持数据同步。Follower 只负责备份不负责数据的读写。 如果 Leader 故障则从 Follower 中选举出一个新的 Leader。 当 Follower 挂掉、卡住或者同步太慢Leader 会把这个 Follower 从 ISRLeader 维护的一个和 Leader 保持同步的 Follower 集合 列表中删除重新创建一个 Follower。
Producer生产者 数据发布者 负责将消息发布到 Kafka 的 Topic 中。 broker 接收到生产者发送的消息后broker 将该消息追加到当前用于追加数据的 segment 文件中。 Partition 选择 生产者可以选择将数据存储到指定的 Partition。
Consumer消费者 数据拉取 消费者从 Broker 中拉取数据。 消费多个 Topic 消费者可以消费多个 Topic 中的数据。
Consumer Group消费者组 逻辑订阅者 消费者组是逻辑上的订阅者由多个 Consumer 组成。 所有的消费者都属于某个消费者组即消费者组是逻辑上的一个订阅者。可为每个消费者指定组名若不指定组名则属于默认的组。 提高消费能力 将多个消费者集中处理一个 Topic 的数据提高消费能力。 分区分配 每个消费者负责消费不同分区的数据防止重复消费。
Offset 偏移量 唯一标识消息 偏移量唯一标识一条消息。 读取位置决定 决定读取数据的位置消费者通过偏移量来决定下次读取的消息。 消息不即时删除 消息被消费后并不立即删除可以重复使用。
Zookeeper 存储集群 meta 信息 Kafka通过Zookeeper存储集群的元数据信息。 Consumer Offset 管理 在Kafka 0.9版本之前Consumer Offset默认保存在Zookeeper中。从0.9版本开始默认保存在Kafka内置的_consumeroffsets topic中。 生产者和消费者协同 生产者和消费者都依赖Zookeeper来进行协同生产者找到Kafka集群节点而消费者获取消费的Offset信息。
kafka 集群 部署实例
下载安装包
//下载安装包
官方下载地址http://kafka.apache.org/downloads.htmlcd /opt
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.7.1/kafka_2.13-2.7.1.tgz在/opt目录下下载Kafka的安装包使用清华大学的镜像站点以提高下载速度。 使用wget命令下载指定版本2.7.1的Kafka。
安装 Kafka
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka//修改配置文件
cd /usr/local/kafka/config/
cp server.properties{,.bak}vim server.properties
broker.id0 ●21行broker的全局唯一编号每个broker不能重复因此要在其他机器上配置 broker.id1、broker.id2
listenersPLAINTEXT://192.168.41.31:9092 ●31行指定监听的IP和端口如果修改每个broker的IP需区分开来也可保持默认配置不用修改
num.network.threads3 #42行broker 处理网络请求的线程数量一般情况下不需要去修改
num.io.threads8 #45行用来处理磁盘IO的线程数量数值应该大于硬盘数
socket.send.buffer.bytes102400 #48行发送套接字的缓冲区大小
socket.receive.buffer.bytes102400 #51行接收套接字的缓冲区大小
socket.request.max.bytes104857600 #54行请求套接字的缓冲区大小
log.dirs/usr/local/kafka/logs #60行kafka运行日志存放的路径也是数据存放的路径
num.partitions1 #65行topic在当前broker上的默认分区个数会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir1 #69行用来恢复和清理data下数据的线程数量
log.retention.hours168 #103行segment文件数据文件保留的最长时间单位为小时默认为7天超时将被删除
log.segment.bytes1073741824 #110行一个segment文件最大的大小默认为 1G超出将新建一个新的segment文件
zookeeper.connect192.168.41.31:2181,192.168.41.32:2181,192.168.41.33:2181 ●123行配置连接Zookeeper集群地址将下载的tar.gz文件解压缩到/usr/local/kafka目录下。 备份并修改/usr/local/kafka/config/server.properties配置文件配置项包括 broker.id设置每个broker的唯一编号。 listeners指定监听的IP和端口。 其他网络、IO、缓冲区等相关配置。 log.dirsKafka运行日志和数据存放的路径。 zookeeper.connect配置连接Zookeeper集群地址。
修改环境变量
vim /etc/profile
export KAFKA_HOME/usr/local/kafka
export PATH$PATH:$KAFKA_HOME/binsource /etc/profile修改/etc/profile文件添加Kafka的环境变量包括KAFKA_HOME和将Kafka的bin目录添加到PATH中。 执行source /etc/profile以使环境变量生效。
配置 Zookeeper 启动脚本
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME/usr/local/kafka
case $1 in
start)echo ---------- Kafka 启动 ------------${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)echo ---------- Kafka 停止 ------------${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)$0 stop$0 start
;;
status)echo ---------- Kafka 状态 ------------count$(ps -ef | grep kafka | egrep -cv grep|$$)if [ $count -eq 0 ];thenecho kafka is not runningelseecho kafka is runningfi
;;
*)echo Usage: $0 {start|stop|restart|status}
esac创建并编辑/etc/init.d/kafka文件定义Kafka的启动、停止、重启和状态检查脚本。 设置脚本的执行权限并添加到开机启动项。
设置开机自启
chmod x /etc/init.d/kafka
chkconfig --add kafka创建并编辑/etc/init.d/kafka文件定义Kafka的启动、停止、重启和状态检查脚本。 设置脚本的执行权限并添加到开机启动项。
分别启动 Kafka
service kafka start使用service kafka start命令启动Kafka。
Kafka 命令行操作
//创建topic
kafka-topics.sh --create --zookeeper 192.168.41.31:2181,192.168.41.32:2181,192.168.41.33:2181 --replication-factor 2 --partitions 3 --topic testkafka-topics.sh --create --zookeeper 192.168.41.31:2181,192.168.41.33:2181,192.168.41.32:2181 --replication-factor 2 --partitions 3 --topic test
-------------------------------------------------------------------------------------
--zookeeper定义 zookeeper 集群服务器地址如果有多个 IP 地址使用逗号分割一般使用一个 IP 即可
--replication-factor定义分区副本数1 代表单副本建议为 2
--partitions定义分区数
--topic定义 topic 名称
-------------------------------------------------------------------------------------//查看当前服务器中的所有 topic
kafka-topics.sh --list --zookeeper 192.168.41.31:2181,192.168.41.32:2181,192.168.41.33:2181//查看某个 topic 的详情
kafka-topics.sh --describe --zookeeper 192.168.41.31:2181,192.168.41.32:2181,192.168.41.33:2181//发布消息
kafka-console-producer.sh --broker-list 192.168.41.31:9092,192.168.41.32:9092,192.168.41.33:9092 --topic test//消费消息
kafka-console-consumer.sh --bootstrap-server 192.168.41.31:9092,192.168.41.32:9092,192.168.41.33:9092 --topic test --from-beginning-------------------------------------------------------------------------------------
--from-beginning会把主题中以往所有的数据都读取出来
-------------------------------------------------------------------------------------//修改分区数
kafka-topics.sh --zookeeper 192.168.41.31:2181,192.168.41.32:2181,192.168.41.33:2181 --alter --topic test --partitions 6//删除 topic
kafka-topics.sh --delete --zookeeper 192.168.41.31:2181,192.168.41.32:2181,192.168.41.33:2181 --topic test使用kafka-topics.sh命令进行各种操作如创建topic、查看topic列表、查看topic详情、发布消息、消费消息、修改分区数和删除topic。 提供了每个命令的详细说明和参数解释。
补充说明 Kafka的配置文件server.properties中的各项配置需要根据实际需求进行调整尤其是网络、IO、缓冲区等方面的配置。 确保Zookeeper服务已经启动并在Kafka配置文件中正确指定Zookeeper集群地址。 启动Kafka前可以使用chkconfig命令将其设置为开机自启动。 在使用Kafka命令行操作时需要提供相应的参数如Zookeeper地址、topic名称、分区数等。 对于kafka-console-producer.sh提供了指定--bootstrap-server参数的示例指定Kafka的broker列表。
FilebeatKafkaELK
Filebeat、Kafka 和 ELKElasticsearch、Logstash 和 Kibana是一套常用于日志收集、传输和分析的开源工具组合。
Filebeat Filebeat 是一个轻量级的日志数据收集器由 Elastic 公司开发。 它负责监控日志文件或位置并将数据发送到指定的目标通常是消息队列或直接到 Elasticsearch。
Kafka Kafka 是一个分布式流处理平台由 Apache 软件基金会开发。 它提供了高可用性、可扩展性和持久性的消息传递系统通常用于构建实时数据流平台。 在这个场景中Kafka 充当一个中间层接收来自 Filebeat 的日志消息并将其传递给 ELK Stack。
ELK Stack ELK 是一个用于日志管理和数据分析的开源软件栈包括 Elasticsearch、Logstash 和 Kibana。 Elasticsearch 用于存储、检索和分析日志数据的分布式搜索引擎。 Logstash 用于日志收集、过滤和转发的数据处理引擎。 Kibana 用于可视化和分析 Elasticsearch 中存储的日志数据的用户界面。
工作流程 Filebeat 监控日志文件或位置收集日志数据。 Filebeat 将日志数据发送到 Kafka 中间件。 Kafka 作为消息队列接收并持久化日志消息。 Logstash 从 Kafka 中获取日志数据进行过滤和处理。 Logstash 将处理后的日志数据发送到 Elasticsearch 进行存储。 Kibana 提供用户界面让用户通过 Web 浏览器可视化和查询 Elasticsearch 中的日志数据。
这样的架构具有高可用性、可伸缩性和灵活性使得日志数据的采集、传输和分析变得更加容易管理和理解。
Kafka 文件存储机制解析:
在 Kafka 中消息以 topic 进行分类生产者生成消息消费者消费消息都是基于 topic 的。 topic 是逻辑上的概念而 partition 是物理上的概念每个 partition 对应一个 log 文件其中存储生产者生成的数据。每条数据都有一个唯一的 offset表示其在 log 文件中的位置。 消费者组中的每个消费者会实时记录自己消费到的 offset以便在出现错误时从上次的位置继续消费。
为了避免 log 文件过大导致数据定位效率低下Kafka 采用了分片和索引机制将每个 partition 分为多个 segment。每个 segment 包括一个 .index 文件和一个 .log 文件它们位于以 topic 名称和分区序号命名的文件夹下。
.index 文件存储大量的索引信息而 .log 文件存储实际的数据。索引文件中的元数据指向对应数据文件中消息的物理偏移地址。
数据可靠性保证:
为了确保生产者发送的数据可靠地到达指定的 topic每个 partition 在接收生产者发送的数据后都需要向生产者发送 ack确认收到。如果生产者收到 ack则会进行下一轮的发送否则将重新发送数据。
数据一致性问题: LEOLog End Offset: 指每个副本最大的 offset。 HWHigh Watermark: 指消费者能看到的最大 offset即所有副本中最小的 LEO。
Follower 故障: 当 follower 发生故障时它会被临时踢出 ISRIn-Sync Replica与 leader 保持同步的 follower 集合。 恢复后follower读取本地磁盘上记录的上次的 HW将高于 HW 的部分截取掉然后从 HW 开始向 leader 进行同步。 当 follower 的 LEO 大于等于该 Partition 的 HW时即 follower追上 leader后就可以重新加入 ISR。
Leader 故障: 当 leader 发生故障后会从 ISR 中选出一个新的 leader。 为确保多个副本之间的数据一致性其余的 follower 先截掉各自 log 文件高于 HW 的部分然后从新的 leader 同步数据。
Ack 应答机制:
Kafka提供了三种可靠性级别根据对可靠性和延迟的要求进行权衡选择 0 生产者无需等待来自 broker 的确认而继续发送下一批消息效率最高但可靠性最低。 1默认配置 生产者在 ISR 中的 leader 成功收到确认后发送下一条消息可能在 follower 同步之前发生 leader 故障导致数据丢失。 -1或 all 生产者需要等待 ISR 中的所有 follower 都确认接收到数据后才算一次发送完成可靠性最高。
在 0.11 版本以前的 Kafka只能保证数据不丢失在下游消费者需要对数据进行全局去重。在 0.11 及以后版本的 Kafka引入了幂等性特性即 Producer 不论向 Server 发送多少次重复数据Server 端都只会持久化一条。
FilebeatKafkaELK部署示例
ZookeeperKafka集群的部署 Zookeeper和Kafka是分布式系统中常用的协调服务和消息中间件。 需要在集群中的节点上部署Zookeeper和Kafka并确保它们正常运行。 Filebeat的部署
cd /usr/local/filebeatvim filebeat.yml
filebeat.prospectors:
- type: logenabled: truepaths:- /var/log/httpd/access_logtags: [access]- type: logenabled: truepaths:- /var/log/httpd/error_logtags: [error]......
#添加输出到 Kafka 的配置
output.kafka:enabled: truehosts: [192.168.41.31:9092,192.168.41.32:9092,192.168.41.33:9092] #指定 Kafka 集群配置topic: httpd #指定 Kafka 的 topic#启动 filebeat
./filebeat -e -c filebeat.yml在/usr/local/filebeat目录下编辑filebeat.yml文件配置Filebeat的prospectors指定要监控的日志文件路径和对应的标签。 添加输出到Kafka的配置指定Kafka集群的地址和topic。 启动Filebeat使用命令./filebeat -e -c filebeat.yml。 ELK的部署
cd /etc/logstash/conf.d/vim kafka.conf
input {kafka {bootstrap_servers 192.168.41.31:9092,192.168.41.32:9092,192.168.41.33:9092 #kafka集群地址topics httpd #拉取的kafka的指定topictype httpd_kafka #指定 type 字段codec json #解析json格式的日志数据auto_offset_reset latest #拉取最近数据earliest为从头开始拉取decorate_events true #传递给elasticsearch的数据额外增加kafka的属性数据}
}output {if access in [tags] {elasticsearch {hosts [192.168.41.31:9200]index httpd_access-%{YYYY.MM.dd}}}if error in [tags] {elasticsearch {hosts [192.168.41.31:9200]index httpd_error-%{YYYY.MM.dd}}}stdout { codec rubydebug }
}#启动 logstash
logstash -f kafka.conf在Logstash组件所在节点的/etc/logstash/conf.d/目录下新建Logstash的配置文件kafka.conf。 在kafka.conf中配置Logstash的输入从Kafka中拉取指定topic的日志数据设置type字段为httpd_kafka使用json格式解析日志数据。 配置输出到Elasticsearch的规则根据日志的标签tags分类存储到不同的索引并输出调试信息到stdout。 启动Logstash使用命令logstash -f kafka.conf。
注生产黑屏操作es时查看所有的索引:curl -X GET localhost:9200/_cat/indices?v
验证
浏览器访问 http://192.168.41.31:5601 登录 Kibana单击“Create Index Pattern”按钮添加索引“filebeat_test-*”单击 “create” 按钮创建单击 “Discover” 按钮可查看图表信息及日志信息。