当前位置: 首页 > news >正文

深圳建设工程造价管理站建设网站南沙区

深圳建设工程造价管理站,建设网站南沙区,建设网站合同文档,怎么免费注册网站【README】 1、本文主要对 java客户端作为kafka 生产者进行测试#xff0c; 消费者由 centos的kafka命令行线程扮演#xff1b; 2、消息发送#xff1a; kafka的生产者采用异步发送消息的方式#xff0c;在消息发送过程中#xff0c;涉及到2个线程——main线程和sender…【README】 1、本文主要对 java客户端作为kafka 生产者进行测试 消费者由 centos的kafka命令行线程扮演  2、消息发送 kafka的生产者采用异步发送消息的方式在消息发送过程中涉及到2个线程——main线程和sender线程以及一个线程共享变量 RecordAccumulator。main线程将消息发送给 RecordAccumulatorsender线程不断从 RecordAccumulator 中读取数据发送到 kafka broker step1生产者中的main线程把数据经过 拦截器-》序列化器-》分区器 处理然后再把数据写到 RecordAccumulator step2send 线程从 RecordAccumulator 中取出数据写入到kafka集群 3、开发环境 -- pom.xml!-- 依赖 -- dependenciesdependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion0.11.0.0/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-simple/artifactIdversion1.7.25/versionscopecompile/scope/dependency/dependencies-- log4j.properties log4j.rootLoggerINFO, stdout log4j.appender.stdoutorg.apache.log4j.ConsoleAppender log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n log4j.appender.logfileorg.apache.log4j.FileAppender log4j.appender.logfile.Filetarget/spring.log log4j.appender.logfile.layoutorg.apache.log4j.PatternLayout log4j.appender.logfile.layout.ConversionPattern%d %p [%c] - %m%n 【0】 生产者同步发送消息 为啥需要同步发送 因为 kafka可以保证单个分区内消息有序但无法保证全局有序即多个分区消息有序  存在一些业务场景需要消息有序 /*** 同步消息生产者*/ public class SyncProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092); /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, all);/*4.重试次数*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小一次发送多少数据当数据大于16k生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待时间 等待时间超过1毫秒即便数据没有大于16k 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());/* 9.创建生产者对象 */KafkaProducerString, String producer new KafkaProducer(props); /* 10.同步发送数据 */ for (int i 0; i 10; i) { try {FutureRecordMetadata future producer.send(new ProducerRecord(first100, first100-20210101--D i));RecordMetadata rMetadata future.get(); // 调用future的get方法让main线程阻塞就可以实现同步发送 } catch (Exception e) {e.printStackTrace();} }/* 11.关闭资源 */ producer.close();System.out.println(kafka生产者写入数据完成); } } 下面都是异步发送 【1】普通生产者 1.1、生产者代码  /*** 普通生产者 */ public class MyProducer {public static void main(String[] args) {/* 1.创建kafka生产者的配置信息 */Properties props new Properties();/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, centos201:9092); /*3.ack应答级别*/props.put(ProducerConfig.ACKS_CONFIG, all);/*4.重试次数*/ props.put(ProducerConfig.RETRIES_CONFIG, 3); /*5.批次大小一次发送多少数据当数据大于16k生产者会发送数据到 kafka集群 */props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16 * KfkNumConst._1K); /*6.等待时间 等待时间超过1毫秒即便数据没有大于16k 也会写数据到kafka集群 */props.put(ProducerConfig.LINGER_MS_CONFIG, 1); /*7. RecordAccumulator 缓冲区大小*/ props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M); /*8. key, value 的序列化类 */ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());System.out.println(props); /* 9.创建生产者对象 */KafkaProducerString, String producer new KafkaProducer(props); /* 10.发送数据 */ for (int i 0; i 10; i) { FutureRecordMetadata future producer.send(new ProducerRecord(first100, first100-20210101--D i));try {System.out.println(future.get().partition() - future.get().offset());} catch (Exception e) {e.printStackTrace();} }/* 11.关闭资源 */ producer.close();System.out.println(kafka生产者写入数据完成); } } -- 日志 0-183203 0-183204 0-183205 0-183206 0-183207 0-183208 0-183209 0-183210 0-183211 0-183212 kafka生产者写入数据完成1.2、消费者 [rootcentos201 ~]# kafka-console-consumer.sh --topic first100 --bootstrap-server centos201:9092 first100-20210101--D0 first100-20210101--D1 first100-20210101--D2 first100-20210101--D3 first100-20210101--D4 first100-20210101--D5 first100-20210101--D6 first100-20210101--D7 first100-20210101--D8 first100-20210101--D9【2】带回调的生产者 2.1、生产者 /*** 带回调的生产者 */for (int i 0; i 10; i) { FutureRecordMetadata future producer.send(new ProducerRecord(first100, first100-20210101--E i), (metadata, exception)- {/* lambda 表达式 */System.out.println(metadata.partition() -- metadata.offset());});}2.2、消费者 first100-20210101--E0 first100-20210101--E1 first100-20210101--E2 first100-20210101--E3 first100-20210101--E4 first100-20210101--E5 first100-20210101--E6 first100-20210101--E7 first100-20210101--E8 first100-20210101--E9 【3】创建分区策略的生产者 指定分区 0、查看topic 4个分区3个副本 [rootcentos201 ~]# kafka-topics.sh --describe --topic aaa --zookeeper centos201:2181 Topic:aaa PartitionCount:4 ReplicationFactor:3 Configs:Topic: aaa Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 1,2,3Topic: aaa Partition: 1 Leader: 3 Replicas: 3,2,1 Isr: 1,2,3Topic: aaa Partition: 2 Leader: 1 Replicas: 1,3,2 Isr: 2,1,3Topic: aaa Partition: 3 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3 虽然集群只有3台机器 centos201, centos202, centos203   当我的分区数是4即分区数可以大于broker数量 但副本数必须小于等于 broker数量  3.1、生产者 props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); // 设置分区器 /* 9.创建生产者对象 */KafkaProducerString, String producer new KafkaProducer(props); /* 10.发送数据 */ for (int i 0; i 10; i) { FutureRecordMetadata future producer.send(new ProducerRecord(aaa, aaa-key, aaa-20210101--B i), (metadata, exception)- {/* lambda 表达式 */System.out.println(metadata.partition() -- metadata.offset());});}-- 日志 1 -- 112 1 -- 113 1 -- 114 1 -- 115 1 -- 116 1 -- 117 1 -- 118 1 -- 119 1 -- 120 1 -- 121 kafka生产者写入数据完成 3.2、自定义分区器  /*** 自定义分区器*/ public class MyPartitioner implements Partitioner {Overridepublic void configure(MapString, ? configs) {}Overridepublic int partition(String topic, Object key, byte[] keyBytes,Object value, byte[] valueBytes, Cluster cluster) {Integer integer cluster.partitionCountForTopic(topic);return 1;}Overridepublic void close() {} } 3.3、消费者 [rootcentos201 ~]# kafka-console-consumer.sh --topic aaa --bootstrap-server centos201:9092 aaa-20210101--C0 aaa-20210101--C1 aaa-20210101--C2 aaa-20210101--C3 aaa-20210101--C4 aaa-20210101--C5 aaa-20210101--C6 aaa-20210101--C7 aaa-20210101--C8 aaa-20210101--C9 小结 可以查看即便topic 有4个分区但我在自定义分区器中指定写入到分区1 所以生产者只把消息写到分区1
http://www.pierceye.com/news/361794/

