当前位置: 首页 > news >正文

在阿里巴巴做网站多少钱Wordpress判断来路 显示

在阿里巴巴做网站多少钱,Wordpress判断来路 显示,单位微信公众号怎么创建,crm系统免费系列文章目录 文章目录 系列文章目录前言一、本文要点二、开发环境三、原项目四、修改项目五、测试一下五、小结 前言 本插件稳定运行上百个kafka项目#xff0c;每天处理上亿级的数据的精简小插件#xff0c;快速上手。 dependencygroupIdio.github.vipjo…系列文章目录 文章目录 系列文章目录前言一、本文要点二、开发环境三、原项目四、修改项目五、测试一下五、小结 前言 本插件稳定运行上百个kafka项目每天处理上亿级的数据的精简小插件快速上手。 dependencygroupIdio.github.vipjoey/groupIdartifactIdmulti-kafka-starter/artifactIdversion最新版本号/version /dependency例如下面这样简单的配置就完成SpringBoot和kafka的整合我们只需要关心com.mmc.multi.kafka.starter.OneProcessor和com.mmc.multi.kafka.starter.TwoProcessor 这两个Service的代码开发。 ## topic1的kafka配置 spring.kafka.one.enabledtrue spring.kafka.one.consumer.bootstrapServers${spring.embedded.kafka.brokers} spring.kafka.one.topicmmc-topic-one spring.kafka.one.group-idgroup-consumer-one spring.kafka.one.processorcom.mmc.multi.kafka.starter.OneProcessor // 业务处理类名称 spring.kafka.one.consumer.auto-offset-resetlatest spring.kafka.one.consumer.max-poll-records10 spring.kafka.one.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.one.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer## topic2的kafka配置 spring.kafka.two.enabledtrue spring.kafka.two.consumer.bootstrapServers${spring.embedded.kafka.brokers} spring.kafka.two.topicmmc-topic-two spring.kafka.two.group-idgroup-consumer-two spring.kafka.two.processorcom.mmc.multi.kafka.starter.TwoProcessor // 业务处理类名称 spring.kafka.two.consumer.auto-offset-resetlatest spring.kafka.two.consumer.max-poll-records10 spring.kafka.two.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.two.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer## pb 消息消费者 spring.kafka.pb.enabledtrue spring.kafka.pb.consumer.bootstrapServers${spring.embedded.kafka.brokers} spring.kafka.pb.topicmmc-topic-pb spring.kafka.pb.group-idgroup-consumer-pb spring.kafka.pb.processorpbProcessor spring.kafka.pb.consumer.auto-offset-resetlatest spring.kafka.pb.consumer.max-poll-records10 spring.kafka.pb.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.pb.consumer.value-deserializerorg.apache.kafka.common.serialization.ByteArrayDeserializer## kafka消息生产者 spring.kafka.four.enabledtrue spring.kafka.four.producer.namefourKafkaSender spring.kafka.four.producer.bootstrap-servers${spring.embedded.kafka.brokers} spring.kafka.four.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.four.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer 国籍惯例先上源码Github源码 一、本文要点 本文将介绍通过封装一个starter来实现多kafka数据源的配置通过通过源码可以学习以下特性。系列文章完整目录 SpringBoot 整合多个kafka数据源SpringBoot 批量消费kafka消息SpringBoot 优雅地启动或停止消费kafkaSpringBoot kafka本地单元测试免集群SpringBoot 利用map注入多份配置SpringBoot BeanPostProcessor 后置处理器使用方式SpringBoot 将自定义类注册到IOC容器SpringBoot 注入bean到自定义类成员变量Springboot 取消限定符SpringBoot 支持消费protobuf类型的kafka消息SpringBoot Aware设计模式SpringBoot 获取kafka消息中的topic、offset、partition、header等参数SpringBoot 使用任意生产者发送kafka消息SpringBoot 配置任意数量的kafka生产者 二、开发环境 jdk 1.8maven 3.6.2springboot 2.4.3kafka-client 2.6.6idea 2020 三、原项目 1、接前文我们基本完成了kafka consumer常用的特性开发有小伙伴问我们该如何配置多个数据源生产者想consumer一样简单发送kafka消息呢 ## 1.配置 spring.kafka.four.enabledtrue spring.kafka.four.producer.namefourKafkaSender spring.kafka.four.producer.bootstrap-servers${spring.embedded.kafka.brokers} spring.kafka.four.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.four.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer## 2.引用 Resource(name fourKafkaSender) private MmcKafkaMultiSender mmcKafkaMultiSender;## 3.使用 mmcKafkaMultiSender.sendStringMessage(topicOne, aaa, json); 答案是可以的、但我们要升级和优化一下。 四、修改项目 1、修改内部类MmcKafkaProperties类增加生产者相关的配置。 EqualsAndHashCode(callSuper true)Datapublic static class Producer extends KafkaProperties.Producer {/*** 是否启用.*/private boolean enabled true;/*** 生产者名称如果有设置则会覆盖默认的xxxKakfkaSender名称.*/private String name;}/*** 生产者.*/private final Producer producer new Producer();/*** Create an initial map of producer properties from the state of this instance.* p* This allows you to add additional properties, if necessary, and override the* default kafkaProducerFactory bean.** return the producer properties initialized with the customizations defined on this* instance*/MapString, Object buildProducerProperties() {return new HashMap(this.producer.buildProperties());}2、新增MmcKafkaSender接口作为发送Kafka消息的唯一约束。 public interface MmcKafkaSender {/*** 发送kafka消息.** param topic topic名称* param partitionKey 消息分区键* param message 具体消息*/void sendStringMessage(String topic, String partitionKey, String message);/*** 发送kafka消息.** param topic topic名称* param partitionKey 消息分区键* param message 具体消息*/void sendProtobufMessage(String topic, String partitionKey, byte[] message); } 3、新增MmcKafkaOutputContainer容器类用于存储所有生产者方便统一管理 Getter Slf4j public class MmcKafkaOutputContainer {/*** 存放所有生产者.*/private final MapString, MmcKafkaSender outputs;/*** 构造函数.*/public MmcKafkaOutputContainer(MapString, MmcKafkaSender outputs) {this.outputs outputs;}}4、新增MmcKafkaSingleSender实现类用于真实发送Kafka消息 public class MmcKafkaSingleSender implements MmcKafkaSender {private final KafkaTemplateString, Object template;public MmcKafkaSingleSender(KafkaTemplateString, Object template) {this.template template;}Overridepublic void sendStringMessage(String topic, String partitionKey, String message) {template.send(topic, partitionKey, message);}Overridepublic void sendProtobufMessage(String topic, String partitionKey, byte[] message) {template.send(topic, partitionKey, message);}}5、修改MmcMultiProducerAutoConfiguration配置类遍历所有配置组装并生成MmcKafkaSingleSender并注册到IOC容器 Slf4j Configuration EnableConfigurationProperties(MmcMultiKafkaProperties.class) ConditionalOnProperty(prefix spring.kafka, value enabled, matchIfMissing true) public class MmcMultiProducerAutoConfiguration implements BeanFactoryAware {private DefaultListableBeanFactory beanDefinitionRegistry;Resourceprivate MmcMultiKafkaProperties mmcMultiKafkaProperties;Beanpublic MmcKafkaOutputContainer mmcKafkaOutputContainer() {// 初始化一个存储所有生产者的哈希映射MapString, MmcKafkaSender outputs new HashMap();// 获取所有的Kafka配置信息MapString, MmcMultiKafkaProperties.MmcKafkaProperties kafkas mmcMultiKafkaProperties.getKafka();// 逐个遍历并生成producerfor (Map.EntryString, MmcMultiKafkaProperties.MmcKafkaProperties entry : kafkas.entrySet()) {// 唯一生产者名称String name entry.getKey();// 生产者配置MmcMultiKafkaProperties.MmcKafkaProperties properties entry.getValue();// 是否开启if (properties.isEnabled() properties.getProducer().isEnabled() CommonUtil.isNotEmpty(properties.getProducer().getBootstrapServers())) {// bean名称String beanName Optional.ofNullable(properties.getProducer().getName()).orElse(name KafkaSender);KafkaTemplateString, Object template mmcdKafkaTemplate(properties);// 创建实例MmcKafkaSender sender new MmcKafkaSingleSender(template);outputs.put(beanName, sender);// 注册到IOCbeanDefinitionRegistry.registerSingleton(beanName, sender);}}return new MmcKafkaOutputContainer(outputs);}private KafkaTemplateString, Object mmcdKafkaTemplate(MmcMultiKafkaProperties.MmcKafkaProperties producer) {return new KafkaTemplate(baseKafkaProducerFactory(producer));}private ProducerFactoryString, Object baseKafkaProducerFactory(MmcMultiKafkaProperties.MmcKafkaProperties producer) {return new DefaultKafkaProducerFactory(producer.buildProducerProperties());}Overridepublic void setBeanFactory(BeanFactory beanFactory) throws BeansException {this.beanDefinitionRegistry (DefaultListableBeanFactory) beanFactory;} }五、测试一下 1、引入kafka测试需要的jar。参考文章kafka单元测试 dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependencydependencygroupIdcom.google.protobuf/groupIdartifactIdprotobuf-java/artifactIdversion3.18.0/versionscopetest/scope/dependencydependencygroupIdcom.google.protobuf/groupIdartifactIdprotobuf-java-util/artifactIdversion3.18.0/versionscopetest/scope/dependency2、消费者配置保持不变增加生产者配置。 ## json消息消费者 spring.kafka.one.enabledtrue spring.kafka.one.consumer.bootstrapServers${spring.embedded.kafka.brokers} spring.kafka.one.topicmmc-topic-one spring.kafka.one.group-idgroup-consumer-one spring.kafka.one.processoroneProcessor spring.kafka.one.duplicatefalse spring.kafka.one.snakeCasefalse spring.kafka.one.consumer.auto-offset-resetlatest spring.kafka.one.consumer.max-poll-records10 spring.kafka.one.consumer.value-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.one.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.one.container.threshold2 spring.kafka.one.container.rate1000 spring.kafka.one.container.parallelism8## json消息生产者 spring.kafka.four.enabledtrue spring.kafka.four.producer.namefourKafkaSender spring.kafka.four.producer.bootstrap-servers${spring.embedded.kafka.brokers} spring.kafka.four.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.four.producer.value-serializerorg.apache.kafka.common.serialization.StringSerializer 3、编写测试类。 Slf4j ActiveProfiles(dev) ExtendWith(SpringExtension.class) SpringBootTest(classes {MmcMultiProducerAutoConfiguration.class, MmcMultiConsumerAutoConfiguration.class,DemoService.class, OneProcessor.class}) TestPropertySource(value classpath:application-string.properties) DirtiesContext EmbeddedKafka(partitions 1, brokerProperties {listenersPLAINTEXT://localhost:9092, port9092},topics {${spring.kafka.one.topic}}) class KafkaStringMessageTest {Value(${spring.kafka.one.topic})private String topicOne;Value(${spring.kafka.two.topic})private String topicTwo;Resource(name fourKafkaSender)private MmcKafkaSingleSender mmcKafkaSingleSender;Testvoid testDealMessage() throws Exception {Thread.sleep(2 * 1000);// 模拟生产数据produceMessage();Thread.sleep(10 * 1000);}void produceMessage() {for (int i 0; i 10; i) {DemoMsg msg new DemoMsg();msg.setRoutekey(routekey i);msg.setName(name i);msg.setTimestamp(System.currentTimeMillis());String json JsonUtil.toJsonStr(msg);mmcKafkaSingleSender.sendStringMessage(topicOne, aaa, json);}} } 5、运行一下测试通过可以看到能正常发送消息和消费。 五、小结 将本项目代码构建成starter就可以大大提升我们开发效率我们只需要关心业务代码的开发github项目源码轻触这里。如果对你有用可以打个星星哦。下一篇升级本starter在kafka单分区下实现十万级消费处理速度。 《搭建大型分布式服务三十六SpringBoot 零代码方式整合多个kafka数据源》 《搭建大型分布式服务三十七SpringBoot 整合多个kafka数据源-取消限定符》 《搭建大型分布式服务三十八SpringBoot 整合多个kafka数据源-支持protobuf》 《搭建大型分布式服务三十九SpringBoot 整合多个kafka数据源-支持Aware模式》 搭建大型分布式服务四十SpringBoot 整合多个kafka数据源-支持生产者 搭建大型分布式服务四十一SpringBoot 整合多个kafka数据源-支持亿级消息生产者 加我加群一起交流学习更多干货下载、项目源码和大厂内推等着你
http://www.pierceye.com/news/450914/

