厦门比较好的网站设计公司,网址制作二维码,东莞php网站建设,北镇网站建设一步一个脚印#xff0c;一天一道面试题。
感觉我现在很难把水印描述的很好#xff0c;但#xff0c;完成比完美更重要。后续我再补充。各位如果有什么建议或补充也欢迎留言。
在实时处理任务时#xff0c;由于网络延迟#xff0c;人工异常#xff0c;各种问题#xf…一步一个脚印一天一道面试题。
感觉我现在很难把水印描述的很好但完成比完美更重要。后续我再补充。各位如果有什么建议或补充也欢迎留言。
在实时处理任务时由于网络延迟人工异常各种问题数据往往会出现乱序不按照我们的预期到达处理框架。 WaterMark 水印就是为了一定程度的解决数据延迟乱序问题的。
使用 WaterMark 一般有以下几个步骤
定义时间特性 Flink 1.12 已废弃默认使用 事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);设置 Watermark 策略赋值事件时间 // 分配时间戳和水位线DataStreamTuple2Long, Integer withTimestampsAndWatermarks parsedStream.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2Long, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) - element.f0));话不多说直接给个 Watermark 水印样例代码。 public class SimpleWatermarkExample {public static void main(String[] args) throws Exception {// 设置流执行环境final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 从 socket 文本流接收数据DataStreamString input env.addSource(new SocketTextStreamFunction(localhost, 9999, \n, -1));// 解析输入的数据DataStreamTuple2Long, Integer parsedStream input.map(new MapFunctionString, Tuple2Long, Integer() {Overridepublic Tuple2Long, Integer map(String value) throws Exception {String[] parts value.split(,);return new Tuple2(Long.parseLong(parts[0]), Integer.parseInt(parts[1]));}});// 分配时间戳和水位线DataStreamTuple2Long, Integer withTimestampsAndWatermarks parsedStream.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2Long, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp) - element.f0));// 使用窗口函数统计每10秒内的最大值DataStreamString maxValues withTimestampsAndWatermarks.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowFunctionTuple2Long, Integer, String, TimeWindow() {Overridepublic void apply(TimeWindow window, IterableTuple2Long, Integer values, CollectorString out) throws Exception {int maxValue Integer.MIN_VALUE;for (Tuple2Long, Integer value : values) {maxValue Math.max(maxValue, value.f1);}out.collect(Window: window Max Value: maxValue);}});// 打印结果maxValues.print();// 执行程序env.execute(Simple Flink Watermark Example);}
}