大连筑成建设集团有限公司网站,近期十大热点新闻,宁波公司地址,电子商务公司图片1.简介kafka本质上是一个开源的、分布式的流处理平台。它被设计用来高效地处理实时数据流。核心功能是作为一个高吞吐量、低延迟、可水平扩展、持久化、容错的发布-订阅消息系统。2.核心用途和解决的问题构建实时数据管道和流处理应用#xff1a;在不同系统或应用之间可靠地传…1.简介kafka本质上是一个开源的、分布式的流处理平台。它被设计用来高效地处理实时数据流。核心功能是作为一个高吞吐量、低延迟、可水平扩展、持久化、容错的发布-订阅消息系统。2.核心用途和解决的问题构建实时数据管道和流处理应用在不同系统或应用之间可靠地传输大量实时数据。
解耦系统让数据生产者和消费者不需要直接通信或知道彼此的存在提高了系统的灵活性和可维护性。
缓冲在数据生产者和处理速度较慢的消费者之间充当缓冲区防止消费者被压垮。
持久化存储数据被持久化写入磁盘并保留一定时间可配置允许消费者按需重播历史数据。3.关键特性和优势高吞吐量能够处理每秒数百万条消息非常适合大数据场景。
低延迟从消息产生到被消费的延迟通常在毫秒级别。
可扩展性通过添加更多的服务器Broker轻松水平扩展以处理不断增长的数据量和负载。
持久性消息被写入磁盘并进行复制冗余备份确保数据在服务器故障时不会丢失。
容错性分布式架构和数据复制机制使得集群中部分节点故障时整个服务仍然可用。
高并发支持大量生产者和消费者同时读写。
多语言客户端支持提供多种编程语言如 Java, Python, Go, .NET 等的客户端库。4.在linux系统上部署Kafka Broker集群(KRaft模式)上面简单介绍Kafka一些概念现在让我们来学习一下基于KRaft模式下在Linux系统上如何部署Kafka Broker集群。4.1安装Java在安装Kafka服务之前需要先安装Java环境。这里安装是Java 11可以直接从Oracle官网https://www.oracle.com/java/technologies/downloads/#java11上下载。我把一些二进制安装包统一归类存放在一个package目录中同时新建一个java目录解压二进制包这里我下载Java 11版本是jdk-11.0.28再配置Java环境变量使其生效。部署的服务器分别是192.168.18.200192.168.18.201192.168.18.202这三台虚拟机具体部署脚本命令如下以下花括号{}表示脚本内容
# 新建java目录
mkdir /usr/java# 切换到安装包目录
cd /root/package# 解压jdk二进制安装包到java目录下
tar zxvf jdk-11.0.28_linux-x64_bin.tar.gz -C /usr/java
# 删除java文件夹
#rm -rf /usr/java/jdk1.8.0_351# 分发java目录到201和202服务器对应目录上
scp -r /usr/java/jdk-11.0.28 root192.168.18.201:/usr/java/jdk-11.0.28
scp -r /usr/java/jdk-11.0.28 root192.168.18.202:/usr/java/jdk-11.0.28# 编辑profile文件配置java、kafka环境变量
vi /etc/profile
{
# 配置java环境变量
export JAVA_HOME/usr/java/jdk-11.0.28
export JRE_HOME${JAVA_HOME}/jre
export CLASSPATH.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
export JAVA_PATH${JAVA_HOME}/bin:${JRE_HOME}/bin
export PATH$PATH:${JAVA_PATH}
# 配置kafka环境变量
export KAFKA_HOME/root/kafka
}# 查看PATH环境变量的值
echo $PATH# 刷新profile文件使java环境变量生效
source /etc/profile# 查看java版本号
java -version4.2部署Kafka Broker集群安装好Java 11后接下来就可以部署Kafka Broker集群了。可以从apache.org官网上https://kafka.apache.org/downloads.html下载最新版本的Kafka。我下载Kafka的版本是3.4.0对应Scala版本是2.13。下载好的Kafka二进制安装包存放在package目录上部署的服务器分别是192.168.18.200192.168.18.201192.168.18.202这三台虚拟机集群部署命令如下启用SASL安全认证以及配置ACL权限控制以下花括号{}表示脚本内容
# 切换到安装包目录
cd /root/package# 解压kafka二进制安装包
tar -zxvf kafka_2.13-3.4.0.tgz -C /root# 修改kafka目录名称
mv /root/kafka_2.13-3.4.0 /root/kafka# 创建一个存放topic数据的目录
mkdir $KAFKA_HOME/kraft-datas
# 移除存放topic数据的目录
rm -rf $KAFKA_HOME/kraft-datas
# 创建kafka日志目录
mkdir $KAFKA_HOME/logs
# 移除kafka日志目录
rm -rf $KAFKA_HOME/logs# 拷贝一份server.properties配置重命名为server-plain.properties修改kafka服务配置
cp $KAFKA_HOME/config/kraft/server.properties $KAFKA_HOME/config/kraft/server-plain.properties# 修改kafka的server.properties配置文件添加SASL认证修改listeners中的ip为对应kafka节点服务器地址
vi $KAFKA_HOME/config/kraft/server-plain.properties
{
#################################### Kafka其他配置 ####################################
# 此服务器的角色。设置此选项将使我们进入KRaft模式一个节点可以充当broker或controller或两者兼之
process.rolesbroker,controller
# 与此实例的角色关联的节点id
node.id1# 控制器仲裁的连接字符串选举的投票节点所有process.roles包含controller角色的规划节点都要参与
controller.quorum.voters1192.168.18.200:9093,2192.168.18.201:9093,3192.168.18.202:9093
# 套接字服务器侦听的地址broker将使用9092端口而kraft controller控制器将使用9093端口
listenersSASL_PLAINTEXT://192.168.18.200:9092,CONTROLLER://192.168.18.200:9093
# 用于代理之间通信的侦听器的名称
inter.broker.listener.nameSASL_PLAINTEXT
# 监听器名称、主机名和代理将向客户端播发的端口。如果未设置则使用listeners的值这里指定kafka通过代理暴露的地址如果都是局域网使用就配置PLAINTEXT://:9092即可
advertised.listenersSASL_PLAINTEXT://192.168.18.200:9092# 存储日志文件的目录的逗号分隔列表实际存放topic数据的目录并不是记录日志
log.dirs/root/kafka/kraft-datas
# 为broker间通讯开启PLAIN机制采用PLAIN算法
sasl.mechanism.inter.broker.protocolPLAIN
# Kafka服务器中启用的SASL机制列表同时启用PLAIN机制
sasl.enabled.mechanismsPLAIN
# 服务器用于从网络接收请求并向网络发送响应的线程数默认3
num.network.threads6
# 服务器用于处理请求的线程数其中可能包括磁盘I/O默认8
num.io.threads24
# 配置该参数可以使kafka每5min自动做一次leader的rebalance
auto.leader.rebalance.enabletrue# 设置超级管理员多个分号隔开
super.usersUser:admin
}# 在kafka的config目录下创建jaas配置文件
touch $KAFKA_HOME/config/kraft/kafka_server_jaas_plain.conf# 编辑jaas配置文件
vi $KAFKA_HOME/config/kraft/kafka_server_jaas_plain.conf
{
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusernameadminpasswordBigdata.1234user_adminBigdata.1234user_icvipQwer.1234;
};
}# 拷贝一份启动脚本重命名为kafka-server-start-kraft-plain.sh修改kafka启动脚本
cp $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/bin/kafka-server-start-kraft-plain.sh# 编辑kafka-server-start-kraft-plain.sh启动脚本
vi $KAFKA_HOME/bin/kafka-server-start-kraft-plain.sh
{
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config$KAFKA_HOME/config/kraft/kafka_server_jaas_plain.conf kafka.Kafka $
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka $
}# 新建admin.conf并添加以下配置由于broker使用安全认证的方式启动所以开启生产者和消费者也需要经过客户端认证
vi $KAFKA_HOME/config/kraft/admin.conf
{
security.protocolSASL_PLAINTEXT
sasl.mechanismPLAIN
sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required usernameadmin passwordBigdata.1234;
#sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required usernameadmin passwordBigdata.1234;
}# 新建icvip.conf并添加以下配置
vi $KAFKA_HOME/config/kraft/icvip.conf
{
security.protocolSASL_PLAINTEXT
sasl.mechanismPLAIN
sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required usernameicvip passwordQwer.1234;
#sasl.jaas.configorg.apache.kafka.common.security.scram.ScramLoginModule required usernameicvip passwordQwer.1234;
}# 如果没有权限只执行如下命令
chmod -R 777 /root/kafka# 生成一个唯一的集群ID在一台kafka服务器上执行一次即可
$KAFKA_HOME/bin/kafka-storage.sh random-uuid
{
Ctq2XbzSS3K_EJqeiF4Csw
}# 格式化存储目录使用生成的集群ID配置文件格式化存储目录log.dirs每一台主机服务器都要执行这个命令
$KAFKA_HOME/bin/kafka-storage.sh format \
-t Ctq2XbzSS3K_EJqeiF4Csw \
-c $KAFKA_HOME/config/kraft/server-plain.properties# 格式化操作完成后你会发现在我们定义的log.dirs目录下多出一个meta.properties文件meta.properties文件中存储了当前的kafka节点的id(node.id)当前节点属于哪个集群(cluster.id)
cat $KAFKA_HOME/kraft-datas/meta.properties# 如果防火墙有IP端口限制请设置端口允许外部访问
firewall-cmd --permanent --zonepublic --add-port9092/tcp
firewall-cmd --permanent --zonepublic --add-port9093/tcp
# 防火墙重启
firewall-cmd --reload
# 查看端口是否允许外部访问
firewall-cmd --permanent --zonepublic --query-port9092/tcp
firewall-cmd --permanent --zonepublic --query-port9093/tcp
# 从防火墙里移除端口
firewall-cmd --permanent --zonepublic --remove-port8080/tcp
# 查看防火墙所有端口
firewall-cmd --zonepublic --list-ports# 查看kafka进程
ps -ef | grep kafka# 强制杀死kafka进程
kill -9 1122# 后台重新启动kafka服务
$KAFKA_HOME/bin/kafka-server-start-kraft-plain.sh -daemon $KAFKA_HOME/config/kraft/server-plain.properties# 前台停止kafka服务
$KAFKA_HOME/bin/kafka-server-stop.sh4.3Kafka操作脚本命令
# 创建kafka主题topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic bigdata-tests --bootstrap-server 192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 --command-config $KAFKA_HOME/config/kraft/admin.conf# 创建kafka带分区数、副本数等参数主题topic
$KAFKA_HOME/bin/kafka-topics.sh --create --topic bigdata-tests --bootstrap-server 192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 --partitions 2 --replication-factor 2 --command-config $KAFKA_HOME/config/kraft/admin.conf# 删除kafka主题topic
$KAFKA_HOME/bin/kafka-topics.sh --delete --topic bigdata-tests --bootstrap-server 192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 --command-config $KAFKA_HOME/config/kraft/admin.conf# 修改kafka主题topic分区数
$KAFKA_HOME/bin/kafka-topics.sh --alter --topic quickstart-events --partitions 2 --bootstrap-server 192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 --command-config $KAFKA_HOME/config/kraft/admin.conf# 显示kafka主题topic
$KAFKA_HOME/bin/kafka-topics.sh --describe --topic bigdata-tests --bootstrap-server 192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 --command-config $KAFKA_HOME/config/kraft/admin.conf# 查询所有kafka主题topic
$KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server 192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 --command-config $KAFKA_HOME/config/kraft/admin.conf# 启动kafka生产者客户端推送主题信息
$KAFKA_HOME/bin/kafka-console-producer.sh --topic bigdata-tests --bootstrap-server 192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 --producer.config $KAFKA_HOME/config/kraft/admin.conf# 启动kafka消费者客户端消费主题信息
$KAFKA_HOME/bin/kafka-console-consumer.sh --topic bigdata-tests --from-beginning --bootstrap-server 192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 --consumer.config $KAFKA_HOME/config/kraft/admin.conf# 启动kafka消费者组客户端消费主题信息
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server 192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 --topic bigdata-tests --group bigdata_group --from-beginning --consumer.config $KAFKA_HOME/config/kraft/admin.conf4.4Brokerkraft模式参数配置说明在了解配置之前必须理解KRaft模式中的三种节点角色因为配置参数与角色紧密相关
Controller负责管理集群元数据如主题创建、分区领导选举、ACL等。一个集群中通常有多个Controller节点奇数个如3、5组成一个Raft仲裁组(Quorum)通过选举产生一个Active Controller。
Broker传统的Kafka角色负责存储数据和处理客户端的生产消费请求。Broker启动后从Controller获取最新的元数据。
Co-located (controllerbroker)同时扮演Controller和Broker的角色。不推荐用于生产环境因为这会将元数据平面和数据平面耦合可能相互影响主要用于开发测试。
这里主要把核心配置与调优与安全配置这两部分的配置来说明。4.4.1核心配置这些是搭建一个KRaft集群所必须设置的最小参数集。
●节点角色与ID配置
◎process.roles节点角色
说明定义此节点扮演的角色。
可选值controller、broker、controller,broker
示例
纯Controller节点process.rolescontroller
纯Broker节点process.rolesbroker
ControllerBroker节点process.rolesbroker,controller
◎node.id节点ID
说明集群范围内唯一的整数标识符。非常重要必须在整个集群中唯一。
示例node.id1、node.id2、node.id3
●KRaft仲裁组Quorum配置
这是KRaft的核心告诉节点如何找到彼此并组成元数据集群。
◎controller.quorum.voters
说明定义整个KRaft Controller仲裁组的所有投票成员。所有Controller节点和Co-located节点的配置必须完全一致。Broker节点也需要此配置来发现Controller。
格式id1host1:port1,id2host2:port2,id3host3:port3
示例controller.quorum.voters1controller1.example.com:9093,2controller2.example.com:9093,3controller3.example.com:9093
注意id是node.id。port是controller.listener.names所监听的端口见下文不是客户端连接的9092端口。
◎controller.listener.names
说明用于Controller节点之间内部Raft通信的监听器名称。通常设置为CONTROLLER。
示例controller.listener.namesCONTROLLER
◎listeners
说明需要包含Controller监听器。Broker节点也需要配置此参数来处理客户端连接。
Controller节点示例listenersCONTROLLER://:9093
Broker节点示例listenersCONTROLLER://:9093,PLAINTEXT://:9092 假设用PLAINTEXT对外服务
Co-located节点示例listenersCONTROLLER://:9093,PLAINTEXT://:9092
◎inter.broker.listener.name
说明Broker节点之间通信使用的监听器名称。仅Broker或Co-located节点需要设置。
示例inter.broker.listener.namePLAINTEXT
●元数据存储与日志配置
◎log.dirs
说明存储实际消息数据的目录。
◎metadata.log.dir
说明KIP-858存储元数据日志的目录。强烈建议为元数据日志配置一个与数据日志物理上分开的高性能磁盘如NVMe SSD这对集群整体稳定性至关重要。
注意在旧版本中元数据也存储在log.dirs中。但从3.4版本开始建议使用metadata.log.dir进行分离。4.3.2调优与安全配置●Raft参数Controller调优
这些参数主要影响元数据操作的性能和容错能力。
◎metadata.log.max.record.bytes.between.snapshots
说明触发一次元数据快照之前元数据日志可以增长的最大字节数。降低此值可以加快Broker重启后从快照恢复元数据的速度但会增加 Controller 的负载。默认值1073741824 (1GiB)
◎controller.snapshot.max.interval.ms
说明即使日志字节数未达到上限也强制生成快照的最大时间间隔。确保元数据不会太久没有快照。默认值900000 (15分钟)
●Broker参数数据平面调优
这些是传统Broker调优参数在KRaft模式下依然重要。
◎num.io.threads
说明处理磁盘I/O的线程数。通常设置为磁盘数量。默认值8
◎num.network.threads
说明处理网络请求的线程数。默认值3
◎num.replica.fetchers
说明从领导副本拉取消息进行同步的Follower副本线程数。增加此值可以加快副本同步速度。默认值1
◎auto.create.topics.enable
说明是否自动创建主题当生产者向不存在的主题写入消息时。生产环境强烈建议设置为false。默认值true
◎default.replication.factor
说明自动创建主题时的默认副本因子。如果禁用了自动创建此参数无效。建议至少2通常为3。
◎min.insync.replicas
说明定义生产者将消息成功写入所需的最少同步副本数。这是保证数据不丢失的关键参数之一与acksall 配合使用。
示例如果replication.factor3设置min.insync.replicas2允许最多1个副本宕机而不影响生产者。
●安全配置
◎listeners和advertised.listeners
说明配置SSLSSL://或 SASLSASL_SSL://监听器以实现加密和认证。
示例listenersSASL_SSL://:9092
◎security.inter.broker.protocol
说明Broker间通信使用的安全协议。应与inter.broker.listener.name对应监听器的协议一致。
示例security.inter.broker.protocolSASL_SSL
◎sasl.enabled.mechanisms, ssl.keystore.*, ssl.truststore.*等
说明具体的SASL和SSL配置需根据选择的认证和加密机制进行详细配置。5.在Docker容器上部署Kafka UIKafka UI是一类用于可视化管理和监控Apache Kafka集群的图形化用户界面GUI工具。
Apache Kafka本身是一个高性能的分布式流数据平台但其原生只提供了命令行工具如 kafka-topics.sh, kafka-console-consumer.sh等和API进行交互。这些工具虽然强大但对于日常的运维、监控、调试和开发工作来说不够直观和高效。Kafka UI的出现就是为了解决这个问题它将复杂的命令行操作转化为直观的点击操作让用户能够通过Web浏览器轻松地查看集群状态、Broker、Topic、Consumer Group等信息创建、修改和删除Topic浏览和搜索消息内容监控消息堆积滞后Lag情况测试消息的生产和消费。组件部署的服务器是192.168.18.200这台虚拟机部署命令如下如果未安装docker可以参考如下链接安装
# 先安装docker下面网址是安装步骤也可以自行安装
https://www.runoob.com/docker/centos-docker-install.html# 启动docker容器
systemctl start docker# 关闭docker容器
systemctl stop docker# 重启docker容器
systemctl restart docker# 容器启动kafka ui组件第一个8080是对外通讯端口第二个8080是内部通讯端口
docker run -p 8080:8080 \-e KAFKA_CLUSTERS_0_NAMEKafka_Cluster_Dev \-e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS192.168.18.200:9092,192.168.18.201:9092,192.168.18.202:9092 \-e SPRING_SECURITY_USER_NAMEadmin \-e SPRING_SECURITY_USER_PASSWORDBigdata.1234 \-e AUTH_TYPELOGIN_FORM \-e KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOLSASL_PLAINTEXT \-e KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISMPLAIN \-e KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIGorg.apache.kafka.common.security.plain.PlainLoginModule required usernameadmin passwordBigdata.1234; \-d provectuslabs/kafka-ui:latest# 如果防火墙有IP端口限制请设置端口允许外部访问
firewall-cmd --permanent --zonepublic --add-port8082/tcp# 从防火墙里移除端口
firewall-cmd --permanent --zonepublic --remove-port8080/tcp# 防火墙重启
firewall-cmd --reload# 查看防火墙所有端口
firewall-cmd --zonepublic --list-ports# 查看所有容器信息
docker ps -a# 查看运行容器信息
docker ps# 删除容器容器ID
docker rm af4e6eb96a43# 启动容器容器ID
docker start 16093bac4c46# 停止容器容器ID
docker stop af4e6eb96a43
参考文献
Kafka官网https://kafka.apache.org/documentation/#gettingStarted