当前位置: 首页 > news >正文

html做音乐网站树形结构网站案例

html做音乐网站,树形结构网站案例,网站源码 一品资源网,为什么谷歌浏览器打不开网页引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合#xff0c;它比ProcessFunction拥有更多特性#xff0c;例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解#xff0c;KeyedProc…引言 KeyedProcessFunction是Flink用于处理KeyedStream的数据集合它比ProcessFunction拥有更多特性例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧 正文 了解一个函数怎么用最权威的地方就是 官方文档 以及注解KeyedProcessFunction的注解如下 /*** A keyed function that processes elements of a stream.** pFor every element in the input stream {link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {link Context}. For firing timers {link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** pbNOTE:/b Access to keyed state and timers (which are also scoped to a key) is only* available if the {code KeyedProcessFunction} is applied on a {code KeyedStream}.** pbNOTE:/b A {code KeyedProcessFunction} is always a {link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {link org.apache.flink.api.common.functions.RichFunction#close()}.*/上面简单来说就是以下四点 Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用通过这个方法的Context参数可以设置定时器在开启定时器后会程序会定时调用onTimer方法由于KeyedProcessFunction实现了RichFunction接口因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放需要注意的是只有在KeyedStream里才能够访问state和定时器通俗点来说就是这个函数要用在keyBy这个函数的后面 processElement方法解析 Flink会调用processElement方法处理输入流中的每一条数据KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据 onTimer方法解析在启用TimerService服务时会定时触发此方法一般会在processElement方法中开启TimerService服务 以上就是这个函数的基本知识接下来就通过实战来熟悉下它的使用 实战简介 本次实战的目标是学习KeyedProcessFunction内容如下 监听本机7777端口读取字符串将每个字符串用空格分隔转成Tuple2实例f0是分隔后的单词f1等于1将Tuple2实例集合通过f0字段分区得到KeyedStreamKeyedSteam通过自定义KeyedProcessFunction处理自定义KeyedProcessFunction的作用是记录每个单词最新一次出现的时间然后建一个十秒的定时器进行触发 使用代码例子 首先定义pojo类 public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return count;}public String getKey() {return key;}public void setKey(String key) {this.key key;}public long getCount() {return count;}public void setCount(long count) {this.count count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp lastQuestTimestamp;} }接着实现KeyedProcessFunction类 public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunctionTuple, Tuple2String, Integer, Tuple2String, Long {private ValueStateCountWithTimestampNew state;Overridepublic void open(Configuration parameters) throws Exception {state getRuntimeContext().getState(new ValueStateDescriptorCountWithTimestampNew(sherlock-state, CountWithTimestampNew.class));}// 实现数据处理逻辑的地方Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();if (countWithTimestampNew null) {countWithTimestampNew new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新这个单词最后一次出现的时间countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//单词之间不会互相覆盖吗推测state对象是跟key绑定针对每一个不同的key KeyedProcessFunction会创建其对应的state对象state.update(countWithTimestampNew);//给当前单词创建定时器十秒后触发long timer countWithTimestampNew.getLastQuestTimestamp()10000;//尝试注释掉看看是否还会触发onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息用于确保数据准确性System.out.println(String.format( 触发processElement方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();//标记当前元素是否已经连续10s未出现boolean isTimeout false;if (timestamp countWithTimestampNew.getLastQuestTimestamp()10000 ) {//out.collect(new Tuple2(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout true;}//打印所有信息用于确保数据准确性System.out.println(String.format( 触发onTimer方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));} }最后是启动类 public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口读取字符串DataStreamString socketDataStream env.socketTextStream(localhost, 7777);// 所有输入的单词如果超过10秒没有再次出现都可以通过CountWithTimeoutFunction得到DataStreamTuple2String, Long timeOutWord socketDataStream// 对收到的字符串用空格做分割得到多个单词.flatMap(new SplitterFlatMapFunction())// 设置时间戳分配器用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTuple2String, Integer() {Overridepublic long extractTimestamp(Tuple2String, Integer element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有输入的单词如果超过10秒没有再次出现就在此打印出来timeOutWord.print();env.execute(ProcessFunction demo : KeyedProcessFunction);} }演示 在启动服务前先通过linux指令监听端口 nc -lk 7777 启动Flink服务后往7777端口里面发送数据 通过IDEA的终端可以看到有日志输出可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志 那么咱们尝试连续发送两条Hello呢可以看到累加器会持续累加并且会触发两次onTimer方法也就是每一条消息都会触发一次。由于连续发送两条因此可以看得到第三行日志的末尾是false说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来 再输入点其他的试试 通过输出可以看到这些单词的计数器又从0开始说明每一个Key都对应一个状态 思考题 open方法会在哪里进行调用KeyedProcessFunction整个类的完整调用逻辑是怎么样的registerProcessingTimeTimer和registerEventTimeTimer的差异是什么 参考资料 https://blog.csdn.net/boling_cavalry/article/details/106299167https://blog.csdn.net/lujisen/article/details/105510532https://blog.csdn.net/qq_31866793/article/details/102831731
http://www.pierceye.com/news/127170/

相关文章:

  • 做一门户网站价格个人网站制作模板图片
  • 做网站需要审核资质吗wordpress 防恶意注册
  • 怎么不花钱建网站无人售货机
  • 可以做空股票的网站thinkphp网站开发
  • 给别人做网站怎么赚钱吗专业网络推广软件
  • SOHO英文网站制作晋江网站制作
  • 启东住房和城乡建设局网站邢台网站制作报价多少钱
  • 佛山网站建设seo优化做英文的小说网站有哪些
  • 安顺建设局网站官网哪里有响应式网站企业
  • 唯品会一家做特卖的网站国家商标查询官方网站
  • 网站宝搭建网站环境做电商网站一般需要什么流程图
  • 南通网站建设团队wordpress广告产检
  • 做网站刷赞qq怎么赚钱邢台路桥建设总公司没有网站吗
  • 网站仿站教程常用外贸网站
  • 南昌市有帮做网站的吗纵横天下网站开发
  • pc网站直接转换成移动端的网站黑果云免费虚拟主机
  • 网站建设用什么科目wordpress当前分类链接地址
  • 做一万个网站网站做下载功能
  • 佛山建站模板制作wordpress加上live2d
  • 樟木头网站仿做深圳网站开发公司
  • 孙俪做的网站广告微信如何修改wordpress
  • 有什么手机做网站的免费ppt模板下载花
  • 网站建设团队技术介绍县级网站
  • 深圳营销型网站建设价格网站建设文化如何
  • 提交网站的入口地址网站建设灬金手指下拉十五
  • 连云港建设局网站学校网站建设管理相关规定
  • 什么网站做玩具的外贸网站监控系统
  • 从事网站美工建设厦门网站制作企业
  • 网站后台传图片南昌做网站要多少钱
  • 网站包括什么国内最大的域名交易平台