怎样让网站显示网站建设中,哈尔滨市建设工程交易,做个简单的导航网站,重庆市建设工程质量信息网SpringBoot 多组 Kafka 配置
单组 Kafka 配置
时隔多日#xff0c;冒个泡吧。
场景 是 我在日常的开发过程中需要监听 kafka 的消息进行回调处理#xff0c;但是呢#xff0c;不同的三方服务他们用了不同的 kafka 集群#xff0c;那么默认的 Spring 自动读取的 kafka 配…SpringBoot 多组 Kafka 配置
单组 Kafka 配置
时隔多日冒个泡吧。
场景 是 我在日常的开发过程中需要监听 kafka 的消息进行回调处理但是呢不同的三方服务他们用了不同的 kafka 集群那么默认的 Spring 自动读取的 kafka 配置就不行了它默认只支持一组那么就需要单独进行多组配置。
先说单组配置的场景只需要在你的 yml 里增加配置
spring:kafka:bootstrap-servers: 192.168.25.11:9092,192.168.25.22:9092properties:security.protocol: SASL_PLAINTEXTsasl.mechanism: SCRAM-SHA-256sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required usernameadmin passwordadmin;topic: your-topic里面 security 和 sasl 用于鉴权
然后你就可以直接写个 consumer 来接受消息了
Component
Slf4j
public class Consumer {KafkaListener(topics ${spring.kafka.topic}, groupId your-group)public void consumeMsgLog(ConsumerRecord?, ? record) {// do everything}}在上述配置和代码示例中groupId 是 Kafka 消费者组的标识符它在 Kafka 架构中起到了关键的角色。让我解释一下 groupId 在 Kafka 架构设计中的作用
Kafka 消费者组Kafka 消费者组是一组 Kafka 消费者的逻辑集合它们共同订阅一个或多个 Kafka 主题。消费者组中的每个消费者可以独立处理主题中的消息而消费者组协调消息的分配和处理。消息分发Kafka主题中的每个分区中的消息可以被同一个消费者组的一个消费者处理。groupId 用于将消费者组中的消费者分配到分区以确保消息被均匀地分发。这意味着每个分区的消息只能被消费者组中的一个消费者处理。Offset 管理groupId 还用于管理消息偏移量offsets。每个分区的消息都有一个偏移量用于跟踪已处理的消息。Kafka维护每个消费者组的每个分区的偏移量以确保消息不会被重复处理。这使得每个消费者组可以在不同时间点开始处理消息并且不会丢失已处理的消息。水平伸缩groupId 允许消费者组进行水平伸缩。您可以添加或删除消费者而不会破坏分配的消息负载均衡。Kafka会根据消费者组的大小自动重新分配分区。
也就是说一个主题中的消息可以被多个消费者组消费但是不能被同一个消费者组的多个消费者消费
在Kafka中消费者组不需要显式地创建。当您的消费者开始订阅特定的主题时如果指定了相同的 groupIdKafka 会自动将这些消费者视为同一个消费者组。这意味着只要您在消费者配置中指定了相同的 groupIdKafka 就会自动将它们分配到同一个消费者组。
如果指定了不同的 groupIdKafka 将把它们视为不同的消费者组并且这些消费者组会独立地消费相同或不同的主题中的消息。
做了一些小小的铺垫让我们进入正题
多组 Kafka 配置
Spring Kafka 提供了 ConcurrentKafkaListenerContainerFactory 以支持同时监听多个不同的 Kafka 集群或主题。可以为每个不同的 Kafka 集群或主题配置不同的 ConcurrentKafkaListenerContainerFactory 实例以满足多组消费者需求。
所以就是我们自己定义加载配置而不是使用 Spring Boot 默认的预留配置。
那比如我有两组 Kafka 集群为了省事第一组我就用默认的而另一组单独设置一组然后进行ConcurrentKafkaListenerContainerFactory 的定制化注入
Slf4j
Configuration
public class KafkaConfiguration {Value(${kafka.sec-kafka.consumer.bootstrap-servers:192.168.25.22:9092})private String servers;Value(${spring.kafka.properties.sasl.jaas.config})private String jaasConfig;Beanpublic ConsumerFactoryString, String secKafkaConsumerFactory() {MapString, Object consumerProps new HashMap();consumerProps.put(bootstrap.servers, servers);consumerProps.put(group.id, your-group);consumerProps.put(enable.auto.commit, true);consumerProps.put(auto.commit.interval.ms, 2000);consumerProps.put(key.deserializer, StringDeserializer.class);consumerProps.put(value.deserializer, StringDeserializer.class);consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);// 鉴权相关配置consumerProps.put(security.protocol, SASL_PLAINTEXT);consumerProps.put(sasl.mechanism, SCRAM-SHA-256);consumerProps.put(sasl.jaas.config, jaasConfig);return new DefaultKafkaConsumerFactory(consumerProps);}Beanpublic ConcurrentKafkaListenerContainerFactoryString, String secKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryString, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(secKafkaConsumerFactory());return factory;}}解释一下配置 bootstrap.servers指定了 Kafka 服务器的地址和端口这是连接到 Kafka 集群的入口点。 group.id指定了消费者所属的消费者组的标识符。Kafka 使用消费者组来协调消息分发确保消息被均匀分发给消费者。 enable.auto.commit指定是否启用自动提交偏移量。如果设置为 “true”Kafka 消费者会自动定期提交偏移量以记录已经处理的消息。如果设置为 “false”您需要手动管理偏移量。 auto.commit.interval.ms如果启用了自动提交这个参数指定了自动提交偏移量的时间间隔以毫秒为单位。 key.deserializer 和 value.deserializer指定用于反序列化消息键和值的反序列化器类。在这种情况下它们都设置为 StringDeserializer.class表示消息键和值都被视为字符串。 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG指定了当消费者启动时或者偏移量丢失时如何处理消息的偏移量。“earliest” 表示从最早的可用消息开始处理“latest” 表示从最新的消息开始处理。 鉴权相关配置SASL这些配置用于设置 Kafka 消费者与 Kafka 集群之间的安全通信和身份验证。这包括 security.protocol、sasl.mechanism 和 sasl.jaas.config。它们指定了使用 SASL 加密和身份验证的方式以及相应的配置信息。jaasConfig 包含了 SASL 配置的详细信息。
这些属性是 Kafka 消费者连接和配置的关键部分它们确保了消费者可以连接到 Kafka 集群并以安全的方式处理消息
而这个时候你的 Consumer只需要在注解里多一个配置 containerFactory
Component
Slf4j
public class SecConsumer {KafkaListener(topics ${kafka.topic}, groupId your-group containerFactory secKafkaListenerContainerFactory)public void consumeMsgLog(ConsumerRecord?, ? record) {// do everything}}更多使用方法可以参考官方文档 Spring for Kafka