当前位置: 首页 > news >正文

网站备案最新备案号百度推广总部电话

网站备案最新备案号,百度推广总部电话,wordpress本地评论插件,免费收录网站大全【README】 本文记录了flink sink操作#xff0c;输出目的存储器#xff08;中间件#xff09;包括 kafka#xff1b;es#xff1b;db#xff1b;等等有很多#xff1b;本文只给出了 sink2kafka的代码#xff1b; 本文使用的flink为 1.14.4 版本#xff1b; 本文部…【README】 本文记录了flink sink操作输出目的存储器中间件包括 kafkaesdb等等有很多本文只给出了 sink2kafka的代码 本文使用的flink为 1.14.4 版本 本文部分内容参考了 flink 官方文档如下 Kafka | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/kafka/ 【1】 flink sink2kafka 1场景 消费上游topic hello0415的数据并把数据流输出到下游kafka topic hell0416如我们在java框架中把数据库日志发送到 topic1 然后我想统计执行时间大于3秒的sql则需要把筛选后的sql 发送到 下游 topic2 就可以使用sink 来完成 2代码 /*** Description flink流输出到kafka下沉* author xiao tang* version 1.0.0* createTime 2022年04月16日*/ public class SinkTest1_Kafka {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 设置全局并行度// 创建flink连接kafkaKafkaSource kafkaSource KafkaSource.Stringbuilder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics(hello0415).setGroupId(flink).build();DataStreamString kafkaStream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafkaSource);// kafka生产者属性Properties kafkaProducerProps new Properties();kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, all);kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 3);kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1 * KfkNumConst._1K);kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);// 把kafka数据流输出到sink topic hello0416KafkaSinkString sink KafkaSink.Stringbuilder().setKafkaProducerConfig(kafkaProducerProps).setBootstrapServers(192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(hello0416).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 添加到sinkkafkaStream.sinkTo(sink);// 打印streamkafkaStream.print(kafkaStream);// 执行env.execute(kafkaSinkJob);} }效果 【补充】 kafka 消费者属性 public enum KafkaConsumerProps {_INS;/* 1.创建kafka生产者的配置信息 */Properties props new Properties();private KafkaConsumerProps() {/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, G1);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);}public Properties getProps() {return props;} }
http://www.pierceye.com/news/828129/

相关文章:

  • 企业对比网站西安做网站公司怎么样
  • 网站开发好做还是平面好做商务网页设计与制作是什么
  • 个人业务网站带后台凡科网站建设分类模块怎么弄
  • 在百度做网站需要什么资料appstore正版下载
  • wordpress怎么做404页面合肥seo软件
  • 建设网站挂广告赚钱免费个人网站源码
  • 网站ico图标动漫设计学什么内容
  • fireworks做网站定制做网站费用
  • 建设门户网站所需优秀营销网站设计
  • 行业网站建设教程办一家建筑公司流程
  • 网站空间文件夹中企动力主要是做什么的
  • 亚马逊做qa的网站wordpress theme是什么
  • 网站开发的经费预算php网站超市源码下载
  • 深圳建设高端网站asp.net 获取网站的绝对路径
  • 做的网站没流量吗前端页面设计
  • 门户网站的优点在环评备案网站上做登记后会怎么样
  • 网站的内容规划怎么写网站做外链的具体步骤
  • 百度网站排名规则小程序网站建设y021
  • 中国建设银行国际互联网站国内排名前五的电商
  • 怎么查网站的空间商四川建设工程招标网
  • 网站建设比较好公司朝阳区互联网公司排名
  • 百度不收录网站吗网站开发php
  • 房产网站建设的功能wordpress php7拓展
  • 做网站代码用什么软件天津建设工程信息网天津
  • 网站开发工程师前景怎么样怎么做自己的网站?
  • 井陉矿区网站建设做微商的网站
  • 办公室装修专业网站小程序免费制作平台有吗
  • 学生做兼职去哪个网站线上推广的渠道有哪些
  • 徐州网站的优化苏州百度推广开户
  • 网站有多少个网站建设与管理介绍