凡网站创建,高校网站建设说明书,广州营销策划公司排行,衡水搜索引擎优化Kafka最重要的功能之一是实现消息的负载平衡#xff0c;并保证分布式集群中的排序#xff0c;否则传统队列中将无法实现。  首先让我们尝试了解问题陈述 让我们假设我们有一个主题#xff0c;其中发送消息#xff0c;并且有一个消费者正在使用这些消息。  如果只有一个使用…   Kafka最重要的功能之一是实现消息的负载平衡并保证分布式集群中的排序否则传统队列中将无法实现。  首先让我们尝试了解问题陈述  让我们假设我们有一个主题其中发送消息并且有一个消费者正在使用这些消息。  如果只有一个使用者它将按消息在队列中的顺序或发送的顺序接收消息。   现在为了获得更高的性能我们需要更快地处理消息因此我们引入了消费者应用程序的多个实例。   如果消息包含任何状态则将导致问题。   让我们尝试通过一个例子来理解这一点   如果对于特定的消息ID我们有3个事件  第一创建  第二更新 第三删除 我们要求仅在消息的“创建”事件之后才处理消息的“更新”或“删除”事件。 现在如果两个单独的实例几乎同时获得相同消息的“ CREATE”和“ UPDATE”则即使另一个实例完成“ CREATE”消息之前带有“ UPDATE”消息的实例仍有机会尝试对其进行处理。 。 这可能是一个问题因为使用者将尝试更新尚未创建的消息并且将引发异常并且此“更新”可能会丢失。  可能的解决方案  我想到的第一个解决方案是对数据库的乐观锁定这可以防止这种情况但是随后需要适应异常情况。 这不是一个非常简单的方法可能涉及更多的锁定和要处理的并发问题。   另一个更简单的解决方案是如果特定ID的消息/事件总是转到特定实例因此它们将是有序的。 在这种情况下CREATE将始终在UPDATE之前执行因为这是发送它们的原始顺序。   这就是卡夫卡派上用场的地方。   Kafka在主题内具有“分区”的概念该概念既可以提供订购保证又可以在整个消费者流程中提供负载平衡。   每个分区都是有序的不可变的消息序列该消息序列被连续附加到提交日志中。 分区中的每个消息均分配有一个顺序ID号称为偏移量它唯一地标识分区中的每个消息。   因此一个主题将具有多个分区每个分区都保持自己的偏移量。  现在要确保将具有特定id的事件始终转到特定实例可以执行以下操作如果将每个使用者与特定分区绑定然后确保具有特定id的所有事件和消息始终转到特定实例则可以完成此操作。特定分区因此它们始终由同一使用者实例使用。   为了实现此分区Kafka客户端API为我们提供了两种方法  1定义用于分区的键该键将用作默认分区逻辑的键。  2编写一个Partitioning类来定义我们自己的分区逻辑。   让我们探索第一个  默认分区逻辑  默认的分区策略是hash(key)%numPartitions 。 如果键为null则选择一个随机分区。 所以如果我们要为分区键是一个特定属性我们需要将它传递在ProducerRecord构造而从发送消息Producer 。   让我们来看一个例子   注意要运行此示例我们需要具备以下条件  1.运行Zookeeper在localhost2181  2.运行Kafka位于localhost9092 3.创建一个带有3个分区的名为“ TRADING-INFO”的主题。为简单起见我们可以只有一个代理。 要完成以上三个步骤请遵循此处的文档。   假设我们正在发送有关“ TRADING-INFO”主题的交易信息该信息由消费者消费。  1.贸易舱  注意我在这里使用过Lombok   Data
Builder
public class Trade {private String id;private String securityId;private String fundShortName;private String value;
}2. Kafka客户端依赖  为了制作一个Kafka Producer我们需要包含Kafka依赖项  dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka_2.10/artifactIdversion0.10.0.0/version/dependency卡夫卡制片人 public class Producer {public static void main(String[] args) {final String TOPIC  TRADING-INFO;KafkaProducer kafkaProducer  new KafkaProducer(getProducerProperties());Runnable task1  () - sendTradeToTopic(TOPIC, kafkaProducer, ABCD, 1, 5);Runnable task2  () - sendTradeToTopic(TOPIC, kafkaProducer, PQ12341211111111111, 6, 10);Runnable task3  () - sendTradeToTopic(TOPIC, kafkaProducer, ZX12345OOO, 11, 15);ExecutorService executorService  Executors.newFixedThreadPool(3);executorService.submit(task1);executorService.submit(task2);executorService.submit(task3);executorService.shutdown();}private static void sendTradeToTopic(String topic, KafkaProducer kafkaProducer, String securityId, int idStart, int idEnd) {for (int i  idStart; i  idEnd; i) {Trade trade  Trade.builder().id(i).securityId(securityId).value(abcd).build();try {String s  new ObjectMapper().writeValueAsString(trade);kafkaProducer.send(new ProducerRecord(topic, trade.getSecurityId(), s));System.out.println(Sending to   topic  msg :   s);} catch (JsonProcessingException e) {e.printStackTrace();}}}private static Properties getProducerProperties() {Properties props  new Properties();String KAFKA_SERVER_IP  localhost:9092;props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_IP);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());return props;}}消费者 public class TConsumer {public static void main(String[] args) {final String TOPIC  TRADING-INFO;final String CONSUMER_GROUP_ID  consumer-group;KafkaConsumerString, String kafkaConsumer  new KafkaConsumer(getConsumerProperties(CONSUMER_GROUP_ID));kafkaConsumer.subscribe(Arrays.asList(TOPIC));while(true) {ConsumerRecordsString, String consumerRecords  kafkaConsumer.poll(1000);consumerRecords.forEach(e - {System.out.println(e.value());});}}private static Properties getConsumerProperties(String consumerGroupId) {Properties props  new Properties();props.put(bootstrap.servers, localhost:9092);props.put(group.id, consumerGroupId);props.put(key.deserializer, StringDeserializer.class.getName());props.put(value.deserializer, StringDeserializer.class.getName());return props;}
} 因为我们有3个分区所以我们将运行3个Consumer实例。   现在当我们使用不同的线程运行生产者时生成具有3种“安全类型”消息的消息这是我们的关键。 我们将看到特定的实例总是迎合特定的“安全类型”因此将能够按顺序处理消息。  产出 消费者1 {id:1,securityId:ABCD,fundShortName:null,value:abcd}
{id:2,securityId:ABCD,fundShortName:null,value:abcd}
{id:3,securityId:ABCD,fundShortName:null,value:abcd}
{id:4,securityId:ABCD,fundShortName:null,value:abcd}
{id:5,securityId:ABCD,fundShortName:null,value:abcd}消费者2 {id:6,securityId:PQ12341211111111111,fundShortName:null,value:abcd}
{id:7,securityId:PQ12341211111111111,fundShortName:null,value:abcd}
{id:8,securityId:PQ12341211111111111,fundShortName:null,value:abcd}
{id:9,securityId:PQ12341211111111111,fundShortName:null,value:abcd}
{id:10,securityId:PQ12341211111111111,fundShortName:null,value:abcd}消费者3 {id:11,securityId:ZX12345OOO,fundShortName:null,value:abcd}
{id:12,securityId:ZX12345OOO,fundShortName:null,value:abcd}
{id:13,securityId:ZX12345OOO,fundShortName:null,value:abcd}
{id:14,securityId:ZX12345OOO,fundShortName:null,value:abcd}
{id:15,securityId:ZX12345OOO,fundShortName:null,value:abcd} 因此这里的3种类型的“ securityIds”生成了不同的哈希值因此被分配到不同的分区中从而确保一种交易类型始终用于特定实例。   现在如果我们不想使用默认的分区逻辑并且我们的场景更加复杂我们将需要实现自己的Partitioner在下一个博客中我将解释如何使用它以及它如何工作。  翻译自: https://www.javacodegeeks.com/2016/08/achieving-order-guarnetee-kafka-partitioning.html