提供网站建设备案公司,个性化网站开发,怎么样子做网站,h5制作软件免费版背景
在处理窗口函数时#xff0c;ProcessWindowFunction处理函数可以定义三个状态#xff1a; 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState#xff0c;那么这几个状态之间有什么关系呢#xff1f; …背景
在处理窗口函数时ProcessWindowFunction处理函数可以定义三个状态 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState那么这几个状态之间有什么关系呢
ProcessWindowFunction处理函数三种状态之间的关系
1.getRuntimeContext.getState这个定义的状态是每个key维度的也就是可以跨时间窗口并维持状态的 2.context.windowState()这个定义的状态是和每个key以及窗口相关的也就是虽然key相同但是时间窗口不同他们的值也不一样. 3.context.globalState这个定义的状态是和每个key相关的也就是和getRuntimeContext.getState的定义一样可以跨窗口维护状态 验证代码如下所示
package wikiedits.func;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector;
import wikiedits.func.model.KeyCount;import java.text.SimpleDateFormat;import java.util.Date;public class ProcessWindowFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 并行度为1env.setParallelism(1);// 设置数据源一共三个元素DataStreamTuple2String, Integer dataStream env.addSource(new SourceFunctionTuple2String, Integer() {Overridepublic void run(SourceContextTuple2String, Integer ctx) throws Exception {int xxxNum 0;int yyyNum 0;for (int i 1; i Integer.MAX_VALUE; i) {// 只有XXX和YYY两种nameString name (0 i % 2) ? XXX : YYY;//更新aaa和bbb元素的总数if (0 i % 2) {xxxNum;} else {yyyNum;}// 使用当前时间作为时间戳long timeStamp System.currentTimeMillis();// 将数据和时间戳打印出来用来验证数据System.out.println(String.format(source%s, %s, XXX total : %d, YYY total : %d\n,name,time(timeStamp),xxxNum,yyyNum));// 发射一个元素并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2String, Integer(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1000);}}Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分再用ProcessWindowFunctionSingleOutputStreamOperatorString mainDataStream dataStream// 以Tuple2的f0字段作为key本例中实际上key只有aaa和bbb两种.keyBy(value - value.f0)// 5秒一次的滚动窗口.timeWindow(Time.seconds(5))// 统计每个key当前窗口内的元素数量然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunctionTuple2String, Integer, String, String, TimeWindow() {// 自定义状态private ValueStateKeyCount state;Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态name是myStatestate getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class));}public void clear(Context context){ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));contextWindowValueState.clear();}Overridepublic void process(String s, Context context, IterableTuple2String, Integer iterable,CollectorString collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current state.value();// 如果myState还从未没有赋值过就在此初始化if (current null) {current new KeyCount();current.key s;current.count 0;}int count 0;// iterable可以访问该key当前窗口内的所有数据// 这里简单处理只统计了元素数量for (Tuple2String, Integer tuple2 : iterable) {count;}// 更新当前key的元素总数current.count count;// 更新状态到backendstate.update(current);System.out.println(getRuntimeContext() context : (getRuntimeContext() context));ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));ValueStateKeyCount contextGlobalValueState context.globalState().getState(new ValueStateDescriptor(myGlobalState, KeyCount.class));KeyCount windowValue contextWindowValueState.value();if (windowValue null) {windowValue new KeyCount();windowValue.key s;windowValue.count 0;}windowValue.count count;contextWindowValueState.update(windowValue);KeyCount globalValue contextGlobalValueState.value();if (globalValue null) {globalValue new KeyCount();globalValue.key s;globalValue.count 0;}globalValue.count count;contextGlobalValueState.update(globalValue);ValueStateKeyCount contextWindowSameNameState context.windowState().getState(new ValueStateDescriptor(myState, KeyCount.class));ValueStateKeyCount contextGlobalSameNameState context.globalState().getState(new ValueStateDescriptor(myState, KeyCount.class));System.out.println(contextWindowSameNameState contextGlobalSameNameState : (contextWindowSameNameState contextGlobalSameNameState));System.out.println(state contextGlobalSameNameState : (state contextGlobalSameNameState));// 将当前key及其窗口的元素数量还有窗口的起止时间整理成字符串String value String.format(window, %s, %s - %s, %d, total : %d, windowStateCount :%s, globalStateCount :%s\n,// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key出现的总数current.count,contextWindowValueState.value(),contextGlobalValueState.value());// 发射到下游算子collector.collect(value);}});// 打印结果通过分析打印信息检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute(processfunction demo : processwindowfunction);}public static String time(long timeStamp) {return new SimpleDateFormat(hh:mm:ss).format(new Date(timeStamp));}}
输出结果
window, XXX, 08:34:45 - 08:34:50, 3, total : 22, windowStateCount :KeyCount{keyXXX, count3}, globalStateCount :KeyCount{keyXXX, count22}
window, YYY, 08:34:45 - 08:34:50, 2, total : 22, windowStateCount :KeyCount{keyYYY, count2}, globalStateCount :KeyCount{keyYYY, count22}从结果可以验证以上的结论此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉因为一旦时间窗口结束就再也没有机会清理了 从这个例子中还发现一个比较有趣的现象
ValueStateKeyCount state getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class));
ValueStateKeyCount contextWindowSameNameState context.windowState().getState(new ValueStateDescriptor(myState, KeyCount.class));
ValueStateKeyCount contextGlobalSameNameState context.globalState().getState(new ValueStateDescriptor(myState, KeyCount.class));在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到并且他们指向的都是同一个变量可以参见代码的输出
System.out.println(contextWindowSameNameState contextGlobalSameNameState : (contextWindowSameNameState contextGlobalSameNameState));
System.out.println(state contextGlobalSameNameState : (state contextGlobalSameNameState));结果如下
contextWindowSameNameState contextGlobalSameNameState :true
state contextGlobalSameNameState :true参考文献 https://cloud.tencent.com/developer/article/1815079