网站建费用,网站增加权重,威海百度seo,成品网站1688入口系列文章目录
Flink1.17实战教程#xff08;第一篇#xff1a;概念、部署、架构#xff09; Flink1.17实战教程#xff08;第二篇#xff1a;DataStream API#xff09; Flink1.17实战教程#xff08;第三篇#xff1a;时间和窗口#xff09; Flink1.17实战教程…系列文章目录
Flink1.17实战教程第一篇概念、部署、架构 Flink1.17实战教程第二篇DataStream API Flink1.17实战教程第三篇时间和窗口 Flink1.17实战教程第四篇处理函数 Flink1.17实战教程第五篇状态管理 Flink1.17实战教程第六篇容错机制 Flink1.17实战教程第七篇Flink SQL 文章目录 系列文章目录1. 检查点Checkpoint1.1 检查点的保存1.2 从检查点恢复状态1.3 检查点算法1.3.1 检查点分界线Barrier1.3.2 分布式快照算法Barrier对齐的精准一次1.3.3 分布式快照算法Barrier对齐的至少一次1.3.4 分布式快照算法非Barrier对齐的精准一次 1.4 检查点配置1.4.1 启用检查点1.4.2 检查点存储1.4.3 其它高级配置1.4.4 通用增量 checkpoint (changelog)1.4.5 最终检查点 1.5 保存点Savepoint1.5.1 保存点的用途1.5.2 使用保存点1.5.3 使用保存点切换状态后端 2. 状态一致性2.1 一致性的概念和级别2.2 端到端的状态一致性 3. 端到端精确一次End-To-End Exactly-Once3.1 输入端保证3.2 输出端保证3.3 Flink和Kafka连接时的精确一次保证 1. 检查点Checkpoint
在Flink中有一套完整的容错机制来保证故障后的恢复其中最重要的就是检查点。 1.1 检查点的保存
1周期性的触发保存 “随时存档”确实恢复起来方便可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存当大量数据同时到来时就会耗费很多资源来频繁做检查点数据处理的速度就会受到影响。所以在Flink中检查点的保存是周期性触发的间隔时间可以进行设置。
2保存的时间点 我们应该在所有任务算子都恰好处理完一个相同的输入数据的时候将它们的状态保存下来。 这样做可以实现一个数据被所有任务算子完整地处理完状态得到了保存。 如果出现故障我们恢复到之前保存的状态故障时正在处理的所有数据都需要重新处理我们只需要让源source任务向数据源重新提交偏移量、请求重放数据就可以了。当然这需要源任务可以把偏移量作为算子状态保存下来而且外部数据源能够重置偏移量kafka就是满足这些要求的一个最好的例子。
3保存的具体流程 检查点的保存最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一个具体的例子来详细描述一下检查点具体的保存过程。 回忆一下我们最初实现的统计词频的程序——word count。这里为了方便我们直接从数据源读入已经分开的一个个单词例如这里输入的是 “hello”“world”“hello”“flink”“hello”“world”“hello”“flink”… 我们所需要的就是每个任务都处理完“hello”之后保存自己的状态。
1.2 从检查点恢复状态 检查点的保存具体流程 处理数据过程发生故障 重启应用 读取检查点重置状态 重置偏移量 继续处理数据 1.3 检查点算法
在Flink中采用了基于Chandy-Lamport算法的分布式快照可以在不暂停整体流处理的前提下将状态备份保存到检查点。
1.3.1 检查点分界线Barrier
借鉴水位线的设计在数据流中插入一个特殊的数据结构专门用来表示触发检查点保存的时间点。收到保存检查点的指令后Source任务可以在当前数据流中插入这个结构之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的因此遇到这个标识就代表之前的数据都处理完了可以保存一个检查点而在它之后的数据引起的状态改变就不会体现在这个检查点中而需要保存到下一个检查点。 这种特殊的数据形式把一条流上的数据按照不同的检查点分隔开所以就叫做检查点的“分界线”Checkpoint Barrier。 1.3.2 分布式快照算法Barrier对齐的精准一次
watermark指示的是“之前的数据全部到齐了”而barrier指示的是“之前所有数据的状态更改保存入当前检查点”它们都是一个“截止时间”的标志。所以在处理多个分区的传递时也要以是否还会有数据到来作为一个判断标准。 具体实现上Flink使用了Chandy-Lamport算法的一种变体被称为“异步分界线快照”算法。算法的核心就是两个原则 当上游任务向多个并行下游任务发送barrier时需要广播出去 而当多个上游任务向同一个下游任务传递分界线时需要在下游任务执行“分界线对齐”操作也就是需要等到所有并行分区的barrier都到齐才可以开始状态的保存。
1场景说明
2检查点保存算法具体过程为 1.触发检查点保存 2.分界线向下游传递 3.分界线对齐 4.有状态算子将状态保存至持久化存储 1触发检查点JobManager向Source发送Barrier 2Barrier发送向下游广播发送 3Barrier对齐下游需要收到上游所有并行度传递过来的Barrier才做自身状态的保存 4状态保存有状态的算子将状态保存至持久化。 5先处理缓存数据然后正常继续处理 完成检查点保存之后任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据需要先做处理然后再按照顺序依次处理新到的数据。当JobManager收到所有任务成功保存状态的信息就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。
补充由于分界线对齐要求先到达的分区做缓存等待一定程度上会影响处理的速度当出现背压时下游任务会堆积大量的缓冲数据检查点可能需要很久才可以保存完毕。
为了应对这种场景Barrier对齐中提供了至少一次语义以及Flink 1.11之后提供了不对齐的检查点保存方式可以将未处理的缓冲数据也保存进检查点。这样当我们遇到一个分区barrier时就不需等待对齐而是可以直接启动状态的保存了。
1.3.3 分布式快照算法Barrier对齐的至少一次 1.触发检查点保存 2.分界线向下游传递 3.分界线对齐 4.有状态算子将状态保存至持久化存储 1.3.4 分布式快照算法非Barrier对齐的精准一次 1.触发检查点保存 2.分界线向下游传递 3.非Barrier对齐 1.4 检查点配置
检查点的作用是为了故障恢复我们不能因为保存检查点占据了大量时间、导致数据处理性能明显降低。为了兼顾容错性和处理性能我们可以在代码中对检查点进行各种配置。
1.4.1 启用检查点
默认情况下Flink程序是禁用检查点的。如果想要为Flink应用开启自动保存快照的功能需要在代码中显式地调用执行环境的.enableCheckpointing()方法
StreamExecutionEnvironment env
StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1秒启动一次检查点保存
env.enableCheckpointing(1000);这里需要传入一个长整型的毫秒数表示周期性保存检查点的间隔时间。如果不传参数直接启用检查点默认的间隔周期为500毫秒这种方式已经被弃用。 检查点的间隔时间是对处理性能和故障恢复速度的一个权衡。如果我们希望对性能的影响更小可以调大间隔时间而如果希望故障重启后迅速赶上实时的数据处理就需要将间隔时间设小一些。
1.4.2 检查点存储
检查点具体的持久化存储位置取决于“检查点存储”的设置。默认情况下检查点存储在JobManager的堆内存中。而对于大状态的持久化保存Flink也提供了在其他存储位置进行保存的接口。 具体可以通过调用检查点配置的.setCheckpointStorage()来配置需要传入一个CheckpointStorage的实现类。Flink主要提供了两种CheckpointStorage作业管理器的堆内存和文件系统。
// 配置存储检查点到JobManager堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(hdfs://namenode:40010/flink/checkpoints));对于实际生产应用我们一般会将CheckpointStorage配置为高可用的分布式文件系统HDFSS3等。
1.4.3 其它高级配置
检查点还有很多可以配置的选项可以通过获取检查点配置CheckpointConfig来进行设置。
CheckpointConfig checkpointConfig env.getCheckpointConfig();1常用高级配置
检查点模式CheckpointingMode 设置检查点一致性的保证级别有“精确一次”exactly-once和“至少一次”at-least-once两个选项。默认级别为exactly-once而对于大多数低延迟的流处理程序at-least-once就够用了而且处理效率会更高。超时时间checkpointTimeout 用于指定检查点保存的超时时间超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数表示超时时间。最小间隔时间minPauseBetweenCheckpoints 用于指定在上一个检查点完成之后检查点协调器最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点只要距离上一个检查点完成的间隔不够就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时实际并发为1。最大并发检查点数量maxConcurrentCheckpoints 用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。开启外部持久化存储enableExternalizedCheckpoints 用于开启检查点的外部持久化而且默认在作业失败的时候不会自动清理如果想释放空间需要自己手工清理。里面传入的参数ExternalizedCheckpointCleanup指定了当作业取消的时候外部的检查点该如何清理。 DELETE_ON_CANCELLATION在作业取消的时候会自动删除外部检查点但是如果是作业失败退出则会保留检查点。 RETAIN_ON_CANCELLATION作业取消的时候也会保留外部检查点。检查点连续失败次数tolerableCheckpointFailureNumber 用于指定检查点连续失败的次数当达到这个次数作业就失败退出。默认为0这意味着不能容忍检查点失败并且作业将在第一次报告检查点失败时失败。
2开启非对齐检查点
非对齐检查点enableUnalignedCheckpoints 不再执行检查点的分界线对齐操作启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式CheckpointingMode必须为exctly-once并且最大并发的检查点个数为1。对齐检查点超时时间alignedCheckpointTimeout 该参数只有在启用非对齐检查点的时候有效。参数默认是0表示一开始就直接用非对齐检查点。如果设置大于0一开始会使用对齐的检查点当对齐时间超过该参数设定的时间则会自动切换成非对齐检查点。
代码中具体设置如下
public class CheckpointConfigDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);// 代码中用到hdfs需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty(HADOOP_USER_NAME, atguigu);// TODO 检查点配置// 1、启用检查点: 默认是barrier对齐的周期为5s, 精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig env.getCheckpointConfig();// 2、指定检查点的存储位置checkpointConfig.setCheckpointStorage(hdfs://hadoop102:8020/chk);// 3、checkpoint的超时时间: 默认10分钟checkpointConfig.setCheckpointTimeout(60000);// 4、同时运行中的checkpoint的最大数量checkpointConfig.setMaxConcurrentCheckpoints(1);// 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔设置了0,并发就会变成1checkpointConfig.setMinPauseBetweenCheckpoints(1000);// 6、取消作业时checkpoint的数据 是否保留在外部系统// DELETE_ON_CANCELLATION:主动cancel时删除存在外部系统的chk-xx目录 如果是程序突然挂掉不会删// RETAIN_ON_CANCELLATION:主动cancel时外部系统的chk-xx目录会保存下来checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 7、允许 checkpoint 连续失败的次数默认0--》表示checkpoint一失败job就挂掉checkpointConfig.setTolerableCheckpointFailureNumber(10);// TODO 开启 非对齐检查点barrier非对齐// 开启的要求 Checkpoint模式必须是精准一次最大并发必须设为1checkpointConfig.enableUnalignedCheckpoints();// 开启非对齐检查点才生效 默认0表示一开始就直接用 非对齐的检查点// 如果大于0 一开始用 对齐的检查点barrier对齐 对齐的时间超过这个参数自动切换成 非对齐检查点barrier非对齐checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));env.socketTextStream(hadoop102, 7777).flatMap((String value, CollectorTuple2String, Integer out) - {String[] words value.split( );for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value - value.f0).sum(1).print();env.execute();}
}1.4.4 通用增量 checkpoint (changelog)
在 1.15 之前只有RocksDB 支持增量快照。不同于产生一个包含所有数据的全量备份增量快照中只包含自上一次快照完成之后被修改的记录因此可以显著减少快照完成的耗时。 Rocksdb状态后端启用增量checkpoint
EmbeddedRocksDBStateBackend backend new EmbeddedRocksDBStateBackend(true);从 1.15 开始不管hashmap还是rocksdb 状态后端都可以通过开启changelog实现通用的增量checkpoint。
1执行过程 1带状态的算子任务将状态更改写入变更日志记录状态 2状态物化状态表定期保存独立于检查点 3状态物化完成后状态变更日志就可以被截断到相应的点 2注意事项 1目前标记为实验性功能开启后可能会造成资源消耗增大 HDFS上保存的文件数变多 消耗更多的IO带宽用于上传变更日志 更多的CPU用于序列化状态更改 TaskManager使用更多内存来缓存状态更改 2使用限制 Checkpoint的最大并发必须为1 从 Flink 1.15 开始只有文件系统的存储类型实现可用memory测试阶段 不支持 NO_CLAIM 模式 3使用方式 1方式一配置文件指定
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
# 存储 changelog 数据
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint-restore-mode: CLAIM2方式二在代码中设置 需要引入依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-changelog/artifactIdversion${flink.version}/versionscoperuntime/scope
/dependency开启changelog
env.enableChangelogStateBackend(true);1.4.5 最终检查点
如果数据源是有界的就可能出现部分Task已经处理完所有数据变成finished状态不继续工作。从 Flink 1.14 开始这些finished状态的Task也可以继续执行检查点。自 1.15 起默认启用此功能并且可以通过功能标志禁用它
Configuration config new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(config);1.5 保存点Savepoint
除了检查点外Flink还提供了另一个非常独特的镜像保存功能——保存点savepoint。 从名称就可以看出这也是一个存盘的备份它的原理和算法与检查点完全相同只是多了一些额外的元数据。
1.5.1 保存点的用途
保存点与检查点最大的区别就是触发的时机。检查点是由Flink自动管理的定期创建发生故障之后自动读取进行恢复这是一个“自动存盘”的功能而保存点不会自动创建必须由用户明确地手动触发保存操作所以就是“手动存盘”。 保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点然后停止应用做一些处理调整之后再从保存点重启。它适用的具体场景有
版本管理和归档存储更新Flink版本更新应用程序调整并行度暂停应用程序
需要注意的是保存点能够在程序更改的时候依然兼容前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子ID-状态名称这样的key-value组织起来的算子ID可以在代码中直接调用SingleOutputStreamOperator的.uid()方法来进行指定
DataStreamString stream env.addSource(new StatefulSource()).uid(source-id).map(new StatefulMapper()).uid(mapper-id).print();对于没有设置ID的算子Flink默认会自动进行设置所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态。所以为了方便后续的维护强烈建议在程序中为每一个算子手动指定ID。
1.5.2 使用保存点
保存点的使用非常简单我们可以使用命令行工具来创建保存点也可以从保存点恢复作业。
1创建保存点 要在命令行中为运行的作业创建一个保存点镜像只需要执行
bin/flink savepoint :jobId [:targetDirectory]这里jobId需要填充要做镜像保存的作业ID目标路径targetDirectory可选表示保存点存储的路径。 对于保存点的默认路径可以通过配置文件flink-conf.yaml中的state.savepoints.dir项来设定
state.savepoints.dir: hdfs:///flink/savepoints当然对于单独的作业我们也可以在程序代码中通过执行环境来设置
env.setDefaultSavepointDir(hdfs:///flink/savepoints);由于创建保存点一般都是希望更改环境之后重启所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点我们也可以在停掉一个作业时直接创建保存点
bin/flink stop --savepointPath [:targetDirectory] :jobId2从保存点重启应用 我们已经知道提交启动一个Flink作业使用的命令是flink run现在要从保存点重启一个应用其实本质是一样的
bin/flink run -s :savepointPath [:runArgs]这里只要增加一个-s参数指定保存点的路径就可以了其它启动时的参数还是完全一样的如果是基于yarn的运行模式还需要加上 -yid application-id。我们在第三章使用web UI进行作业提交时可以填入的参数除了入口类、并行度和运行参数还有一个“Savepoint Path”这就是从保存点启动应用的配置。
1.5.3 使用保存点切换状态后端
使用savepoint恢复状态的时候也可以更换状态后端。但是有一点需要注意的是不要在代码中指定状态后端了 通过配置文件来配置或者-D 参数配置。 打包时服务器上有的就provided可能遇到依赖问题报错javax.annotation.Nullable找不到可以导入如下依赖 dependencygroupIdcom.google.code.findbugs/groupIdartifactIdjsr305/artifactIdversion1.3.9/version/dependency1提交flink作业
bin/flink run-application -d -t yarn-application -Dstate.backendhashmap -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar2停止flink作业时触发保存点 方式一stop优雅停止并触发保存点要求source实现StoppableFunction接口
bin/flink stop -p savepoint路径 job-id -yid application-id方式二cancel立即停止并触发保存点
bin/flink cancel -s savepoint路径 job-id -yid application-id案例中source是socket不能用stop
bin/flink cancel -s hdfs://hadoop102:8020/sp cffca338509ea04f38f03b4b77c8075c -yid application_1681871196375_00013从savepoint恢复作业同时修改状态后端
bin/flink run-application -d -t yarn-application -s hdfs://hadoop102:8020/sp/savepoint-267cc0-47a214b019d5 -Dstate.backendrocksdb -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar4从保存下来的checkpoint恢复作业
bin/flink run-application -d -t yarn-application -Dstate.backendrocksdb -s hdfs://hadoop102:8020/chk/532f87ef4146b2a2968a1c137d33d4a6/chk-175 -c com.atguigu.checkpoint.SavepointDemo ./FlinkTutorial-1.0-SNAPSHOT.jar如果停止作业时忘了触发保存点也不用担心现在版本的flink支持从保留在外部系统的checkpoint恢复作业但是恢复时不支持切换状态后端。
2. 状态一致性
2.1 一致性的概念和级别
一致性其实就是结果的正确性一般从数据丢失、数据重复来评估。 流式计算本身就是一个一个来的所以正常处理的过程中结果肯定是正确的但在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确所以主要讨论的就是“状态的一致性”。 一般说来状态一致性有三种级别
最多一次At-Most-Once至少一次At-Least-Once精确一次Exactly-Once
2.2 端到端的状态一致性
我们已经知道检查点可以保证Flink内部状态的一致性而且可以做到精确一次。那是不是说只要开启了检查点发生故障进行恢复结果就不会有任何问题呢 没那么简单。在实际应用中一般要保证从用户的角度看来最终消费的数据是正确的。而用户或者外部应用不会直接从Flink内部的状态读取数据往往需要我们将处理结果写入外部存储中。这就要求我们不仅要考虑Flink内部数据的处理转换还涉及到从外部数据源读取以及写入外部持久化系统整个应用处理流程从头到尾都应该是正确的。 所以完整的流处理应用应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性就叫做“端到端end-to-end的状态一致性”它取决于三个组件中最弱的那一环。一般来说能否达到at-least-once一致性级别主要看数据源能够重放数据而能否达到exactly-once级别流处理器内部、数据源、外部存储都要有相应的保证机制。
3. 端到端精确一次End-To-End Exactly-Once
实际应用中最难做到、也最希望做到的一致性语义无疑就是端到端end-to-end的“精确一次”。我们知道对于Flink内部来说检查点机制可以保证故障恢复后数据不丢在能够重放的前提下并且只处理一次所以已经可以做到exactly-once的一致性语义了。 所以端到端一致性的关键点就在于输入的数据源端和输出的外部存储端。 3.1 输入端保证
输入端主要指的就是Flink读取的外部数据源。对于一些数据源来说并不提供数据的缓冲或是持久化保存数据被消费之后就彻底不存在了例如socket文本流。对于这样的数据源故障后我们即使通过检查点恢复之前的状态可保存检查点之后到发生故障期间的数据已经不能重发了这就会导致数据丢失。所以就只能保证at-most-once的一致性语义相当于没有保证。
想要在故障恢复后不丢数据外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存并且可以重设数据的读取位置。一个最经典的应用就是Kafka。在Flink的Source任务中将数据读取的偏移量保存为状态这样就可以在故障恢复时从检查点中读取出来对数据源重置偏移量重新获取数据。
数据源可重放数据或者说可重置读取数据偏移量加上Flink的Source算子将偏移量作为状态保存进检查点就可以保证数据不丢。这是达到at-least-once一致性语义的基本要求当然也是实现端到端exactly-once的基本要求。
3.2 输出端保证
有了Flink的检查点机制以及可重放数据的外部数据源我们已经能做到at-least-once了。但是想要实现exactly-once却有更大的困难数据有可能重复写入外部系统。
因为检查点保存之后继续到来的数据也会一一处理任务的状态也会更新最终通过Sink任务将计算结果输出到外部系统只是状态改变还没有存到下一个检查点中。这时如果出现故障这些数据都会重新来一遍就计算了两次。我们知道对Flink内部状态来说重复计算的动作是没有影响的因为状态已经回滚最终改变只会发生一次但对于外部系统来说已经写入的结果就是泼出去的水已经无法收回了再次执行写入就会把同一个数据写入两次。
所以这时我们只保证了端到端的at-least-once语义。
为了实现端到端exactly-once我们还需要对外部存储系统、以及Sink连接器有额外的要求。能够保证exactly-once一致性的写入方式有两种
幂等写入事务写入
我们需要外部存储系统对这两种写入方式的支持而Flink也为提供了一些Sink连接器接口。接下来我们进行展开讲解。
1幂等Idempotent写入 所谓“幂等”操作就是说一个操作可以重复执行很多次但只导致一次结果更改。也就是说后面再重复执行就不会对结果起作用了。
这相当于说我们并没有真正解决数据重复计算、写入的问题而是说重复写入也没关系结果不会改变。所以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入比如Redis中键值存储或者关系型数据库如MySQL中满足查询条件的更新操作。
需要注意对于幂等写入遇到故障进行恢复时有可能会出现短暂的不一致。因为保存点完成之后到发生故障之间的数据其实已经写入了一遍回滚的时候并不能消除它们。如果有一个外部应用读取写入的数据可能会看到奇怪的现象短时间内结果会突然“跳回”到之前的某个值然后“重播”一段之前的数据。不过当数据的重放逐渐超过发生故障的点的时候最终的结果还是一致的。
2事务Transactional写入 如果说幂等写入对应用场景限制太多那么事务写入可以说是更一般化的保证一致性的方式。
输出端最大的问题就是写入到外部系统的数据难以撤回。而利用事务就可以实现对已写入数据的撤回。
事务是应用程序中一系列严密的操作所有操作必须成功完成否则在每个操作中所作的所有更改都会被撤消。事务有四个基本特性原子性、一致性、隔离性和持久性这就是著名的ACID。
在Flink流处理的结果写入外部系统时如果能够构建一个事务让写入操作可以随着检查点来提交和回滚那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是用一个事务来进行数据向外部系统的写入这个事务是与检查点绑定在一起的。当Sink任务遇到barrier时开始保存状态的同时就开启一个事务接下来所有数据的写入都在这个事务中待到当前检查点保存完毕时将事务提交所有写入的数据就真正可用了。如果中间过程出现故障状态会回退到上一个检查点而当前事务没有正常关闭因为当前检查点没有保存完所以也会回滚写入到外部的数据就被撤销了。
具体来说又有两种实现方式预写日志WAL和两阶段提交2PC 1预写日志write-ahead-logWAL 我们发现事务提交是需要外部存储系统支持事务的否则没有办法真正实现写入的回撤。那对于一般不支持事务的存储系统能够实现事务写入呢 预写日志WAL就是一种非常简单的方式。具体步骤是 ①先把结果数据作为日志log状态保存起来 ②进行检查点保存时也会将这些结果数据一并做持久化存储 ③在收到检查点完成的通知时将所有结果一次性写入外部系统。 ④在成功写入所有数据后在内部再次确认相应的检查点将确认信息也进行持久化保存。这才代表着检查点的真正完成。 我们会发现这种方式类似于检查点完成时做一个批处理一次性的写入会带来一些性能上的问题而优点就是比较简单由于数据提前在状态后端中做了缓存所以无论什么外部存储系统理论上都能用这种方式一批搞定。在Flink中DataStream API提供了一个模板类GenericWriteAheadSink用来实现这种事务型的写入方式。
需要注意的是预写日志这种一批写入的方式有可能会写入失败所以在执行写入动作之后必须等待发送成功的返回确认消息。在成功写入所有数据后在内部再次确认相应的检查点这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存在故障恢复时只有存在对应的确认信息才能保证这批数据已经写入可以恢复到对应的检查点位置。
但这种“再次确认”的方式也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功地一批写入到了外部系统但是最终保存确认信息时出现了故障Flink最终还是会认为没有成功写入。于是发生故障时不会使用这个检查点而是需要回退到上一个这样就会导致这批数据的重复写入。
2两阶段提交two-phase-commit2PC 前面提到的各种实现exactly-once的方式多少都有点缺陷而更好的方法就是传说中的两阶段提交2PC。
顾名思义它的想法是分成两个阶段先做“预提交”等检查点完成之后再正式提交。这种提交方式是真正基于事务的它需要外部系统提供事务支持。 具体的实现步骤为 ①当第一条数据到来时或者收到检查点的分界线时Sink任务都会启动一个事务。 ②接下来接收到的所有数据都通过这个事务写入外部系统这时由于事务没有提交所以数据尽管写入了外部系统但是不可用是“预提交”的状态。 ③当Sink任务收到JobManager发来检查点完成的通知时正式提交事务写入的结果就真正可用了。 当中间发生故障时当前未提交的事务就会回滚于是所有写入外部系统的数据也就实现了撤回。这种两阶段提交2PC的方式充分利用了Flink现有的检查点机制分界线的到来就标志着开始一个新事务而收到来自JobManager的checkpoint成功的消息就是提交事务的指令。每个结果数据的写入依然是流式的不再有预写日志时批处理的性能问题最终提交时也只需要额外发送一个确认信息。所以2PC协议不仅真正意义上实现了exactly-once而且通过搭载Flink的检查点机制来实现事务只给系统增加了很少的开销。 Flink提供了TwoPhaseCommitSinkFunction接口方便我们自定义实现两阶段提交的SinkFunction的实现提供了真正端到端的exactly-once保证。新的Sink架构使用的是TwoPhaseCommittingSink接口。 不过两阶段提交虽然精巧却对外部系统有很高的要求。这里将2PC对外部系统的要求列举如下
外部系统必须提供事务支持或者Sink任务必须能够模拟外部系统上的事务。在检查点的间隔期间里必须能够开启一个事务并接受数据写入。在收到检查点完成的通知之前事务必须是“等待提交”的状态。在故障恢复的情况下这可能需要一些时间。如果这个时候外部系统关闭事务例如超时了那么未提交的数据就会丢失。Sink任务必须能够在进程失败后恢复事务。提交事务必须是幂等操作。也就是说事务的重复提交应该是无效的。
可见2PC在实际应用同样会受到比较大的限制。具体在项目中的选型最终还应该是一致性级别和处理性能的权衡考量。
3.3 Flink和Kafka连接时的精确一次保证
在流处理的应用中最佳的数据源当然就是可重置偏移量的消息队列了它不仅可以提供数据重放的功能而且天生就是以流的方式存储和处理数据的。所以作为大数据工具中消息队列的代表Kafka可以说与Flink是天作之合实际项目中也经常会看到以Kafka作为数据源和写入的外部系统的应用。在本小节中我们就来具体讨论一下Flink和Kafka连接时怎样保证端到端的exactly-once状态一致性。 1整体介绍 既然是端到端的exactly-once我们依然可以从三个组件的角度来进行分析 1Flink内部 Flink内部可以通过检查点机制保证状态和处理结果的exactly-once语义。 2输入端 输入数据源端的Kafka可以对数据进行持久化保存并可以重置偏移量offset。所以我们可以在Source任务FlinkKafkaConsumer中将当前读取的偏移量保存为算子状态写入到检查点中当发生故障时从检查点中读取恢复状态并由连接器FlinkKafkaConsumer向Kafka重新提交偏移量就可以重新消费数据、保证结果的一致性了。 3输出端 输出端保证exactly-once的最佳实现当然就是两阶段提交2PC。作为与Flink天生一对的Kafka自然需要用最强有力的一致性保证来证明自己。 也就是说我们写入Kafka的过程实际上是一个两段式的提交处理完毕得到结果写入Kafka时是基于事务的“预提交”等到检查点保存完毕才会提交事务进行“正式提交”。如果中间出现故障事务进行回滚预提交就会被放弃恢复状态之后也只能恢复所有已经确认提交的操作。 2需要的配置 在具体应用中实现真正的端到端exactly-once还需要有一些额外的配置 1必须启用检查点 2指定KafkaSink的发送级别为DeliveryGuarantee.EXACTLY_ONCE 3配置Kafka读取数据的消费者的隔离级别 这里所说的Kafka是写入的外部系统。预提交阶段数据已经写入只是被标记为“未提交”uncommitted而Kafka中默认的隔离级别isolation.level是read_uncommitted也就是可以读取未提交的数据。这样一来外部应用就可以直接消费未提交的数据对于事务性的保证就失效了。所以应该将隔离级别配置 为read_committed表示消费者遇到未提交的消息时会停止从分区中消费数据直到消息被标记为已提交才会再次恢复消费。当然这样做的话外部应用消费数据就会有显著的延迟。 4事务超时配置 Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时而Kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以在检查点保存时间很长时有可能出现Kafka已经认为事务超时了丢弃了预提交的数据而Sink任务认为还可以继续等待。如果接下来检查点保存成功发生故障后回滚到这个检查点的状态这部分数据就被真正丢掉了。所以这两个超时时间前者应该小于等于后者。 public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 代码中用到hdfs需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty(HADOOP_USER_NAME, atguigu);// TODO 1、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig env.getCheckpointConfig();checkpointConfig.setCheckpointStorage(hdfs://hadoop102:8020/chk);checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// TODO 2.读取kafkaKafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setGroupId(atguigu).setTopics(topic_1).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSourceString kafkasource env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource);/*** TODO 3.写出到Kafka* 精准一次 写入Kafka需要满足以下条件缺一不可* 1、开启checkpoint* 2、sink设置保证级别为 精准一次* 3、sink设置事务前缀* 4、sink设置事务超时时间 checkpoint间隔 事务超时时间 max的15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// TODO 3.1 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// TODO 3.2 精准一次必须设置 事务的前缀.setTransactionalIdPrefix(atguigu-)// TODO 3.3 精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000).build();kafkasource.sinkTo(kafkaSink);env.execute();}
}后续读取“ws”这个topic的消费者要设置事务的隔离级别为“读已提交”如下
public class KafkaEOSDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用两阶段提交写入的TopicKafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setGroupId(atguigu).setTopics(ws).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// TODO 作为 下游的消费者要设置 事务的隔离级别 读已提交.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, read_committed).build();env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource).print();env.execute();}
}