沙井网站开发,曲靖网站制作,平凉市市建设局网站,品牌网站建设市场分析1. ProcessFunction
ProcessFunction 是一种底层的流处理操作#xff0c;基于它用户可以访问#xff08;无环#xff09;流应用程序的所有基本构建块
事件#xff08;流元素#xff09;状态#xff08;容错#xff0c;一致性#xff0c;仅在 keyed stream 上#xf…1. ProcessFunction
ProcessFunction 是一种底层的流处理操作基于它用户可以访问无环流应用程序的所有基本构建块
事件流元素状态容错一致性仅在 keyed stream 上定时器事件时间和处理时间仅在 keyed stream 上
可以将 ProcessFunction 视为一种可以访问 keyed state 和定时器的 FlatMapFunction。Flink 为收到的输入流中的每个事件都调用该函数来进行处理。对于容错与其它有状态的函数类似ProcessFunction 可以通过 RuntimeContext 访问 Flink 的keyed state。定时器允许应用程序对处理时间和 事件时间中的更改做出反应。 每次调用 processElement(...) 时参数中都会提供一个 Context 对象该对象可以访问元素的事件时间戳和 TimerService。
TimerService 可用于为将来特定的事件时间/处理时间注册回调。 特定事件时间的 onTimer(...) 回调函数会在当前对齐的 watermark 超过所注册的时间戳时调用。 特定处理时间的 onTimer(...) 回调函数则会在系统物理时间超过所注册的时间戳时调用。 在该调用期间所有状态会被再次绑定到创建定时器时的键上从而允许定时器操作与之对应的 keyed state。如果想要访问 keyed state 和定时器需要在 keyed stream 上使用 ProcessFunction。
stream.keyBy(...).process(new MyProcessFunction());
2. 底层 Join
为了在两个输入上实现底层操作应用程序可以使用 CoProcessFunction 或 KeyedCoProcessFunction。 这些函数绑定两个不同的输入从两个不同的输入中获取元素并分别调用 processElement1(...) 和 processElement2(...) 进行处理。
实现底层 join 一般需要遵循以下模式
为一个输入或两者创建状态对象。从某个输入接收元素时更新状态。从另一个输入接收元素时查询状态并生成 join 结果。
例如你可能会将客户数据与金融交易进行 join同时想要保留客户数据的状态。如果你希望即使在出现乱序事件时仍然可以得到完整且确定的 join 结果你可以通过注册一个定时器在客户数据流的 watermark 已经超过当前这条金融交易记录时计算和发送 join 结果。
在下面的例子中KeyedProcessFunction 维护每个键的计数并且每次超过一分钟事件时间没有更新时输出一次键/计数对。
计数键和最后修改时间存储在 ValueState 中它由键隐式限定范围。对于每条记录KeyedProcessFunction 递增计数器并设置最后修改时间。对于每条记录该函数还会注册了一个一分钟后事件时间的回调函数。在每次回调时它会根据注册的时间和最后修改时间进行比较如果正好差一分钟则 输出键/计数对即在该分钟内没有进一步更新
这个简单的例子本身可以用会话窗口session window实现 这里我们使用 KeyedProcessFunction 来展示使用它的基本模式。
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;// 源数据流
DataStreamTuple2String, String stream ...;// 使用 process function 来处理一个 Keyed Stream
DataStreamTuple2String, Long result stream.keyBy(value - value.f0).process(new CountWithTimeoutFunction());/*** 在状态中保存的数据类型*/
public class CountWithTimestamp {public String key;public long count;public long lastModified;
}/*** 用来维护数量和超时的 ProcessFunction 实现*/
public class CountWithTimeoutFunction extends KeyedProcessFunctionTuple, Tuple2String, String, Tuple2String, Long {/** 由 process function 管理的状态 */private ValueStateCountWithTimestamp state;Overridepublic void open(OpenContext openContext) throws Exception {state getRuntimeContext().getState(new ValueStateDescriptor(myState, CountWithTimestamp.class));}Overridepublic void processElement(Tuple2String, String value, Context ctx, CollectorTuple2String, Long out) throws Exception {// 获得当前的数量CountWithTimestamp current state.value();if (current null) {current new CountWithTimestamp();current.key value.f0;}// 更新状态中的数量current.count;// 将状态中的最后修改时间改为记录的事件时间current.lastModified ctx.timestamp();// 将更新后的状态写回state.update(current);// 注册一个 60s 之后的事件时间回调 ctx.timerService().registerEventTimeTimer(current.lastModified 60000);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorTuple2String, Long out) throws Exception {// 获得注册该回调时使用的键对应的状态CountWithTimestamp result state.value();// 检查当前回调时否是最新的回调还是后续注册了新的回调if (timestamp result.lastModified 60000) {// 超时后发送状态out.collect(new Tuple2String, Long(result.key, result.count));}}
}
在 Flink 1.4.0 之前在调用处理时间定时器时ProcessFunction.onTimer() 方法将当前的处理时间设置为事件时间的时间戳。此行为非常不明显用户可能不会注意到。 然而这样做是有害的因为处理时间的时间戳是不确定的并且和 watermark 不一致。此外用户依赖于此错误的时间戳来实现逻辑很有可能导致非预期的错误。 因此我们决定对其进行修复。在 1.4.0 后使用此错误的事件时间时间戳的 Flink 作业将失败用户应将其作业更正为正确的逻辑。
3. KeyedProcessFunction
KeyedProcessFunction 是 ProcessFunction 的一个扩展, 可以在其 onTimer(...) 方法中访问定时器的键。
Override
public void onTimer(long timestamp, OnTimerContext ctx, CollectorOUT out) throws Exception {K key ctx.getCurrentKey();// ...
}
4. Timers
两种定时器处理时间定时器和事件时间定时器都在 TimerService 内部维护并排队等待执行。对于相同的键和时间戳TimerService 会删除重复的定时器即每个键和时间戳最多有一个定时器。如果为同一时间戳注册了多个定时器则只调用一次 onTimer() 方法。Flink 会同步 onTimer() 和 processElement() 的调用因此用户不必担心状态的并发修改。
4.1 Fault Tolerance
定时器支持容错它会和应用程序的状态一起进行 checkpoint。当进行故障恢复或从保存点启动应用程序时定时器也会被恢复。当应用程序从故障中恢复或从保存点启动时可能会发生这种情况。即在恢复之前就应该触发的处理时间定时器会立即触发。
除了使用基于 RocksDB backend 的增量 snapshots 并使用基于 Heap 的定时器的情况外Flink 总是会异步执行计算器的快照操作。 大量定时器会增加 checkpoint 的时间因为定时器是需要 checkpoint 的状态的一部分。
4.2 Timer Coalescing
由于 Flink 中每个键和时间戳只保存一个定时器因此可以通过降低定时器的精度来合并它们从而减少定时器的数量。
对于精度为 1 秒事件或处理时间的定时器可以将目标时间向下舍入为整秒。定时器最多会提前 1 秒但不迟于要求的毫秒精度。 这样每个键在每秒内最多有一个定时器。
long coalescedTime ((ctx.timestamp() timeout) / 1000) * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);
由于事件时间定时器仅在 watermark 到来时才触发因此还可以将下一个 watermark 到达前的定时器与当前定时器合并
long coalescedTime ctx.timerService().currentWatermark() 1;
ctx.timerService().registerEventTimeTimer(coalescedTime);
定时器也可以按照以下方式被停止或者删除
停止处理时间定时器
long timestampOfTimerToStop ...;
ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);
停止事件时间定时器
long timestampOfTimerToStop ...;
ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);
如果没有注册给定时间戳的定时器则停止定时器不会产生影响。