商城网站营销系统源码,长春网站制作建设,郑州网课老师,网站如何在手机上显示引言
KeyedProcessFunction是Flink用于处理KeyedStream的数据集合#xff0c;它比ProcessFunction拥有更多特性#xff0c;例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧
正文
了解一个函数怎么用最权威的地方就是 官方文档 以及注解#xff0c;KeyedProc…引言
KeyedProcessFunction是Flink用于处理KeyedStream的数据集合它比ProcessFunction拥有更多特性例如状态处理和定时器功能等。接下来就一起来了解下这个函数吧
正文
了解一个函数怎么用最权威的地方就是 官方文档 以及注解KeyedProcessFunction的注解如下
/*** A keyed function that processes elements of a stream.** pFor every element in the input stream {link #processElement(Object, Context, Collector)} is* invoked. This can produce zero or more elements as output. Implementations can also query the* time and set timers through the provided {link Context}. For firing timers {link #onTimer(long,* OnTimerContext, Collector)} will be invoked. This can again produce zero or more elements as* output and register further timers.** pbNOTE:/b Access to keyed state and timers (which are also scoped to a key) is only* available if the {code KeyedProcessFunction} is applied on a {code KeyedStream}.** pbNOTE:/b A {code KeyedProcessFunction} is always a {link* org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the {link* org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and teardown* methods can be implemented. See {link* org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}* and {link org.apache.flink.api.common.functions.RichFunction#close()}.*/上面简单来说就是以下四点
Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用通过这个方法的Context参数可以设置定时器在开启定时器后会程序会定时调用onTimer方法由于KeyedProcessFunction实现了RichFunction接口因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放需要注意的是只有在KeyedStream里才能够访问state和定时器通俗点来说就是这个函数要用在keyBy这个函数的后面
processElement方法解析
Flink会调用processElement方法处理输入流中的每一条数据KeyedProcessFunction.Context参数可以用来读取以及更新内部状态state这个KeyedProcessFunction跟其他function一样通过参数中的Collector对象以回写的方式返回数据
onTimer方法解析在启用TimerService服务时会定时触发此方法一般会在processElement方法中开启TimerService服务
以上就是这个函数的基本知识接下来就通过实战来熟悉下它的使用
实战简介
本次实战的目标是学习KeyedProcessFunction内容如下
监听本机7777端口读取字符串将每个字符串用空格分隔转成Tuple2实例f0是分隔后的单词f1等于1将Tuple2实例集合通过f0字段分区得到KeyedStreamKeyedSteam通过自定义KeyedProcessFunction处理自定义KeyedProcessFunction的作用是记录每个单词最新一次出现的时间然后建一个十秒的定时器进行触发
使用代码例子
首先定义pojo类
public class CountWithTimestampNew {private String key;private long count;private long lastQuestTimestamp;public long getAndIncrementCount() {return count;}public String getKey() {return key;}public void setKey(String key) {this.key key;}public long getCount() {return count;}public void setCount(long count) {this.count count;}public long getLastQuestTimestamp() {return lastQuestTimestamp;}public void setLastQuestTimestamp(long lastQuestTimestamp) {this.lastQuestTimestamp lastQuestTimestamp;}
}接着实现KeyedProcessFunction类
public class CountWithTimeoutKeyProcessFunctionNew extends KeyedProcessFunctionTuple, Tuple2String, Integer, Tuple2String, Long {private ValueStateCountWithTimestampNew state;Overridepublic void open(Configuration parameters) throws Exception {state getRuntimeContext().getState(new ValueStateDescriptorCountWithTimestampNew(sherlock-state, CountWithTimestampNew.class));}// 实现数据处理逻辑的地方Overridepublic void processElement(Tuple2String, Integer value, Context ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();if (countWithTimestampNew null) {countWithTimestampNew new CountWithTimestampNew();countWithTimestampNew.setKey(value.f0);}countWithTimestampNew.getAndIncrementCount();//更新这个单词最后一次出现的时间countWithTimestampNew.setLastQuestTimestamp(ctx.timestamp());//单词之间不会互相覆盖吗推测state对象是跟key绑定针对每一个不同的key KeyedProcessFunction会创建其对应的state对象state.update(countWithTimestampNew);//给当前单词创建定时器十秒后触发long timer countWithTimestampNew.getLastQuestTimestamp()10000;//尝试注释掉看看是否还会触发onTimer方法ctx.timerService().registerProcessingTimeTimer(timer);//打印所有信息用于确保数据准确性System.out.println(String.format( 触发processElement方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timer)));}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorTuple2String, Long out) throws Exception {Tuple currentKey ctx.getCurrentKey();CountWithTimestampNew countWithTimestampNew state.value();//标记当前元素是否已经连续10s未出现boolean isTimeout false;if (timestamp countWithTimestampNew.getLastQuestTimestamp()10000 ) {//out.collect(new Tuple2(countWithTimestampNew.getKey(), countWithTimestampNew.getCount()));isTimeout true;}//打印所有信息用于确保数据准确性System.out.println(String.format( 触发onTimer方法当前的key是 %s, 这个单词累加次数是 %d, 上次请求的时间是%s, timer的时间是: %s, 当前单词是否已超过10秒没有再请求: %s,currentKey.getField(0),countWithTimestampNew.getCount(),time(countWithTimestampNew.getLastQuestTimestamp()),time(timestamp),String.valueOf(isTimeout)));}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));}
}最后是启动类
public class KeyedProcessFunctionDemo2 {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 并行度1env.setParallelism(1);// 处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 监听本地9999端口读取字符串DataStreamString socketDataStream env.socketTextStream(localhost, 7777);// 所有输入的单词如果超过10秒没有再次出现都可以通过CountWithTimeoutFunction得到DataStreamTuple2String, Long timeOutWord socketDataStream// 对收到的字符串用空格做分割得到多个单词.flatMap(new SplitterFlatMapFunction())// 设置时间戳分配器用当前时间作为时间戳.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTuple2String, Integer() {Overridepublic long extractTimestamp(Tuple2String, Integer element, long previousElementTimestamp) {// 使用当前系统时间作为时间戳return System.currentTimeMillis();}Overridepublic Watermark getCurrentWatermark() {// 本例不需要watermark返回nullreturn null;}})// 将单词作为key分区.keyBy(0)// 按单词分区后的数据交给自定义KeyedProcessFunction处理.process(new CountWithTimeoutKeyProcessFunctionNew());// 所有输入的单词如果超过10秒没有再次出现就在此打印出来timeOutWord.print();env.execute(ProcessFunction demo : KeyedProcessFunction);}
}演示
在启动服务前先通过linux指令监听端口 nc -lk 7777 启动Flink服务后往7777端口里面发送数据 通过IDEA的终端可以看到有日志输出可以看到在发送消息的时候第一条日志立马打印出来并在10秒后输出第二条日志 那么咱们尝试连续发送两条Hello呢可以看到累加器会持续累加并且会触发两次onTimer方法也就是每一条消息都会触发一次。由于连续发送两条因此可以看得到第三行日志的末尾是false说明收到第一条后的10秒内又有相同的消息进来。第二条是ture说明在收到第二条消息后的10秒内没有消息进来 再输入点其他的试试 通过输出可以看到这些单词的计数器又从0开始说明每一个Key都对应一个状态
思考题
open方法会在哪里进行调用KeyedProcessFunction整个类的完整调用逻辑是怎么样的registerProcessingTimeTimer和registerEventTimeTimer的差异是什么
参考资料
https://blog.csdn.net/boling_cavalry/article/details/106299167https://blog.csdn.net/lujisen/article/details/105510532https://blog.csdn.net/qq_31866793/article/details/102831731