相关文章:

  • 网站关键词怎么快速上排名wordpress极慢
  • 摄影网站建站wordpress怎么改密码
  • 旅游网站制作过程百度收录查询入口
  • 简述企业网站建设的流程网站建设的需求分析报告
  • 做网络课程的网站聚美优品网站建设分析
  • 网站建设公司简介wordpress注册按钮
  • 网站的栏目建设在哪里惠州网
  • 免费建站模板哪个好核酸造假7人枪毙视频
  • 一手房哪个网站做信息效果好微信小程序打不开
  • 建设网站图片素材包头怎样做网站
  • 网站内容建设与管理90设计app下载
  • 怎么做优惠卷网站公司做网站大概多少钱
  • 哪些网站是单页面应用程序在线做网站流程
  • 公司网站设计维护官方网站建设需要做哪些东西
  • 网站被k还能不能在百度做推广wordpress主题网址导航葬爱
  • 成都网站制作和建设辽阳北京网站建设
  • 合肥金融网站设计网页制作工具分哪两类
  • 专业营销型网站定制wordpress菜单绑定模板
  • 网站建设公司找哪家好石家庄网站改版
  • 建立一个网站要多久网页界面ps制作步骤
  • 珠海网站建设费用自己做网站切入地图
  • 个人在线视频播放网站搭建软件属于网站开发吗
  • 小米的企业网站建设思路c2c的网站
  • 网站设计书籍做网站的基础
  • 买下云服务器怎么做网站官方网站怎么查询
  • 手机版企业网站php西宁做网站公司排名
  • 微网站如何做推广做淘宝客网站需要备案吗
  • 天津网站制作重点windows与wordpress
  • 可以查企业备案的网站吗佛山住房和城乡建设部网站官网
  • 和初中生做视频网站怎么进入追信魔盒网站开发软件