当前位置: 首页 > news >正文

牛人网站建设南京网站网站建设学校

牛人网站建设,南京网站网站建设学校,三亚发布紧急通知,龙岗网页设计价格Consumer之自动提交在上文中介绍了Producer API的使用#xff0c;现在我们已经知道如何将消息通过API发送到Kafka中了#xff0c;那么现在的生产者/消费者模型就还差一位扮演消费者的角色了。因此#xff0c;本文将介绍Consumer API的使用#xff0c;使用API从Kafka中消费消…Consumer之自动提交在上文中介绍了Producer API的使用现在我们已经知道如何将消息通过API发送到Kafka中了那么现在的生产者/消费者模型就还差一位扮演消费者的角色了。因此本文将介绍Consumer API的使用使用API从Kafka中消费消息让应用成为一个消费者角色。还是老样子首先我们得创建一个Consumer实例并指定相关配置项有了这个实例对象后我们才能进行其他的操作。代码示例/*** 创建Consumer实例*/public static Consumer createConsumer() {Properties props new Properties();// 指定Kafka服务的ip地址及端口props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 127。0.0.1:9092);// 指定group.idKafka中的消费者需要在消费者组里props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, test);// 是否开启自动提交props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);// 自动提交的间隔单位毫秒props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);// 消息key的序列化器props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);// 消息value的序列化器props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);return new KafkaConsumer(props);}在以上代码中可以看到设置了group.id这个配置项这是一个Consumer的必要配置项因为在Kafka中Consumer需要位于一个Consumer Group里。具体如下图所示在上图中是一个Consumer消费一个Partition是一对一的关系。但Consumer Group里可以只有一个Consumer此时该Consumer可以消费多个Partition是一对多的关系。如下图所示一个Consumer可以只消费一个Partition也可以消费多个Partition但需要注意的是多个Consumer不能消费同一个Partition总结一下Consumer的注意事项单个Partition的消息只能由Consumer Group中的某个Consumer来消费Consumer从Partition中消费消息是顺序的默认从头开始消费如果Consumer Group中只有一个Consumer那么这个Consumer会消费所有Partition中的消息在Kafka中当消费者消费数据后需要提交数据的offset来告知服务端成功消费了哪些数据。然后服务端就会移动数据的offset下一次消费的时候就是从移动后的offset位置开始消费。这样可以在一定程度上保证数据是被消费成功的并且由于数据不会被删除而只是移动数据的offset这也保证了数据不易丢失。若消费者处理数据失败时只要不提交相应的offset就可以在下一次重新进行消费。和数据库的事务一样Kafka消费者提交offset的方式也有两种分别是自动提交和手动提交。在本例中演示的是自动提交这也是消费数据最简单的方式。代码示例/*** 演示自动提交offset*/public static void autoCommitOffset() {Consumer consumer createConsumer();List topics List.of(MyTopic);// 订阅一个或多个Topicconsumer.subscribe(topics);while (true) {// 从Topic中拉取数据每1000毫秒拉取一次ConsumerRecords records consumer.poll(Duration.ofMillis(1000));// 每次拉取可能都是一组数据需要遍历出来for (ConsumerRecord record : records) {System.out.printf(partition %d, offset %d, key %s, value %s%n,record.partition(), record.offset(), record.key(), record.value());}}}Consumer之手动提交自动提交的方式是最简单的但不建议在实际生产中使用因为可控性不高。所以更多时候我们使用的是手动提交但想要使用手动提交就需要先关闭自动提交修改配置项如下props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);关闭了自动提交后就得在代码中调用commit相关的方法来提交offset主要就是两个方法commitAsync和commitSync看方法名也知道一个是异步提交一个是同步提交。这里以commitAsync为例实现思路主要是在发生异常的时候不要调用commitAsync方法而在正常执行完毕后才调用commitAsync方法。代码示例/*** 演示手动提交offset*/public static void manualCommitOffset() {Consumer consumer createConsumer();List topics List.of(MyTopic);// 订阅一个或多个Topicconsumer.subscribe(topics);while (true) {// 从Topic中拉取数据每1000毫秒拉取一次ConsumerRecords records consumer.poll(Duration.ofMillis(1000));// 每次拉取可能都是一组数据需要遍历出来for (ConsumerRecord record : records) {try {// 模拟将数据写入数据库Thread.sleep(1000);System.out.println(save to db...);System.out.printf(partition %d, offset %d, key %s, value %s%n,record.partition(), record.offset(), record.key(), record.value());} catch (Exception e) {// 写入失败则不要调用commit这样就相当于起到回滚的作用// 下次消费还是从之前的offset开始消费e.printStackTrace();return;}}// 写入成功则调用commit相关方法去手动提交offsetconsumer.commitAsync();}}##针对Partition提交offset在前文中有介绍到一个Consumer Group里可以只有一个Consumer该Consumer可以消费多个Partition。在这种场景下我们可能会在Consumer中开启多线程去处理多个Partition中的数据以提高性能。为了防止某些Partition里的数据消费成功而某些Partition里的数据消费失败却都一并提交了offset。我们就需要针对单个Partition去提交offset也就是将offset的提交粒度控制在Partition级别。这里先简单演示一下如何针对单个Partition提交offset代码示例/*** 演示手动提交单个Partition的offset*/public static void manualCommitOffsetWithPartition() {Consumer consumer createConsumer();List topics List.of(MyTopic);// 订阅一个或多个Topicconsumer.subscribe(topics);while (true) {// 从Topic中拉取数据每1000毫秒拉取一次ConsumerRecords records consumer.poll(Duration.ofMillis(1000));// 单独处理每一个Partition中的数据for (TopicPartition partition : records.partitions()) {System.out.println(partition: partition start);// 从Partition中取出数据List partitionRecords records.records(partition);for (ConsumerRecord record : partitionRecords) {try {// 模拟将数据写入数据库Thread.sleep(1000);System.out.println(save to db...);System.out.printf(partition %d, offset %d, key %s, value %s%n,record.partition(), record.offset(), record.key(), record.value());} catch (Exception e) {// 发生异常直接结束不提交offsete.printStackTrace();return;}}// 执行成功则取出当前消费到的offsetlong lastOffset partitionRecords.get(partitionRecords.size() - 1).offset();// 由于下一次开始消费的位置是最后一次offset1的位置所以这里要1OffsetAndMetadata metadata new OffsetAndMetadata(lastOffset 1);// 针对Partition提交offsetMap offsets new HashMap();offsets.put(partition, metadata);// 同步提交offsetconsumer.commitSync(offsets);System.out.println(partition: partition end);}}}Consumer针对一个或多个Partition进行订阅在之前的例子中我们都是针对Topic去订阅并消费数据实际上也可以更细粒度一些针对Partition进行订阅这通常应用在一个Consumer多线程消费的场景下。代码示例/*** 演示将订阅粒度控制到Partition级别* 针对单个或多个Partition进行订阅*/public static void manualCommitOffsetWithPartition2() {Consumer consumer createConsumer();// 该Topic中有两个PartitionTopicPartition p0 new TopicPartition(MyTopic, 0);TopicPartition p1 new TopicPartition(MyTopic, 1);// 订阅该Topic下的一个Partitionconsumer.assign(List.of(p0));// 也可以订阅该Topic下的多个Partition// consumer.assign(List.of(p0, p1));while (true) {...与上一小节中的代码一致略...}}Consumer多线程并发处理前面两个小节的内容基本都是为了本小节所介绍的多线程并发处理消息而铺垫的因为为了提高应用对消息的处理效率我们通常会使用多线程来并行消费消息从而加快消息的处理速度。而多线程处理消息的方式主要有两种一种是按Partition数量创建线程然后每个线程里创建一个Consumer多个Consumer对多个Partition进行消费。这就和之前在介绍Consumer Group时给出的那张图所展示的一样这种属于是经典模式实现起来也比较简单适用于对消息的顺序和offset控制有要求的场景。代码示例package com.zj.study.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.clients.consumer.OffsetAndMetadata;import org.apache.kafka.common.TopicPartition;import org.apache.kafka.common.errors.WakeupException;import java.time.Duration;import java.util.Collections;import java.util.List;import java.util.Properties;import java.util.concurrent.atomic.AtomicBoolean;/*** 经典模式** author 01* date 2020-05-21**/public class ConsumerThreadSample {private final static String TOPIC_NAME MyTopic;/*** 这种类型是经典模式每一个线程单独创建一个KafkaConsumer用于保证线程安全*/public static void main(String[] args) throws InterruptedException {KafkaConsumerRunner r1 new KafkaConsumerRunner();Thread t1 new Thread(r1);t1.start();Thread.sleep(15000);r1.shutdown();}public static class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed new AtomicBoolean(false);private final KafkaConsumer consumer;public KafkaConsumerRunner() {Properties props new Properties();props.put(bootstrap.servers, 192.168.220.128:9092);props.put(group.id, test);props.put(enable.auto.commit, false);props.put(auto.commit.interval.ms, 1000);props.put(session.timeout.ms, 30000);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);consumer new KafkaConsumer(props);TopicPartition p0 new TopicPartition(TOPIC_NAME, 0);TopicPartition p1 new TopicPartition(TOPIC_NAME, 1);consumer.assign(List.of(p0, p1));}Overridepublic void run() {try {while (!closed.get()) {//处理消息ConsumerRecords records consumer.poll(Duration.ofMillis(10000));for (TopicPartition partition : records.partitions()) {List pRecord records.records(partition);// 处理每个分区的消息for (ConsumerRecord record : pRecord) {System.out.printf(patition %d , offset %d, key %s, value %s%n,record.partition(), record.offset(), record.key(), record.value());}// 返回去告诉kafka新的offsetlong lastOffset pRecord.get(pRecord.size() - 1).offset();// 注意加1consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset 1)));}}} catch (WakeupException e) {if (!closed.get()) {throw e;}} finally {consumer.close();}}public void shutdown() {closed.set(true);consumer.wakeup();}}}另一种多线程的消费方式则是在一个线程池中只创建一个Consumer实例然后通过这个Consumer去拉取数据后交由线程池中的线程去处理。如下图所示但需要注意的是在这种模式下我们无法手动控制数据的offset也无法保证数据的顺序性所以通常应用在流处理场景对数据的顺序和准确性要求不高。经过之前的例子我们知道每拉取一次数据返回的就是一个ConsumerRecords这里面存放了多条数据。然后我们对ConsumerRecords进行迭代就可以将多条数据交由线程池中的多个线程去并行处理了。代码示例package com.zj.study.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.List;import java.util.Properties;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;/*** 一个Consumer多个hander模式** author 01* date 2020-05-21**/public class ConsumerRecordThreadSample {private final static String TOPIC_NAME MyTopic;public static void main(String[] args) throws InterruptedException {String brokerList 192.168.220.128:9092;String groupId test;int workerNum 5;ConsumerExecutor consumers new ConsumerExecutor(brokerList, groupId, TOPIC_NAME);consumers.execute(workerNum);Thread.sleep(1000000);consumers.shutdown();}/*** Consumer处理*/public static class ConsumerExecutor {private final KafkaConsumer consumer;private ExecutorService executors;public ConsumerExecutor(String brokerList, String groupId, String topic) {Properties props new Properties();props.put(bootstrap.servers, brokerList);props.put(group.id, groupId);props.put(enable.auto.commit, true);props.put(auto.commit.interval.ms, 1000);props.put(session.timeout.ms, 30000);props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer);props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer);consumer new KafkaConsumer(props);consumer.subscribe(List.of(topic));}public void execute(int workerNum) {executors new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());while (true) {ConsumerRecords records consumer.poll(200);for (final ConsumerRecord record : records) {executors.submit(new ConsumerRecordWorker(record));}}}public void shutdown() {if (consumer ! null) {consumer.close();}if (executors ! null) {executors.shutdown();}try {if (executors ! null !executors.awaitTermination(10, TimeUnit.SECONDS)) {System.out.println(Timeout.... Ignore for this case);}} catch (InterruptedException ignored) {System.out.println(Other thread interrupted this shutdown, ignore for this case.);Thread.currentThread().interrupt();}}}/*** 记录处理*/public static class ConsumerRecordWorker implements Runnable {private ConsumerRecord record;public ConsumerRecordWorker(ConsumerRecord record) {this.record record;}Overridepublic void run() {// 假如说数据入库操作System.out.println(Thread - Thread.currentThread().getName());System.err.printf(patition %d , offset %d, key %s, value %s%n,record.partition(), record.offset(), record.key(), record.value());}}}Consumer控制offset起始位置上一小节中介绍的第二种多线程消息模式通过Consumer拉取数据后交由多线程去处理是没法控制offset的如果此时程序出现错误或其他意外情况导致消息没有被正确消费我们就需要人为控制offset的起始位置重新进行消费。通过调用seek方法可以指定从哪个Partition的哪个offset位置进行消费代码示例/*** 手动控制offset的起始位置*/public static void manualCommitOffsetWithPartition2() {Consumer consumer createConsumer();TopicPartition p0 new TopicPartition(MyTopic, 0);consumer.assign(List.of(p0));// 指定offset的起始位置consumer.seek(p0, 1);while (true) {...与上一小节中的代码一致略...}}实际应用中的设计思路第一次从某个offset的起始位置进行消费如果本次消费了100条数据那么offset设置为101并存入Redis等缓存数据库中后续每次poll之前从Redis中获取offset值然后从这个offset的起始位置进行消费消费完后再次将新的offset值存入Redis周而复始Consumer限流为了避免Kafka中的流量剧增导致过大的流量打到Consumer端将Consumer给压垮的情况我们就需要针对Consumer进行限流。例如当处理的数据量达到某个阈值时暂停消费低于阈值时则恢复消费这就可以让Consumer保持一定的速率去消费数据从而避免流量剧增时将Consumer给压垮。大体思路如下在poll到数据之后先去令牌桶中拿取令牌如果获取到令牌则继续业务处理如果获取不到令牌则调用pause方法暂停Consumer等待令牌当令牌桶中的令牌足够则调用resume方法恢复Consumer的消费状态接下来编写具体的代码案例简单演示一下这个限流思路令牌桶算法使用Guava里内置的所以需要在项目中添加对Guava的依赖。添加的依赖项如下com.google.guavaguava29.0-jre然后我们就可以使用Guava的限流器对Consumer进行限流了代码示例public class ConsumerCurrentLimiting {/*** 令牌生成速率单位为秒 */public static final int permitsPerSecond 1;/*** 限流器 */private static final RateLimiter LIMITER RateLimiter.create(permitsPerSecond);/*** 创建Consumer实例*/public static Consumer createConsumer() {... 与之前小节的代码类似略 ...}/*** 演示对Consumer限流*/public static void currentLimiting() {Consumer consumer createConsumer();TopicPartition p0 new TopicPartition(MyTopic, 0);TopicPartition p1 new TopicPartition(MyTopic, 1);consumer.assign(List.of(p0, p1));while (true) {// 从Topic中拉取数据每100毫秒拉取一次ConsumerRecords records consumer.poll(Duration.ofMillis(1));if (records.isEmpty()) {continue;}// 限流if (!LIMITER.tryAcquire()) {System.out.println(无法获取到令牌暂停消费);consumer.pause(List.of(p0, p1));} else {System.out.println(获取到令牌恢复消费);consumer.resume(List.of(p0, p1));}// 单独处理每一个Partition中的数据for (TopicPartition partition : records.partitions()) {System.out.println(partition: partition start);// 从Partition中取出数据List partitionRecords records.records(partition);for (ConsumerRecord record : partitionRecords) {try {// 模拟将数据写入数据库Thread.sleep(1000);System.out.println(save to db...);System.out.printf(partition %d, offset %d, key %s, value %s%n,record.partition(), record.offset(), record.key(), record.value());} catch (Exception e) {// 发生异常直接结束不提交offsete.printStackTrace();return;}}// 执行成功则取出当前消费到的offsetlong lastOffset partitionRecords.get(partitionRecords.size() - 1).offset();// 由于下一次开始消费的位置是最后一次offset1的位置所以这里要1OffsetAndMetadata metadata new OffsetAndMetadata(lastOffset 1);// 针对Partition提交offsetMap offsets new HashMap();offsets.put(partition, metadata);// 同步提交offsetconsumer.commitSync(offsets);System.out.println(partition: partition end);}}}public static void main(String[] args) {currentLimiting();}}Consumer Rebalance解析Consumer有个Rebalance的特性即重新负载均衡该特性依赖于一个协调器来实现。每当Consumer Group中有Consumer退出或有新的Consumer加入都会触发Rebalance。之所以要重新负载均衡是为了将退出的Consumer所负责处理的数据再重新分配到组内的其他Consumer上进行处理。或当有新加入的Consumer时将组内其他Consumer的负载压力重新进均匀分配而不会说新加入一个Consumer就闲在那。下面就用几张图简单描述一下各种情况触发Rebalance时组内成员是如何与协调器进行交互的。1、新成员加入组(member join)Tips图中的Coordinator是协调器而generation则类似于乐观锁中的版本号每当成员入组成功就会更新也是起到一个并发控制的作用2、组成员崩溃/非正常退出(member failure)3、组成员主动离组/正常退出(member leave group)4、当Consumer提交位移(member commit offset)时也会有类似的交互过程
http://www.pierceye.com/news/209845/

