中小企业品牌网站建设,莆田建设信息网站,北京网站建设公司哪个好,百度云盘安装ZK
Index of /zookeeper/zookeeper-3.9.2
下载安装包 一定要下载-bin的#xff0c;不带bin的是源码#xff0c;没有编译的#xff0c;无法执行。-bin的才可以执行。 解压
tar -zxvf apache-zookeeper-3.9.2-bin.tar.gz 备份配置
cp zoo_sample.cfg zoo_sample.cfg-b…安装ZK
Index of /zookeeper/zookeeper-3.9.2
下载安装包 一定要下载-bin的不带bin的是源码没有编译的无法执行。-bin的才可以执行。 解压
tar -zxvf apache-zookeeper-3.9.2-bin.tar.gz 备份配置
cp zoo_sample.cfg zoo_sample.cfg-back 配置命名并修改配置
# 创建zk数据路径
mkdir -p /data/zk
# 修改配置
mv zoo_sample.cfg zoo.cfg vim zoo.cfg 配置变更内容
########################### 变更区
# 将数据目录变更为新的路径
# dataDir/tmp/zookeeper
dataDir/data/zk
########################### 变更区##########################################配置原文件
# The number of milliseconds of each tick
tickTime2000
# The number of ticks that the initial
# synchronization phase can take
initLimit10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
# dataDir/tmp/zookeeper
dataDir/data/zk
# the port at which the clients will connect
clientPort2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount3
# Purge task interval in hours
# Set to 0 to disable auto purge feature
#autopurge.purgeInterval1## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.classNameorg.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost0.0.0.0
#metricsProvider.httpPort7000
#metricsProvider.exportJvmInfotrue启动zk
./bin/zkServer.sh start 常用命令
# 启动
./zkServer.sh start# 停止
./zkServer.sh stop# 状态
./zkServer.sh status 集群改造
集群需要多台机器
每台机器的配置都要配置上下面的内容
配置
server.2173.16.250.31:12888:13888
server.1173.16.250.32:12888:13888
# 配置解析
# server.2173.16.250.31:12888:13888
## server 是固定前缀
## 2 代表当前节点id可以自定义数字或字母标识只要唯一标识一个节点即可。
## 173.16.250.31 节点id
## 12888 master和slave通信端口 默认2888
## 13888 leader选举端口默认3888
注意自己当前节点及其他的节点均要配置。
创建当前节点id》myid
注意myid需要创建在dataDir目录下看配置的dataDir目录是什么否则会起不来
# 这里的2 就是当前节点的id等于 server.2xxxx:2888:3888 ,中的2
echo 2 /data/zk/myid验证集群状态
# 集群状态下登录每一台zk服务器查看状态会显示当前的节点是否为leader./zkServer.sh status# 非主节点显示
[rootlocaldomain bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zk/apache-zookeeper-3.9.2-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower# 主节点显示
[rootlocaldomain bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zk/apache-zookeeper-3.9.2-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
安装kafka
kafka首页Apache Kafka 点击下载即可 解压安装包
tar -zxvf kafka_2.13-3.7.0.tgz 修改配置
vim /${KAFKA_HOME}/config/server.properties broker.id0 //初始是0每个 server 的broker.id 都应该设置为不一样的就和 myid 一样 我的三个服务分别设置的是 1,2,3
log.dirs/usr/local/kafka/kafka_2.12-2.3.0/log#在log.retention.hours168 下面新增下面三项
message.max.byte5242880
default.replication.factor2
replica.fetch.max.bytes5242880#设置zookeeper的连接端口
zookeeper.connect192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181# 设置局域网内其他机器可以访问如果不设置只能localhost能访问会导致后面写消费者时java程序连接不上。这里写自己的服务ip即可。
advertised.listenersPLAINTEXT://173.16.250.32:9092
配置解释
broker.id0 #当前机器在集群中的唯一标识和zookeeper的myid性质一样
port9092 #当前kafka对外提供服务的端口默认是9092
host.name192.168.1.7 #这个参数默认是关闭的在0.8.1有个bugDNS解析问题失败率的问题。
num.network.threads3 #这个是borker进行网络处理的线程数
num.io.threads8 #这个是borker进行I/O处理的线程数
log.dirs/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录这个目录可以配置为“”逗号分割的表达式上面的num.io.threads要大于这个目录的个数这个目录如果配置多个目录新创建的topic他把消息持久化的地方是当前以逗号分割的目录中那个分区数最少就放那一个
socket.send.buffer.bytes102400 #发送缓冲区buffer大小数据不是一下子就发送的先回存储到缓冲区了到达一定的大小后在发送能提高性能
socket.receive.buffer.bytes102400 #kafka接收缓冲区大小当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数这个值不能超过java的堆栈大小
num.partitions1 #默认的分区数一个topic默认1个分区数
log.retention.hours168 #默认消息的最大持久化时间168小时7天
message.max.byte5242880 #消息保存的最大值5M
default.replication.factor2 #kafka保存消息的副本数如果一个副本失效了另一个还可以继续提供服务
replica.fetch.max.bytes5242880 #取消息的最大直接数
log.segment.bytes1073741824 #这个参数是因为kafka的消息是以追加的形式落地到文件当超过这个值的时候kafka会新起一个文件
log.retention.check.interval.ms300000 #每隔300000毫秒去检查上面配置的log失效时间log.retention.hours168 到目录查看是否有过期的消息如果有删除
log.cleaner.enablefalse #是否启用log压缩一般不用启用启用的话可以提高性能
zookeeper.connect192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的连接端口 详细说明
常规配置
这些参数是 kafka 中最基本的配置broker.id每个 broker 都需要有一个标识符使用 broker.id 来表示。它的默认值是 0它可以被设置成其他任意整数在集群中需要保证每个节点的 broker.id 都是唯一的。port如果使用配置样本来启动 kafka 它会监听 9092 端口修改 port 配置参数可以把它设置成其他任意可用的端口。zookeeper.connect用于保存 broker 元数据的地址是通过 zookeeper.connect 来指定。localhost:2181 表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表每一部分含义如下hostname 是 zookeeper 服务器的服务名或 IP 地址port 是 zookeeper 连接的端口/path 是可选的 zookeeper 路径作为 Kafka 集群的 chroot 环境。如果不指定默认使用跟路径log.dirsKafka 把消息都保存在磁盘上存放这些日志片段的目录都是通过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径那么 broker 会根据 最少使用 原则把同一分区的日志片段保存到同一路径下。要注意broker 会向拥有最少数目分区的路径新增分区而不是向拥有最小磁盘空间的路径新增分区。num.recovery.threads.per.data.dir对于如下 3 种情况Kafka 会使用可配置的线程池来处理日志片段服务器正常启动用于打开每个分区的日志片段服务器崩溃后启动用于检查和截断每个分区的日志片段服务器正常关闭用于关闭日志片段默认情况下每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说一旦发生崩愤在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说如果 num.recovery.threads.per.data.dir 被设为 8并且 log.dir 指定了 3 个路径那么总共需要 24 个线程。auto.create.topics.enable默认情况下Kafka 会在如下 3 种情况下创建主题当一个生产者开始往主题写入消息时当一个消费者开始从主题读取消息时当任意一个客户向主题发送元数据请求时delete.topic.enable如果你想要删除一个主题你可以使用主题管理工具。默认情况下是不允许删除主题的delete.topic.enable 的默认值是 false 因此你不能随意删除主题。这是对生产环境的合理性保护但是在开发环境和测试环境是可以允许你删除主题的所以如果你想要删除主题需要把 delete.topic.enable 设为 true。主题默认配置
Kafka 为新创建的主题提供了很多默认配置参数下面就来一起认识一下这些参数num.partitionsnum.partitions 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能该功能是默认启用的主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意我们可以增加主题分区的个数但不能减少分区的个数。default.replication.factor这个参数比较简单它表示 kafka保存消息的副本数如果一个副本失效了另一个还可以继续提供服务default.replication.factor 的默认值为1这个参数在你启用了主题自动创建功能后有效。log.retention.msKafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间默认是 168 个小时也就是一周。除此之外还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的都是决定消息多久以后被删除推荐使用 log.retention.ms。log.retention.bytes另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定作用在每一个分区上。也就是说如果有一个包含 8 个分区的主题并且 log.retention.bytes 被设置为 1GB那么这个主题最多可以保留 8GB 数据。所以当主题的分区个数增加时整个主题可以保留的数据也随之增加。log.segment.bytes上述的日志都是作用在日志片段上而不是作用在单个消息上。当消息到达 broker 时它们被追加到分区的当前日志片段上当日志片段大小到达 log.segment.bytes 指定上限默认为 1GB时当前日志片段就会被关闭一个新的日志片段被打开。如果一个日志片段被关闭就开始等待过期。这个参数的值越小就越会频繁的关闭和分配新文件从而降低磁盘写入的整体效率。log.segment.ms上面提到日志片段经关闭后需等待过期那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭就看哪个条件先得到满足。message.max.bytesbroker 通过设置 message.max.bytes 参数来限制单个消息的大小默认是 1000 000 也就是 1MB如果生产者尝试发送的消息超过这个大小不仅消息不会被接收还会收到 broker 返回的错误消息。跟其他与字节相关的配置参数一样该参数指的是压缩后的消息大小也就是说只要压缩后的消息小于 mesage.max.bytes那么消息的实际大小可以大于这个值这个值对性能有显著的影响。值越大那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小从而影响 IO 吞吐量。 启动kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties 检查服务是否启动
# 执行命令 jps
6201 QuorumPeerMain
7035 Jps
6972 Kafka 验证
创建topic
bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 2 --partitions 1 --topic test--replication-factor 2 复制两份--partitions 1 创建1个分区--topic 创建主题 查看topic是否创建成功
bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092 创建生产者
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test 创建消费者
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning 查看topic状态 bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test# 下面是显示的详细信息
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2# 分区为为1 复制因子为2 主题 cxuantopic 的分区为0
# Replicas: 0,1 复制的为12 kafka集群配置
多台机器启动集群环境配置改动很少都很简单
配置修改
# 这里多台服务器改成不一样的
broker.id1
# 改为自己的ip地址
advertised.listenersPLAINTEXT://173.16.250.32:9092
# 配置所有zk节点
zookeeper.connect173.16.250.32:2181,173.16.250.31:2181 其他的按照正常上面的流程重新启动即可。
如果遇到
开发生产者及消费者
maven依赖
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency 配置 spring:kafka:consumer:bootstrap-servers: 173.16.250.32:9092# 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)auto-offset-reset: earliestproducer:bootstrap-servers: 173.16.250.32:9092# 发送的对象信息变为json格式value-serializer: org.springframework.kafka.support.serializer.JsonSerializer生产者服务
package cn.huadingyun.bp.tidb.sync.provider;import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;/*** author rubik* created of 2024/4/11 20:24 for cn.huadingyun.bp.tidb.sync.provider*/
Component
RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplateString, Object kafkaTemplate;public void send(String topic, Object message) {kafkaTemplate.send(topic, message);}}生产者接口
package cn.huadingyun.bp.tidb.sync.api;import cn.huadingyun.bp.tidb.sync.provider.KafkaProducerService;
import cn.huadingyun.framework.dto.model.Result;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** author rubik* created of 2024/4/11 20:22 for cn.huadingyun.bp.tidb.sync.api*/
RestController
RequestMapping(kafka)
RequiredArgsConstructor
public class KafkaProvider {private final KafkaProducerService kafkaProducerService;GetMapping(send)public ResultString send() {kafkaProducerService.send(test, hello);return Result.success(okk);}}消费者
package cn.huadingyun.bp.tidb.sync.listener;import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** author rubik* created of 2024/4/11 20:01 for cn.huadingyun.bp.tidb.sync.listener*/
Component
public class KafkaListenerDemo {KafkaListener(topics {test}, groupId test)public void listen(String message) {System.err.println(message);}
}注意这里的groupId必须指定否则启动会报错。可以在配置的时候统一指定。