嘉定建站公司,wordpress重置密码代码,深圳市产品设计公司,做网站的公司金坛文章目录 一. 状态后端概述二. StateBackend的整体设计1. 核心功能2. StateBackend的UML3. 小结 三. StateBackend的加载与初始化1. StateBackend创建概述2. StateBackend创建过程 一. 状态后端概述
StateBackend作为状态存储后端#xff0c;提供了创建和获取KeyedStateBacke… 文章目录 一. 状态后端概述二. StateBackend的整体设计1. 核心功能2. StateBackend的UML3. 小结 三. StateBackend的加载与初始化1. StateBackend创建概述2. StateBackend创建过程 一. 状态后端概述
StateBackend作为状态存储后端提供了创建和获取KeyedStateBackend及OperatorStateBackend的方法并通过CheckpointStorage实现了对状态数据的持久化存储。
Flink支持MemoryStateBackend、FsStateBackend和RocksDBStateBackend三种类型的状态存储后端三者的主要区别在于创建的KeyedStateBackend及CheckpointStorage不同。例如MemoryStateBackend和FileStateBackend创建的是HeapKeyedStateBackendRocksDBStateBackend创建的是RocksDBKeyedStateBackend。
本文关注StateBackend的设计与实现。
二. StateBackend的整体设计
1. 核心功能
在StateBackend接口中提供了如下核心功能。 resolveCheckpoint()方法用于获取Checkpoint的Location信息Location信息包含Checkpoint元数据信息createCheckpointStorage()方法为Job创建CheckpointStorage对象CheckpointStorage提供写入Checkpoint数据和元数据信息的能力createKeyedStateBackend()方法用于创建KeyedStateBackendKeyedStateBackend提供创建和管理KeyedState的能力createOperatorStateBackend()方法主要用于创建OperatorStateBackend通过OperatorStateBackend可以创建和管理OperatorState状态数据。 2. StateBackend的UML
StateBackend主要有AbstractStateBackend基本实现类该类中没有提供实质性的方法主要为了向前ing兼容。
AbstractFileStateBackend有MemoryStateBackend和FsStateBackend两种实现类其中MemoryStateBackend主要通过JobManager堆内存存储Checkpoint数据FsStateBackend通过FsCheckpointStorage将Checkpoint数据存储在指定文件系统中。 RockdsDBStateBackend也实现了StateBackend的基本功能 内存状态和其他状态管理后端不同的是它创建的KeyedStateBackend是基于RocksDB实现的RocksDBKeyedStateBackend。KeyedState数据都会存储在RocksDB内存中。持久化对于CheckpointStorage的创建RocksDBStateBackend依赖于FsStateBackend即基于文件系统对Checkpoint中的状态数据进行持久化。 3. 小结
StateBackend提供了创建CheckpointStorage、KeyedStateBackend及OperatorStateBackend的功能。
基于MemoryStateBackend可以实现非常高效的状态数据获取和存储但由于JobManager内存数量有限对比较大的状态数据无法提供更好的支持。对于RocksDBStateBackend而言可以基于RocksDB提供的LSM-TreeLog StructuredMerge-Tree内存数据结构实现更加高效的堆外内存访问支持大数据量的状态数据存储这对生产环境来讲是一个更优的选择。 三. StateBackend的加载与初始化
1. StateBackend创建概述
StateBackend主要通过StateBackendFactory接口创建。StateBackendFactory主要有MemoryStateBackendFactory、FsStateBackendFactory和RocksDBStateBackendFactory三种实现最终通过StateBackendFactory的不同实现类创建相应的StateBackend。
StateBackendFactory主要通过StateBackendLoader进行加载和创建。StateBackendLoader会根据state.backend的名称使用Java SPI技术加载相应类型的StateBackendFactory最终创建StateBackend。 2. StateBackend创建过程
StateBackend会在两个过程中创建 首先在JobMaster根据JobGraph对象创建ExecutionGraph的过程中会创建StateBackend用于CheckpointCoordinator组件管理状态和Checkpoint操作其次在每个Task实例初始化的过程中会创建StateBackend用于管理当前Task中的状态和Checkpoint数据。 接下来我们分步骤看StateBackend的创建过程。
1在StreamTask中初始化StateBackend
前面我们已经知道当StreamTask在TaskManager的Task线程中启动时会调用invoke()抽象方法运行StreamTask中的算子。此时在beforeInvoke()方法中就会调用StreamTask.createStateBackend()方法创建当前Task中使用的StateBackend。
在StreamTask.createStateBackend()方法中可以看出
//
private StateBackend createStateBackend() throws Exception {//1. 从UserCodeClassLoader获取StateBackendfinal StateBackend fromApplication configuration.getStateBackend(getUserCodeClassLoader());//2. 通过应用配置还是通过集群默认配置创建StateBackendreturn StateBackendLoader.fromApplicationOrConfigOrDefault(fromApplication,getEnvironment().getTaskManagerInfo().getConfiguration(),getUserCodeClassLoader(),LOG);//用户在代码中调用StreamExecutionEnvironment.enableCheckpointing()方法时//系统默认配置主要是通过flink-conf.yaml启用StateBackend配置项。
}2StateBackendLoader加载配置的StateBackend
public static StateBackend fromApplicationOrConfigOrDefault(Nullable StateBackend fromApplication,Configuration config,ClassLoader classLoader,Nullable Logger logger) throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {checkNotNull(config, config);checkNotNull(classLoader, classLoader);final StateBackend backend;// 1) 如果应用配置的StateBackend不为空则最高优先级是应用中定义的//StateBackend实现类。if (fromApplication ! null) {if (logger ! null) {logger.info(Using application-defined state backend: {}, fromApplication);}// 向fromApplication中追加额外的参数配置if (fromApplication instanceof ConfigurableStateBackend) {if (logger ! null) {logger.info(Configuring application-defined state backend with job/cluster config);}// 直接从UserClassLoader中反序列化出StateBackendbackend ((ConfigurableStateBackend) fromApplication).configure(config, classLoader);}else {backend fromApplication;}}else {//检查是否开启StateBackend默认配置final StateBackend fromConfig loadStateBackendFromConfig(config, classLoader, logger);if (fromConfig ! null) {backend fromConfig;} else {//2. 如果配置为空则创建默认MemoryStateBackendbackend new MemoryStateBackendFactory().createFromConfig(config, classLoader);if (logger ! null) {logger.info(No state backend has been configured, using default (Memory / JobManager) {}, backend);}}}return backend;
}3通过StateBackendFactory创建StateBackend 这里举例说明MemoryStateBackend的创建过程。从方法中调用了MemoryStateBackend()构造器创建基于堆内存的StateBackend并调用configure()方法对StateBackend进行参数配置。
public MemoryStateBackend createFromConfig(Configuration config, ClassLoader classLoader) {return new MemoryStateBackend().configure(config, classLoader);
}《Flink设计与实现核心原理与源码解析》–张利兵