营销型网站建设易网拓,阜新网站制作,网站网站 后缀gr,静态网页怎么做网站flink state原理 1. 状态、状态后端、Checkpoint 三者之间的区别及关系#xff1f;2 算子状态与键控状态的区别2.1 算子状态2.2 键控状态2.3 算子状态api2.4 键控状态api 3 HashMapStateBackend 状态后端4 EmBeddedRocksDbStateBackend 状态后端5 状态数据结构介绍5.1 算子状态… flink state原理 1. 状态、状态后端、Checkpoint 三者之间的区别及关系2 算子状态与键控状态的区别2.1 算子状态2.2 键控状态2.3 算子状态api2.4 键控状态api 3 HashMapStateBackend 状态后端4 EmBeddedRocksDbStateBackend 状态后端5 状态数据结构介绍5.1 算子状态提供的数据结构5.2 键控状态提供的数据结构 6 Reducing 聚合状态7 广播状态8. flink重启时修改并行度state会发生什么变化键值状态分区策略解决数据倾斜9.Flink State TTL 是怎么做到数据过期的首先我们来想想要做到 TTL 的话要具备什么条件呢cleanupStrategies过期数据清理策略目前支持的策略有 10.Flink SQL API State TTL 的过期机制是 onCreateAndUpdate 还是onReadAndWrite5.operator-state 和 keyed-state 两者的区别最大并行度又和它们有什么关系举个生产环境中经常出现的案例当用户停止任务、更新代码逻辑并且改变任务并发度时两种 state 都是怎样进行恢复的⭐ operator-state⭐ keyed-state 11.ValueState 和 MapState 各自适合的应用场景 1. 状态、状态后端、Checkpoint 三者之间的区别及关系
拿五个字做比喻“铁锅炖大鹅”铁锅是状态后端大鹅是状态Checkpoint 是炖的动作。 状态本质来说就是数据在 Flink 中其实就是 Flink 提供给用户的状态编程接口。比如 flink 中的 MapStateValueStateListState。 状态后端Flink 提供的用于管理状态的组件状态后端决定了以什么样数据结构什么样的存储方式去存储和管理我们的状态。Flink 目前官方提供了 memory、filesystemrocksdb 三种状态后端来存储我们的状态。 但flink1.13后 对状态后端做了整合只有这两种了
HashMapStateBackendEmbeddedRocksDBStateBackend
老版本flink-1.12 版及以前 Fsstatebackend MemoryStatebackend RocksdbStateBackend 新版本中Fsstatebackend 和 MemoryStatebackend 整合成了 HashMapStateBackend 而且 HashMapStateBackend 和 EmBeddedRocksDBStateBackend 所生成的快照文件也统一了格式因而 在 job 重新部署或者版本升级时可以任意替换 statebackend
Checkpoint状态管理Flink 提供的用于定时将状态后端中存储的状态同步到远程的存储系统的组件或者能力。为了防止 long run 的 Flink 任务挂了导致状态丢失产生数据质量问题Flink 提供了状态管理CheckpointSavepoint的能力把我们使用的状态给管理起来定时的保存到远程。然后可以在 Flink 任务 failover 时从远程把状态数据恢复到 Flink 任务中保障数据质量。
2 算子状态与键控状态的区别
2.1 算子状态
算子状态是每个 subtask 自己持有一份独立的状态数据但如果在失败恢复后算子并行度发 生变化则状态将在新的 subtask 之间均匀分配 算子函数实现 CheckpointedFunction 后即可使用算子状态 算子状态通常用于 source 算子中其他场景下建议使用 KeyedState键控状态算子状态在逻辑上由算子 task 下所有 subtask 共享 如何理解正常运行时subtask 自己读写自己的状态数据而一旦 job 重启且带状态算子发生了并行度的变化则 之前的状态数据将在新的一批 subtask 间均匀分配
2.2 键控状态
键控状态只能应用于 KeyedStream 的算子中keyby 后的处理算子中 算子为每一个 key 绑定一份独立的状态数据 2.3 算子状态api
要使用算子状态operator state需要让算子函数实现 CheckpointedFunction 接口
/*** * author hunter.d * qq 657270652 * wx haitao-duan * date 2022/4/10 **/
public class OperatorStateTest {public static void main(String[] args) throws Exception {Configuration conf new Configuration();conf.setString(execution.savepoint.path, D:\\ckpt\\27270525e8f166834f2bbf7c617ad6d3\\chk-11);StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(conf);env.enableCheckpointing(2000);env.getCheckpointConfig().setCheckpointStorage(file:///d:/ckpt);DataStreamSourceString source env.socketTextStream(localhost, 9999);//以下两个 map 算子其中一个是带状态的 // 如果修改代码逻辑如调整两个 map 算子顺序且没有设置 uid则从 savepoints 恢复时将失败source.map(new StatefulMapFunc()).uid(stateful-mapfunc-001).setParallelism(2).map(new NoStateMapFunc()).setParallelism(1).print().setParallelism(1);env.execute();}public static class NoStateMapFunc implements MapFunctionString, String {Overridepublic String map(String value) throws Exception {return value.toUpperCase();}}// 带状态的 map 函数将接收的字符串记在状态中以不断拼接新数据返回 public static class StatefulMapFunc extends RichMapFunctionString, String implements CheckpointedFunction {ListStateString lstState;/*** 正常的 map 映射逻辑方法 */Overridepublic String map(String value) throws Exception {lstState.add(value);StringBuilder sb new StringBuilder();for (String s : lstState.get()) {sb.append(s).append(,);}return sb.toString();}/*** checkpoint 触发时会调用的方法 */Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {}/*** 初始化算子任务时会调用的方法以加载、初始化状态数据 */Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {lstState context.getOperatorStateStore().getListState(new ListStateDescriptorString(lst, String.class));int indexOfThisSubtask getRuntimeContext().getIndexOfThisSubtask();IterableString iter lstState.get();IteratorString it iter.iterator(); // 用于观察 task 失败恢复后的状态恢复情况 System.out.println(------- indexOfThisSubtask - 初始化时打印状态-----);while (it.hasNext()) {System.out.println(indexOfThisSubtask : it.next());}System.out.println(------- indexOfThisSubtask -初始化时打印状态-----);}}
}
2.4 键控状态api
要使用键控状态Keyed State,需要在实现 RichFunction 的函数中 public class _15_ChannelEventsCntMapFunc extends RichMapFunctionEventLog, String {ValueStateInteger valueState;Overridepublic void open(Configuration parameters) throws Exception {StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.milliseconds(2000)).cleanupFullSnapshot().neverReturnExpired().useProcessingTime().updateTtlOnReadAndWrite().build();ValueStateDescriptorInteger desc new ValueStateDescriptor(cnt, Integer.class);desc.enableTimeToLive(ttlConfig); // 获取单值状态管理器 valueState getRuntimeContext().getState(desc);}Overridepublic String map(EventLog eventLog) throws Exception {// 来一条数据就对状态更新 valueState.update((valueState.value() null ? 0 : valueState.value()) 1);return eventLog.getChannel() : valueState.value();}
}
3 HashMapStateBackend 状态后端
HashMapStateBackend 状态数据是以 java 对象形式存储在 heap 内存中 内存空间不够时也会溢出一部分数据到本地磁盘文件 可以支撑大规模的状态数据只不过在状态数据规模超出内存空间时读写效率就会明显降低 对于 KeyedState 来说 HashMapStateBackend 在内存中是使用 CopyOnWriteStateMap 结构来存储用户的状态数据 注意此数据结构类名为 Map实非 Map它其实是一个单向链表的数据结构 对于 OperatorState 来说 可以清楚看出它底层直接用一个 Map 集合来存储用户的状态数据状态名称 -- 状态 List
4 EmBeddedRocksDbStateBackend 状态后端
状态数据是交给 rocksdb 来管理 Rocksdb 中的数据是以序列化的 kv 字节进行存储 Rockdb 中的数据有内存缓存的部分也有磁盘文件的部分 Rockdb 的磁盘文件数据读写速度相对还是比较快的所以在支持超大规模状态数据时数据的 读写效率不会有太大的降低 注意上述 2 中状态后端在生成 checkpoint 快照文件时生成的文件格式是完全一致的 所以用户的 flink 程序在更改状态后端后重启时依然可以加载和恢复此前的快照文件数据 老版本中状态与状态后端的关系是 5 状态数据结构介绍
5.1 算子状态提供的数据结构
ListState UnionListState UnionListState 和普通 ListState 的区别
UnionListState 的快照存储数据在系统重启后list 数据的重分配模式为 广播模式 在每个 subtask 上都拥有一份完整的数据ListState 的快照存储数据系统重启后list 数据的重分配模式为 round-robin 轮询平均分配
5.2 键控状态提供的数据结构
ValueState ListState MapState ReducingState AggregateState
6 Reducing 聚合状态
用户传入一个增量聚合函数后状态实现自动增量聚合输入数据与聚合结果类型必须一致
// 获取一个 reduce 聚合状态 reduceState runtimeContext.getReducingState(new ReducingStateDescriptorInteger(reduceState,new ReduceFunctionInteger(){Override public Integer reduce (Integer value1, Integer value2) throws Exception {return value1 value2;}},Integer .class));7 广播状态
/*** * author hunter.d * qq 657270652 * wx haitao-duan * date 2022/4/10 **/
public class OperatorStateTest {// 主数据流 SingleOutputStreamOperatorStudent s1;// 待广播出去的流SingleOutputStreamOperatorStuInfo s2;// 定义广播状态的状态描述对象MapStateDescriptorInteger, StuInfo stateDescriptor new MapStateDescriptor(info, Integer.class, StuInfo.class);// 将 s2 流广播出去 BroadcastStreamStuInfostuInfoBroadcastStream s2.broadcast(stateDescriptor);// 用主数据流 connect 连接 广播数据流并处理s1.connect(stuInfoBroadcastStream).process(new BroadcastProcessFunctionStudent, StuInfo, String() {Override public void processElement (Student student, ReadOnlyContextreadOnlyContext, Collector String collector) throws Exception{// 对 主流 中的元素进行处理 readOnlyContext.getBroadCastState(); // 只读状态}Override public void processBroadcastElement (StuInfo stuInfo, Context context, Collector String collector) throwsException {// 对 广播流 中的元素进行处理context.getBroadCastState(); // 可读可写 }});}
8. flink重启时修改并行度state会发生什么变化键值状态分区策略解决数据倾斜
假设原来是3个并行度
重启之后给两个并行度state会发生什么呢他依然可以加载之前的快照数据 这里面引入一下 subtask 是什么呢相当于 每一个算子就是一个subtask像下面的 4个sink 就是4个subtask4个并行度。 那么说回来3个并行度 改成了两个少了一个这个subtask上存储的state 要怎么办呢。 假设你用的是liststate。重启的时候 会自动做分配到剩余的两个state里 也可能是直接重新分配给某一个state以前 三个并行度分别读kafka的三个分区1分区 10002 分区 500 3分区 800 重新分配后就变成了 1分区 10003分区 800 ; 2 分区 500 这种情况。
在键值状态API的设计思路小节中我们提到键值状态在重分布时要和KeyedStream的哈希数据分区策略保持完全一致。原理理解起来简单但实际上Flink键值状态重分布的机制在此基础上还做了很多的性能优化本节我们就详细剖析键值状态重分布的过程掌握这部分知识将会对我们在生产环境中解决数据倾斜问题有很大的帮助。
我们以电商场景中计算每种商品累计销售额的场景为例逻辑数据流为Source→KeyBy\Map→Sink我们在KeyBy\Map算子中使用键值状态ValueState来保存每种商品的累计销售额。
接下来我们看看ValueState键值状态在KeyBy\Map算子并行度从2变为3时键值状态的重分布过程。如图6-28所示我们用parallelism代表算子并行度假设KeyedStream的哈希数据分区策略的计算公式为SubTask(key)hash(key)%parallelism符号%代表取余计算该计算公式用于计算某个key的数据要被发往KeyBy\Map算子的SubTask下标。
当KeyBy\Map算子的并行度为2时哈希数据分区策略就为SubTask(key)hash(key)%2假设这时经过计算后key为商品3的数据会被发送到KeyBy\Map[1]中那么商品3的累计销售额的状态数据就会存储在KeyBy\Map[1]本地。当用户将KeyBy\Map算子的并行度扩展为3后哈希数据分区策略就变为了SubTask(key)hash(key)%3由于数据分区策略的计算公式变化了因此每一个key的数据要发往的SubTask也会发生改变。假设这时key为商品3的数据会被发送到KeyBy\Map[0]中那么商品3的累计销售额状态数据必然要被重分布到KeyBy\Map[0]中如图所示。 虽然键值状态的重分布策略能够降低用户的开发成本但是这种重分布策略却对键值状态重分布的性能提出了巨大的挑战。如图6-28所示当算子并行度为2时每个SubTask在执行快照时会将本地的状态数据顺序地写入到远程分布式文件系统中SubTask0和SubTask1分别写入文件1和文件2。当算子并行度变为3后根据新的Hash分区策略计算key为商品0、商品3、商品6和商品9的数据要被恢复到SubTask0中那么SubTask0就要同时读取文件1和文件2SubTask1和SubTask2的恢复过程也相同都分别需要从文件1和文件2中恢复一部分key的状态数据。
这时我们发现如果要让每个SubTask都完整且正确的恢复状态数据就需要让每个SubTask都从分布式文件系统中读取到所有的快照文件然后再过滤出属于当前SubTask的key的状态数据但是按照这样的流程执行就会出现以下两个问题。
状态恢复时的性能问题在算子以新的并行度启动并从快照恢复时算子的每个SubTask都会读取大量不属于当前SubTask的key的状态数据同时还需要从中筛选出属于当前SubTask的状态数据而这会导致SubTask的启动过程耗费大量的时间作业的恢复过程很漫长。举例来说如果算子的并行度为500每一个SubTask中都有100万个key的状态数据那么整个作业总计会有5亿个key这时如果我们将算子并行度扩展为1000那么对这1000个SubTask来说每一个SubTask都要读取到这5亿个key的快照文件然后再过滤出属于自己的key的状态数据但是平均下来每个SubTask最终只会保留50万个key的状态数据其余的4.995亿个key的数据都会被过滤。分布式文件系统的稳定性问题在算子从快照时所有的SubTask都会对分布式文件系统发起大量读取相同文件的请求这对分布式文件系统稳定性也会造成影响并且随着算子并行度的增大这种情况会越来越严重。
综上所述使用该方案来恢复状态数据时性能是无法达到预期的。其低效的原因就在于从状态恢复时SubTask不知道分布式文件系统中的每一份快照文件中存储了哪些key的状态数据也不知道这些key的状态数据在快照文件中的偏移量所以只能全量读取后再按照key一个一个的进行过滤。 这里插一句在20240125 此时此刻flink计划做的2.0中最核心的一块就是状态的存算分离解决的就是大状态的性能场景问题与这里引出的状态重分配导致的问题本质上是一个情况所以这个问题正常情况来说flink2.0上线以后可以解决。 9.Flink State TTL 是怎么做到数据过期的首先我们来想想要做到 TTL 的话要具备什么条件呢
想想 Redis 的 TTL 设置如果我们要设置 TTL 则必然需要给一条数据给一个时间戳只有这样才能判断这条数据是否过期了。
在 Flink 中设置 State TTL就会有这样一个时间戳具体实现时Flink 会把时间戳字段和具体数据字段存储作为同级存储到 State 中。
举个例子我要将一个 String 存储到 State 中时
⭐ 没有设置 State TTL 时则直接将 String 存储在 State 中⭐ 如果设置 State TTL 时则 Flink 会将 String, Long 存储在 State 中其中 Long 为时间戳用于判断是否过期。
接下来以 FileSystem 状态后端下的 MapState 作为案例来说
⭐ 如果没有设置 State TTL则生产的 MapState 的字段类型如下可以看到生成的就是 HeapMapState 实例 ⭐ 如果设置了 State TTL则生成的 MapState 的字段类型如下可以看到使用到了装饰器的设计模式生成是 TtlMapState 注意 任务设置了 State TTL 和不设置 State TTL 的状态是不兼容的。这里大家在使用时一定要注意。防止出现任务从 Checkpoint 恢复不了的情况。但是你可以去修改 TTL 时长因为修改时长并不会改变 State 存储结构。 注意 存活时长的计时器可以在数据被读、写时重置 Ttl 存活管理粒度是到元素级的如 liststate 中的每个元素mapstate 中的每个 entry
cleanupStrategies过期数据清理策略目前支持的策略有 cleanupIncrementally : 增量清除 每当访问状态时都会驱动一次过期检查算子注册了很多 key 的 state一次检查只针对其中一部分 由参数 cleanupSize 决定 算子持有一个包含所有 key 的迭代器每次检查后迭代器都会向前 advance 指定的 key 数量 本策略针对“本地状态空间”且只用于 HashMapStateBackend cleanupFullSnapshot 在进行全量快照checkpoint时清理掉过期数据 注意只是在生成的 checkpoint 数据中不包含过期数据在本地状态空间中并没有做清理 本策略针对“快照”生效 cleanupInRocksdbCompactFilter 只针对 rocksdbStateBackend 有效 它是利用 rocksdb 的 compact 功能在 rocksdb 进行 compact 时清除掉过期数据 本策略针对“本地状态空间”且只用于 EmbeddedRocksDbStateBackend
10.Flink SQL API State TTL 的过期机制是 onCreateAndUpdate 还是onReadAndWrite
⭐ 结论Flink SQL API State TTL 的过期机制目前只支持 onCreateAndUpdateDataStream API 两个都支持 ⭐ 剖析
onCreateAndUpdate是在创建 State 和更新 State 时【更新 State TTL】onReadAndWrite是在访问 State 和写入 State 时【更新 State TTL】
⭐ 实际踩坑场景Flink SQL Deduplicate 写法row_number partition by user_id order by proctime asc此 SQL 最后生成的算子只会在第一条数据来的时候更新 state后续访问不会更新 state TTL因此 state 会在用户设置的 state TTL 时间之后过期。
5.operator-state 和 keyed-state 两者的区别最大并行度又和它们有什么关系举个生产环境中经常出现的案例当用户停止任务、更新代码逻辑并且改变任务并发度时两种 state 都是怎样进行恢复的
⭐ 总结如下
⭐ operator-state ⭐ 状态适用算子所有算子都可以使用 operator-state没有限制。 ⭐ 状态的创建方式如果需要使用 operator-state需要实现 CheckpointedFunction 或 ListCheckpointed 接口 ⭐ DataStream API 中operator-state 提供了 ListState、BroadcastState、UnionListState 3 种用户接口 ⭐ 状态的存储粒度以单算子单并行度粒度访问、更新状态 ⭐ 并行度变化时a. ListState均匀划分到算子的每个 sub-task 上比如 Flink Kafka Source 中就使用了 ListState 存储消费 Kafka 的 offset其 rescale 如下图 BroadcastState每个 sub-task 的广播状态都一样 c. UnionListState将原来所有元素合并合并后的数据每个算子都有一份全量状态数据
⭐ keyed-state ⭐ 状态适用算子keyed-stream 后的算子使用。注意这里很多同学会犯一个错误就是大家会认为 keyby 后面跟的所有算子都使用的是 keyed-state但这是错误的 ❌比如有 keyby.process.flatmap其中 flatmap 中使用状态的话是 operator-state⭐ 状态的创建方式从 context 接口获取具体的 keyed-state⭐ DataStream API 中keyed-state 提供了 ValueState、MapState、ListState 等用户接口其中最常用 ValueState、MapState⭐ 状态的存储粒度以单 key 粒度访问、更新状态。举例当我们使用 keyby.process在 process 中处理逻辑时其实每一次 process 的处理 context 都会对应到一个 key所以在 process 中的处理都是以 key 为粒度的。这里很多同学会犯一个错 ❌比如想在 open 方法中访问、更新 state这是不行的因为 open 方法在执行时还没有到正式的数据处理环节上下文中是没有 key 的。⭐ 并行度变化时keyed-state 的重新划分是随着 key-group 进行的。其中 key-group 的个数就是最大并发度的个数。其中一个 key-group 处理一段区间 key 的数据不同 key-group 处理的 key 是完全不同的。当任务并行度变化时会将 key-group 重新划分到算子不同的 sub-task 上任务启动后任务数据在做 keyby 进行数据 shuffle 时依然能够按照当前数据的 key 发到下游能够处理这个 key 的 key-group 中进行处理如下图所示。注意最大并行度和 key-group 的个数绑定所以如果想恢复任务 state最大并行度是不能修改的。大家需要提前预估最大并行度个数。
11.ValueState 和 MapState 各自适合的应用场景
⭐ ValueState
应用场景简单的一个变量存储比如 Long\String 等。如果状态后端为 RocksDB极其不建议在 ValueState 中存储一个大 Map这种场景下序列化和反序列化的成本非常高这种常见适合使用 MapState。其实这种场景也是很多小伙伴一开始使用 State 的误用之痛一定要避免。TTL针对整个 Value 起作用
⭐ MapState
应用场景和 Map 使用方式一样一样的TTL针对 Map 的 key 生效每个 key 一个 TTL