当前位置: 首页 > news >正文

凡网站创建高校网站建设说明书

凡网站创建,高校网站建设说明书,广州营销策划公司排行,衡水搜索引擎优化Kafka最重要的功能之一是实现消息的负载平衡#xff0c;并保证分布式集群中的排序#xff0c;否则传统队列中将无法实现。 首先让我们尝试了解问题陈述 让我们假设我们有一个主题#xff0c;其中发送消息#xff0c;并且有一个消费者正在使用这些消息。 如果只有一个使用… Kafka最重要的功能之一是实现消息的负载平衡并保证分布式集群中的排序否则传统队列中将无法实现。 首先让我们尝试了解问题陈述 让我们假设我们有一个主题其中发送消息并且有一个消费者正在使用这些消息。 如果只有一个使用者它将按消息在队列中的顺序或发送的顺序接收消息。 现在为了获得更高的性能我们需要更快地处理消息因此我们引入了消费者应用程序的多个实例。 如果消息包含任何状态则将导致问题。 让我们尝试通过一个例子来理解这一点 如果对于特定的消息ID我们有3个事件 第一创建 第二更新 第三删除 我们要求仅在消息的“创建”事件之后才处理消息的“更新”或“删除”事件。 现在如果两个单独的实例几乎同时获得相同消息的“ CREATE”和“ UPDATE”则即使另一个实例完成“ CREATE”消息之前带有“ UPDATE”消息的实例仍有机会尝试对其进行处理。 。 这可能是一个问题因为使用者将尝试更新尚未创建的消息并且将引发异常并且此“更新”可能会丢失。 可能的解决方案 我想到的第一个解决方案是对数据库的乐观锁定这可以防止这种情况但是随后需要适应异常情况。 这不是一个非常简单的方法可能涉及更多的锁定和要处理的并发问题。 另一个更简单的解决方案是如果特定ID的消息/事件总是转到特定实例因此它们将是有序的。 在这种情况下CREATE将始终在UPDATE之前执行因为这是发送它们的原始顺序。 这就是卡夫卡派上用场的地方。 Kafka在主题内具有“分区”的概念该概念既可以提供订购保证又可以在整个消费者流程中提供负载平衡。 每个分区都是有序的不可变的消息序列该消息序列被连续附加到提交日志中。 分区中的每个消息均分配有一个顺序ID号称为偏移量它唯一地标识分区中的每个消息。 因此一个主题将具有多个分区每个分区都保持自己的偏移量。 现在要确保将具有特定id的事件始终转到特定实例可以执行以下操作如果将每个使用者与特定分区绑定然后确保具有特定id的所有事件和消息始终转到特定实例则可以完成此操作。特定分区因此它们始终由同一使用者实例使用。 为了实现此分区Kafka客户端API为我们提供了两种方法 1定义用于分区的键该键将用作默认分区逻辑的键。 2编写一个Partitioning类来定义我们自己的分区逻辑。 让我们探索第一个 默认分区逻辑 默认的分区策略是hash(key)%numPartitions 。 如果键为null则选择一个随机分区。 所以如果我们要为分区键是一个特定属性我们需要将它传递在ProducerRecord构造而从发送消息Producer 。 让我们来看一个例子 注意要运行此示例我们需要具备以下条件 1.运行Zookeeper在localhost2181 2.运行Kafka位于localhost9092 3.创建一个带有3个分区的名为“ TRADING-INFO”的主题。为简单起见我们可以只有一个代理。 要完成以上三个步骤请遵循此处的文档。 假设我们正在发送有关“ TRADING-INFO”主题的交易信息该信息由消费者消费。 1.贸易舱 注意我在这里使用过Lombok Data Builder public class Trade {private String id;private String securityId;private String fundShortName;private String value; }2. Kafka客户端依赖 为了制作一个Kafka Producer我们需要包含Kafka依赖项 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion0.10.0.0/version/dependency卡夫卡制片人 public class Producer {public static void main(String[] args) {final String TOPIC TRADING-INFO;KafkaProducer kafkaProducer new KafkaProducer(getProducerProperties());Runnable task1 () - sendTradeToTopic(TOPIC, kafkaProducer, ABCD, 1, 5);Runnable task2 () - sendTradeToTopic(TOPIC, kafkaProducer, PQ12341211111111111, 6, 10);Runnable task3 () - sendTradeToTopic(TOPIC, kafkaProducer, ZX12345OOO, 11, 15);ExecutorService executorService Executors.newFixedThreadPool(3);executorService.submit(task1);executorService.submit(task2);executorService.submit(task3);executorService.shutdown();}private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) {for (int i idStart; i idEnd; i) {Trade trade Trade.builder().id(i).securityId(securityId).value(abcd).build();try {String s new ObjectMapper().writeValueAsString(trade);kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s));System.out.println(Sending to topic msg : s);} catch (JsonProcessingException e) {e.printStackTrace();}}}private static Properties getProducerProperties() {Properties props new Properties();String KAFKA_SERVER_IP localhost:9092;props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return props;}}消费者 public class TConsumer {public static void main(String[] args) {final String TOPIC TRADING-INFO;final String CONSUMER_GROUP_ID consumer-group;KafkaConsumerString, String kafkaConsumer new KafkaConsumer(getConsumerProperties(CONSUMER_GROUP_ID));kafkaConsumer.subscribe(Arrays.asList(TOPIC));while(true) {ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(1000);consumerRecords.forEach(e - {System.out.println(e.value());});}}private static Properties getConsumerProperties(String consumerGroupId) {Properties props new Properties();props.put(bootstrap.servers, localhost:9092);props.put(group.id, consumerGroupId);props.put(key.deserializer, StringDeserializer.class.getName());props.put(value.deserializer, StringDeserializer.class.getName());return props;} } 因为我们有3个分区所以我们将运行3个Consumer实例。 现在当我们使用不同的线程运行生产者时生成具有3种“安全类型”消息的消息这是我们的关键。 我们将看到特定的实例总是迎合特定的“安全类型”因此将能够按顺序处理消息。 产出 消费者1 {id:1,securityId:ABCD,fundShortName:null,value:abcd} {id:2,securityId:ABCD,fundShortName:null,value:abcd} {id:3,securityId:ABCD,fundShortName:null,value:abcd} {id:4,securityId:ABCD,fundShortName:null,value:abcd} {id:5,securityId:ABCD,fundShortName:null,value:abcd}消费者2 {id:6,securityId:PQ12341211111111111,fundShortName:null,value:abcd} {id:7,securityId:PQ12341211111111111,fundShortName:null,value:abcd} {id:8,securityId:PQ12341211111111111,fundShortName:null,value:abcd} {id:9,securityId:PQ12341211111111111,fundShortName:null,value:abcd} {id:10,securityId:PQ12341211111111111,fundShortName:null,value:abcd}消费者3 {id:11,securityId:ZX12345OOO,fundShortName:null,value:abcd} {id:12,securityId:ZX12345OOO,fundShortName:null,value:abcd} {id:13,securityId:ZX12345OOO,fundShortName:null,value:abcd} {id:14,securityId:ZX12345OOO,fundShortName:null,value:abcd} {id:15,securityId:ZX12345OOO,fundShortName:null,value:abcd} 因此这里的3种类型的“ securityIds”生成了不同的哈希值因此被分配到不同的分区中从而确保一种交易类型始终用于特定实例。 现在如果我们不想使用默认的分区逻辑并且我们的场景更加复杂我们将需要实现自己的Partitioner在下一个博客中我将解释如何使用它以及它如何工作。 翻译自: https://www.javacodegeeks.com/2016/08/achieving-order-guarnetee-kafka-partitioning.html
http://www.pierceye.com/news/48502/

