知乎 上海做网站的公司,珠海医疗网站建设公司排名,重庆微信网站建设多少钱,太古楼角原网站建设前期准备安装kafka
启动Kafka本地环境需Java 8以上
Kafka是一种高吞吐量的分布式发布订阅消息系统#xff0c;它可以处理消费者在网站中的所有动作流数据。
Kafka启动方式有Zookeeper和Kraft#xff0c;两种方式只能选择其中一种启动#xff0c;不能同时使用。
Kafka下载…前期准备安装kafka
启动Kafka本地环境需Java 8以上
Kafka是一种高吞吐量的分布式发布订阅消息系统它可以处理消费者在网站中的所有动作流数据。
Kafka启动方式有Zookeeper和Kraft两种方式只能选择其中一种启动不能同时使用。
Kafka下载https://downloads.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz 解压tar -xzf kafka_2.13-3.7.0.tgz 一、Zookeeper启动Kafkakafka内置zookeeper
Kafka依赖Zookeeper
1、启动Zookeeper 2、启动Kafka
使用kafka自带Zookeeper启动
./zookeeper-server-start.sh ../config/zookeeper.properties
./zookeeper-server-stop.sh ../config/zookeeper.properties ./kafka-server-start.sh ../config/server.properties ./kafka-server-stop.sh ../config/server.properties
二、Zookeeper服务器启动Kafka
Zookeeper服务器安装
https://zookeeper.apache.org/ https://dlcdn.apache.org/zookeeper/zookeeper-3.9.2/apache-zookeeper-3.9.2-bin.tar.gz
tar zxvf apache-zookeeper-3.9.2-bin.tar.gz 配置Zookeeper服务器
cp zoo_sample.cfg zoo.cfg 启动Zookeeper服务器 ./zkServer.sh start
修改Zookeeper端口 Zoo.cfg添加内容
admin.serverPort8099
apache-zookeeper-3.9.2-bin/bin目录下重启Zookeeper Zookeeper服务器启动kafka
/opt/kafka_2.13-3.7.0/bin目录下
./kafka-server-start.sh ../config/server.properties Kafka配置文件server.properties 三、使用KRaft启动Kafka UUID通用唯一识别码Universally Unique Identifier
1、生成Cluster UUID(集群UUID)./kafka-storage.sh random-uuid 2.格式化kafka日志目录./kafka-storage.sh format -t 3pMJGNJcT0uLIBsZhbucjQ -c ../config/kraft/server.properties 3.启动kafka./kafka-server-start.sh ../config/kraft/server.properties springboot集成kafka 创建topic时若不指定topic的分区(partition)数量使则默认为1个分区(partition) 修改server.properties文件
vim server.properties
listenersPLAINTEXT://0.0.0.0:9092 advertised.listenersPLAINTEXT://192.168.68.133:9092 springboot加入依赖kafka
dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId /dependency
加入spring-kafka依赖后springboot自动装配好kafkaTemplate的Bean application.yml配置连接kafka
spring: kafka: bootstrap-servers: 192.168.68.133:9092
生产者
发送消息 Resource private KafkaTemplateString,String kafkaTemplate; Test void kafkaSendTest(){ kafkaTemplate.send(kafkamsg01,hello kafka); } 消费者 接收消息 Component public class KafkaConsumer { KafkaListener(topics {kafkamsg01,test},groupId 123) public void consume(String message){ System.out.println(接收到消息message); } }
若没有配置groupid
Failed to start bean org.springframework.kafka.config.internalKafkaListenerEndpointRegistry; nested exception is java.lang.IllegalStateException: No group.id found in consumer config, container properties, or KafkaListener annotation; a group.id is required when group management is used. Component public class KafkaConsumer { KafkaListener(topics {kafkamsg01,test},groupId 123) public void consume(String message){ System.out.println(接收到消息message); } } 想从第一条消息开始读取(若同组的消费者已经消费过该主题并且kafka已经保存了该消费者组的偏移量则设置auto.offset.reset设置为earliest不生效需要手动修改偏移量或使用新的消费者组)
application.yml需要将auto.offset.reset设置为earliest
代码语言java
spring:kafka:bootstrap-servers: 192.168.68.133:9092consumer:auto-offset-reset: earliest Earliest:将偏移量重置为最早的偏移量
Latest: 将偏移量重置为最新的偏移量
None: 没有为消费者组找到以前的偏移量向消费者抛出异常
Exception: 向消费者抛出异常 脚本重置消费者组偏移量 ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 123 --topic kafkamsg01 --reset-offsets --to-earliest –execute
重置完成