相关文章:

  • 樟木头镇网站建设公司WordPress企业响应式主题
  • 怎么给网站做备份呢怎么去建设微信网站
  • 成都各公司网站中小企业网站建设 论文
  • 广告网站建设实训报告做电商从哪里入手
  • 建电子商务网站需要多少钱做网站的简称
  • 制定网站推广方案网络营销网站分析
  • 商城网站系网站 png逐行交错
  • 陕西网站建设陕icp备免费虚拟机安卓
  • 优化教程网站推广排名东莞网站建设推广有哪些
  • 金阳建设集团网站电子商务系统 网站建设
  • 网站建设规模哪里有做app软件开发
  • 建站工具上市手机视频网站设计
  • 代做道具网站做地方门户网站不备案可以吗
  • 电子商务 网站前台功能想做微商怎么找厂家
  • 网站建设电子书做网站引入字体
  • 顺德建设网站公司分发平台
  • 个人门户网站模板下载婚纱摄影网站定制
  • 提高网站流量的软文案例手机腾讯网
  • 网站只做内容 不做外链深圳宝安区天气
  • 生物网站 template淘宝的网站建设怎么建
  • 苏州哪家做网站好些推广之家app
  • 网站开发计入管理费用哪个明细对网站建设的调研报告
  • 南头专业的网站建设公司wordpress数据量大网站访问
  • 龙华民治网站建设公司wordpress设置vip
  • 网站建设天猫店免费主机空间
  • 帮网贷做网站会判刑吗学it要多久多少学费
  • 陕西网站建设维护erp软件怎么安装
  • 沈阳网站建设简维软件工程在网站建设
  • 万维网网站续费云南建设厅网站执业注册
  • 判断网站首页民宿设计网站大全