南昌网站开发公司哪家公司好,网站建设接单,电子工程网,wordpress 有点尴尬诶1、概述
在生产场景值#xff0c;经常需要和上游、下游对数#xff0c;离线场景可以直接 group by 再 count #xff0c;但是实时场景中#xff0c;如果使用 kafka 作为中间件#xff0c;中间经过几个 job 的过滤转化后#xff0c;再对照像 Doris 或 Clickhouse 中最终层…1、概述
在生产场景值经常需要和上游、下游对数离线场景可以直接 group by 再 count 但是实时场景中如果使用 kafka 作为中间件中间经过几个 job 的过滤转化后再对照像 Doris 或 Clickhouse 中最终层的数据如果出现缺失很难判断是哪一层缺失的。
2、使用 侧流输出处理时间的滚动窗口状态进行数据量级统计
package com.flink.feature.windowcount;import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** 1、先输入数据* 1* 1* 1* 1* 输出结果* main:8 业务处理1* main:1 业务处理1* main:2 业务处理1* main:3 业务处理1* 每10秒每个key接受到的数据量:2 (1698913020000,1698913030000,窗口统计1,4)** 2、再输入数据* 1* 2* 2* 3* 3* 4* 4* 输出结果* main:4 业务处理1* main:5 业务处理2* main:6 业务处理2* main:7 业务处理3* main:8 业务处理3* main:1 业务处理4* main:2 业务处理4* 每10秒每个key接受到的数据量:2 (1698913030000,1698913040000,窗口统计1,1)* 每10秒每个key接受到的数据量:7 (1698913030000,1698913040000,窗口统计4,2)* 每10秒每个key接受到的数据量:6 (1698913030000,1698913040000,窗口统计2,2)* 每10秒每个key接受到的数据量:6 (1698913030000,1698913040000,窗口统计3,2)*/public class UseWindowValidateData {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();OutputTagTuple2String,Integer windowCountTag new OutputTagTuple2String,Integer(window_count){};DataStreamSourceString source env.socketTextStream(localhost, 8888);SingleOutputStreamOperatorString process source.process(new ProcessFunctionString, String() {Overridepublic void processElement(String input, ProcessFunctionString, String.Context ctx, CollectorString collector) throws Exception {ctx.output(windowCountTag,new Tuple2(窗口统计input,1));collector.collect(业务处理 input);}});process.getSideOutput(windowCountTag).keyBy(new KeySelectorTuple2String,Integer, String() {Overridepublic String getKey(Tuple2String,Integer tp) throws Exception {return tp.f0;}}).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionTuple2String, Integer, Tuple4String,String,String,Integer, String, TimeWindow() {private MapStateString, Integer mapState;Overridepublic void open(Configuration parameters) throws Exception {MapStateDescriptorString, Integer stateDescriptor new MapStateDescriptor(map-state, String.class, Integer.class);mapState getRuntimeContext().getMapState(stateDescriptor);}Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Integer, Tuple4String, String, String, Integer, String, TimeWindow.Context ctx, IterableTuple2String, Integer elements, CollectorTuple4String, String, String, Integer out) throws Exception {for (Tuple2String, Integer tp : elements) {Integer res mapState.get(tp.f0);if (res null) {res 0;}res 1;mapState.put(tp.f0, res);}out.collect(new Tuple4(String.valueOf(ctx.window().getStart()),String.valueOf(ctx.window().getEnd()),key,mapState.get(key)));// 每个窗口计算后清空状态mapState.clear();}}).print(每10秒每个key接受到的数据量);process.print(main);env.execute();}
}
3、测试结果
1先输入数据
1 1 1 1
输出结果
main:8 业务处理1 main:1 业务处理1 main:2 业务处理1 main:3 业务处理1
每10秒每个key接受到的数据量:2 (1698913020000,1698913030000,窗口统计1,4)
2再输入数据
1 2 2 3 3 4 4
输出结果
main:4 业务处理1 main:5 业务处理2 main:6 业务处理2 main:7 业务处理3 main:8 业务处理3 main:1 业务处理4 main:2 业务处理4
每10秒每个key接受到的数据量:2 (1698913030000,1698913040000,窗口统计1,1) 每10秒每个key接受到的数据量:7 (1698913030000,1698913040000,窗口统计4,2) 每10秒每个key接受到的数据量:6 (1698913030000,1698913040000,窗口统计2,2) 每10秒每个key接受到的数据量:6 (1698913030000,1698913040000,窗口统计3,2)