在阿里巴巴做网站多少钱,Wordpress判断来路 显示,单位微信公众号怎么创建,crm系统免费系列文章目录 文章目录 系列文章目录前言一、本文要点二、开发环境三、原项目四、修改项目五、测试一下五、小结 前言
本插件稳定运行上百个kafka项目#xff0c;每天处理上亿级的数据的精简小插件#xff0c;快速上手。
dependencygroupIdio.github.vipjo…系列文章目录 文章目录 系列文章目录前言一、本文要点二、开发环境三、原项目四、修改项目五、测试一下五、小结 前言
本插件稳定运行上百个kafka项目每天处理上亿级的数据的精简小插件快速上手。
dependencygroupIdio.github.vipjoey/groupIdartifactIdmulti-kafka-starter/artifactIdversion最新版本号/version
/dependency例如下面这样简单的配置就完成SpringBoot和kafka的整合我们只需要关心com.mmc.multi.kafka.starter.OneProcessor和com.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。
## topic1的kafka配置
spring.kafka.one.enabledtrue
spring.kafka.one.consumer.bootstrapServers${spring.embedded.kafka.brokers}
spring.kafka.one.topicmmc-topic-one
spring.kafka.one.group-idgroup-consumer-one
spring.kafka.one.processorcom.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称
spring.kafka.one.consumer.auto-offset-resetlatest
spring.kafka.one.consumer.max-poll-records10
spring.kafka.one.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer## topic2的kafka配置
spring.kafka.two.enabledtrue
spring.kafka.two.consumer.bootstrapServers${spring.embedded.kafka.brokers}
spring.kafka.two.topicmmc-topic-two
spring.kafka.two.group-idgroup-consumer-two
spring.kafka.two.processorcom.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称
spring.kafka.two.consumer.auto-offset-resetlatest
spring.kafka.two.consumer.max-poll-records10
spring.kafka.two.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer## pb 消息消费者
spring.kafka.pb.enabledtrue
spring.kafka.pb.consumer.bootstrapServers${spring.embedded.kafka.brokers}
spring.kafka.pb.topicmmc-topic-pb
spring.kafka.pb.group-idgroup-consumer-pb
spring.kafka.pb.processorpbProcessor
spring.kafka.pb.consumer.auto-offset-resetlatest
spring.kafka.pb.consumer.max-poll-records10
spring.kafka.pb.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializerorg.apache.kafka.common.serialization.ByteArrayDeserializer## kafka消息生产者
spring.kafka.four.enabledtrue
spring.kafka.four.producer.namefourKafkaSender
spring.kafka.four.producer.bootstrap-servers${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer
国籍惯例先上源码Github源码
一、本文要点
本文将介绍通过封装一个starter来实现多kafka数据源的配置通过通过源码可以学习以下特性。系列文章完整目录
SpringBoot 整合多个kafka数据源SpringBoot 批量消费kafka消息SpringBoot 优雅地启动或停止消费kafkaSpringBoot kafka本地单元测试免集群SpringBoot 利用map注入多份配置SpringBoot BeanPostProcessor 后置处理器使用方式SpringBoot 将自定义类注册到IOC容器SpringBoot 注入bean到自定义类成员变量Springboot 取消限定符SpringBoot 支持消费protobuf类型的kafka消息SpringBoot Aware设计模式SpringBoot 获取kafka消息中的topic、offset、partition、header等参数SpringBoot 使用任意生产者发送kafka消息SpringBoot 配置任意数量的kafka生产者
二、开发环境
jdk 1.8maven 3.6.2springboot 2.4.3kafka-client 2.6.6idea 2020
三、原项目
1、接前文我们基本完成了kafka consumer常用的特性开发有小伙伴问我们该如何配置多个数据源生产者想consumer一样简单发送kafka消息呢 ## 1.配置
spring.kafka.four.enabledtrue
spring.kafka.four.producer.namefourKafkaSender
spring.kafka.four.producer.bootstrap-servers${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer## 2.引用
Resource(name fourKafkaSender)
private MmcKafkaMultiSender mmcKafkaMultiSender;## 3.使用
mmcKafkaMultiSender.sendStringMessage(topicOne, aaa, json);
答案是可以的、但我们要升级和优化一下。
四、修改项目
1、修改内部类MmcKafkaProperties类增加生产者相关的配置。 EqualsAndHashCode(callSuper true)Datapublic static class Producer extends KafkaProperties.Producer {/*** 是否启用.*/private boolean enabled true;/*** 生产者名称如果有设置则会覆盖默认的xxxKakfkaSender名称.*/private String name;}/*** 生产者.*/private final Producer producer new Producer();/*** Create an initial map of producer properties from the state of this instance.* p* This allows you to add additional properties, if necessary, and override the* default kafkaProducerFactory bean.** return the producer properties initialized with the customizations defined on this* instance*/MapString, Object buildProducerProperties() {return new HashMap(this.producer.buildProperties());}2、新增MmcKafkaSender接口作为发送Kafka消息的唯一约束。
public interface MmcKafkaSender {/*** 发送kafka消息.** param topic topic名称* param partitionKey 消息分区键* param message 具体消息*/void sendStringMessage(String topic, String partitionKey, String message);/*** 发送kafka消息.** param topic topic名称* param partitionKey 消息分区键* param message 具体消息*/void sendProtobufMessage(String topic, String partitionKey, byte[] message);
}
3、新增MmcKafkaOutputContainer容器类用于存储所有生产者方便统一管理
Getter
Slf4j
public class MmcKafkaOutputContainer {/*** 存放所有生产者.*/private final MapString, MmcKafkaSender outputs;/*** 构造函数.*/public MmcKafkaOutputContainer(MapString, MmcKafkaSender outputs) {this.outputs outputs;}}4、新增MmcKafkaSingleSender实现类用于真实发送Kafka消息
public class MmcKafkaSingleSender implements MmcKafkaSender {private final KafkaTemplateString, Object template;public MmcKafkaSingleSender(KafkaTemplateString, Object template) {this.template template;}Overridepublic void sendStringMessage(String topic, String partitionKey, String message) {template.send(topic, partitionKey, message);}Overridepublic void sendProtobufMessage(String topic, String partitionKey, byte[] message) {template.send(topic, partitionKey, message);}}5、修改MmcMultiProducerAutoConfiguration配置类遍历所有配置组装并生成MmcKafkaSingleSender并注册到IOC容器 Slf4j
Configuration
EnableConfigurationProperties(MmcMultiKafkaProperties.class)
ConditionalOnProperty(prefix spring.kafka, value enabled, matchIfMissing true)
public class MmcMultiProducerAutoConfiguration implements BeanFactoryAware {private DefaultListableBeanFactory beanDefinitionRegistry;Resourceprivate MmcMultiKafkaProperties mmcMultiKafkaProperties;Beanpublic MmcKafkaOutputContainer mmcKafkaOutputContainer() {// 初始化一个存储所有生产者的哈希映射MapString, MmcKafkaSender outputs new HashMap();// 获取所有的Kafka配置信息MapString, MmcMultiKafkaProperties.MmcKafkaProperties kafkas mmcMultiKafkaProperties.getKafka();// 逐个遍历并生成producerfor (Map.EntryString, MmcMultiKafkaProperties.MmcKafkaProperties entry : kafkas.entrySet()) {// 唯一生产者名称String name entry.getKey();// 生产者配置MmcMultiKafkaProperties.MmcKafkaProperties properties entry.getValue();// 是否开启if (properties.isEnabled() properties.getProducer().isEnabled() CommonUtil.isNotEmpty(properties.getProducer().getBootstrapServers())) {// bean名称String beanName Optional.ofNullable(properties.getProducer().getName()).orElse(name KafkaSender);KafkaTemplateString, Object template mmcdKafkaTemplate(properties);// 创建实例MmcKafkaSender sender new MmcKafkaSingleSender(template);outputs.put(beanName, sender);// 注册到IOCbeanDefinitionRegistry.registerSingleton(beanName, sender);}}return new MmcKafkaOutputContainer(outputs);}private KafkaTemplateString, Object mmcdKafkaTemplate(MmcMultiKafkaProperties.MmcKafkaProperties producer) {return new KafkaTemplate(baseKafkaProducerFactory(producer));}private ProducerFactoryString, Object baseKafkaProducerFactory(MmcMultiKafkaProperties.MmcKafkaProperties producer) {return new DefaultKafkaProducerFactory(producer.buildProducerProperties());}Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {this.beanDefinitionRegistry (DefaultListableBeanFactory) beanFactory;}
}五、测试一下
1、引入kafka测试需要的jar。参考文章kafka单元测试 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.google.protobuf/groupIdartifactIdprotobuf-java/artifactIdversion3.18.0/versionscopetest/scope/dependencydependencygroupIdcom.google.protobuf/groupIdartifactIdprotobuf-java-util/artifactIdversion3.18.0/versionscopetest/scope/dependency2、消费者配置保持不变增加生产者配置。
## json消息消费者
spring.kafka.one.enabledtrue
spring.kafka.one.consumer.bootstrapServers${spring.embedded.kafka.brokers}
spring.kafka.one.topicmmc-topic-one
spring.kafka.one.group-idgroup-consumer-one
spring.kafka.one.processoroneProcessor
spring.kafka.one.duplicatefalse
spring.kafka.one.snakeCasefalse
spring.kafka.one.consumer.auto-offset-resetlatest
spring.kafka.one.consumer.max-poll-records10
spring.kafka.one.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.container.threshold2
spring.kafka.one.container.rate1000
spring.kafka.one.container.parallelism8## json消息生产者
spring.kafka.four.enabledtrue
spring.kafka.four.producer.namefourKafkaSender
spring.kafka.four.producer.bootstrap-servers${spring.embedded.kafka.brokers}
spring.kafka.four.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer
3、编写测试类。
Slf4j
ActiveProfiles(dev)
ExtendWith(SpringExtension.class)
SpringBootTest(classes {MmcMultiProducerAutoConfiguration.class, MmcMultiConsumerAutoConfiguration.class,DemoService.class, OneProcessor.class})
TestPropertySource(value classpath:application-string.properties)
DirtiesContext
EmbeddedKafka(partitions 1, brokerProperties {listenersPLAINTEXT://localhost:9092, port9092},topics {${spring.kafka.one.topic}})
class KafkaStringMessageTest {Value(${spring.kafka.one.topic})private String topicOne;Value(${spring.kafka.two.topic})private String topicTwo;Resource(name fourKafkaSender)private MmcKafkaSingleSender mmcKafkaSingleSender;Testvoid testDealMessage() throws Exception {Thread.sleep(2 * 1000);// 模拟生产数据produceMessage();Thread.sleep(10 * 1000);}void produceMessage() {for (int i 0; i 10; i) {DemoMsg msg new DemoMsg();msg.setRoutekey(routekey i);msg.setName(name i);msg.setTimestamp(System.currentTimeMillis());String json JsonUtil.toJsonStr(msg);mmcKafkaSingleSender.sendStringMessage(topicOne, aaa, json);}}
}
5、运行一下测试通过可以看到能正常发送消息和消费。
五、小结
将本项目代码构建成starter就可以大大提升我们开发效率我们只需要关心业务代码的开发github项目源码轻触这里。如果对你有用可以打个星星哦。下一篇升级本starter在kafka单分区下实现十万级消费处理速度。
《搭建大型分布式服务三十六SpringBoot 零代码方式整合多个kafka数据源》 《搭建大型分布式服务三十七SpringBoot 整合多个kafka数据源-取消限定符》 《搭建大型分布式服务三十八SpringBoot 整合多个kafka数据源-支持protobuf》 《搭建大型分布式服务三十九SpringBoot 整合多个kafka数据源-支持Aware模式》 搭建大型分布式服务四十SpringBoot 整合多个kafka数据源-支持生产者 搭建大型分布式服务四十一SpringBoot 整合多个kafka数据源-支持亿级消息生产者
加我加群一起交流学习更多干货下载、项目源码和大厂内推等着你