当前位置: 首页 > news >正文

怎样让网站显示网站建设中哈尔滨市建设工程交易

怎样让网站显示网站建设中,哈尔滨市建设工程交易,做个简单的导航网站,重庆市建设工程质量信息网SpringBoot 多组 Kafka 配置 单组 Kafka 配置 时隔多日#xff0c;冒个泡吧。 场景 是 我在日常的开发过程中需要监听 kafka 的消息进行回调处理#xff0c;但是呢#xff0c;不同的三方服务他们用了不同的 kafka 集群#xff0c;那么默认的 Spring 自动读取的 kafka 配…SpringBoot 多组 Kafka 配置 单组 Kafka 配置 时隔多日冒个泡吧。 场景 是 我在日常的开发过程中需要监听 kafka 的消息进行回调处理但是呢不同的三方服务他们用了不同的 kafka 集群那么默认的 Spring 自动读取的 kafka 配置就不行了它默认只支持一组那么就需要单独进行多组配置。 先说单组配置的场景只需要在你的 yml 里增加配置 spring:kafka:bootstrap-servers: 192.168.25.11:9092,192.168.25.22:9092properties:security.protocol: SASL_PLAINTEXTsasl.mechanism: SCRAM-SHA-256sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required usernameadmin passwordadmin;topic: your-topic里面 security 和 sasl 用于鉴权 然后你就可以直接写个 consumer 来接受消息了 Component Slf4j public class Consumer {KafkaListener(topics ${spring.kafka.topic}, groupId your-group)public void consumeMsgLog(ConsumerRecord?, ? record) {// do everything}}在上述配置和代码示例中groupId 是 Kafka 消费者组的标识符它在 Kafka 架构中起到了关键的角色。让我解释一下 groupId 在 Kafka 架构设计中的作用 Kafka 消费者组Kafka 消费者组是一组 Kafka 消费者的逻辑集合它们共同订阅一个或多个 Kafka 主题。消费者组中的每个消费者可以独立处理主题中的消息而消费者组协调消息的分配和处理。消息分发Kafka主题中的每个分区中的消息可以被同一个消费者组的一个消费者处理。groupId 用于将消费者组中的消费者分配到分区以确保消息被均匀地分发。这意味着每个分区的消息只能被消费者组中的一个消费者处理。Offset 管理groupId 还用于管理消息偏移量offsets。每个分区的消息都有一个偏移量用于跟踪已处理的消息。Kafka维护每个消费者组的每个分区的偏移量以确保消息不会被重复处理。这使得每个消费者组可以在不同时间点开始处理消息并且不会丢失已处理的消息。水平伸缩groupId 允许消费者组进行水平伸缩。您可以添加或删除消费者而不会破坏分配的消息负载均衡。Kafka会根据消费者组的大小自动重新分配分区。 也就是说一个主题中的消息可以被多个消费者组消费但是不能被同一个消费者组的多个消费者消费 在Kafka中消费者组不需要显式地创建。当您的消费者开始订阅特定的主题时如果指定了相同的 groupIdKafka 会自动将这些消费者视为同一个消费者组。这意味着只要您在消费者配置中指定了相同的 groupIdKafka 就会自动将它们分配到同一个消费者组。 如果指定了不同的 groupIdKafka 将把它们视为不同的消费者组并且这些消费者组会独立地消费相同或不同的主题中的消息。 做了一些小小的铺垫让我们进入正题 多组 Kafka 配置 Spring Kafka 提供了 ConcurrentKafkaListenerContainerFactory 以支持同时监听多个不同的 Kafka 集群或主题。可以为每个不同的 Kafka 集群或主题配置不同的 ConcurrentKafkaListenerContainerFactory 实例以满足多组消费者需求。 所以就是我们自己定义加载配置而不是使用 Spring Boot 默认的预留配置。 那比如我有两组 Kafka 集群为了省事第一组我就用默认的而另一组单独设置一组然后进行ConcurrentKafkaListenerContainerFactory 的定制化注入 Slf4j Configuration public class KafkaConfiguration {Value(${kafka.sec-kafka.consumer.bootstrap-servers:192.168.25.22:9092})private String servers;Value(${spring.kafka.properties.sasl.jaas.config})private String jaasConfig;Beanpublic ConsumerFactoryString, String secKafkaConsumerFactory() {MapString, Object consumerProps new HashMap();consumerProps.put(bootstrap.servers, servers);consumerProps.put(group.id, your-group);consumerProps.put(enable.auto.commit, true);consumerProps.put(auto.commit.interval.ms, 2000);consumerProps.put(key.deserializer, StringDeserializer.class);consumerProps.put(value.deserializer, StringDeserializer.class);consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);// 鉴权相关配置consumerProps.put(security.protocol, SASL_PLAINTEXT);consumerProps.put(sasl.mechanism, SCRAM-SHA-256);consumerProps.put(sasl.jaas.config, jaasConfig);return new DefaultKafkaConsumerFactory(consumerProps);}Beanpublic ConcurrentKafkaListenerContainerFactoryString, String secKafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactoryString, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(secKafkaConsumerFactory());return factory;}}解释一下配置 bootstrap.servers指定了 Kafka 服务器的地址和端口这是连接到 Kafka 集群的入口点。 group.id指定了消费者所属的消费者组的标识符。Kafka 使用消费者组来协调消息分发确保消息被均匀分发给消费者。 enable.auto.commit指定是否启用自动提交偏移量。如果设置为 “true”Kafka 消费者会自动定期提交偏移量以记录已经处理的消息。如果设置为 “false”您需要手动管理偏移量。 auto.commit.interval.ms如果启用了自动提交这个参数指定了自动提交偏移量的时间间隔以毫秒为单位。 key.deserializer 和 value.deserializer指定用于反序列化消息键和值的反序列化器类。在这种情况下它们都设置为 StringDeserializer.class表示消息键和值都被视为字符串。 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG指定了当消费者启动时或者偏移量丢失时如何处理消息的偏移量。“earliest” 表示从最早的可用消息开始处理“latest” 表示从最新的消息开始处理。 鉴权相关配置SASL这些配置用于设置 Kafka 消费者与 Kafka 集群之间的安全通信和身份验证。这包括 security.protocol、sasl.mechanism 和 sasl.jaas.config。它们指定了使用 SASL 加密和身份验证的方式以及相应的配置信息。jaasConfig 包含了 SASL 配置的详细信息。 这些属性是 Kafka 消费者连接和配置的关键部分它们确保了消费者可以连接到 Kafka 集群并以安全的方式处理消息 而这个时候你的 Consumer只需要在注解里多一个配置 containerFactory Component Slf4j public class SecConsumer {KafkaListener(topics ${kafka.topic}, groupId your-group containerFactory secKafkaListenerContainerFactory)public void consumeMsgLog(ConsumerRecord?, ? record) {// do everything}}更多使用方法可以参考官方文档 Spring for Kafka
http://www.pierceye.com/news/787867/

