当前位置: 首页 > news >正文

网站开发需要掌握的哪些开发软件做网站优化有什么方法

网站开发需要掌握的哪些开发软件,做网站优化有什么方法,做网站多少人,c++可以做网站吗记录kafka-flink-kafka的end-to-end的exactly-once语义 步骤代码 步骤 开启checkpoint、stateBackend的设置和checkpoint配置设置kafka source的配置读取kafka source message随意的transformation#xff1b;并打印结果kafka sink端的配置输出到kafka sink端执行 代码 pac… 记录kafka-flink-kafka的end-to-end的exactly-once语义 步骤代码 步骤 开启checkpoint、stateBackend的设置和checkpoint配置设置kafka source的配置读取kafka source message随意的transformation并打印结果kafka sink端的配置输出到kafka sink端执行 代码 package com.javaye.demo.exactly;import org.apache.commons.lang3.SystemUtils; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector;import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit;/*** Author: Java页大数据* Date: 2024-04-11:17:59* Describe:* kafka - flink - kafka 验证end-to-end的exactly once*/ public class ExactlyOnce {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 1.1. 开启checkpoint间隔为1000L msenv.enableCheckpointing(1000L);// 1.2. stateBackend:checkpoint持久化目录if (SystemUtils.IS_OS_WINDOWS) {env.setStateBackend(new FsStateBackend(file:///D:/ckp));} else {env.setStateBackend(new FsStateBackend(hdfs://only:9870/flink-checkpoints));}CheckpointConfig config env.getCheckpointConfig(); // 1.3. ckp的配置 // 1.3.1. 前后两次checkpoint的最小间隔:防止前后两次的checkpoint重叠config.setMinPauseBetweenCheckpoints(500L); // 1.3.2. 容忍5次checkpoint失败config.setTolerableCheckpointFailureNumber(5); // 1.3.3. job被取消时保留外部的checkpointconfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 1.3.4. 设置checkpoint的语义为 exactly-onceconfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 1.3.5. 设置checkpoint的超时时间若checkpoint超过该超时时间则说明该次checkpoint失败丢弃该checkpointconfig.setCheckpointTimeout(60 * 1000); // 1.3.6. 设置同一时刻允许多少个checkpoint同时执行config.setMaxConcurrentCheckpoints(1);// 1.4. 设置重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10, TimeUnit.SECONDS)));// 2. 设置kafka source的配置String kafkaServer only:9092;String sourceTopic flink_kafka_source;String groupId flink_kafka_source_exactly_once;String clientIdPrefix flink_exactly_once;Properties kafkaSourceProp new Properties();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(kafkaServer).setTopics(sourceTopic).setGroupId(groupId).setClientIdPrefix(clientIdPrefix).setStartingOffsets(OffsetsInitializer.latest()) // Start from latest offset.setProperty(partition.discovery.interval.ms, 50000) // discover new partitions per 50 seconds.setProperty(auto.offset.reset, latest).setValueOnlyDeserializer(new SimpleStringSchema()) // 执行checkpoint时提交offset到checkpointflink内部使用并且提交一份到默认主题__consumer_offsets // .setCommitOffsetsOnCheckpoints(true) // checkpoint开启默认为true否则为false不支持该方法.setProperties(kafkaSourceProp).build();// 3. 读取kafka source messageDataStreamSourceString kafkaDS env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), flink_kafka_exactly_once, TypeInformation.of(String.class));// 4. 随意的transformationSingleOutputStreamOperatorString flatMapDS kafkaDS.flatMap(new FlatMapFunctionString, String() {Overridepublic void flatMap(String value, CollectorString out) throws Exception {String[] words value.split(,);for (String word : words) {Random random new Random();int i random.nextInt(5);if (i 3) {System.out.println(模拟出现bug...);throw new RuntimeException(模拟出现bug...);}System.out.println(word i);out.collect(word i);}}});// 4.1. 打印结果容易观察flatMapDS.print();// 5. kafka sink端的配置Properties kafkaSinkProp new Properties();kafkaSinkProp.setProperty(transaction.timeout.ms, 1000 * 5 ); //设置事务超时时间也可在kafka配置中设置KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(kafkaServer).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(flink_kafka_sink).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setKafkaProducerConfig(kafkaSinkProp).build();// 6. 输出到kafka sink端flatMapDS.sinkTo(kafkaSink);// 7. 执行env.execute(ExactlyOnce.class.getName());} }
http://www.pierceye.com/news/341020/

相关文章:

  • 家庭电脑做网站深圳罗湖网站制作公司
  • 聊城做wap网站服务有哪些做特卖的网站有哪些
  • wordpress後台建站flash美食网站论文
  • 网站服务器ip地址在哪里看用记事本做电影介绍的网站
  • 重庆企业建站公司沧浪企业建设网站价格
  • 免费建单页网站厂房出租做推广什么网站好
  • jquery网站模板下载做的网站显示不了背景图片
  • 图书管理系统网站开发教程怎么创建自己的网址
  • 网站网站是怎么做的专业app开发制作团队
  • 平顺网站建设应届生在淮北招的网站建设类型岗位
  • 手机网站模板尺寸wordpress5.1更新
  • 微网站设计教育培训机构官网
  • 搭建论坛网站福州工程网站建设团队
  • 易语言跳到指定网站怎么做商业网站建设方案
  • 专业的饰品行业网站开发杭州市建设厅网站
  • 做仿站如何获取网站源码鞍山网站建设优化
  • 网站建设模拟实验报告wordpress表excel插件
  • 苏州企业网站建设电话包头网站制作公司
  • 邓州微网站建设上海十大广告公司排名
  • 深圳装修公司报价网络优化公司排名
  • 互联网建设网站的的好处中国建盏大师排名2021
  • 商城网站建设新闻制作一个网站需要多久
  • 为什么要创建网站子目录泰安网信科技
  • 住房和城乡建设部网站质保金企业手机网站建设效果
  • 网站建设制作鸿运通邯郸网络运营中心电话号码
  • 辽阳企业网站建设价格中国交通建设股份有限公司官网
  • 企业网站域名后缀手机网站设计规格
  • 网页制作成品模板网站中国兰州网首页
  • 企业展示型网站php批量外链工具
  • 网站公司做网站dede查看网站