相关文章:

  • 如何做全景素材网站企业网站标题设置
  • 重庆网站推广机构西部数码官网
  • 400网站建设网站开发工具的选择
  • 维度网络网站建设重庆万州网站建设费用
  • 韩版做哪个网站好电子商务网站模式
  • 哈尔滨网站建设30t河北互联思维网站建设
  • 网站开发需求分析包括什么石家庄百度seo
  • 找做网站的公司好qq登陆 wordpress
  • 网站用什么语言网页截图快捷键设置
  • 网站建设说明书怎么写网站建设网站建设哪里有
  • 制作网站的全过程系统学做网站
  • 网贷网站建设怎么下载电脑本机wordpress
  • 网站营销定义军事新闻头条最新消息
  • 电商网站文档创量广告投放平台
  • 网站手机端怎么做2024又要开始做核酸了
  • 指定关键字 网站有更新就提醒2022年国内重要新闻
  • 企业网站建设联系电话企业网络架构拓扑图
  • 有哪些公司的网站做的很好看室内设计师多少钱一个月
  • 甘肃平凉建设局网站朝阳企业网站建设方案
  • 自己做的网站显示不安全怎么回事谷歌 网站做推广
  • 上海哪家公司做网站最好网站信息化建设
  • 知名做网站网站多级导航效果
  • 给网站做图vps可以做wordpress和ssr
  • 长沙模板建站平台怎么把危险网站
  • 巴中城乡建设官方网站游戏推广网站如何做的
  • 大中型网站开发价格电脑哪里做模板下载网站
  • 做网站文字居中代码企业邮箱号码从哪里查
  • 都有哪些网站注册域名要钱吗
  • 常州做网站信息广告公司名字
  • 松江企业网站建设网站建设:上海珍岛