当前位置: 首页 > news >正文

视频网站会员系统怎么做桂林漓江旅游攻略

视频网站会员系统怎么做,桂林漓江旅游攻略,电商培训班,天津做网站制作公司引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合#xff0c;它比ProcessFunction拥有更多特性#xff0c;例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解#xff0c;KeyedProc…引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合它比ProcessFunction拥有更多特性例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解KeyedProcessFunction的注解如下 /*** A keyed function that processes elements of a stream.** pFor every element in the input stream {link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {link Context}. For firing timers {link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** pbNOTE:/b Access to keyed state and timers (which are also scoped to a key) is only* available if the {code KeyedProcessFunction} is applied on a {code KeyedStream}.** pbNOTE:/b A {code KeyedProcessFunction} is always a {link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {link org.apache.flink.api.common.functions.RichFunction#close()}.*/上面简单来说就是以下四点 Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用通过这个方法的Context参数可以设置定时器在开启定时器后会程序会定时调用onTimer方法由于KeyedProcessFunction实现了RichFunction接口因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放需要注意的是只有在KeyedStream里才能够访问state和定时器通俗点来说就是这个函数要用在keyBy这个函数的后面 processElement方法解析 Flink会调用processElement方法处理输入流中的每一条数据KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据 onTimer方法解析在启用TimerService服务时会定时触发此方法一般会在processElement方法中开启TimerService服务 以上就是这个函数的基本知识接下来就通过实战来熟悉下它的使用 实战简介 本次实战的目标是学习KeyedProcessFunction内容如下 监听本机7777端口读取字符串将每个字符串用空格分隔转成Tuple2实例f0是分隔后的单词f1等于1将Tuple2实例集合通过f0字段分区得到KeyedStreamKeyedSteam通过自定义KeyedProcessFunction处理自定义KeyedProcessFunction的作用是记录每个单词最新一次出现的时间然后建一个十秒的定时器进行触发 使用代码例子 首先定义pojo类 public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return count;}public String getKey() {return key;}public void setKey(String key) {this.key key;}public long getCount() {return count;}public void setCount(long count) {this.count count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp lastQuestTimestamp;} }接着实现KeyedProcessFunction类 public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunctionTuple, Tuple2String, Integer, Tuple2String, Long {private ValueStateCountWithTimestampNew state;Overridepublic void open(Configuration parameters) throws Exception {state getRuntimeContext().getState(new ValueStateDescriptorCountWithTimestampNew(sherlock-state, CountWithTimestampNew.class));}// 实现数据处理逻辑的地方Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();if (countWithTimestampNew null) {countWithTimestampNew new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新这个单词最后一次出现的时间countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//单词之间不会互相覆盖吗推测state对象是跟key绑定针对每一个不同的key KeyedProcessFunction会创建其对应的state对象state.update(countWithTimestampNew);//给当前单词创建定时器十秒后触发long timer countWithTimestampNew.getLastQuestTimestamp()10000;//尝试注释掉看看是否还会触发onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息用于确保数据准确性System.out.println(String.format( 触发processElement方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();//标记当前元素是否已经连续10s未出现boolean isTimeout false;if (timestamp countWithTimestampNew.getLastQuestTimestamp()10000 ) {//out.collect(new Tuple2(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout true;}//打印所有信息用于确保数据准确性System.out.println(String.format( 触发onTimer方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));} }最后是启动类 public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口读取字符串DataStreamString socketDataStream env.socketTextStream(localhost, 7777);// 所有输入的单词如果超过10秒没有再次出现都可以通过CountWithTimeoutFunction得到DataStreamTuple2String, Long timeOutWord socketDataStream// 对收到的字符串用空格做分割得到多个单词.flatMap(new SplitterFlatMapFunction())// 设置时间戳分配器用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTuple2String, Integer() {Overridepublic long extractTimestamp(Tuple2String, Integer element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有输入的单词如果超过10秒没有再次出现就在此打印出来timeOutWord.print();env.execute(ProcessFunction demo : KeyedProcessFunction);} }演示 在启动服务前先通过linux指令监听端口 nc -lk 7777 启动Flink服务后往7777端口里面发送数据 通过IDEA的终端可以看到有日志输出可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志 那么咱们尝试连续发送两条Hello呢可以看到累加器会持续累加并且会触发两次onTimer方法也就是每一条消息都会触发一次。由于连续发送两条因此可以看得到第三行日志的末尾是false说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来 再输入点其他的试试 通过输出可以看到这些单词的计数器又从0开始说明每一个Key都对应一个状态 思考题 open方法会在哪里进行调用KeyedProcessFunction整个类的完整调用逻辑是怎么样的registerProcessingTimeTimer和registerEventTimeTimer的差异是什么 参考资料 https://blog.csdn.net/boling_cavalry/article/details/106299167https://blog.csdn.net/lujisen/article/details/105510532https://blog.csdn.net/qq_31866793/article/details/102831731
http://www.pierceye.com/news/992463/

相关文章:

  • 网站建设要做什么有关网站建设的书
  • 网站前台开发由什么做的seo网络贸易网站推广
  • 网站设计与开发范本wordpress 左图右文
  • 网站后台的搭建seo网站推广教程
  • 长春做网站的公司男生怎么找的小资源
  • 在线课程网站建设规范甘肃手机版建站系统信息
  • 自定义网站模板中国进出口企业名录
  • 晟合建设集团网站高中文凭能学做网站吗
  • 黑色网站模板建筑工程查询网
  • 建设网站jw100有哪些做鞋机设备的网站
  • 模仿网站建设大学生网站设计论文范文
  • 浙江杭州网站建设服务公司哪家好社区推广经验做法
  • 牟平网站制作公司天安云谷网站建设
  • 培训网站建设方案书沈阳定制网站方案
  • 廊坊公司快速建站电子商务网站建设前期规划方案
  • 西安网站建设盈科wordpress 评论模板
  • 网站制作的电话潍坊建立企业网站公司
  • 二级建造师证书查询官方网站21年没封直接可以进的
  • 计科专业毕设做网站傻瓜式做网站程序
  • 创办网站需要怎么做网站的建设方法包括
  • 直邮网站的推广活动怎么做电商美工是做什么的
  • 唐山建设局网站俄罗斯ip地址
  • 贵州省建设厅网站首页旅游seo
  • 郑州网站建设三猫网络新主题 老版本 wordpress
  • 网站 ftp网站首页布局有哪些
  • 3d模型代做网站微分销商城
  • 县区网站建设运行汇报宝塔 wordpress优化
  • 手机网站判断跳转代码怎么写pc网站怎么做自适应
  • 怎样在一个虚拟服务器里做两个网站西安市城乡建设管理局网站
  • 做网站实训总结查看网站建设的特点