相关文章:

  • 成都设计公司网站线上线下一体化营销
  • 网站你懂我意思正能量晚上下载注册公司需要多少钱手续费
  • 在线html网站开发广州网站排名优化公司
  • 如何在免费网站上做推扩自己怎么来建设网站
  • 福安市教育局建设网站做架构图简单的网站
  • 如何快速进行网站开发seo是什么东西
  • 网站建设需要具备哪些学编程多少钱学费
  • 建设工程许可证在那个网站办金融行业网站制作
  • 邢台专业做网站价格信息流广告是什么
  • 网站开发的母的目的和意义.建设购物平台网站
  • 立方米网站建设做淘宝客网站用什么程序好
  • 怎样做网站挣钱建筑资料软件
  • 涿州建设局网站苏州市高新区建设局网站
  • 个人soho要怎么做企业网站成都包装设计公司
  • 网站开发 chrome浏览器崩溃ruhe用dw做网站
  • 全屏网站 图片优化个人网站cms系统
  • 做我女朋友程序网站邵东做网站
  • 建设网站如何挂到网上wordpress首页添加幻灯
  • 汕头正规网站建设模板总部城乡建设网站 资料员
  • vs 2017c 怎么建设网站网站建设的数字化和互联网化
  • 南昌网站设计公司海南营销网站建设
  • 购物网站素材个人搭建网站教程
  • 青岛网站建设哪里好模板建站服务公司
  • 青色网站欣赏wordpress中文购物
  • 建站培训全国住房与城乡建设部网站
  • 唐山网站建设方案策划沧州网站建设联系电话
  • 网页制作和网站开发实验报告logo设计品牌
  • 摄影后期教程网站百度指数1000搜索量有多少
  • wp网站建设模板什么是网站的原型
  • 园林绿化网站建设上海著名室内设计公司