当前位置: 首页 > news >正文

提供网站建设备案公司个性化网站开发

提供网站建设备案公司,个性化网站开发,怎么样子做网站,h5制作软件免费版背景 在处理窗口函数时#xff0c;ProcessWindowFunction处理函数可以定义三个状态#xff1a; 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState#xff0c;那么这几个状态之间有什么关系呢#xff1f; …背景 在处理窗口函数时ProcessWindowFunction处理函数可以定义三个状态 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState那么这几个状态之间有什么关系呢 ProcessWindowFunction处理函数三种状态之间的关系 1.getRuntimeContext.getState这个定义的状态是每个key维度的也就是可以跨时间窗口并维持状态的 2.context.windowState()这个定义的状态是和每个key以及窗口相关的也就是虽然key相同但是时间窗口不同他们的值也不一样. 3.context.globalState这个定义的状态是和每个key相关的也就是和getRuntimeContext.getState的定义一样可以跨窗口维护状态 验证代码如下所示 package wikiedits.func;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector; import wikiedits.func.model.KeyCount;import java.text.SimpleDateFormat;import java.util.Date;public class ProcessWindowFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 并行度为1env.setParallelism(1);// 设置数据源一共三个元素DataStreamTuple2String, Integer dataStream env.addSource(new SourceFunctionTuple2String, Integer() {Overridepublic void run(SourceContextTuple2String, Integer ctx) throws Exception {int xxxNum 0;int yyyNum 0;for (int i 1; i Integer.MAX_VALUE; i) {// 只有XXX和YYY两种nameString name (0 i % 2) ? XXX : YYY;//更新aaa和bbb元素的总数if (0 i % 2) {xxxNum;} else {yyyNum;}// 使用当前时间作为时间戳long timeStamp System.currentTimeMillis();// 将数据和时间戳打印出来用来验证数据System.out.println(String.format(source%s, %s, XXX total : %d, YYY total : %d\n,name,time(timeStamp),xxxNum,yyyNum));// 发射一个元素并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2String, Integer(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1000);}}Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分再用ProcessWindowFunctionSingleOutputStreamOperatorString mainDataStream dataStream// 以Tuple2的f0字段作为key本例中实际上key只有aaa和bbb两种.keyBy(value - value.f0)// 5秒一次的滚动窗口.timeWindow(Time.seconds(5))// 统计每个key当前窗口内的元素数量然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunctionTuple2String, Integer, String, String, TimeWindow() {// 自定义状态private ValueStateKeyCount state;Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态name是myStatestate getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class));}public void clear(Context context){ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));contextWindowValueState.clear();}Overridepublic void process(String s, Context context, IterableTuple2String, Integer iterable,CollectorString collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current state.value();// 如果myState还从未没有赋值过就在此初始化if (current null) {current new KeyCount();current.key s;current.count 0;}int count 0;// iterable可以访问该key当前窗口内的所有数据// 这里简单处理只统计了元素数量for (Tuple2String, Integer tuple2 : iterable) {count;}// 更新当前key的元素总数current.count count;// 更新状态到backendstate.update(current);System.out.println(getRuntimeContext() context : (getRuntimeContext() context));ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));ValueStateKeyCount contextGlobalValueState context.globalState().getState(new ValueStateDescriptor(myGlobalState, KeyCount.class));KeyCount windowValue contextWindowValueState.value();if (windowValue null) {windowValue new KeyCount();windowValue.key s;windowValue.count 0;}windowValue.count count;contextWindowValueState.update(windowValue);KeyCount globalValue contextGlobalValueState.value();if (globalValue null) {globalValue new KeyCount();globalValue.key s;globalValue.count 0;}globalValue.count count;contextGlobalValueState.update(globalValue);ValueStateKeyCount contextWindowSameNameState context.windowState().getState(new ValueStateDescriptor(myState, KeyCount.class));ValueStateKeyCount contextGlobalSameNameState context.globalState().getState(new ValueStateDescriptor(myState, KeyCount.class));System.out.println(contextWindowSameNameState contextGlobalSameNameState : (contextWindowSameNameState contextGlobalSameNameState));System.out.println(state contextGlobalSameNameState : (state contextGlobalSameNameState));// 将当前key及其窗口的元素数量还有窗口的起止时间整理成字符串String value String.format(window, %s, %s - %s, %d, total : %d, windowStateCount :%s, globalStateCount :%s\n,// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key出现的总数current.count,contextWindowValueState.value(),contextGlobalValueState.value());// 发射到下游算子collector.collect(value);}});// 打印结果通过分析打印信息检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute(processfunction demo : processwindowfunction);}public static String time(long timeStamp) {return new SimpleDateFormat(hh:mm:ss).format(new Date(timeStamp));}} 输出结果 window, XXX, 08:34:45 - 08:34:50, 3, total : 22, windowStateCount :KeyCount{keyXXX, count3}, globalStateCount :KeyCount{keyXXX, count22} window, YYY, 08:34:45 - 08:34:50, 2, total : 22, windowStateCount :KeyCount{keyYYY, count2}, globalStateCount :KeyCount{keyYYY, count22}从结果可以验证以上的结论此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉因为一旦时间窗口结束就再也没有机会清理了 从这个例子中还发现一个比较有趣的现象 ValueStateKeyCount state getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class)); ValueStateKeyCount contextWindowSameNameState context.windowState().getState(new ValueStateDescriptor(myState, KeyCount.class)); ValueStateKeyCount contextGlobalSameNameState context.globalState().getState(new ValueStateDescriptor(myState, KeyCount.class));在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到并且他们指向的都是同一个变量可以参见代码的输出 System.out.println(contextWindowSameNameState contextGlobalSameNameState : (contextWindowSameNameState contextGlobalSameNameState)); System.out.println(state contextGlobalSameNameState : (state contextGlobalSameNameState));结果如下 contextWindowSameNameState contextGlobalSameNameState :true state contextGlobalSameNameState :true参考文献 https://cloud.tencent.com/developer/article/1815079
http://www.pierceye.com/news/851191/

相关文章:

  • 企业网站制作托管东营高端网站建设
  • 可以推广网站建立网站接受投注是什么意思
  • 微网站制作网站开发创建自己网站的步骤
  • 人才网网站开发手册外链发布平台大全
  • 福州网站备案wordpress打开媒体链接设置
  • 大学网站建设考核办法永春网站设计
  • 哪个设计网站赚钱百度地图网页版进入
  • 网站备案号不存在100m的网站 数据库
  • 网站空间管理平台网站模版 优帮云
  • 网站开发的比较备案期间 需要关闭网站吗
  • 做网站 怎么推广上海市企业服务云十问十答
  • 怎么做一种网站为别人宣传wordpress query_posts()
  • 网站的运营和维护专业做网站官网
  • 详细论述制作网站的步骤做网站需求 后期方便优化
  • 蒙icp备 网站建设学校网站建设管理
  • 做免费外贸网站册域名网站大全免黄
  • 祈网网站建设制作网站如何赚钱
  • 最讨厌网站门户类网站的主页设计
  • 国家建设环保局网站网站做的好赚钱吗
  • 如何设置网站服务器做标签的网站
  • 网站建设高端培训学校做网站交易平台
  • 公司网站建设收费优化网站排名解析推广
  • 昆明快速建站模板汽车网站建设多少钱
  • 网站注销主体注销广州联享网站建设公司怎么样
  • 中山seo建站新手建站教程报价单
  • 台州制作网站软件陈坤做直播在哪个网站
  • 北湖区网站建设公司企业主题wordpress 含演示数据
  • 网站建设简历自我评价做招聘信息的网站有哪些内容
  • 怎么和其它网站做友情链接网络营销师证怎么考
  • 百度推广要自己做网站吗做的视频传到哪个网站好