郑州做网站推广的公司,wordpress英文插件,做门户网站需要具备什么,阿里网站建设需要准备什么Flink的容错机制是确保数据流应用程序在出现故障时能够恢复一致状态的关键组成部分。其核心是通过创建分布式数据流和操作符状态的一致快照来实现#xff0c;这种快照被称为检查点#xff08;Checkpoint#xff09;。
1. 检查点#xff08;Checkpoint#xff09;
保存机…Flink的容错机制是确保数据流应用程序在出现故障时能够恢复一致状态的关键组成部分。其核心是通过创建分布式数据流和操作符状态的一致快照来实现这种快照被称为检查点Checkpoint。
1. 检查点Checkpoint
保存机制 Flink定期对整个Job任务进行快照将快照产生的备份数据保存到指定的StateBackend中。这种保存是周期性的可以根据配置的时间间隔触发。恢复机制 当出现故障时Flink会回退到最后一个成功的检查点并重新启动所有的算子。这样可以确保即使在故障发生后应用程序的状态也只会反映数据流中的每个记录一次实现精确一次exactly-once的语义。控制节点 JobManager中的检查点协调器负责向source节点的数据插入barrier标记以触发检查点的保存。
2. 检查点分界线Barrier
作用barrier标记表示这个标记之前的所有数据已经将状态更改存入当前检查点。后续的算子节点只要遇到它就开始对状态做持久化快照保存。策略 精确一次等待所有并行分区的barrier都到齐才可以开始状态的保存。处理多次的结果是一样的。至少一次对先到的数据进行处理但可能导致从source重复发送已经处理过的数据。
3. 容错机制的配置
启用检查点通过StreamExecutionEnvironment.enableCheckpointing(long interval, CheckpointingMode mode)方法启用检查点并设置时间间隔和模式如EXACTLY_ONCE。其他配置还包括检查点超时、最大并发检查点数、检查点之间的最小暂停时间、检查点目录等。
4. 状态后端State Backend
作用决定状态在Checkpoint时如何持久化以及持久化在哪里。类型 HashMapStateBackend将数据以Java对象的形式存储在堆中适用于有较大状态、较长窗口和较大key/value状态的Job。EmbeddedRocksDBStateBackend将正在运行中的状态数据保存在RocksDB数据库中使用异步方式生成快照。 Flink的容错机制通过检查点和状态后端确保了数据流应用程序在故障发生后的恢复能力。通过合理的配置和使用可以确保应用程序在故障后能够恢复到一致的状态并继续处理数据从而实现精确一次的数据处理语义。 Checkpointing
Checkpointing 机制是 Flink 实现容错Fault Tolerance和状态一致性State Consistency的核心组件。Checkpointing 允许 Flink 在分布式数据流处理过程中捕获操作符operators的状态以便在发生故障时能够恢复并继续处理数据从而确保数据处理的“恰好一次”Exactly-Once语义。
条件
持久化的数据源
Flink Checkpointing 机制需要与持久化的数据源进行交互以确保在发生故障时能够从数据源中重新消费指定时间段的记录。持久化消息队列如 Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub 等或文件系统如 HDFS, S3, GFS, NFS, Ceph 等可以满足这样的需求。
状态的持久化存储
Flink Checkpointing 需要将操作符的状态进行持久化存储以便在发生故障时能够恢复状态。状态通常保存在分布式文件系统中如 HDFS、S3 等。
Flink 集群配置
Flink 集群必须正确配置并运行以支持 Checkpointing 机制。特别是 JobManager 和 TaskManager 的角色需要明确并确保它们之间的通信畅通无阻。
Checkpointing 启用与配置
Flink 默认不启用 Checkpointing需要用户显式地在代码中调用 enableCheckpointing() 方法来启用它。在启用 Checkpointing 时还需要配置一些关键参数如 检查点的时间间隔通常以毫秒为单位。检查点的超时时间超过该时间后检查点将被视为失败。状态后端用于存储和恢复状态。
状态后端的选择
根据应用的需求和集群的配置选择合适的状态后端来存储和恢复状态。不同的状态后端具有不同的性能和一致性保证。
容错级别
Flink 支持不同的容错级别包括“恰好一次”Exactly-Once、“至少一次”At-Least-Once和“最多一次”At-Most-Once。Checkpointing 机制主要用于实现“恰好一次”的容错级别。
网络和存储稳定性
Flink Checkpointing 需要依赖稳定的网络和存储系统来确保检查点的正确生成和恢复。如果网络或存储系统不稳定可能会导致检查点失败或数据丢失。 Flink Checkpointing 的前提条件包括持久化的数据源、状态的持久化存储、正确的 Flink 集群配置、Checkpointing 的启用与配置、合适的状态后端选择、适当的容错级别以及稳定的网络和存储系统。这些条件共同确保了 Flink Checkpointing 机制能够正常运行并提供可靠的数据处理容错能力。 开启配置
开启 Checkpointing 需要首先调用 enableCheckpointing 方法来开启 Checkpointing。这个方法有两个参数
interval检查点的时间间隔以毫秒为单位。例如如果你想每 1000 毫秒即 1 秒生成一个检查点你可以这样设置env.enableCheckpointing(1000);mode可选Checkpointing 模式。目前 Flink 支持 EXACTLY_ONCE 和 AT_LEAST_ONCE 两种模式。对于大多数应用来说选择 EXACTLY_ONCE 模式即可满足需求。例如env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
// 每 1000ms 开始一次 checkpoint
env.enableCheckpointing(1000);
// 高级选项
// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// Checkpoint 必须在一分钟内完成否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 使用 externalized checkpoints这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();配置 Checkpoint 存储位置 Checkpoint 的数据需要存储在某个位置以便在发生故障时进行恢复。可以通过 CheckpointConfig 来设置存储位置。例如如果把 Checkpoint 存储在 HDFS 上可以这样设置
CheckpointConfig checkpointConfig env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(hdfs:///ip:port/dir);其他参数配置 除了上述两个关键参数外Flink 还提供了一些其他参数来优化 Checkpointing 的性能
checkpointTimeout检查点超时时间以毫秒为单位。如果在这个时间内没有完成 Checkpoint那么该 Checkpoint 将被丢弃并尝试进行下一次 Checkpoint。默认值是 60000 毫秒即 1 分钟。例如checkpointConfig.setCheckpointTimeout(60000);minPauseBetweenCheckpoints两次 Checkpoint 之间的最小暂停时间以毫秒为单位。这个参数可以确保 Flink 不会在很短的时间内连续进行多次 Checkpoint从而避免对系统性能造成过大的影响。例如checkpointConfig.setMinPauseBetweenCheckpoints(500);maxConcurrentCheckpoints最大并发 Checkpoint 数。这个参数用于限制同时进行的 Checkpoint 数量以避免对系统性能造成过大的影响。例如checkpointConfig.setMaxConcurrentCheckpoints(1);
State Backends
State Backends是负责管理和存储Flink应用程序状态的组件。Flink提供了多种不同的State Backends每种都有其特定的用途和优缺点。
1. MemoryStateBackend
存储位置状态数据保存在Java堆内存中。适用场景本地调试或小规模状态数据的场景。限制每个独立的状态默认限制大小为5MB但可以通过构造函数增加容量状态的大小不能超过Akka的framesize大小。配置如果没有明确配置State BackendFlink将默认使用MemoryStateBackend。 全局配置
state.backend: hashmap
state.checkpoint-storage: jobmanager代码配置
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());2. FsStateBackend
存储位置状态数据保存在TaskManager的内存中并通过Checkpoint机制将状态快照写入配置好的文件系统或目录中。适用场景状态数据较大需要持久化存储的场景。配置通过配置文件系统路径如HDFS、本地文件系统等来设置FsStateBackend。特点FsStateBackend通过配置一个fileStateThreshold阈值当状态大小超过该阈值时将状态存储在文件系统中否则仍然保存在内存中。 全局配置
state.backend: hashmap
state.checkpoints.dir: file:///checkpoint-dir/
state.checkpoint-storage: filesystem代码配置
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(file:///checkpoint-dir);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(file:///checkpoint-dir));3. RocksDBStateBackend
存储位置使用RocksDB作为状态存储后端数据保存在磁盘上。适用场景状态数据非常大、需要高可用性和持久性保证的场景。特点RocksDB支持增量快照这对于具有大量变化缓慢状态的应用程序非常有用。状态快照会持久化到分布式文件系统如HDFS中。 全局配置
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
state.checkpoint-storage: filesystem代码配置
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage(file:///checkpoint-dir);
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(file:///checkpoint-dir));4. 配置和使用
配置方式 可以通过 StreamExecutionEnvironment.setStateBackend(…) 方法来配置 State Backend。也可以在 Flink 的配置文件如 flink-conf.yaml中设置默认的 State Backend。 选择建议 -对于开发调试或状态量较小的情况可以使用 MemoryStateBackend。 对于生产环境或状态量较大的情况推荐使用 FsStateBackend 或 RocksDBStateBackend或 EmbeddedRocksDBStateBackend。如果对性能有特别高的要求且状态量非常大可以考虑使用 RocksDBStateBackend 或 EmbeddedRocksDBStateBackend。
开箱即用的 state backends
最新版本Flink 内置了以下这些开箱即用的 state backends
HashMapStateBackendEmbeddedRocksDBStateBackend 如果不设置默认使用 HashMapStateBackend。
HashMapStateBackend
在 HashMapStateBackend 内部数据以 Java 对象的形式存储在堆中。 Key/value 形式的状态和窗口算子会持有一个 hash table其中存储着状态值、触发器。 适用场景
有较大 state较长 window 和较大 key/value 状态的 Job。所有的高可用场景。
EmbeddedRocksDBStateBackend
EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。 不同于 HashMapStateBackend 中的 java 对象数据被以序列化字节数组的方式存储这种方式由序列化器决定因此 key 之间的比较是以字节序的形式进行而不是使用 Java 的 hashCode 或 equals() 方法。 EmbeddedRocksDBStateBackend 会使用异步的方式生成 snapshots。
EmbeddedRocksDBStateBackend 的局限
由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。 RocksDB 合并操作的状态例如ListState累积数据量大小可以超过 2^31 字节但是会在下一次获取数据时失败。这是当前 RocksDB JNI 的限制。
EmbeddedRocksDBStateBackend 的适用场景
状态非常大、窗口非常长、key/value 状态非常大的 Job。所有高可用的场景。
设置 State Backend
如果没有明确指定将使用 jobmanager 做为默认的 state backend。能在 flink-conf.yaml 中为所有 Job 设置其他默认的 State Backend。 每一个 Job 的 state backend 配置会覆盖默认的 state backend 配置 设置每个 Job 的 State Backend 对每个 Job 的 State Backend 进行设置如下所示
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());若想在 IDE 中使用 EmbeddedRocksDBStateBackend
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion1.18.1/versionscopeprovided/scope
/dependency注意: 由于 RocksDB 是 Flink 默认分发包的一部分所以如果你没在代码中使用 RocksDB则不需要添加此依赖。而且可以在 flink-conf.yaml 文件中通过 state.backend.type 配置 State Backend以及更多的 checkpointing 和 RocksDB 特定的 参数。 设置默认的全局的 State Backend 在 flink-conf.yaml 可以通过键 state.backend.type 设置默认的 State Backend。
# 用于存储 operator state 快照的 State Backend
state.backend: hashmap
# 存储快照的目录
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints