狼们求个没封的免费网站,wordpress 首页 静态,出色的网站,服装设计师月薪多少文章目录 来自于尚硅谷教程1. Flink概述1.1 特点1.2 与SparkStreaming对比 2. Flink部署2.1 集群角色2.2 部署模式2.3 Standalone运行模式2.3.1 本地会话模式部署2.3.2 应用模式 2.4 YARN运行模式2.4.1 会话模式部署2.4.2 应用模式部署 2.5 历史服务 3. 系统架构3.1 并行度3.2 … 文章目录 来自于尚硅谷教程1. Flink概述1.1 特点1.2 与SparkStreaming对比 2. Flink部署2.1 集群角色2.2 部署模式2.3 Standalone运行模式2.3.1 本地会话模式部署2.3.2 应用模式 2.4 YARN运行模式2.4.1 会话模式部署2.4.2 应用模式部署 2.5 历史服务 3. 系统架构3.1 并行度3.2 算子链3.3 任务槽(task slot)3.4 任务槽和并行度的关系3.5 作业提交流程 4. 时间和窗口4.1 窗口分类4.2 窗口分配器4.2.1 时间窗口4.2.2 计数窗口 4.3 窗口函数4.3.1 增量聚合函数(ReduceFunction / AggregateFunction)4.3.2 全窗口函数4.3.3 增量聚合和全窗口函数的结合使用 4.4 触发器和移除器4.5 窗口知识总结4.6 水位线(watermark)4.6.1 什么是水位线4.6.2 水位线生成 4.7 迟到数据的处理4.8 双流联结4.8.1 窗口联结Window Join4.8.2 间隔联结Interval Join 5. 处理函数5.1 按键分区处理函数KeyedProcessFunction5.2 窗口处理函数ProcessWindowFunction ProcessAllWindowFunction5.3 侧输出流Side Output 6. 状态管理6.1 按键分区状态6.1.1 值状态ValueState6.1.2 列表状态ListState6.1.3 Map状态MapState6.1.4 归约状态ReducingState6.1.5 聚合状态AggregatingState6.1.6 状态生存时间TTL 6.2 算子状态Operator State6.2.1 列表状态ListState6.2.2 联合列表状态UnionListState6.2.3 广播状态BroadcastState 7. 容错机制7.1 检查点7.1.1 检查点算法7.1.2 代码配置 7.2 保存点Savepoint7.2.1 使用保存点 问题状态、状态后端、Checkpoint 三者之间的区别及关系7.3 状态一致性7.3.1 端到端精确一次End-To-End Exactly-Once7.3.2 Flink和Kafka连接时的精确一次保证 来自于尚硅谷教程
1. Flink概述
Flink是一个框架和分布式处理引擎用于对无界和有界数据流进行有状态计算 1.1 特点
高吞吐和低延迟精确一次的状态一致性(exactly-once)保证结果准确性提供事件时间和处理时间语义。可以连接到外部系统如Kafka、Hive、JDBC等
1.2 与SparkStreaming对比
FlinkStreaming计算模型流计算微批处理时间语义事件时间处理时间窗口多、灵活少、不灵活(窗口必须是批次的整数倍)状态有无流式SQL有无
2. Flink部署
2.1 集群角色
Flink提交作业和执行任务需要以下组件
客户端: 代码由客户端获取并转换之后提交给JobManagerJobManager: 相当于Master对作业进行中央调度管理获取到执行的作业后分发给众多TaskManagerTaskManager相当于Worker真正执行作业的组件
2.2 部署模式
在一些应用场景中对于集群资源分配和占用的方式可能会有特定的需求。Flink为各种场景提供了不同的部署模式主要有以下三种会话模式Session Mode、单作业模式Per-Job Mode、应用模式Application Mode。
会话模式启动一个集群保持一个会话通过客户端向这个会话提交作业。由于集群启动时所有资源都已经确定(只有一个JobManager)所以提交的作业会竞争集群的资源。单作业模式为了更好地隔离资源可以考虑为每一个作业启动一个集群这就是单作业模式。flink1.16版本及后续版本已将该模式标记为过时不多介绍应用模式上述两种模式应用代码都是在客户端执行然后提交给JobManager。由于客户端会去下载依赖和发送二进制数据给JobManager所以会占用大量网络带宽。解决办法是不需要客户端了直接把应用提交给JobManager这个JobManger只为这个应用存在执行完后就关闭这就是应用模式。
三种模式的区别
会话模式先创建集群后提交作业单作业和应用先提交作业后创建集群会话和单作业由客户端解析提交作业应用模式直接由JobManager执行
这里我们所讲到的部署模式相对是比较抽象的概念。实际应用时一般需要和资源管理平台结合起来选择特定的模式来分配资源、部署应用。
2.3 Standalone运行模式
独立模式是独立运行的不依赖任何外部的资源管理平台当然独立也是有代价的如果资源不足或者出现故障没有自动扩展或重分配资源的保证必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。
2.3.1 本地会话模式部署
运行flink下的start-cluster.sh 就是 StandaloneSession
./bin/start-cluster.sh2.3.2 应用模式
在flink的安装目录下
mv test.jar lib/
# 启动jobManager 类名是com.example.test
./bin/standalone-job.sh start --job-classname com.example.test
# 启动taskManager
bin/taskmanager.sh start# stop
bin/taskmanager.sh stop
bin/standalone-job.sh stop2.4 YARN运行模式
YARN上部署的过程是客户端把Flink应用提交给Yarn的ResourceManagerYarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上Flink会部署JobManager和TaskManager的实例从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。
相关配置
vi ~/.bashrcHADOOP_HOME/opt/module/hadoop-3.3.4
export PATH$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATHhadoop classpath2.4.1 会话模式部署
启动一个集群
bin/yarn-session.sh -nm test -jm 2048 -tm 2048 -s 2 -d-d分离模式如果你不想让Flink YARN客户端一直前台运行可以使用这个参数即使关掉当前对话窗口YARN session也可以后台运行。-jm–jobManagerMemory配置JobManager所需内存默认单位MB。-nm–name配置在YARN UI界面上显示的任务名。-qu–queue指定YARN队列名。-tm–taskManager配置每个TaskManager所使用内存。-s: 一个taskManager的slot数量
提交作业(可多次提交)
bin/flink run -c com.example.test test.jar停止session
echo stop | ./bin/yarn-session.sh -id {application_id}无论是提交一个作业还是多个作业都是 1个JobManager N个TaskManager
2.4.2 应用模式部署
提交作业
bin/flink run-application -t yarn-application -c com.example.test test.jar查看或取消
# 查看
bin/flink list -t yarn-application -Dyarn.application.idapplication_XXXX_YY
# 取消
bin/flink cancel -t yarn-application -Dyarn.application.idapplication_XXXX_YY jobId推荐通过上传HDFS后提交作业
上传flink的lib和plugins到HDFS上
hadoop fs -mkdir /flink-dist
hadoop fs -put lib/ /flink-dist
hadoop fs -put plugins/ /flink-dist上传自己的jar包到HDFS
hadoop fs -mkdir /flink-jars
hadoop fs -put test.jar /flink-jars提交作业
bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirshdfs://xxx:8020/flink-dist -c com.example.test hdfs://xxx:8020/flink-jars/test.jar这种方式下flink本身的依赖和用户jar可以预先上传到HDFS而不需要单独发送到集群这就使得作业提交更加轻量了。
该模式下每个作业就对应 1个JobManager N个TaskManager
对比会话模式和应用模式
若服务器资源情况比较紧缺可考虑使用会话模式。因为只会启动一个JobManager省去了每个应用一个JobManager的内存消耗但相对的会出现资源竞争并且每个TaskManager的内存都是已经定制好了若服务器资源足够则优先使用应用模式。可为每个应用分配不同的JobManager内存和TaskManager内存
2.5 历史服务
运行 Flink job 的集群一旦停止只能去 yarn 或本地磁盘上查看日志不再可以查看作业挂掉之前的运行的 Web UI。Flink提供了历史服务器用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。 1创建存储目录
hadoop fs -mkdir -p /logs/flink-job2在 flink-config.yaml中添加如下配置
jobmanager.archive.fs.dir: hdfs://hadoop:8020/logs/flink-job
historyserver.web.address: hadoop
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop:8020/logs/flink-job
historyserver.archive.fs.refresh-interval: 50003启动历史服务器
bin/historyserver.sh start4停止历史服务器
bin/historyserver.sh stop5在浏览器地址栏输入http://hadoop:8082 查看已经停止的 job 的统计信息
3. 系统架构 1作业管理器JobManager JobManager是一个Flink集群中任务管理和调度的核心是控制应用执行的主进程。也就是说每个应用都应该被唯一的JobManager所控制执行。 JobManger又包含3个不同的组件。 1JobMaster JobMaster是JobManager中最核心的组件负责处理单独的作业Job。所以JobMaster和具体的Job是一一对应的多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster。需要注意在早期版本的Flink中没有JobMaster的概念而JobManager的概念范围较小实际指的就是现在所说的JobMaster。
在作业提交时JobMaster会先接收到要执行的应用。JobMaster会把JobGraph转换成一个物理层面的数据流图这个图被叫作“执行图”ExecutionGraph它包含了所有可以并发执行的任务。JobMaster会向资源管理器ResourceManager发出请求申请执行任务必要的资源。一旦它获取到了足够的资源就会将执行图分发到真正运行它们的TaskManager上。
而在运行过程中JobMaster会负责所有需要中央协调的操作比如说检查点checkpoints的协调。 2资源管理器ResourceManager ResourceManager主要负责资源的分配和管理在Flink 集群中只有一个。所谓“资源”主要是指TaskManager的任务槽task slots。任务槽就是Flink集群中的资源调配单元包含了机器用来执行计算的一组CPU和内存资源。每一个任务Task都需要分配到一个slot上执行。 这里注意要把Flink内置的ResourceManager和其他资源管理平台比如YARN的ResourceManager区分开。 3分发器Dispatcher Dispatcher主要负责提供一个REST接口用来提交应用并且负责为每一个新提交的作业启动一个新的JobMaster 组件。Dispatcher也会启动一个Web UI用来方便地展示和监控作业执行的信息。Dispatcher在架构中并不是必需的在不同的部署模式下可能会被忽略掉。
2任务管理器TaskManager TaskManager是Flink中的工作进程数据流的具体计算就是它来做的。Flink集群中必须至少有一个TaskManager每一个TaskManager都包含了一定数量的任务槽task slots。Slot是资源调度的最小单位slot的数量限制了TaskManager能够并行处理的任务数量。 启动之后TaskManager会向资源管理器注册它的slots收到资源管理器的指令后TaskManager就会将一个或者多个槽位提供给JobMaster调用JobMaster就可以分配任务来执行了。 在执行过程中TaskManager可以缓冲数据还可以跟其他运行同一应用的TaskManager交换数据。
3.1 并行度
当要处理的数据量非常大时我们可以把一个算子操作“复制”多份到多个节点数据来了之后就可以到其中任意一个执行。这样一来一个算子任务就被拆分成了多个并行的“子任务”subtasks再将它们分发到不同节点就真正实现了并行计算。
一个特定算子的子任务subtask的个数被称之为其并行度parallelism。在Flink执行过程中每一个算子operator可以包含一个或多个子任务operator subtask这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。一般情况下一个流程序的并行度可以认为就是其所有算子中最大的并行度。一个程序中不同的算子可能具有不同的并行度。 并行度设置
代码设置
stream.map(word - Tuple2.of(word, 1L)).setParallelism(2); // 特定算子并行度
env.setParallelism(2); // 代码中的全局并行度提交应用时设置
bin/flink run –p 2 –c test ./test.jar配置文件配置 flink-conf.yaml
parallelism.default: 23.2 算子链 1算子间的数据传输 一个数据流在算子之间传输数据的形式可以是一对一one-to-one的直通forwarding模式也可以是打乱的重分区redistributing模式具体是哪一种形式取决于算子的种类。 1一对一One-to-oneforwarding 这种模式下数据流维护着分区以及元素的顺序。比如图中的source和map算子source算子读取数据之后可以直接发送给map算子做处理它们之间不需要重新分区也不需要调整数据的顺序。这就意味着map 算子的子任务看到的元素个数和顺序跟source 算子的子任务产生的完全一样保证着“一对一”的关系。map、filter、flatMap等算子都是这种one-to-one的对应关系。这种关系类似于Spark中的窄依赖。 2重分区Redistributing 在这种模式下数据流的分区会发生改变。比如图中的map和后面的keyBy/window算子之间以及keyBy/window算子和Sink算子之间都是这样的关系。 每一个算子的子任务会根据数据传输的策略把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程这一过程类似于Spark中的shuffle。
2合并算子链 在Flink中并行度相同的一对一one to one算子操作可以直接链接在一起形成一个“大”的任务task这样原来的算子就成为了真正任务里的一部分如下图所示。每个task会被一个线程执行。这样的技术被称为“算子链”Operator Chain。
所以合并算子链需要两个条件1. 并行度相同的算子2. 没有重分区 上图中Source和map之间满足了算子链的要求所以可以直接合并在一起形成了一个任务因为并行度为2所以合并后的任务也有两个并行子任务。这样这个数据流图所表示的作业最终会有5个任务由5个线程并行执行优化前是7个线程。
将算子链接成task是非常有效的优化可以减少线程之间的切换和基于缓存区的数据交换在减少时延的同时提升吞吐量。
Flink默认会按照算子链的原则进行链接合并但有时候在调试时想清晰地看到每个算子的运行状态或定位具体算子的问题也可以在代码中对算子做一些特定的设置
// 禁用算子链
.map(word - Tuple2.of(word, 1L)).disableChaining();// 从当前算子开始新链
.map(word - Tuple2.of(word, 1L)).startNewChain()关于算子链的api1全局禁用算子链env.disableOperatorChaining();2某个算子不参与链化 算子A.disableChaining(), 算子A不会与 前面 和 后面的 算子 串在一起3从某个算子开启新链条 算子A.startNewChain() 算子A不与 前面串在一起从A开始正常链化3.3 任务槽(task slot)
1任务槽Task Slots Flink中每一个TaskManager都是一个JVM进程它可以启动多个独立的线程来并行执行多个子任务subtask。 很显然TaskManager的计算资源是有限的并行的任务越多每个线程的资源就会越少。那一个TaskManager到底能并行处理多少个任务呢为了控制并发量**我们需要在TaskManager上对每个任务运行所占用的资源做出明确的划分这就是所谓的任务槽task slots。**每个任务槽task slot其实表示了TaskManager拥有计算资源的一个固定大小的子集。这些资源就是用来独立执行一个子任务的。
2任务槽数量的设置 在Flink的flink-conf.yaml配置文件中可以设置TaskManager的slot数量默认是1个slot。
taskmanager.numberOfTaskSlots: 1需要注意的是slot目前仅仅用来隔离内存不会涉及CPU的隔离。在具体应用时可以将slot数量配置为机器的CPU核心数尽量避免不同任务之间对CPU的竞争。这也是开发环境默认并行度设为机器CPU数量的原因。
3任务对任务槽的共享 默认情况下Flink是允许子任务共享slot的。如果我们保持sink任务并行度为1不变而作业提交时设置全局并行度为6那么前两个任务节点就会各自有6个并行子任务整个流处理程序则有13个子任务13个线程。如上图所示只要属于同一个作业那么对于不同任务节点算子的并行子任务就可以放到同一个slot上执行。所以对于第一个任务节点source→map它的6个并行子任务必须分到不同的slot上而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。 当我们将资源密集型和非密集型的任务同时放到一个slot中它们就可以自行分配对资源占用的比例从而保证最重的活平均分配给所有的TaskManager。 slot共享另一个好处就是允许我们保存完整的作业管道。这样一来即使某个TaskManager出现故障宕机其他节点也可以完全不受影响作业的任务可以继续执行。 当然Flink默认是允许slot共享的如果希望某个算子对应的任务完全独占一个slot或者只有某一部分算子共享slot我们也可以通过设置“slot共享组”手动指定
.map(word - Tuple2.of(word, 1L)).slotSharingGroup(1);这样只有属于同一个slot共享组的子任务才会开启slot共享不同组之间的任务是完全隔离的必须分配到不同的slot上。在这种场景下总共需要的slot数量就是各个slot共享组最大并行度的总和。
3.4 任务槽和并行度的关系
任务槽和并行度都跟程序的并行执行有关但两者是完全不同的概念。
任务槽是静态的概念是指TaskManager具有的并发执行能力可以通过参数taskmanager.numberOfTaskSlots进行配置并行度是动态概念也就是TaskManager运行程序时实际使用的并发能力可以通过参数parallelism.default进行配置。
3.5 作业提交流程
逻辑流图StreamGraph→ 作业图JobGraph→ 执行图ExecutionGraph→ 物理图Physical Graph
逻辑流图StreamGraph这是根据用户通过 DataStream API编写的代码生成的最初的DAG图用来表示程序的拓扑结构。这一步一般在客户端完成。作业图JobGraphStreamGraph经过优化后生成的就是作业图确定了当前作业中所有任务的划分。主要的优化为形成算子链。JobGraph一般也是在客户端生成的在作业提交时传递给JobMaster。执行图ExecutionGraphJobMaster收到JobGraph后会根据它来生成执行图ExecutionGraph是JobGraph的并行化版本是调度层最核心的数据结构。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分并明确了任务间数据传输的方式。物理图Physical GraphJobMaster生成执行图后会将它分发给TaskManager各个TaskManager会根据执行图部署任务最终的物理执行过程也会形成一张“图”一般就叫作物理图Physical Graph
YARN应用模式提交流程
4. 时间和窗口 Flink中窗口并不是静态准备好的而是动态创建——当有落在这个窗口区间范围的数据达到时才创建对应的窗口。
4.1 窗口分类
1按照驱动类型分
时间窗口以时间点定义窗口的开始和结束计数窗口基于元素个数来截取数据
2按照窗口分配数据的规则分类 滚动窗口Tumbling Window、滑动窗口Sliding Window、会话窗口Session Window以及全局窗口Global Window
4.2 窗口分配器
定义窗口分配器Window Assigners是构建窗口算子的第一步它的作用就是定义数据应该被“分配”到哪个窗口。所以可以说窗口分配器其实就是在指定窗口的类型。
4.2.1 时间窗口
其中按照时间语义分为不同的API
滚动时间
stream.keyBy(...)// 滚动处理时间(每5s一个窗口)// .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 滚动事件时间(每5s一个窗口).window(TumblingEventTimeWindows.of(Time.seconds(5)))滑动时间
stream.keyBy(...)// 滑动处理时间 长度为10秒、滑动步长为5秒的滑动窗口// .window(SlidingProcessingTimeWindows.of(Time.seconds(10)Time.seconds(5)))// 滑动事件时间 长度为10秒、滑动步长为5秒的滑动窗口.window(SlidingEventTimeWindows.of(Time.seconds(10)Time.seconds(5)))会话时间
stream.keyBy(...)// 处理时间 静态会话超时时间为10秒的会话窗口// .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))// 事件时间 静态会话超时时间为10秒的会话窗口.window(EventTimeSessionWindows.withGap(Time.seconds(10)))4.2.2 计数窗口
滚动窗口
stream.keyBy(...).countWindow(10)滑动窗口
stream.keyBy(...).countWindow(103)全局窗口 需要注意使用全局窗口必须自行定义触发器才能实现窗口计算否则起不到任何作用。
stream.keyBy(...).window(GlobalWindows.create());4.3 窗口函数
定义了窗口分配器知道数据属于哪个窗口可以将数据收集起来但收集起来后要做什么还不知道。所以接下来要定义窗口如何进行计算的操作这就是窗口函数。
窗口函数定义了要对窗口中收集的数据做的计算操作根据处理的方式可以分为两类增量聚合函数和全窗口函数
4.3.1 增量聚合函数(ReduceFunction / AggregateFunction)
每来一个数据就在之前结果上聚合一次这就是“增量聚合”。
1归约函数ReduceFunction
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction()).keyBy(r - r.getId())// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunctionWaterSensor() {Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println(调用reduce方法之前的结果:value1 ,现在来的数据:value2);return new WaterSensor(value1.getId(), System.currentTimeMillis(),value1.getVc()value2.getVc());}}).print();env.execute();}
}2聚合函数AggregateFunction ReduceFunction可以解决大多数归约聚合的问题但是这个接口有一个限制就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
AggregateFunction可以看作是ReduceFunction的通用版本这里有三种类型输入类型IN、累加器类型ACC和输出类型OUT。输入类型IN就是输入流中元素的数据类型累加器类型ACC则是我们进行聚合的中间状态类型而输出类型就是最终计算结果的类型。
public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorString aggregate sensorWS.aggregate(new AggregateFunctionWaterSensor, Integer, String() {Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(调用add方法,valuevalue);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用getResult方法);return accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {System.out.println(调用merge方法);return null;}});aggregate.print();env.execute();}
}4.3.2 全窗口函数
有些场景下我们要做的计算必须基于全部的数据才有效这时做增量聚合就没什么意义了另外输出的结果有可能要包含上下文中的一些信息比如窗口的起始时间这是增量聚合函数做不到的。 1处理窗口函数ProcessWindowFunction
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorString process sensorWS.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long count elements.spliterator().estimateSize();long windowStartTs context.window().getStart();long windowEndTs context.window().getEnd();String windowStart DateFormatUtils.format(windowStartTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(windowEndTs, yyyy-MM-dd HH:mm:ss.SSS);out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}});process.print();env.execute();}
}4.3.3 增量聚合和全窗口函数的结合使用
在实际应用中我们往往希望兼具这两者的优点把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。
public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数/*** 增量聚合 Aggregate 全窗口 process* 1、增量聚合函数处理数据 来一条计算一条* 2、窗口触发时 增量聚合的结果只有一条 传递给 全窗口函数* 3、经过全窗口函数的处理包装后输出** 结合两者的优点* 1、增量聚合 来一条计算一条存储中间的计算结果占用的空间少* 2、全窗口函数 可以通过 上下文 实现灵活的功能*/// sensorWS.reduce() //也可以传两个SingleOutputStreamOperatorString result sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}public static class MyAgg implements AggregateFunctionWaterSensor, Integer, String{Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(调用add方法,valuevalue);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用getResult方法);return accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {System.out.println(调用merge方法);return null;}}// 全窗口函数的输入类型 增量聚合函数的输出类型public static class MyProcess extends ProcessWindowFunctionString,String,String,TimeWindow{Overridepublic void process(String s, Context context, IterableString elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}
}4.4 触发器和移除器
对于一个窗口算子而言窗口分配器和窗口函数是必不可少的。除此之外Flink还提供了其他一些可选的API让我们可以更加灵活地控制窗口行为。
触发器Trigger 触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”本质上就是执行窗口函数所以可以认为是计算得到结果并输出的过程。 基于WindowedStream调用.trigger()方法就可以传入一个自定义的窗口触发器Trigger。
stream.keyBy(...).window(...).trigger(new MyTrigger())移除器Evictor 移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法就可以传入一个自定义的移除器Evictor。Evictor是一个接口不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...).window(...).evictor(new MyEvictor())4.5 窗口知识总结
1、窗口什么时候触发 输出 时间进展 窗口的最大时间戳end - 1ms
2、窗口是怎么划分的 start 向下取整取窗口长度的整数倍 end start 窗口长度 窗口左闭右开 本窗口的 最大时间戳 end - 1ms
3、窗口的生命周期 创建 属于本窗口的第一条数据来的时候现new的放入一个singleton单例的集合中 销毁关窗 时间进展 窗口的最大时间戳end - 1ms 允许迟到的时间默认0 所以窗口触发和窗口关窗其实两个动作
那什么是时间进展为什么要-1ms这涉及到水位线(Watermark)
4.6 水位线(watermark)
4.6.1 什么是水位线
在说水位线之前涉及到时间语义
处理时间服务器处理数据的服务器时间事件事件数据发生(产生)的时间
在实际应用中事件时间语义会更为常见。一般情况下业务日志数据中都会记录数据生成的时间戳timestamp它就可以作为事件时间的判断基础。从Flink1.12版本开始Flink已经将事件时间作为默认的时间语义了。
用来衡量事件时间进展的标记就被称作“水位线”Watermark。具体实现上水位线可以看作一条特殊的数据记录它是插入到数据流中的一个标记点主要内容就是一个时间戳用来指示当前的事件时间。 特性
是数据流中的一个标记可以认为是一个特殊数据数据内容是一个时间戳基于数据的时间戳生成的用来表示当前事件时间的进展水位线时间戳必须单调递增确保时钟一直向前推进水位线可以设置延迟来处理乱序数据水位线t表示当前流中事件时间已经到达时间戳t这代表t之前的所有数据都到齐了不会再出现t’≤t的数据
水位线通常与窗口一起配合完成数据处理
4.6.2 水位线生成
完美的水位线是“绝对正确”的也就是一个水位线一旦出现就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确就必须等足够长的时间这会带来更高的延迟。 如果我们希望处理得更快、实时性更强那么可以将水位线延迟设得低一些。这种情况下可能很多迟到数据会在水位线之后才到达就会导致窗口遗漏数据计算结果不准确。当然如果我们对准确性完全不考虑、一味地追求处理速度可以直接使用处理时间语义这在理论上可以得到最低的延迟。 所以Flink中的水位线其实是流处理中对低延迟和结果正确性的一个权衡机制而且把控制的权力交给了程序员我们可以在代码中定义水位线的生成策略。
1有序流中内置水位线设置 对于有序流主要特点就是时间戳单调增长所以永远不会出现迟到数据的问题直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。
public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成升序的watermark没有等待时间.WaterSensorforMonotonousTimestamps()// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();}
}2乱序流中内置水位线设置 由于乱序流中需要等待迟到数据到齐所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳就是当前数据流中最大的时间戳减去延迟的结果调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数表示“最大乱序程度”它表示数据流中乱序数据时间戳的最大差值
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成乱序的等待3s.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();}
}3自定义水位线生成器 1周期性水位线生成器Periodic Generator 周期性生成器一般是通过onEvent()观察判断输入的事件而在onPeriodicEmit()里发出水位线。
import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategyEvent {Overridepublic TimestampAssignerEvent createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event elementlong recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}Overridepublic WatermarkGeneratorEvent createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}/*** 自定义水位生成器*/public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGeneratorEvent {private Long delayTime 5000L; // 延迟时间private Long maxTs -Long.MAX_VALUE delayTime 1L; // 观察到的最大时间戳Overridepublic void onEvent(Event eventlong eventTimestampWatermarkOutput output) {// 每来一条数据就调用一次maxTs Math.max(event.timestampmaxTs); // 更新最大时间戳}Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}如果想修改默认周期时间可以通过下面方法修改。例如修改为400ms env.getConfig().setAutoWatermarkInterval(400L);
(2断点式水位线生成器Punctuated Generator 断点式生成器会不停地检测onEvent()中的事件当发现带有水位线信息的事件时就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。
(3在数据源中发送水位线 我们也可以在自定义的数据源中抽取事件时间然后发送水位线。这里要注意的是在自定义数据源中发送了水位线以后就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下 env.fromSource( kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), “kafkasource” )
4总结 Watermark生成原理: 1、都是周期性生成的 默认200ms 2、有序流 watermark 当前最大的事件时间 - 1ms 3、乱序流 watermark 当前最大的事件时间 - 延迟时间 - 1ms 为什么都要 -1ms 假如没有-1ms例如来了一条5s的数据当前最大事件时间是5swatermark5s则会立刻触发窗口输出假如接下来刚好又来了一条5s的数据这条数据实际上并没有迟到但窗口已经触发了该如何是好 所以考虑到同一秒有多条数据生成故意-1ms例如watermark4999则认为4999ms之前的数据都到齐
4.7 迟到数据的处理
主要有三种方法 1. 推迟水印推进 在水印产生时设置一个乱序容忍度推迟系统时间的推进保证窗口计算被延迟执行为乱序的数据争取更多的时间进入窗口。 WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)); 2. 设置窗口延迟关闭 Flink的窗口也允许迟到数据。当触发了窗口计算后会先计算当前的结果但是此时并不会关闭窗口。 以后每来一条迟到数据就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间推迟时间此时窗口会真正关闭。注意允许迟到只能运用在event time上
推迟关窗时间在关窗之前迟到数据来了还能被窗口计算来一条迟到数据触发一次计算但关窗后迟到数据不会被计算
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))使用侧流接收迟到的数据 那对于关窗后的数据该如何处理关窗后的迟到数据可以放入侧输出流
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction());WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) - element.getTs() * 1000L);SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTagWaterSensor lateTag new OutputTag(late-data, Types.POJO(WaterSensor.class));SingleOutputStreamOperatorString process sensorDSwithWatermark.keyBy(sensor - sensor.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据放入侧输出流.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}});process.print();// 从主流获取侧输出流打印process.getSideOutput(lateTag).printToErr(关窗后的迟到数据);env.execute();}
}问题如果 watermark等待3s窗口允许迟到2s 为什么不直接 watermark等待5s 或者 窗口允许迟到5s
watermark等待时间不会设太大因为会影响的计算延迟 如果设置3s 窗口第一次触发计算和输出是13s的数据来13-310s 如果设置5s 窗口第一次触发计算和输出是15s的数据来15-510s窗口允许迟到是对 大部分迟到数据的 处理 尽量让结果准确但每来一条数据就会触发一次这条数据所在窗口计算(增量计算) 即如果只设置 允许迟到5s 那么 就会导致 频繁 窗口计算
4.8 双流联结
4.8.1 窗口联结Window Join
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子可以定义时间窗口并将两条流中共享一个公共键key的数据放在窗口中进行配对处理。 类似于SQL inner join
SELECT * FROM table1 t1, table2 t2 WHERE t1.id t2.id;public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer,Integer ds2 env.fromElements(Tuple3.of(a, 1,1),Tuple3.of(a, 11,1),Tuple3.of(b, 2,1),Tuple3.of(b, 12,1),Tuple3.of(c, 14,1),Tuple3.of(d, 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer,IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));// TODO window join// 1. 落在同一个时间窗口范围内才能匹配// 2. 根据keyby的key来进行匹配关联// 3. 只能拿到匹配上的数据类似有固定时间范围的inner joinDataStreamString join ds1.join(ds2).where(r1 - r1.f0) // ds1的keyby.equalTo(r2 - r2.f0) // ds2的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 关联上的数据调用join方法* param first ds1的数据* param second ds2的数据* return* throws Exception*/Overridepublic String join(Tuple2String, Integer first, Tuple3String, Integer, Integer second) throws Exception {return first ----- second;}});join.print();env.execute();}
}
4.8.2 间隔联结Interval Join
在有些场景下我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧于是窗口内就都没有匹配了。为了应对这样的需求Flink提供了一种叫作“间隔联结”interval join的合流操作。顾名思义间隔联结的思路就是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔看这期间是否有来自另一条流的数据匹配。
间隔联结具体的定义方式是我们给定两个时间点分别叫作间隔的“上界”upperBound和“下界”lowerBound于是对于一条流不妨叫作A中的任意一个数据元素a就可以开辟一段时间间隔[a.timestamp lowerBound, a.timestamp upperBound],即以a的时间戳为中心下至下界点、上至上界点的一个闭区间我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流不妨叫B中的数据元素b如果它的时间戳落在了这个区间范围内a和b就可以成功配对进而进行计算输出结果。所以匹配的条件为 a.timestamp lowerBound b.timestamp a.timestamp upperBound 这里需要注意做间隔联结的两条流A和B也必须基于相同的key下界lowerBound应该小于等于上界upperBound两者都可正可负间隔联结目前只支持事件时间语义。
public class IntervalJoinWithLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.socketTextStream(localhost, 7777).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] datas value.split(,);return Tuple2.of(datas[0], Integer.valueOf(datas[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer, Integer ds2 env.socketTextStream(localhost, 8888).map(new MapFunctionString, Tuple3String, Integer, Integer() {Overridepublic Tuple3String, Integer, Integer map(String value) throws Exception {String[] datas value.split(,);return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));/*** TODO Interval join* 1、只支持事件时间* 2、指定上界、下界的偏移负号代表时间往前正号代表时间往后* 3、process中只能处理 join上的数据* 4、两条流关联后的watermark以两条流中最小的为准* 5、如果 当前数据的事件时间 当前的watermark就是迟到数据 主流的process不处理* between后可以指定将 左流 或 右流 的迟到数据 放入侧输出流*///1. 分别做keybykey其实就是关联条件KeyedStreamTuple2String, Integer, String ks1 ds1.keyBy(r1 - r1.f0);KeyedStreamTuple3String, Integer, Integer, String ks2 ds2.keyBy(r2 - r2.f0);//2. 调用 interval joinOutputTagTuple2String, Integer ks1LateTag new OutputTag(ks1-late, Types.TUPLE(Types.STRING, Types.INT));OutputTagTuple3String, Integer, Integer ks2LateTag new OutputTag(ks2-late, Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperatorString process ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).sideOutputLeftLateData(ks1LateTag) // 将 ks1的迟到数据放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据放入侧输出流.process(new ProcessJoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 两条流的数据匹配上才会调用这个方法* param left ks1的数据* param right ks2的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement(Tuple2String, Integer left, Tuple3String, Integer, Integer right, Context ctx, CollectorString out) throws Exception {// 进入这个方法是关联上的数据out.collect(left ------ right);}});process.print(主流);process.getSideOutput(ks1LateTag).printToErr(ks1迟到数据);process.getSideOutput(ks2LateTag).printToErr(ks2迟到数据);env.execute();}
}5. 处理函数
如果当前的API满足不了需求则可以尝试使用底层的处理函数。 stream.process(new MyProcessFunction()) Flink提供了8个不同的处理函数
ProcessFunction: 最基本的处理函数基于DataStream直接调用.process()时作为参数传入KeyedProcessFunction: 对流按键分区后的处理函数基于KeyedStream调用.process()时作为参数传入ProcessWindowFunction: 开窗之后的处理函数也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入ProcessAllWindowFunction: 同样是开窗之后的处理函数基于AllWindowedStream调用.process()时作为参数传入CoProcessFunction: 合并connect两条流之后的处理函数基于ConnectedStreams调用.process()时作为参数传入ProcessJoinFunction: 间隔连接interval join两条流之后的处理函数基于IntervalJoined调用.process()时作为参数传入BroadcastProcessFunction: 广播连接流处理函数基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream是一个未keyBy的普通DataStream与一个广播流BroadcastStream做连接conncet之后的产物。KeyedBroadcastProcessFunction: 按键分区的广播连接流处理函数同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是这时的广播连接流是一个KeyedStream与广播流BroadcastStream做连接之后的产物
5.1 按键分区处理函数KeyedProcessFunction
KeyedStream中支持使用TimerService设置定时器的操作。 ProcessFunction的上下文Context中提供了.timerService()方法可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口包含以下六个方法
// 获取当前的处理时间
long currentProcessingTime();// 获取当前的水位线事件时间
long currentWatermark();// 注册处理时间定时器当处理时间超过time时触发
void registerProcessingTimeTimer(long time);// 注册事件时间定时器当水位线超过time时触发
void registerEventTimeTimer(long time);// 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);// 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);代码
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// TODO Process:keyedSingleOutputStreamOperatorString process sensorKS.process(new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据调用一次* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {//获取当前数据的keyString currentKey ctx.getCurrentKey();// 定时器注册TimerService timerService ctx.timerService();// 事件时间的案例Long currentEventTime ctx.timestamp(); // 数据中提取出来的事件时间timerService.registerEventTimeTimer(5000L);System.out.println(当前key currentKey ,当前时间 currentEventTime ,注册了一个5s的定时器);// api介绍// 注册定时器 处理时间、事件时间
// timerService.registerProcessingTimeTimer();
// timerService.registerEventTimeTimer();// 删除定时器 处理时间、事件时间
// timerService.deleteEventTimeTimer();
// timerService.deleteProcessingTimeTimer();// 获取当前时间进展 处理时间-当前系统时间 事件时间-当前watermark
// long currentTs timerService.currentProcessingTime();
// long wm timerService.currentWatermark();}/*** TODO 2.时间进展到定时器注册的时间调用该方法* param timestamp 当前时间进展就是定时器被触发时的时间* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey ctx.getCurrentKey();System.out.println(key currentKey 现在时间是 timestamp 定时器触发);}});process.print();env.execute();}
}事件时间定时器通过watermark来触发的watermark 注册的时间 注意 watermark 当前最大事件时间 - 等待时间 -1ms 因为 -1ms所以会推迟一条数据 比如 5s的定时器 如果 等待3s watermark 8s - 3s -1ms 4999ms,不会触发5s的定时器 需要 watermark 9s -3s -1ms 5999ms 才能去触发 5s的定时器
5.2 窗口处理函数ProcessWindowFunction ProcessAllWindowFunction
public abstract class ProcessWindowFunctionIN, OUT, KEY, W extends Window extends AbstractRichFunction {...public abstract void process(KEY key, Context context, IterableIN elements, CollectorOUT out) throws Exception;public void clear(Context context) throws Exception {}public abstract class Context implements java.io.Serializable {...}
}ProcessWindowFunction依然是一个继承了AbstractRichFunction的抽象类它有四个类型参数
INinput数据流中窗口任务的输入数据类型。OUToutput窗口任务进行计算之后的输出数据类型。KEY数据中键key的类型。W窗口的类型是Window的子类型。一般情况下我们定义时间窗口W就是TimeWindow。
ProcessWindowFunction里面处理数据的核心方法.process()。方法包含四个参数
key窗口做统计计算基于的键也就是之前keyBy用来分区的字段。context当前窗口进行计算的上下文它的类型就是ProcessWindowFunction内部定义的抽象类Context。elements窗口收集到用来计算的所有数据这是一个可迭代的集合类型。out用来发送数据输出计算结果的收集器类型为Collector。
5.3 侧输出流Side Output
之前讲到的绝大多数转换算子输出的都是单一流流里的数据类型只能有一种。而侧输出流可以认为是“主流”上分叉出的“支流”所以可以由一条流产生出多条流而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。
DataStreamInteger stream env.fromSource(...);OutputTagString outputTag new OutputTagString(side-output) {};SingleOutputStreamOperatorLong longStream stream.process(new ProcessFunctionInteger, Long() {Overridepublic void processElement( Integer value, Context ctx, CollectorInteger out) throws Exception {// 转换成Long输出到主流中out.collect(Long.valueOf(value));// 转换成String输出到侧输出流中ctx.output(outputTag, side-output: String.valueOf(value));}
});6. 状态管理
1托管状态Managed State和原始状态Raw State Flink的状态有两种托管状态Managed State和原始状态Raw State。托管状态就是由Flink统一管理的状态的存储访问、故障恢复和重组等一系列问题都由Flink实现我们只要调接口就可以而原始状态则是自定义的相当于就是开辟了一块内存需要我们自己管理实现状态的序列化和故障恢复。 通常我们采用Flink托管状态来实现需求。
2算子状态Operator State和按键分区状态Keyed State 我们知道在Flink中一个算子任务会按照并行度分为多个并行子任务执行而不同的子任务会占据不同的任务槽task slot。由于不同的slot在计算资源上是物理隔离的所以Flink能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效。 而很多有状态的操作比如聚合、窗口都是要先做keyBy进行按键分区的。按键分区之后任务所进行的所有计算都应该只针对当前key有效所以状态也应该按照key彼此隔离。在这种情况下状态的访问方式又会有所不同。 基于这样的想法我们又可以将托管状态分为两类算子状态和按键分区状态。
按键分区状态根据输入流中定义的键(key)来访问和维护所以只能在keyBy之后使用算子状态作用范围为当前算子的任务实例只对并行子任务实例有效
无论是Keyed State还是Operator State它们都是在本地实例上维护的也就是说每个并行子任务维护着对应的状态算子的子任务之间状态不共享。
6.1 按键分区状态
6.1.1 值状态ValueState
案例需求检测每种传感器的水位值如果连续的两个水位值超过10就输出报警。
public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {// TODO 1.定义状态ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 2.在open方法中初始化状态// 状态描述器两个参数第一个参数起个名字不重复第二个参数存储的类型lastVcState getRuntimeContext().getState(new ValueStateDescriptorInteger(lastVcState, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {
// lastVcState.value(); // 取出 本组 值状态 的数据
// lastVcState.update(); // 更新 本组 值状态 的数据
// lastVcState.clear(); // 清除 本组 值状态 的数据// 1. 取出上一条数据的水位值(Integer默认值是null判断)int lastVc lastVcState.value() null ? 0 : lastVcState.value();// 2. 求差值的绝对值判断是否超过10Integer vc value.getVc();if (Math.abs(vc - lastVc) 10) {out.collect(传感器 value.getId() 当前水位值 vc ,与上一条水位值 lastVc ,相差超过10);}// 3. 更新状态里的水位值lastVcState.update(vc);}}).print();env.execute();}
}6.1.2 列表状态ListState
ListStateInteger vcListState;vcListState getRuntimeContext().getListState(new ListStateDescriptorInteger(vcListState, Types.INT));vcListState.get();
vcListState.update(vcList);6.1.3 Map状态MapState
MapStateInteger, Integer vcCountMapState;
vcCountMapState getRuntimeContext().getMapState(new MapStateDescriptorInteger, Integer(vcCountMapState, Types.INT, Types.INT));
...6.1.4 归约状态ReducingState
类似于值状态Value不过需要对添加进来的所有数据进行归约将归约聚合之后的值作为状态保存下来。
.process(new KeyedProcessFunctionString WaterSensor Integer() {private ReducingStateInteger sumVcState;Overridepublic void open(Configuration parameters) throws Exception {sumVcState this.getRuntimeContext().getReducingState(new ReducingStateDescriptorInteger(sumVcStateInteger::sumInteger.class));}Overridepublic void processElement(WaterSensor value Context ctx CollectorInteger out) throws Exception {sumVcState.add(value.getVc());out.collect(sumVcState.get());}
})6.1.5 聚合状态AggregatingState
与归约状态非常类似聚合状态也是一个值用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数AggregateFunction来定义的。
案例需求计算每种传感器的平均水位
public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {AggregatingStateInteger, Double vcAvgAggregatingState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggregatingState getRuntimeContext().getAggregatingState(new AggregatingStateDescriptorInteger, Tuple2Integer, Integer, Double(vcAvgAggregatingState,new AggregateFunctionInteger, Tuple2Integer, Integer, Double() {Overridepublic Tuple2Integer, Integer createAccumulator() {return Tuple2.of(0, 0);}Overridepublic Tuple2Integer, Integer add(Integer value, Tuple2Integer, Integer accumulator) {return Tuple2.of(accumulator.f0 value, accumulator.f1 1);}Overridepublic Double getResult(Tuple2Integer, Integer accumulator) {return accumulator.f0 * 1D / accumulator.f1;}Overridepublic Tuple2Integer, Integer merge(Tuple2Integer, Integer a, Tuple2Integer, Integer b) {
// return Tuple2.of(a.f0 b.f0, a.f1 b.f1);return null;}},Types.TUPLE(Types.INT, Types.INT)));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 将 水位值 添加到 聚合状态中vcAvgAggregatingState.add(value.getVc());// 从 聚合状态中 获取结果Double vcAvg vcAvgAggregatingState.get();out.collect(传感器id为 value.getId() ,平均水位值 vcAvg);// vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果
// vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据会自动进行聚合
// vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据}}).print();env.execute();}
}6.1.6 状态生存时间TTL
配置一个状态的“生存时间”time-to-liveTTL当状态在内存中存在的时间超出这个值时就将它清除。设置 失效时间 当前时间 TTL之后如果有对状态的访问和修改我们可以再对失效时间进行更新当设置的清除条件被触发时比如状态被访问的时候或者每隔一段时间扫描一次失效状态就可以判断状态是否失效、从而进行清除了。
目前的TTL设置只支持处理时间。
案例
public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 1.创建 StateTtlConfigStateTtlConfig stateTtlConfig StateTtlConfig.newBuilder(Time.seconds(5)) // 过期时间5s
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入更新 更新 过期时间.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入更新 更新 过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值.build();// TODO 2.状态描述器 启用 TTLValueStateDescriptorInteger stateDescriptor new ValueStateDescriptor(lastVcState, Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastVcState getRuntimeContext().getState(stateDescriptor);}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 先获取状态值打印 》 读取状态Integer lastVc lastVcState.value();out.collect(key value.getId() ,状态值 lastVc);// 如果水位大于10更新状态值 》 写入状态if (value.getVc() 10) {lastVcState.update(value.getVc());}}}).print();env.execute();}
}6.2 算子状态Operator State
算子状态Operator State就是一个算子并行实例上定义的状态作用范围被限定为当前算子任务。算子状态跟数据的key无关所以不同key的数据只要被分发到同一个并行子任务就会访问到同一个Operator State。
算子状态也支持不同的结构类型主要有三种ListState、UnionListState和BroadcastState。
6.2.1 列表状态ListState
与Keyed State中的列表状态的区别是在算子状态的上下文中不会按键key分别处理状态所以每一个并行子任务上只会保留一个“列表”list也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度彼此之间完全独立。
当算子并行度进行缩放调整时算子的列表状态中的所有元素项会被统一收集起来相当于把多个分区的列表合并成了一个“大列表”然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”round-robin与之前介绍的rebanlance数据传输方式类似是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”even-split redistribution。
案例在map算子中计算数据的个数。
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream(localhost, 7777).map(new MyCountMapFunction()).print();env.execute();}// TODO 1.实现 CheckpointedFunction 接口public static class MyCountMapFunction implements MapFunctionString, Long, CheckpointedFunction {private Long count 0L;private ListStateLong state;Overridepublic Long map(String value) throws Exception {return count;}/*** TODO 2.本地变量持久化将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用** param context* throws Exception*/Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println(snapshotState...);// 2.1 清空算子状态state.clear();// 2.2 将 本地变量 添加到 算子状态 中state.add(count);}/*** TODO 3.初始化本地变量程序启动和恢复时 从状态中 把数据添加到 本地变量每个子任务调用一次** param context* throws Exception*/Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(initializeState...);// 3.1 从 上下文 初始化 算子状态state context.getOperatorStateStore().getListState(new ListStateDescriptorLong(state, Types.LONG));// 3.2 从 算子状态中 把数据 拷贝到 本地变量if (context.isRestored()) {for (Long c : state.get()) {count c;}}}}
}6.2.2 联合列表状态UnionListState
UnionListState的重点就在于“联合”union。在并行度调整时常规列表状态是轮询分配状态项而联合列表状态的算子则会直接广播状态的完整列表。这样并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”union redistribution。如果列表中状态项数量太多为资源和效率考虑一般不建议使用联合重组的方式。
tate context.getOperatorStateStore().getUnionListState(new ListStateDescriptorLong(union-state, Types.LONG));6.2.3 广播状态BroadcastState
有时我们希望算子并行子任务都保持同一份“全局”状态用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态状态就像被“广播”到所有分区一样这种特殊的算子状态就叫作广播状态BroadcastState。
适用场景将配置数据存放于mysqlflink cdc使用广播流读取mysql广播给数据流动态修改配置
案例水位超过指定的阈值发送告警阈值可以动态修改。
public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 数据流SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(localhost, 7777).map(new WaterSensorMapFunction());// 配置流用来广播配置DataStreamSourceString configDS env.socketTextStream(localhost, 8888);// TODO 1. 将 配置流 广播MapStateDescriptorString, Integer broadcastMapState new MapStateDescriptor(broadcast-state, Types.STRING, Types.INT);BroadcastStreamString configBS configDS.broadcast(broadcastMapState);// TODO 2.把 数据流 和 广播后的配置流 connectBroadcastConnectedStreamWaterSensor, String sensorBCS sensorDS.connect(configBS);// TODO 3.调用 processsensorBCS.process(new BroadcastProcessFunctionWaterSensor, String, String() {/*** 数据流的处理方法 数据流 只能 读取 广播状态不能修改* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, CollectorString out) throws Exception {// TODO 5.通过上下文获取广播状态取出里面的值只读不能修改ReadOnlyBroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);Integer threshold broadcastState.get(threshold);// 判断广播状态里是否有数据因为刚启动时可能是数据流的第一条数据先来threshold (threshold null ? 0 : threshold);if (value.getVc() threshold) {out.collect(value ,水位超过指定的阈值 threshold !!!);}}/*** 广播后的配置流的处理方法: 只有广播流才能修改 广播状态* param value* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorString out) throws Exception {// TODO 4. 通过上下文获取广播状态往里面写数据BroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);broadcastState.put(threshold, Integer.valueOf(value));}}).print();env.execute();}
}7. 容错机制
7.1 检查点 这样做可以实现一个数据被所有任务算子完整地处理完状态得到了保存。 如果出现故障我们恢复到之前保存的状态故障时正在处理的所有数据都需要重新处理我们只需要让源source任务向数据源重新提交偏移量、请求重放数据就可以了。当然这需要源任务可以把偏移量作为算子状态保存下来而且外部数据源能够重置偏移量kafka就是满足这些要求的一个最好的例子。
7.1.1 检查点算法
在Flink中采用了基于Chandy-Lamport算法的分布式快照可以在不暂停整体流处理的前提下将状态备份保存到检查点。
借鉴水位线的设计在数据流中插入一个特殊的数据结构专门用来表示触发检查点保存的时间点。收到保存检查点的指令后Source任务可以在当前数据流中插入这个结构之后的所有任务只要遇到它就开始对状态做持久化快照保存。由于数据流是保持顺序依次处理的因此遇到这个标识就代表之前的数据都处理完了可以保存一个检查点而在它之后的数据引起的状态改变就不会体现在这个检查点中而需要保存到下一个检查点。
这种特殊的数据形式把一条流上的数据按照不同的检查点分隔开所以就叫做检查点的“分界线”Checkpoint Barrier。
分布式快照算法Barrier对齐的精准一次
当上游任务向多个并行下游任务发送barrier时需要广播出去 而当多个上游任务向同一个下游任务传递分界线时需要在下游任务执行“分界线对齐”操作也就是需要等到所有并行分区的barrier都到齐才可以开始状态的保存。
1触发检查点JobManager向Source发送Barrier 2Barrier发送向下游广播发送 3Barrier对齐下游需要收到上游所有并行度传递过来的Barrier才做自身状态的保存 4状态保存有状态的算子将状态保存至持久化。 5先处理缓存数据然后正常继续处理
完成检查点保存之后任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据需要先做处理然后再按照顺序依次处理新到的数据。当JobManager收到所有任务成功保存状态的信息就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。
补充由于分界线对齐要求先到达的分区做缓存等待一定程度上会影响处理的速度当出现背压时下游任务会堆积大量的缓冲数据检查点可能需要很久才可以保存完毕。 为了应对这种场景Barrier对齐中提供了至少一次语义以及Flink 1.11之后提供了非Barrier对齐的精准一次的检查点保存方式可以将未处理的缓冲数据也保存进检查点。这样当我们遇到一个分区barrier时就不需等待对齐而是可以直接启动状态的保存了。
总结 1、Barrier对齐 一个Task 收到 所有上游 同一个编号的 barrier之后才会对自己的本地状态做 备份
精准一次 在barrier对齐过程中barrier后面的数据 阻塞等待不会越过barrier至少一次 在barrier对齐过程中先到的barrier其后面的数据 不阻塞 接着计算
2、非Barrier对齐 一个Task 收到 第一个 barrier时就开始 执行备份能保证 精准一次flink 1.11出的新算法 先到的barrier将 本地状态 备份 其后面的数据接着计算输出 未到的barrier其 前面的数据 接着计算输出同时 也保存到 备份中 最后一个barrier到达 该Task时这个Task的备份结束
7.1.2 代码配置
1、常用配置
检查点模式CheckpointingMode 设置检查点一致性的保证级别有“精确一次”exactly-once和“至少一次”at-least-once两个选项。默认级别为exactly-once而对于大多数低延迟的流处理程序at-least-once就够用了而且处理效率会更高。超时时间checkpointTimeout 用于指定检查点保存的超时时间超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数表示超时时间。最小间隔时间minPauseBetweenCheckpoints 用于指定在上一个检查点完成之后检查点协调器最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点只要距离上一个检查点完成的间隔不够就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数0时实际并发为1。最大并发检查点数量maxConcurrentCheckpoints 用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。开启外部持久化存储enableExternalizedCheckpoints 用于开启检查点的外部持久化而且默认在作业失败的时候不会自动清理如果想释放空间需要自己手工清理。里面传入的参数ExternalizedCheckpointCleanup指定了当作业取消的时候外部的检查点该如何清理。 DELETE_ON_CANCELLATION在作业取消的时候会自动删除外部检查点但是如果是作业失败退出则会保留检查点。RETAIN_ON_CANCELLATION作业取消的时候也会保留外部检查点。 检查点连续失败次数tolerableCheckpointFailureNumber 用于指定检查点连续失败的次数当达到这个次数作业就失败退出。默认为0这意味着不能容忍检查点失败并且作业将在第一次报告检查点失败时失败。
2、开启非对齐检查点
非对齐检查点enableUnalignedCheckpoints 不再执行检查点的分界线对齐操作启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式CheckpointingMode必须为exctly-once并且最大并发的检查点个数为1。对齐检查点超时时间alignedCheckpointTimeout 该参数只有在启用非对齐检查点的时候有效。参数默认是0表示一开始就直接用非对齐检查点。如果设置大于0一开始会使用对齐的检查点当对齐时间超过该参数设定的时间则会自动切换成非对齐检查点。
public class CheckpointConfigDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);// 代码中用到hdfs需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty(HADOOP_USER_NAME, atguigu);// TODO 检查点配置// 1、启用检查点: 默认是barrier对齐的周期为5s, 精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig env.getCheckpointConfig();// 2、指定检查点的存储位置checkpointConfig.setCheckpointStorage(hdfs://hadoop102:8020/chk);// 3、checkpoint的超时时间: 默认10分钟checkpointConfig.setCheckpointTimeout(60000);// 4、同时运行中的checkpoint的最大数量checkpointConfig.setMaxConcurrentCheckpoints(1);// 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔设置了0,并发就会变成1checkpointConfig.setMinPauseBetweenCheckpoints(1000);// 6、取消作业时checkpoint的数据 是否保留在外部系统// DELETE_ON_CANCELLATION:主动cancel时删除存在外部系统的chk-xx目录 如果是程序突然挂掉不会删// RETAIN_ON_CANCELLATION:主动cancel时外部系统的chk-xx目录会保存下来checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 7、允许 checkpoint 连续失败的次数默认0--》表示checkpoint一失败job就挂掉checkpointConfig.setTolerableCheckpointFailureNumber(10);// TODO 开启 非对齐检查点barrier非对齐// 开启的要求 Checkpoint模式必须是精准一次最大并发必须设为1checkpointConfig.enableUnalignedCheckpoints();// 开启非对齐检查点才生效 默认0表示一开始就直接用 非对齐的检查点// 如果大于0 一开始用 对齐的检查点barrier对齐 对齐的时间超过这个参数自动切换成 非对齐检查点barrier非对齐checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));env.socketTextStream(hadoop102, 7777).flatMap((String value, CollectorTuple2String, Integer out) - {String[] words value.split( );for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value - value.f0).sum(1).print();env.execute();}
}7.2 保存点Savepoint
除了检查点外Flink还提供了另一个非常独特的镜像保存功能——保存点savepoint。 从名称就可以看出这也是一个存盘的备份它的原理和算法与检查点完全相同只是多了一些额外的元数据。
保存点与检查点最大的区别就是触发的时机。检查点是由Flink自动管理的定期创建发生故障之后自动读取进行恢复这是一个“自动存盘”的功能而保存点不会自动创建必须由用户明确地手动触发保存操作所以就是“手动存盘”。
需要注意的是保存点能够在程序更改的时候依然兼容前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子ID-状态名称这样的key-value组织起来的算子ID可以在代码中直接调用SingleOutputStreamOperator的.uid()方法来进行指定
DataStreamString stream env.addSource(new StatefulSource()).uid(source-id).map(new StatefulMapper()).uid(mapper-id).print();7.2.1 使用保存点
如果是基于yarn的运行模式还需要加上 -yid application-id
停止时使用
bin/flink stop --savepointPath [:targetDirectory] :jobId从保存点重启应用
bin/flink run -s :savepointPath [:runArgs]问题状态、状态后端、Checkpoint 三者之间的区别及关系
拿五个字做比喻“铁锅炖大鹅”铁锅是状态后端大鹅是状态Checkpoint 是炖的动作。
状态本质来说就是数据在 Flink 中其实就是 Flink 提供给用户的状态编程接口。比如 flink 中的 MapStateValueStateListState。
状态后端Flink 提供的用于管理状态的组件状态后端决定了以什么样数据结构什么样的存储方式去存储和管理我们的状态。Flink 目前官方提供了 memory、filesystemrocksdb 三种状态后端来存储我们的状态。
Checkpoint状态管理Flink 提供的用于定时将状态后端中存储的状态同步到远程的存储系统的组件或者能力。为了防止 long run 的 Flink 任务挂了导致状态丢失产生数据质量问题Flink 提供了状态管理CheckpointSavepoint的能力把我们使用的状态给管理起来定时的保存到远程。然后可以在 Flink 任务 failover 时从远程把状态数据恢复到 Flink 任务中保障数据质量。
7.3 状态一致性
一般说来状态一致性有三种级别
最多一次At-Most-Once至少一次At-Least-Once精确一次Exactly-Once
7.3.1 端到端精确一次End-To-End Exactly-Once
实际应用中最难做到、也最希望做到的一致性语义无疑就是端到端end-to-end的“精确一次”。我们知道对于Flink内部来说检查点机制可以保证故障恢复后数据不丢在能够重放的前提下并且只处理一次所以已经可以做到exactly-once的一致性语义了。
所以端到端一致性的关键点就在于输入的数据源端和输出的外部存储端。 1. 输入端保证
想要在故障恢复后不丢数据外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存并且可以重设数据的读取位置。一个最经典的应用就是Kafka。在Flink的Source任务中将数据读取的偏移量保存为状态这样就可以在故障恢复时从检查点中读取出来对数据源重置偏移量重新获取数据。
数据源可重放数据或者说可重置读取数据偏移量加上Flink的Source算子将偏移量作为状态保存进检查点就可以保证数据不丢。这是达到at-least-once一致性语义的基本要求当然也是实现端到端exactly-once的基本要求。
2. 输出端保证 检查点保存之后继续到来的数据也会一一处理任务的状态也会更新最终通过Sink任务将计算结果输出到外部系统只是状态改变还没有存到下一个检查点中。这时如果出现故障这些数据都会重新来一遍就计算了两次。我们知道对Flink内部状态来说重复计算的动作是没有影响的因为状态已经回滚最终改变只会发生一次但对于外部系统来说已经写入的结果就是泼出去的水已经无法收回了再次执行写入就会把同一个数据写入两次。 所以这时我们只保证了端到端的at-least-once语义。 为了实现端到端exactly-once我们还需要对外部存储系统、以及Sink连接器有额外的要求。能够保证exactly-once一致性的写入方式有两种
幂等写入事务写入
1幂等Idempotent写入 所谓“幂等”操作就是说一个操作可以重复执行很多次但只导致一次结果更改。也就是说后面再重复执行就不会对结果起作用了。
这相当于说我们并没有真正解决数据重复计算、写入的问题而是说重复写入也没关系结果不会改变。所以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入比如Redis中键值存储或者关系型数据库如MySQL中满足查询条件的更新操作。
需要注意对于幂等写入遇到故障进行恢复时有可能会出现短暂的不一致。因为保存点完成之后到发生故障之间的数据其实已经写入了一遍回滚的时候并不能消除它们。如果有一个外部应用读取写入的数据可能会看到奇怪的现象短时间内结果会突然“跳回”到之前的某个值然后“重播”一段之前的数据。不过当数据的重放逐渐超过发生故障的点的时候最终的结果还是一致的。
2事务Transactional写入 输出端最大的问题就是写入到外部系统的数据难以撤回。而利用事务就可以实现对已写入数据的撤回。
在Flink流处理的结果写入外部系统时如果能够构建一个事务让写入操作可以随着检查点来提交和回滚那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是用一个事务来进行数据向外部系统的写入这个事务是与检查点绑定在一起的。当Sink任务遇到barrier时开始保存状态的同时就开启一个事务接下来该检查点之后到下一个检查点之间的所有数据的写入都在这个事务中待到当前检查点保存完毕时将事务提交所有写入的数据就真正可用了。如果中间过程出现故障状态会回退到上一个检查点而当前事务没有正常关闭因为当前检查点没有保存完所以也会回滚写入到外部的数据就被撤销了。
具体来说又有两种实现方式预写日志WAL和两阶段提交2PC 1预写日志write-ahead-logWAL 预写日志WAL就是一种非常简单的方式。具体步骤是 ①先把结果数据作为日志log状态保存起来 ②进行检查点保存时也会将这些结果数据一并做持久化存储 ③在收到检查点完成的通知时将所有结果一次性写入外部系统。 ④在成功写入所有数据后在内部再次确认相应的检查点将确认信息也进行持久化保存。这才代表着检查点的真正完成。
我们会发现这种方式类似于检查点完成时做一个批处理一次性的写入会带来一些性能上的问题而优点就是比较简单由于数据提前在状态后端中做了缓存所以无论什么外部存储系统理论上都能用这种方式一批搞定。
2两阶段提交two-phase-commit2PC 它的想法是分成两个阶段先做“预提交”等检查点完成之后再正式提交。这种提交方式是真正基于事务的它需要外部系统提供事务支持。 具体的实现步骤为 ①当第一条数据到来时或者收到检查点的分界线时Sink任务都会启动一个事务。 ②接下来接收到的所有数据都通过这个事务写入外部系统这时由于事务没有提交所以数据尽管写入了外部系统但是不可用是“预提交”的状态。 ③当Sink任务收到JobManager发来检查点完成的通知时正式提交事务写入的结果就真正可用了。
7.3.2 Flink和Kafka连接时的精确一次保证 需要的配置 在具体应用中实现真正的端到端exactly-once还需要有一些额外的配置 1必须启用检查点 2指定KafkaSink的发送级别为DeliveryGuarantee.EXACTLY_ONCE 3配置Kafka读取数据的消费者的隔离级别 这里所说的Kafka是写入的外部系统。预提交阶段数据已经写入只是被标记为“未提交”uncommitted而Kafka中默认的隔离级别isolation.level是read_uncommitted也就是可以读取未提交的数据。这样一来外部应用就可以直接消费未提交的数据对于事务性的保证就失效了。所以应该将隔离级别配置 为read_committed表示消费者遇到未提交的消息时会停止从分区中消费数据直到消息被标记为已提交才会再次恢复消费。当然这样做的话外部应用消费数据就会有显著的延迟。 4事务超时配置 Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时而Kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以在检查点保存时间很长时有可能出现Kafka已经认为事务超时了丢弃了预提交的数据而Sink任务认为还可以继续等待。如果接下来检查点保存成功发生故障后回滚到这个检查点的状态这部分数据就被真正丢掉了。所以这两个超时时间前者应该小于等于后者。
public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 代码中用到hdfs需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty(HADOOP_USER_NAME, atguigu);// TODO 1、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig env.getCheckpointConfig();checkpointConfig.setCheckpointStorage(hdfs://hadoop102:8020/chk);checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// TODO 2.读取kafkaKafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setGroupId(atguigu).setTopics(topic_1).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSourceString kafkasource env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource);/*** TODO 3.写出到Kafka* 精准一次 写入Kafka需要满足以下条件缺一不可* 1、开启checkpoint* 2、sink设置保证级别为 精准一次* 3、sink设置事务前缀* 4、sink设置事务超时时间 checkpoint间隔 事务超时时间 max的15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// TODO 3.1 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// TODO 3.2 精准一次必须设置 事务的前缀.setTransactionalIdPrefix(atguigu-)// TODO 3.3 精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000).build();kafkasource.sinkTo(kafkaSink);env.execute();}
}
// 后续读取“ws”这个topic的消费者要设置事务的隔离级别为“读已提交”如下
public class KafkaEOSDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用两阶段提交写入的TopicKafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setGroupId(atguigu).setTopics(ws).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// TODO 作为 下游的消费者要设置 事务的隔离级别 读已提交.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, read_committed).build();env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource).print();env.execute();}
}