相关文章:

  • 三河市建设厅公示网站个人flash网站
  • 建设网站工具上海网站制作团队
  • 化妆品网站系统规划wordpress 站群软件
  • 深圳低价做网站广告免费设计在线生成
  • 网站服务体系网站开发补充合同范本
  • 萝岗做网站网站优化大计
  • 服装店网站模板北京网站设计公司哪个好
  • 网站运维工作内容网页设计与制作课程小结
  • 2019网站怎么做网站快速备案公司
  • 上饶网站制作专业网站设计如何提升网页品质
  • 哈尔滨微信网站建设学网站设计和平面设计
  • 网站开发公司网站官网焦作建设企业网站公司
  • 设备上哪个网站做外贸推广php版本不同于wordpress使用
  • 虚拟服务器怎样做网站广州 骏域网站建设专家
  • 谁有做任务网站色多多导入百媚导航
  • 做网站怎么加视频素材网免费
  • 想做棋牌网站怎么做做电商的进货网站
  • 做微信小程序和做网站南昌网站备案
  • 好的摄影网站推荐抖音点赞自助网站
  • 能够做代理的网站有哪些问题朝阳区住房和城乡建设部网站
  • 网站建设与管理考察报告中国农业建设信息网站
  • 张家界做网站中天建设集团有限公司怎么样
  • 广州网站百度排名推广聊天代理分销系统
  • 全球采购网站有哪些网站平台
  • wordpress怎么做商城网站软件工程师证书含金量
  • 锡林浩特建设局网站推广方法有哪几种
  • 汉南城乡建设局网站活动页面设计
  • 滕州网站搜索引擎优化新浪企业邮箱
  • 涿州网站建设珠海网站制作计划
  • 摄影网站设计思想wordpress 同步插件