当前位置: 首页 > news >正文

商城网站营销系统源码长春网站制作建设

商城网站营销系统源码,长春网站制作建设,郑州网课老师,网站如何在手机上显示引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合#xff0c;它比ProcessFunction拥有更多特性#xff0c;例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解#xff0c;KeyedProc…引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合它比ProcessFunction拥有更多特性例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解KeyedProcessFunction的注解如下 /*** A keyed function that processes elements of a stream.** pFor every element in the input stream {link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {link Context}. For firing timers {link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** pbNOTE:/b Access to keyed state and timers (which are also scoped to a key) is only* available if the {code KeyedProcessFunction} is applied on a {code KeyedStream}.** pbNOTE:/b A {code KeyedProcessFunction} is always a {link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {link org.apache.flink.api.common.functions.RichFunction#close()}.*/上面简单来说就是以下四点 Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用通过这个方法的Context参数可以设置定时器在开启定时器后会程序会定时调用onTimer方法由于KeyedProcessFunction实现了RichFunction接口因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放需要注意的是只有在KeyedStream里才能够访问state和定时器通俗点来说就是这个函数要用在keyBy这个函数的后面 processElement方法解析 Flink会调用processElement方法处理输入流中的每一条数据KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据 onTimer方法解析在启用TimerService服务时会定时触发此方法一般会在processElement方法中开启TimerService服务 以上就是这个函数的基本知识接下来就通过实战来熟悉下它的使用 实战简介 本次实战的目标是学习KeyedProcessFunction内容如下 监听本机7777端口读取字符串将每个字符串用空格分隔转成Tuple2实例f0是分隔后的单词f1等于1将Tuple2实例集合通过f0字段分区得到KeyedStreamKeyedSteam通过自定义KeyedProcessFunction处理自定义KeyedProcessFunction的作用是记录每个单词最新一次出现的时间然后建一个十秒的定时器进行触发 使用代码例子 首先定义pojo类 public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return count;}public String getKey() {return key;}public void setKey(String key) {this.key key;}public long getCount() {return count;}public void setCount(long count) {this.count count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp lastQuestTimestamp;} }接着实现KeyedProcessFunction类 public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunctionTuple, Tuple2String, Integer, Tuple2String, Long {private ValueStateCountWithTimestampNew state;Overridepublic void open(Configuration parameters) throws Exception {state getRuntimeContext().getState(new ValueStateDescriptorCountWithTimestampNew(sherlock-state, CountWithTimestampNew.class));}// 实现数据处理逻辑的地方Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();if (countWithTimestampNew null) {countWithTimestampNew new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新这个单词最后一次出现的时间countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//单词之间不会互相覆盖吗推测state对象是跟key绑定针对每一个不同的key KeyedProcessFunction会创建其对应的state对象state.update(countWithTimestampNew);//给当前单词创建定时器十秒后触发long timer countWithTimestampNew.getLastQuestTimestamp()10000;//尝试注释掉看看是否还会触发onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息用于确保数据准确性System.out.println(String.format( 触发processElement方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();//标记当前元素是否已经连续10s未出现boolean isTimeout false;if (timestamp countWithTimestampNew.getLastQuestTimestamp()10000 ) {//out.collect(new Tuple2(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout true;}//打印所有信息用于确保数据准确性System.out.println(String.format( 触发onTimer方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));} }最后是启动类 public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口读取字符串DataStreamString socketDataStream env.socketTextStream(localhost, 7777);// 所有输入的单词如果超过10秒没有再次出现都可以通过CountWithTimeoutFunction得到DataStreamTuple2String, Long timeOutWord socketDataStream// 对收到的字符串用空格做分割得到多个单词.flatMap(new SplitterFlatMapFunction())// 设置时间戳分配器用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTuple2String, Integer() {Overridepublic long extractTimestamp(Tuple2String, Integer element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有输入的单词如果超过10秒没有再次出现就在此打印出来timeOutWord.print();env.execute(ProcessFunction demo : KeyedProcessFunction);} }演示 在启动服务前先通过linux指令监听端口 nc -lk 7777 启动Flink服务后往7777端口里面发送数据 通过IDEA的终端可以看到有日志输出可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志 那么咱们尝试连续发送两条Hello呢可以看到累加器会持续累加并且会触发两次onTimer方法也就是每一条消息都会触发一次。由于连续发送两条因此可以看得到第三行日志的末尾是false说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来 再输入点其他的试试 通过输出可以看到这些单词的计数器又从0开始说明每一个Key都对应一个状态 思考题 open方法会在哪里进行调用KeyedProcessFunction整个类的完整调用逻辑是怎么样的registerProcessingTimeTimer和registerEventTimeTimer的差异是什么 参考资料 https://blog.csdn.net/boling_cavalry/article/details/106299167https://blog.csdn.net/lujisen/article/details/105510532https://blog.csdn.net/qq_31866793/article/details/102831731
http://www.pierceye.com/news/592874/

相关文章:

  • 找工作的平台seo第三方点击软件
  • 青岛市建设监理协会网站网站开发工作量评估
  • 深圳网站设计与制作公司德州万企互联网站制作
  • 制作一个链接网站400电话网站模板
  • 网站建设网站网站建设网站濮阳网站建设网站
  • 追天网站建设 优帮云网页设计基础入门
  • 北京网站的建立的wordpress mofile
  • 在网上做翻译的网站私募网站建设
  • 网站建设明薇通网络服务好企业官网网站建设免费
  • php开发企业网站教程企业网站怎么建设公司
  • 网站开发 保密期限不用实名认证的网页游戏
  • 网站制作公司的流程网络运营公司经营范围
  • 杭州公司网站制作维护运城建设银行网站
  • 做一个个人主页的网站怎么做网站整合建设方案
  • 做亚马逊运营要看哪些网站专业排名
  • 网站做跳转影响排名吗店面设计多少钱一个平方
  • 中国建设注册中心网站启用中文域名大网站
  • 贵阳市建设城乡规划局网站一个新产品策划方案
  • 做设计什么网站平台好点做私活河南省建设厅53号文
  • 酷网站欣赏mit网站可以做app
  • 网站建设公司哪家好 都来磐石网络推广软件平台排行榜
  • 个人博客网站开发毕业设计东莞建设网站官网住房和城乡
  • 怎么下载网站的视频室内设计专业网站
  • 和幼儿做网站爱wordpress主题简
  • wordpress 大型网站吗怎么做网站超市
  • 太原建设厅官方网站做电商需要准备多少钱
  • 公司网站建设安全的风险wordpress t1主题
  • 手机下载视频网站模板下载失败杭州做网站五
  • 招聘wordpress网站高手兼职wordpress会员付费插件
  • 网站建设综合报告威海网站建设兼职