网站如何做收录排行,商务网站建设,百度搜索排名机制,漳州台商投资区建设局网站Flink1.17 基础知识
来源#xff1a;B站尚硅谷 目录 Flink1.17 基础知识Flink 概述Flink 是什么Flink特点Flink vs SparkStreamingFlink的应用场景Flink分层API Flink快速上手创建项目WordCount代码编写批处理流处理 Flink部署集群角色部署模式会话模式#xff08;Session …Flink1.17 基础知识
来源B站尚硅谷 目录 Flink1.17 基础知识Flink 概述Flink 是什么Flink特点Flink vs SparkStreamingFlink的应用场景Flink分层API Flink快速上手创建项目WordCount代码编写批处理流处理 Flink部署集群角色部署模式会话模式Session Mode单作业模式Per-Job Mode应用模式Application Mode Standalone运行模式了解会话模式部署单作业模式部署应用模式部署 YARN运行模式重点相关准备和配置会话模式部署单作业模式部署应用模式部署 K8S 运行模式了解历史服务器 Flink运行时架构系统架构核心概念并行度Parallelism算子链Operator Chain任务槽Task Slots任务槽和并行度的关系 作业提交流程Standalone会话模式作业提交流程逻辑流图/作业图/执行图/物理流图Yarn应用模式作业提交流程 DataStream API执行环境Execution Environment创建执行环境执行模式Execution Mode触发程序执行 源算子Source准备工作从集合中读取数据从文件读取数据从Socket读取数据从Kafka读取数据从数据生成器读取数据Flink支持的数据类型 转换算子Transformation基本转换算子map/ filter/ flatMap映射map过滤filter扁平映射flatMap 聚合算子Aggregation按键分区keyBy简单聚合sum/min/max/minBy/maxBy归约聚合reduce 用户自定义函数UDF函数类Function Classes富函数类Rich Function Classes 物理分区算子Physical Partitioning随机分区shuffle轮询分区Round-Robin重缩放分区rescale广播broadcast全局分区global自定义分区Custom 分流简单实现使用侧输出流 基本合流操作联合Union连接Connect 输出算子Sink连接到外部系统输出到文件输出到Kafka输出到MySQLJDBC自定义Sink输出 Flink中的时间和窗口窗口Window窗口的概念窗口的分类窗口API概览窗口分配器时间窗口计数窗口 窗口函数增量聚合函数ReduceFunction / AggregateFunction全窗口函数full window functions增量聚合和全窗口函数的结合使用 其他API触发器Trigger移除器Evictor 时间语义Flink中的时间语义哪种时间语义更重要 水位线Watermark事件时间和窗口什么是水位线水位线和窗口的工作原理生成水位线生成水位线的总体原则水位线生成策略Flink内置水位线自定义水位线生成器 水位线的传递迟到数据的处理推迟水印推进设置窗口延迟关闭使用侧流接收迟到的数据 基于时间的合流——双流联结Join窗口联结Window Join间隔联结Interval Join 处理函数基本处理函数ProcessFunction处理函数的功能和使用ProcessFunction解析处理函数的分类 按键分区处理函数KeyedProcessFunction定时器Timer和定时服务TimerServiceKeyedProcessFunction案例 窗口处理函数窗口处理函数的使用ProcessWindowFunction解析 应用案例——Top N使用ProcessAllWindowFunction使用KeyedProcessFunction 侧输出流Side Output 状态管理Flink中的状态概述状态的分类 按键分区状态Keyed State值状态ValueState列表状态ListStateMap状态MapState归约状态ReducingState聚合状态AggregatingState状态生存时间TTL 算子状态Operator State列表状态ListState联合列表状态UnionListState广播状态BroadcastState 状态后端State Backends状态后端的分类HashMapStateBackend/RocksDB如何选择正确的状态后端状态后端的配置 容错机制检查点Checkpoint检查点的保存从检查点恢复状态检查点算法检查点分界线Barrier分布式快照算法Barrier对齐的精准一次分布式快照算法Barrier对齐的至少一次分布式快照算法非Barrier对齐的精准一次 检查点配置启用检查点检查点存储其它高级配置通用增量 checkpoint (changelog)最终检查点 保存点Savepoint保存点的用途使用保存点使用保存点切换状态后端 状态一致性一致性的概念和级别端到端的状态一致性 端到端精确一次End-To-End Exactly-Once输入端保证输出端保证Flink和Kafka连接时的精确一次保证 Flink 概述
Flink 是什么
Flink的核心目标是“数据流上的有状态计算” Stateful Computations over Data Streams。具体来说Apache Flink是一个框架式和分布式处理引擎用于对无界和有界数据流进行有状态计算。
Flink特点
处理数据的目标是低延迟、高吞吐、结果的准确性和良好的容错性。
高吞吐和低延迟。每秒处理数百万个事件毫秒级延迟。结果的准确性。Flink提供了事件时间event-time和处理时间processing-time语义。对于乱序事件流事件时间语义仍然能提供一致且准确的结果。精准一次exactly-once的状态一致性保证。可以连接到最常用的外部系统如Kafka、Hive、JDBC、HDFS、Redis等。高可用。本身高可用的设置加上K8sYARN和Mesos的紧密集成再加上从故障中快速恢复和动态扩展任务的能力全天候运行。
Flink vs SparkStreaming
FlinkStreaming计算模型流计算微批处理时间语义事件时间、处理时间处理时间窗口多、灵活少、不灵活窗口必须是批次的整数倍状态有没有流式SQL有没有
Flink的应用场景
电商和市场营销 举例实时数据报表、广告投放、实时推荐物联网IOT 举例传感器实时数据采集和显示、实时报警交通运输业物流配送和服务业 举例订单状态实时更新、通知信息推送银行和金融业 举例实时结算和通知推送实时检测异常行为
Flink分层API 有状态流处理通过底层API处理函数对最原始数据加工处理。底层API与DataStream API相集成可以处理复杂的计算。DataStream API流处理和DataSet API批处理封装了底层处理函数提供了通用的模块比如转换transformations包括map、flatmap等连接joins聚合aggregations窗口windows操作等。注意Flink1.12以后DataStream API已经实现真正的流批一体所以DataSet API已经过时。Table API 是以表为中心的声明式编程其中表可能会动态变化。Table API遵循关系模型表有二维数据结构类似于关系数据库中的表同时API提供可比较的操作例如select、project、join、group-by、aggregate等。我们可以在表与 DataStream/DataSet 之间无缝切换以允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。SQL这一层在语法与表达能力上与 Table API 类似但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切同时SQL查询可以直接在Table API定义的表上执行。
Flink快速上手
创建项目
创建一个Maven工程。在项目的pom文件中添加Flink的依赖包括flink-java、flink-streaming-java以及flink-clients客户端也可以省略。
propertiesflink.version1.17.0/flink.version
/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependency
/dependenciesWordCount代码编写
需求统计一段文字中每个单词出现的频次。环境准备在src/main/java目录下新建一个包命名为com.jjm.wc随便起。
批处理
批处理基本思路先逐行读入文件数据然后将每一行文字拆分成单词接着按照单词分组统计每组数据的个数就是对应单词的频次。
1数据准备 1在工程根目录下新建一个input文件夹并在下面创建文本文件words.txt 2在words.txt中输入一些文字例如
hello flink
hello world
hello java2代码编写
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)DataSourceString lineDS env.readTextFile(input/words.txt);// 3. 转换数据格式FlatMapOperatorString, Tuple2String, Long wordAndOne lineDS.flatMap(new FlatMapFunctionString, Tuple2String, Long() {Overridepublic void flatMap(String line, CollectorTuple2String, Long out) throws Exception {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word,1L));}}});// 4. 按照 word 进行分组UnsortedGroupingTuple2String, Long wordAndOneUG wordAndOne.groupBy(0);// 5. 分组内聚合统计AggregateOperatorTuple2String, Long sum wordAndOneUG.sum(1);// 6. 打印结果sum.print();}
}3结果
(flink,1)
(world,1)
(hello,3)
(java,1)需要注意的是这种代码的实现方式是基于DataSet API的也就是我们对数据的处理转换是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构批量的数据集本质上也是流没有必要用两套不同的API来实现。所以从Flink 1.12开始官方推荐的做法是直接使用DataStream API在提交任务时通过将执行模式设为BATCH来进行批处理
$ bin/flink run -Dexecution.runtime-modeBATCH BatchWordCount.jar这样DataSet API就没什么用了在实际应用中我们只要维护一套DataStream API就可以。
流处理
对于Flink而言流才是整个处理逻辑的底层核心所以流批统一之后的DataStream API更加强大可以直接处理批处理和流处理的所有场景。
1代码编写
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import java.util.Arrays;public class StreamWordCount {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSourceString lineStream env.readTextFile(input/words.txt);// 3. 转换、分组、求和得到统计结果SingleOutputStreamOperatorTuple2String, Long sum lineStream.flatMap(new FlatMapFunctionString, Tuple2String, Long() {Overridepublic void flatMap(String line, CollectorTuple2String, Long out) throws Exception {String[] words line.split( );for (String word : words) {out.collect(Tuple2.of(word, 1L));}}}).keyBy(data - data.f0).sum(1);// 4. 打印sum.print();// 5. 执行env.execute();}
}2结果
3 (java,1)
5 (hello,1)
5 (hello,2)
5 (hello,3)
13 (flink,1)
9 (world,1)3观察与批处理程序BatchWordCount的不同 创建执行环境的不同流处理程序使用的是StreamExecutionEnvironment。转换处理之后得到的数据对象类型不同。分组操作调用的是keyBy方法可以传入一个匿名函数作为键选择器KeySelector指定当前分组的key是什么。代码末尾需要调用env的execute方法开始执行任务。
Flink部署
集群角色
Flink提交作业和执行任务需要几个关键组件
客户端Client代码由客户端获取并做转换之后提交给JobMangerJobManager就是Flink集群里的“管事人”对作业进行中央调度管理而它获取到要执行的作业后会进一步处理转换然后分发任务给众多的TaskManager。TaskManager就是真正“干活的人”数据的处理操作都是它们来做的。
部署模式
会话模式Session Mode
会话模式其实最符合常规思维。我们需要先启动一个集群保持一个会话在这个会话中通过客户端提交作业。集群启动时所有资源就都已经确定所以所有提交的作业会竞争集群中的资源。会话模式比较适合于单个规模小、执行时间短的大量作业。
单作业模式Per-Job Mode
会话模式因为资源共享会导致很多问题所以为了更好地隔离资源我们可以考虑为每个提交的作业启动一个集群这就是所谓的单作业Per-Job模式。作业完成后集群就会关闭所有资源也会释放。这些特性使得单作业模式在生产环境运行更加稳定所以是实际应用的首选模式。需要注意的是Flink本身无法直接这样运行所以单作业模式一般需要借助一些资源管理框架来启动集群比如YARN、KubernetesK8S。
应用模式Application Mode
前面提到的两种模式下应用代码都是在客户端上执行然后由客户端提交给JobManager的。但是这种方式客户端需要占用大量网络带宽去下载依赖和把二进制数据发送给JobManager加上很多情况下我们提交作业用的是同一个客户端就会加重客户端所在节点的资源消耗。解决办法就是不要客户端了直接把应用提交到JobManger上运行。而这也就代表着需要为每一个提交的应用单独启动一个JobManager也就是创建一个集群。这个JobManager只为执行这一个应用而存在执行结束之后JobManager也就关闭了这就是所谓的应用模式。应用模式与单作业模式都是提交作业之后才创建集群单作业模式是通过客户端来提交的客户端解析出的每一个作业对应一个集群而应用模式下是直接由JobManager执行应用程序的。
Standalone运行模式了解
独立模式是独立运行的不依赖任何外部的资源管理平台当然独立也是有代价的如果资源不足或者出现故障没有自动扩展或重分配资源的保证必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。
会话模式部署
提前启动集群并通过Web页面客户端提交任务可以多个任务但是集群资源固定。
单作业模式部署
Flink的Standalone集群并不支持单作业模式部署。因为单作业模式需要借助一些资源管理平台。
应用模式部署
应用模式下不会提前创建集群所以不能调用start-cluster.sh脚本。我们可以使用同样在bin目录下的standalone-job.sh来创建一个JobManager。具体步骤如下 0环境准备。在hadoop102中执行以下命令启动netcat。
[jjmhadoop102 flink-1.17.0]$ nc -lk 77771进入到Flink的安装路径下将应用程序的jar包放到lib/目录下。
[jjmhadoop102 flink-1.17.0]$ mv FlinkTutorial-1.0-SNAPSHOT.jar lib/2执行以下命令启动JobManager。
[jjmhadoop102 flink-1.17.0]$ bin/standalone-job.sh start --job-classname com.jjm.wc.SocketStreamWordCount这里我们直接指定作业入口类脚本会到lib目录扫描所有的jar包。 3同样是使用bin目录下的脚本启动TaskManager。
[jjmhadoop102 flink-1.17.0]$ bin/taskmanager.sh start4在hadoop102上模拟发送单词数据。
[jjmhadoop102 ~]$ nc -lk 7777
hello5在hadoop102:8081地址中观察输出数据 6如果希望停掉集群同样可以使用脚本命令如下。
[atguiguhadoop102 flink-1.17.0]$ bin/taskmanager.sh stop
[atguiguhadoop102 flink-1.17.0]$ bin/standalone-job.sh stopYARN运行模式重点
YARN上部署的过程是客户端把Flink应用提交给Yarn的ResourceManagerYarn的ResourceManager会向Yarn的NodeManager申请容器。在这些容器上Flink会部署JobManager和TaskManager的实例从而启动集群。Flink会根据运行在JobManger上的作业所需要的Slot数量动态分配TaskManager资源。
相关准备和配置
在将Flink任务部署至YARN集群之前需要确认集群是否安装有Hadoop保证Hadoop版本至少在2.2以上并且集群中安装有HDFS服务。具体配置步骤如下 1配置环境变量增加环境变量配置如下
$ sudo vim /etc/profile.d/my_env.shHADOOP_HOME/opt/module/hadoop-3.3.4
export PATH$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CONF_DIR${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATHhadoop classpath2启动Hadoop集群包括HDFS和YARN。
[jjmhadoop102 hadoop-3.3.4]$ start-dfs.sh
[jjmhadoop103 hadoop-3.3.4]$ start-yarn.sh3在hadoop102中执行以下命令启动netcat。
[jjmhadoop102 flink-1.17.0]$ nc -lk 7777会话模式部署
YARN的会话模式与独立集群略有不同需要首先申请一个YARN会话YARN Session来启动Flink集群。具体步骤如下
1启动集群 1启动Hadoop集群HDFS、YARN。 2执行脚本命令向YARN集群申请资源开启一个YARN会话启动Flink集群。
[jjmhadoop102 flink-1.17.0]$ bin/yarn-session.sh -nm test可用参数解读 -d分离模式如果你不想让Flink YARN客户端一直前台运行可以使用这个参数即使关掉当前对话窗口YARN session也可以后台运行。 -jm–jobManagerMemory配置JobManager所需内存默认单位MB。 -nm–name配置在YARN UI界面上显示的任务名。 -qu–queue指定YARN队列名。 -tm–taskManager配置每个TaskManager所使用内存。 注意Flink1.11.0版本不再使用-n参数和-s参数分别指定TaskManager数量和slot数量YARN会按照需求动态分配TaskManager和slot。所以从这个意义上讲YARN的会话模式也不会把集群资源固定同样是动态分配的。 YARN Session启动之后会给出一个Web UI地址以及一个YARN application ID如下所示用户可以通过Web UI或者命令行两种方式提交作业。
2022-11-17 15:20:52,711 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface hadoop104:40825 of application application_1668668287070_0005.
JobManager Web Interface: http://hadoop104:408252提交作业 1通过Web UI提交作业 这种方式比较简单与上文所述Standalone部署模式基本相同。直接点击Flink界面的左侧菜单栏的‘Submit New Job’提交作业。 2通过命令行提交作业 ① 将FlinkTutorial-1.0-SNAPSHOT.jar任务上传至集群。 ② 执行以下命令将该任务提交到已经开启的Yarn-Session中运行。
[jjmhadoop102 flink-1.17.0]$ bin/flink run
-c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar客户端可以自行确定JobManager的地址也可以通过-m或者-jobmanager参数指定JobManager的地址JobManager的地址在YARN Session的启动页面中可以找到。 ③ 任务提交成功后可在YARN的Web UI界面查看运行情况。hadoop103:8088。 ④也可以通过Flink的Web UI页面查看提交任务的运行情况。
单作业模式部署
在YARN环境中由于有了外部平台做资源调度所以我们也可以直接向YARN提交一个单独的作业从而启动一个Flink集群。
1执行命令提交作业。
[atguiguhadoop102 flink-1.17.0]$ bin/flink run -d -t yarn-per-job -c com.atguigu.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar注意如果启动过程中报如下异常。
Exception in thread “Thread-5” java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration ‘classloader.check-leaked-classloader’.
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders解决办法在flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中设置
[jjmhadoop102 conf]$ vim flink-conf.yamlclassloader.check-leaked-classloader: false2在YARN的ResourceManager界面查看执行情况。3可以使用命令行查看或取消作业命令如下。
[atguiguhadoop102 flink-1.17.0]$ bin/flink list -t yarn-per-job -Dyarn.application.idapplication_XXXX_YY[atguiguhadoop102 flink-1.17.0]$ bin/flink cancel -t yarn-per-job -Dyarn.application.idapplication_XXXX_YY jobId这里的application_XXXX_YY是当前应用的ID是作业的ID。注意如果取消作业整个Flink集群也会停掉。
应用模式部署
应用模式同样非常简单与单作业模式类似直接执行flink run-application命令即可。
1命令行提交 1执行命令提交作业。
[jjmhadoop102 flink-1.17.0]$ bin/flink run-application -t yarn-application -c com.jjm.wc.SocketStreamWordCount FlinkTutorial-1.0-SNAPSHOT.jar 2在命令行中查看或取消作业。
[jjmhadoop102 flink-1.17.0]$ bin/flink list -t yarn-application -Dyarn.application.idapplication_XXXX_YY[jjmhadoop102 flink-1.17.0]$ bin/flink cancel -t yarn-application -Dyarn.application.idapplication_XXXX_YY jobId2上传HDFS提交 可以通过yarn.provided.lib.dirs配置选项指定位置将flink的依赖上传到远程。 1上传flink的lib和plugins到HDFS上
[jjmhadoop102 flink-1.17.0]$ hadoop fs -mkdir /flink-dist
[jjmhadoop102 flink-1.17.0]$ hadoop fs -put lib/ /flink-dist
[jjmhadoop102 flink-1.17.0]$ hadoop fs -put plugins/ /flink-dist2上传自己的jar包到HDFS
[jjmhadoop102 flink-1.17.0]$ hadoop fs -mkdir /flink-jars
[jjmhadoop102 flink-1.17.0]$ hadoop fs -put FlinkTutorial-1.0-SNAPSHOT.jar /flink-jars3提交作业
[jjmhadoop102 flink-1.17.0]$ bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirshdfs://hadoop102:8020/flink-dist -c com.jjm.wc.SocketStreamWordCount hdfs://hadoop102:8020/flink-jars/FlinkTutorial-1.0-SNAPSHOT.jar这种方式下flink本身的依赖和用户jar可以预先上传到HDFS而不需要单独发送到集群这就使得作业提交更加轻量了。
K8S 运行模式了解
容器化部署是如今业界流行的一项技术基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是Kubernetesk8s而Flink也在最近的版本中支持了k8s部署模式。基本原理与YARN是类似的具体配置可以参见官网说明这里我们就不做过多讲解了。
历史服务器
Flink提供了历史服务器用来在相应的 Flink 集群关闭后查询已完成作业的统计信息。我们都知道只有当作业处于运行中的状态才能够查看到相关的WebUI统计信息。通过 History Server 我们才能查询这些已完成作业的统计信息无论是正常退出还是异常退出。 此外它对外提供了 REST API它接受 HTTP 请求并使用 JSON 数据进行响应。Flink 任务停止后JobManager 会将已经完成任务的统计信息进行存档History Server 进程则在任务停止后可以对任务统计信息进行查询。比如最后一次的 Checkpoint、任务运行时的相关配置。
1创建存储目录
hadoop fs -mkdir -p /logs/flink-job2在 flink-config.yaml中添加如下配置
jobmanager.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job
historyserver.web.address: hadoop102
historyserver.web.port: 8082
historyserver.archive.fs.dir: hdfs://hadoop102:8020/logs/flink-job
historyserver.archive.fs.refresh-interval: 50003启动历史服务器
bin/historyserver.sh start4停止历史服务器
bin/historyserver.sh stop5在浏览器地址栏输入http://hadoop102:8082 查看已经停止的 job 的统计信息
Flink运行时架构
系统架构
Flink运行时架构——Standalone会话模式为例
1作业管理器JobManager JobManager是一个Flink集群中任务管理和调度的核心是控制应用执行的主进程。也就是说每个应用都应该被唯一的JobManager所控制执行。 JobManger又包含3个不同的组件。 1JobMaster JobMaster是JobManager中最核心的组件负责处理单独的作业Job。所以JobMaster和具体的Job是一一对应的多个Job可以同时运行在一个Flink集群中, 每个Job都有一个自己的JobMaster。需要注意在早期版本的Flink中没有JobMaster的概念而JobManager的概念范围较小实际指的就是现在所说的JobMaster。 在作业提交时JobMaster会先接收到要执行的应用。JobMaster会把JobGraph转换成一个物理层面的数据流图这个图被叫作“执行图”ExecutionGraph它包含了所有可以并发执行的任务。JobMaster会向资源管理器ResourceManager发出请求申请执行任务必要的资源。一旦它获取到了足够的资源就会将执行图分发到真正运行它们的TaskManager上。 而在运行过程中JobMaster会负责所有需要中央协调的操作比如说检查点checkpoints的协调。 2资源管理器ResourceManager ResourceManager主要负责资源的分配和管理在Flink 集群中只有一个。所谓“资源”主要是指TaskManager的任务槽task slots。任务槽就是Flink集群中的资源调配单元包含了机器用来执行计算的一组CPU和内存资源。每一个任务Task都需要分配到一个slot上执行。 这里注意要把Flink内置的ResourceManager和其他资源管理平台比如YARN的ResourceManager区分开。 3分发器Dispatcher Dispatcher主要负责提供一个REST接口用来提交应用并且负责为每一个新提交的作业启动一个新的JobMaster 组件。Dispatcher也会启动一个Web UI用来方便地展示和监控作业执行的信息。Dispatcher在架构中并不是必需的在不同的部署模式下可能会被忽略掉。2任务管理器TaskManager TaskManager是Flink中的工作进程数据流的具体计算就是它来做的。Flink集群中必须至少有一个TaskManager每一个TaskManager都包含了一定数量的任务槽task slots。Slot是资源调度的最小单位slot的数量限制了TaskManager能够并行处理的任务数量。 启动之后TaskManager会向资源管理器注册它的slots收到资源管理器的指令后TaskManager就会将一个或者多个槽位提供给JobMaster调用JobMaster就可以分配任务来执行了。 在执行过程中TaskManager可以缓冲数据还可以跟其他运行同一应用的TaskManager交换数据。
核心概念
并行度Parallelism
1并行子任务和并行度 当要处理的数据量非常大时我们可以把一个算子操作“复制”多份到多个节点数据来了之后就可以到其中任意一个执行。这样一来一个算子任务就被拆分成了多个并行的“子任务”subtasks再将它们分发到不同节点就真正实现了并行计算。 在Flink执行过程中每一个算子operator可以包含一个或多个子任务operator subtask这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。 **一个特定算子的子任务subtask的个数被称之为其并行度parallelism。**这样包含并行子任务的数据流就是并行数据流它需要多个分区stream partition来分配并行任务。一般情况下一个流程序的并行度可以认为就是其所有算子中最大的并行度。一个程序中不同的算子可能具有不同的并行度。2并行度的设置 在Flink中可以用不同的方法来设置并行度它们的有效范围和优先级别也是不同的。 1代码中设置 我们在代码中可以很简单地在算子后跟着调用**setParallelism()**方法来设置当前算子的并行度
stream.map(word - Tuple2.of(word, 1L)).setParallelism(2);这种方式设置的并行度只针对当前算子有效。 另外我们也可以直接调用执行环境的setParallelism()方法全局设定并行度
env.setParallelism(2);这样代码中所有算子默认的并行度就都为2了。我们一般不会在程序中设置全局并行度因为如果在程序中对全局并行度进行硬编码会导致无法动态扩容。 这里要注意的是由于keyBy不是算子所以无法对keyBy设置并行度。 2提交应用时设置 在使用flink run命令提交应用时可以增加-p参数来指定当前应用程序执行的并行度它的作用类似于执行环境的全局设置
bin/flink run –p 2 –c com.atguigu.wc.SocketStreamWordCount
./FlinkTutorial-1.0-SNAPSHOT.jar如果我们直接在Web UI上提交作业也可以在对应输入框中直接添加并行度。 3配置文件中设置 我们还可以直接在集群的配置文件flink-conf.yaml中直接更改默认并行度
parallelism.default: 2这个设置对于整个集群上提交的所有作业有效初始值为1。无论在代码中设置、还是提交时的-p参数都不是必须的所以在没有指定并行度的时候就会采用配置文件中的集群默认并行度。在开发环境中没有配置文件默认并行度就是当前机器的CPU核心数。
算子链Operator Chain
1算子间的数据传输 一个数据流在算子之间传输数据的形式可以是一对一one-to-one的直通forwarding模式也可以是打乱的重分区redistributing模式具体是哪一种形式取决于算子的种类。 1一对一One-to-oneforwarding 这种模式下数据流维护着分区以及元素的顺序。比如图中的source和map算子source算子读取数据之后可以直接发送给map算子做处理它们之间不需要重新分区也不需要调整数据的顺序。这就意味着map 算子的子任务看到的元素个数和顺序跟source 算子的子任务产生的完全一样保证着“一对一”的关系。map、filter、flatMap等算子都是这种one-to-one的对应关系。这种关系类似于Spark中的窄依赖。 2重分区Redistributing 在这种模式下数据流的分区会发生改变。比如图中的map和后面的keyBy/window算子之间以及keyBy/window算子和Sink算子之间都是这样的关系。 每一个算子的子任务会根据数据传输的策略把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程这一过程类似于Spark中的shuffle。2合并算子链 在Flink中并行度相同的一对一one to one算子操作可以直接链接在一起形成一个“大”的任务task这样原来的算子就成为了真正任务里的一部分如下图所示。每个task会被一个线程执行。这样的技术被称为“算子链”Operator Chain。 上图中Source和map之间满足了算子链的要求所以可以直接合并在一起形成了一个任务因为并行度为2所以合并后的任务也有两个并行子任务。这样这个数据流图所表示的作业最终会有5个任务由5个线程并行执行。 将算子链接成task是非常有效的优化可以减少线程之间的切换和基于缓存区的数据交换在减少时延的同时提升吞吐量。 Flink默认会按照算子链的原则进行链接合并如果我们想要禁止合并或者自行定义也可以在代码中对算子做一些特定的设置
// 禁用算子链
.map(word - Tuple2.of(word, 1L)).disableChaining();// 从当前算子开始新链
.map(word - Tuple2.of(word, 1L)).startNewChain()任务槽Task Slots
1任务槽Task Slots Flink中每一个TaskManager都是一个JVM进程它可以启动多个独立的线程来并行执行多个子任务subtask。 很显然TaskManager的计算资源是有限的并行的任务越多每个线程的资源就会越少。那一个TaskManager到底能并行处理多少个任务呢为了控制并发量我们需要在TaskManager上对每个任务运行所占用的资源做出明确的划分这就是所谓的任务槽task slots。 每个任务槽task slot其实表示了TaskManager拥有计算资源的一个固定大小的子集。这些资源就是用来独立执行一个子任务的。2任务槽数量的设置 在Flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中可以设置TaskManager的slot数量默认是1个slot。
taskmanager.numberOfTaskSlots: 8需要注意的是slot目前仅仅用来隔离内存不会涉及CPU的隔离。在具体应用时可以将slot数量配置为机器的CPU核心数尽量避免不同任务之间对CPU的竞争。这也是开发环境默认并行度设为机器CPU数量的原因。
3任务对任务槽的共享 在同一个作业中不同任务节点的并行子任务就可以放到同一个slot上执行。 **默认情况下Flink是允许子任务共享slot的。**如果我们保持sink任务并行度为1不变而作业提交时设置全局并行度为6那么前两个任务节点就会各自有6个并行子任务整个流处理程序则有13个子任务。如上图所示只要属于同一个作业那么对于不同任务节点算子的并行子任务就可以放到同一个slot上执行。所以对于第一个任务节点source→map它的6个并行子任务必须分到不同的slot上而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。 当我们将资源密集型和非密集型的任务同时放到一个slot中它们就可以自行分配对资源占用的比例从而保证最重的活平均分配给所有的TaskManager。 **slot共享另一个好处就是允许我们保存完整的作业管道。**这样一来即使某个TaskManager出现故障宕机其他节点也可以完全不受影响作业的任务可以继续执行。 当然Flink默认是允许slot共享的如果希望某个算子对应的任务完全独占一个slot或者只有某一部分算子共享slot我们也可以通过设置“slot共享组”手动指定
.map(word - Tuple2.of(word, 1L)).slotSharingGroup(1);这样只有属于同一个slot共享组的子任务才会开启slot共享不同组之间的任务是完全隔离的必须分配到不同的slot上。在这种场景下总共需要的slot数量就是各个slot共享组最大并行度的总和。
任务槽和并行度的关系
任务槽和并行度都跟程序的并行执行有关但两者是完全不同的概念。简单来说任务槽是静态的概念是指TaskManager具有的并发执行能力可以通过参数taskmanager.numberOfTaskSlots进行配置而并行度是动态概念也就是TaskManager运行程序时实际使用的并发能力可以通过参数parallelism.default进行配置。 举例说明假设一共有3个TaskManager每一个TaskManager中的slot数量设置为3个那么一共有9个task slot表示集群最多能并行执行9个同一算子的子任务。 而我们定义word count程序的处理操作是四个转换算子 source→ flatmap→ reduce→ sink 当所有算子并行度相同时容易看出source和flatmap可以合并算子链于是最终有三个任务节点。
作业提交流程
Standalone会话模式作业提交流程 逻辑流图/作业图/执行图/物理流图
我们已经彻底了解了由代码生成任务的过程现在来做个梳理总结。 逻辑流图StreamGraph→ 作业图JobGraph→ 执行图ExecutionGraph→ 物理图Physical Graph。
辑流图——作业流图——执行流图——物理流图 1逻辑流图StreamGraph 这是根据用户通过 DataStream API编写的代码生成的最初的DAG图用来表示程序的拓扑结构。这一步一般在客户端完成。 2作业图JobGraph StreamGraph经过优化后生成的就是作业图JobGraph这是提交给 JobManager 的数据结构确定了当前作业中所有任务的划分。主要的优化为将多个符合条件的节点链接在一起合并成一个任务节点形成算子链这样可以减少数据交换的消耗。JobGraph一般也是在客户端生成的在作业提交时传递给JobMaster。 我们提交作业之后打开Flink自带的Web UI点击作业就能看到对应的作业图。 3执行图ExecutionGraph JobMaster收到JobGraph后会根据它来生成执行图ExecutionGraph。ExecutionGraph是JobGraph的并行化版本是调度层最核心的数据结构。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分并明确了任务间数据传输的方式。 4物理图Physical Graph JobMaster生成执行图后会将它分发给TaskManager各个TaskManager会根据执行图部署任务最终的物理执行过程也会形成一张“图”一般就叫作物理图Physical Graph。这只是具体执行层面的图并不是一个具体的数据结构。 物理图主要就是在执行图的基础上进一步确定数据存放的位置和收发的具体方式。有了物理图TaskManager就可以对传递来的数据进行处理计算了。
Yarn应用模式作业提交流程 DataStream API
DataStream API是Flink的核心层API。一个Flink程序其实就是对DataStream的各种转换。具体来说代码基本上都由以下几部分构成
执行环境Execution Environment
Flink程序可以在各种上下文环境中运行我们可以在本地JVM中执行程序也可以提交到远程集群上运行。 不同的环境代码的提交运行的过程会有所不同。这就要求我们在提交作业执行计算时首先必须获取当前Flink的运行环境从而建立起与Flink框架之间的联系。
创建执行环境
我们要获取的执行环境是StreamExecutionEnvironment类的对象这是所有Flink程序的基础。在代码中创建执行环境的方式就是调用这个类的静态方法具体有以下三种。
1getExecutionEnvironment 最简单的方式就是直接调用getExecutionEnvironment方法。它会根据当前运行的上下文直接得到正确的结果如果程序是独立运行的就返回一个本地执行环境如果是创建了jar包然后从命令行调用它并提交到集群执行那么就返回集群的执行环境。也就是说这个方法会根据当前运行的方式自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();这种方式用起来简单高效是最常用的一种创建执行环境的方式。
2createLocalEnvironment 这个方法返回一个本地执行环境。可以在调用时传入一个参数指定默认的并行度如果不传入则默认并行度就是本地的CPU核心数。
StreamExecutionEnvironment localEnv StreamExecutionEnvironment.createLocalEnvironment();3createRemoteEnvironment 这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号并指定要在集群中运行的Jar包。
StreamExecutionEnvironment remoteEnv StreamExecutionEnvironment.createRemoteEnvironment(host, // JobManager主机名1234, // JobManager进程端口号path/to/jarFile.jar // 提交给JobManager的JAR包); 在获取到程序执行环境后我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链还可以定义程序的时间语义、配置容错机制。
执行模式Execution Mode
从Flink 1.12开始官方推荐的做法是直接使用DataStream API在提交任务时通过将执行模式设为BATCH来进行批处理。不建议使用DataSet API。
// 流处理环境
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStream API执行模式包括流执行模式、批执行模式和自动模式。
流执行模式Streaming 这是DataStream API最经典的模式一般用于需要持续实时处理的无界数据流。默认情况下程序使用的就是Streaming执行模式。批执行模式Batch 专门用于批处理的执行模式。自动模式AutoMatic 在这种模式下将由程序根据输入数据源是否有界来自动选择执行模式。 批执行模式的使用。主要有两种方式 1通过命令行配置
bin/flink run -Dexecution.runtime-modeBATCH ...在提交作业时增加execution.runtime-mode参数指定值为BATCH。 2通过代码配置
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.BATCH);在代码中直接基于执行环境调用setRuntimeMode方法传入BATCH模式。 实际应用中一般不会在代码中配置而是使用命令行这样更加灵活。
触发程序执行
需要注意的是写完输出sink操作并不代表程序已经结束。因为当main()方法被调用时其实只是定义了作业的每个执行操作然后添加到数据流图中这时并没有真正处理数据——因为数据可能还没来。Flink是由事件驱动的只有等到数据到来才会触发真正的计算这也被称为“延迟执行”或“懒执行”。 所以我们需要显式地调用执行环境的execute()方法来触发程序执行。execute()方法将一直等待作业完成然后返回一个执行结果JobExecutionResult。
env.execute();源算子Source
Flink可以从各种来源获取数据然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源data source而读取数据的算子就是源算子source operator。所以source就是我们整个处理程序的输入端。 在Flink1.12以前旧的添加source的方式是调用执行环境的addSource()方法
DataStreamString stream env.addSource(...);方法传入的参数是一个“源函数”source function需要实现SourceFunction接口。 从Flink1.12开始主要使用流批统一的新Source架构
DataStreamSourceString stream env.fromSource(…)Flink直接提供了很多预实现的接口此外还有很多外部连接工具也帮我们实现了对应的Source通常情况下足以应对我们的实际需求。
准备工作
为了方便练习这里使用WaterSensor作为数据模型。
字段名数据类型说明idLong水位传感器类型tsString传感器记录时间戳vcInteger水位记录具体代码如下
public class WaterSensor {public String id;public Long ts;public Integer vc;public WaterSensor() {}public WaterSensor(String id, Long ts, Integer vc) {this.id id;this.ts ts;this.vc vc;}public String getId() {return id;}public void setId(String id) {this.id id;}public Long getTs() {return ts;}public void setTs(Long ts) {this.ts ts;}public Integer getVc() {return vc;}public void setVc(Integer vc) {this.vc vc;}Overridepublic String toString() {return WaterSensor{ id id \ , ts ts , vc vc };}Overridepublic boolean equals(Object o) {if (this o) {return true;}if (o null || getClass() ! o.getClass()) {return false;}WaterSensor that (WaterSensor) o;return Objects.equals(id, that.id) Objects.equals(ts, that.ts) Objects.equals(vc, that.vc);}Overridepublic int hashCode() {return Objects.hash(id, ts, vc);}
}这里需要注意定义的WaterSensor有这样几个特点
类是公有public的有一个无参的构造方法所有属性都是公有public的所有属性的类型都是可以序列化的 Flink会把这样的类作为一种特殊的POJOPlain Ordinary Java Object简单的Java对象实际就是普通JavaBeans数据类型来对待方便数据的解析和序列化。另外在类中还重写了toString方法主要是为了测试输出显示更清晰。 这里自定义的POJO类会在后面的代码中频繁使用所以在后面的代码中碰到把这里的POJO类导入就好了。
从集合中读取数据
最简单的读取数据的方式就是在代码中直接创建一个Java集合然后调用执行环境的fromCollection方法进行读取。这相当于将数据临时存储到内存中形成特殊的数据结构后作为数据源使用一般用于测试。
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();ListInteger data Arrays.asList(1, 22, 3);DataStreamSourceInteger ds env.fromCollection(data);stream.print();env.execute();
}从文件读取数据
真正的实际应用中自然不会直接将数据写在代码中。通常情况下我们会从存储介质中获取数据一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。 读取文件需要添加文件连接器依赖: dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-files/artifactIdversion${flink.version}/version
/dependency示例代码如下:
public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();FileSourceString fileSource FileSource.forRecordStreamFormat(new TextLineInputFormat(), new Path(input/word.txt)).build();env.fromSource(fileSource,WatermarkStrategy.noWatermarks(),file).print();env.execute();
}说明
参数可以是目录也可以是文件还可以从HDFS目录下读取使用路径hdfs://…路径可以是相对路径也可以是绝对路径 相对路径是从系统属性user.dir获取路径idea下是project的根目录standalone模式下是集群节点根目录
从Socket读取数据
不论从集合还是文件我们读取的其实都是有界数据。在流处理的场景中数据往往是无界的。 读取socket文本流就是流处理场景。但是这种方式由于吞吐量小、稳定性较差一般也是用于测试。
DataStreamString stream env.socketTextStream(localhost, 7777);从Kafka读取数据
Flink官方提供了连接工具flink-connector-kafka直接帮我们实现了一个消费者FlinkKafkaConsumer它就是用来读取Kafka数据的SourceFunction。 所以想要以Kafka作为数据源获取数据我们只需要引入Kafka连接器的依赖。Flink官方提供的是一个通用的Kafka连接器它会自动跟踪最新版本的Kafka客户端。目前最新版本只支持0.10.0版本以上的Kafka。这里我们需要导入的依赖如下。
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version
/dependency代码如下
public class SourceKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();KafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092).setTopics(topic_1).setGroupId(atguigu).setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()) .build();DataStreamSourceString stream env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), kafka-source);stream.print(Kafka);env.execute();}
}从数据生成器读取数据
Flink从1.11开始提供了一个内置的DataGen 连接器主要是用于生成一些随机数用于在没有数据源的时候进行流任务的测试以及性能测试等。1.17提供了新的Source写法需要导入依赖 dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-datagen/artifactIdversion${flink.version}/version/dependency代码如下
public class DataGeneratorDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number:value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(10),Types.STRING);env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), datagenerator).print();env.execute();}
}Flink支持的数据类型
1Flink的类型系统 Flink使用“类型信息”TypeInformation来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性并为每个数据类型生成特定的序列化器、反序列化器和比较器。2Flink支持的数据类型 对于常见的Java和Scala数据类型Flink都是支持的。Flink在内部Flink对支持不同的类型进行了划分这些类型可以在Types工具类中找到 1基本类型 所有Java基本类型及其包装类再加上Void、String、Date、BigDecimal和BigInteger。 2数组类型 包括基本类型数组PRIMITIVE_ARRAY和对象数组OBJECT_ARRAY。 3复合数据类型 Java元组类型TUPLE这是Flink内置的元组类型是Java API的一部分。最多25个字段也就是从Tuple0~Tuple25不支持空字段。 Scala 样例类及Scala元组不支持空字段。 行类型ROW可以认为是具有任意个字段的元组并支持空字段。 POJOFlink自定义的类似于Java bean模式的类。 4辅助类型 Option、Either、List、Map等。 5泛型类型GENERIC **Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义就会被Flink当作泛型类来处理。**Flink会把泛型类型当作黑盒无法获取它们内部的属性它们也不是由Flink本身序列化的而是由Kryo序列化的。 在这些类型中元组类型和POJO类型最为灵活因为它们支持创建复杂类型。而相比之下POJO还支持在键key的定义中直接使用字段名这会让我们的代码可读性大大增加。所以在项目实践中往往会将流处理程序中的元素类型定为Flink的POJO类型。 Flink对POJO类型的要求如下 类是公有public的有一个无参的构造方法所有属性都是公有public的所有属性的类型都是可以序列化的 3类型提示Type Hints Flink还具有一个类型提取系统可以分析函数的输入和返回类型自动获取类型信息从而获得对应的序列化器和反序列化器。但是由于Java中泛型擦除的存在在某些特殊情况下比如Lambda表达式中自动提取的信息是不够精细的——只告诉Flink当前的元素由“船头、船身、船尾”构成根本无法重建出“大船”的模样这时就需要显式地提供类型信息才能使应用程序正常工作或提高其性能。 为了解决这类问题Java API提供了专门的“类型提示”type hints。 例如
.map(word - Tuple2.of(word, 1L))
.returns(Types.TUPLE(Types.STRING, Types.LONG));Flink还专门提供了TypeHint类它可以捕获泛型的类型信息并且一直记录下来为运行时提供足够的信息。我们同样可以通过.returns()方法明确地指定转换之后的DataStream里元素的类型。
returns(new TypeHintTuple2Integer, SomeType(){})转换算子Transformation
数据源读入数据之后我们就可以使用各种转换算子将一个或多个DataStream转换为新的DataStream。
基本转换算子map/ filter/ flatMap
映射map
map是大家非常熟悉的大数据操作算子主要用于将数据流中的数据进行转换形成新的数据流。简单来说就是一个“一一映射”消费一个元素就产出一个元素。 我们只需要基于DataStream调用map()方法就可以进行转换处理。方法需要传入的参数是接口MapFunction的实现返回值类型还是DataStream不过泛型流中的元素类型可能改变。 下面的代码用不同的方式实现了提取WaterSensor中的id字段的功能。
public class TransMap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1, 1),new WaterSensor(sensor_2, 2, 2));// 方式一传入匿名类实现MapFunctionstream.map(new MapFunctionWaterSensor, String() {Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}).print();// 方式二传入MapFunction的实现类// stream.map(new UserMap()).print();env.execute();}public static class UserMap implements MapFunctionWaterSensor, String {Overridepublic String map(WaterSensor e) throws Exception {return e.id;}}
}上面代码中MapFunction实现类的泛型类型与输入数据类型和输出数据的类型有关。在实现MapFunction接口的时候需要指定两个泛型分别是输入事件和输出事件的类型还需要重写一个map()方法定义从一个输入事件转换为另一个输出事件的具体逻辑。
过滤filter
filter转换操作顾名思义是对数据流执行一个过滤通过一个布尔条件表达式设置过滤条件对于每一个流内元素进行判断若为true则元素正常输出若为false则元素被过滤掉。 进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口而FilterFunction内要实现filter()方法就相当于一个返回布尔类型的条件表达式。 案例需求下面的代码会将数据流中传感器id为sensor_1的数据过滤出来。
public class TransFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1, 1),
new WaterSensor(sensor_1, 2, 2),
new WaterSensor(sensor_2, 2, 2),
new WaterSensor(sensor_3, 3, 3));// 方式一传入匿名类实现FilterFunctionstream.filter(new FilterFunctionWaterSensor() {Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals(sensor_1);}}).print();// 方式二传入FilterFunction实现类// stream.filter(new UserFilter()).print();env.execute();}public static class UserFilter implements FilterFunctionWaterSensor {Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals(sensor_1);}}
}扁平映射flatMap
flatMap操作又称为扁平映射主要是将数据流中的整体一般是集合类型拆分成一个一个的个体使用。消费一个元素可以产生0到多个元素。flatMap可以认为是“扁平化”flatten和“映射”map两步操作的结合也就是先按照某种规则对数据进行打散拆分再对拆分后的元素做转换处理。 案例需求如果输入的数据是sensor_1只打印vc如果输入的数据是sensor_2既打印ts又打印vc。 实现代码如下
public class TransFlatmap {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1, 1),
new WaterSensor(sensor_1, 2, 2),
new WaterSensor(sensor_2, 2, 2),
new WaterSensor(sensor_3, 3, 3));stream.flatMap(new MyFlatMap()).print();env.execute();}public static class MyFlatMap implements FlatMapFunctionWaterSensor, String {Overridepublic void flatMap(WaterSensor value, CollectorString out) throws Exception {if (value.id.equals(sensor_1)) {out.collect(String.valueOf(value.vc));} else if (value.id.equals(sensor_2)) {out.collect(String.valueOf(value.ts));out.collect(String.valueOf(value.vc));}}}
} 聚合算子Aggregation
按键分区keyBy
在Flink中要做聚合需要先进行分区这个操作就是通过keyBy来完成的。keyBy是聚合前必须要用到的一个算子。keyBy通过指定键key可以将一条流从逻辑上划分成不同的分区partitions。这里所说的分区其实就是并行处理的子任务。基于不同的key流中的数据将被分配到不同的分区中去这样一来所有具有相同的key的数据都将被发往同一个分区。 在内部是通过计算key的哈希值hash code对分区数进行取模运算来实现的。所以这里key如果是POJO的话必须要重写hashCode()方法。 keyBy()方法需要传入一个参数这个参数指定了一个或一组key。有很多不同的方法来指定key比如对于Tuple数据类型可以指定字段的位置或者多个位置的组合对于POJO类型可以指定字段的名称String另外还可以传入Lambda表达式或者实现一个键选择器KeySelector用于说明从数据中提取key的逻辑。 我们可以以id作为key做一个分区操作代码实现如下
public class TransKeyBy {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(
new WaterSensor(sensor_1, 1, 1),
new WaterSensor(sensor_1, 2, 2),
new WaterSensor(sensor_2, 2, 2),
new WaterSensor(sensor_3, 3, 3));// 方式一使用Lambda表达式KeyedStreamWaterSensor, String keyedStream stream.keyBy(e - e.id);// 方式二使用匿名类实现KeySelectorKeyedStreamWaterSensor, String keyedStream1 stream.keyBy(new KeySelectorWaterSensor, String() {Overridepublic String getKey(WaterSensor e) throws Exception {return e.id;}});env.execute();}
}需要注意的是keyBy得到的结果将不再是DataStream而是会将DataStream转换为KeyedStream。KeyedStream可以认为是“分区流”或者“键控流”它是对DataStream按照key的一个逻辑分区所以泛型有两个类型除去当前流中的元素类型外还需要指定key的类型。 KeyedStream也继承自DataStream所以基于它的操作也都归属于DataStream API。但它跟之前的转换操作得到的SingleOutputStreamOperator不同只是一个流的分区操作并不是一个转换算子。KeyedStream是一个非常重要的数据结构只有基于它才可以做后续的聚合操作比如sumreduce。
简单聚合sum/min/max/minBy/maxBy
有了按键分区的数据流KeyedStream就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API主要有以下几种
sum()在输入流上对指定的字段做叠加求和的操作。min()在输入流上对指定的字段求最小值。max()在输入流上对指定的字段求最大值。minBy()与min()类似在输入流上针对指定字段求最小值。不同的是min()只计算指定字段的最小值其他字段会保留最初第一个数据的值而minBy()则会返回包含字段最小值的整条数据。maxBy()与max()类似在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。 简单聚合算子使用非常方便语义也非常明确。这些聚合方法调用时也需要传入参数但并不像基本转换算子那样需要实现自定义函数只要说明聚合指定的字段就可以了。指定字段的方式有两种指定位置和指定名称。 对于元组类型的数据可以使用这两种方式来指定字段。需要注意的是元组中字段的名称是以f0、f1、f2、…来命名的。 如果数据流的类型是POJO类那么就只能通过字段名称来指定不能通过位置来指定了。
public class TransAggregation {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(
new WaterSensor(sensor_1, 1, 1),
new WaterSensor(sensor_1, 2, 2),
new WaterSensor(sensor_2, 2, 2),
new WaterSensor(sensor_3, 3, 3));stream.keyBy(e - e.id).max(vc); // 指定字段名称env.execute();}
}简单聚合算子返回的同样是一个SingleOutputStreamOperator也就是从KeyedStream又转换成了常规的DataStream。所以可以这样理解**keyBy和聚合是成对出现的先分区、后聚合得到的依然是一个DataStream。**而且经过简单聚合之后的数据流元素的数据类型保持不变。 一个聚合算子会为每一个key保存一个聚合的值在Flink中我们把它叫作“状态”state。所以每当有一个新的数据输入算子就会更新保存的聚合结果并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说这些状态是永远不会被清除的所以我们使用聚合算子应该只用在含有有限个key的数据流上。
归约聚合reduce
reduce可以对已有的数据进行归约处理把每一个新输入的数据和当前已经归约出来的值再做一个聚合计算。 reduce操作也会将KeyedStream转换为DataStream。它不会改变流的元素数据类型所以输出类型和输入类型是一样的。 调用KeyedStream的reduce方法时需要传入一个参数实现ReduceFunction接口。接口在源码中的定义如下
public interface ReduceFunctionT extends Function, Serializable {T reduce(T value1, T value2) throws Exception;
}ReduceFunction接口里需要实现reduce()方法这个方法接收两个输入事件经过转换处理之后输出一个相同类型的事件。在流处理的底层实现过程中实际上是将中间“合并的结果”作为任务的一个状态保存起来的之后每来一个新的数据就和之前的聚合状态进一步做归约。 我们可以单独定义一个函数类实现ReduceFunction接口也可以直接传入一个匿名类。当然同样也可以通过传入Lambda表达式实现类似的功能。 为了方便后续使用定义一个WaterSensorMapFunction
public class WaterSensorMapFunction implements MapFunctionString,WaterSensor {Overridepublic WaterSensor map(String value) throws Exception {String[] datas value.split(,);return new WaterSensor(datas[0],Long.valueOf(datas[1]) ,Integer.valueOf(datas[2]) );}
}案例使用reduce实现max和maxBy的功能。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).keyBy(WaterSensor::getId).reduce(new ReduceFunctionWaterSensor(){Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println(Demo7_Reduce.reduce);int maxVc Math.max(value1.getVc(), value2.getVc());//实现max(vc)的效果 取最大值其他字段以当前组的第一个为主//value1.setVc(maxVc);//实现maxBy(vc)的效果 取当前最大值的所有字段if (value1.getVc() value2.getVc()){value1.setVc(maxVc);return value1;}else {value2.setVc(maxVc);return value2;}}}).print();
env.execute();reduce同简单聚合算子一样也要针对每一个key保存状态。因为状态不会清空所以我们需要将reduce算子作用在一个有限key的流上。
用户自定义函数UDF
用户自定义函数分为函数类、匿名函数、富函数类。
函数类Function Classes
Flink暴露了所有UDF函数的接口具体实现方式为接口或者抽象类例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类实现对应的接口。 需求用来从用户的点击数据中筛选包含“sensor_1”的内容 方式一实现FilterFunction接口
public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1, 1),
new WaterSensor(sensor_1, 2, 2),
new WaterSensor(sensor_2, 2, 2),
new WaterSensor(sensor_3, 3, 3));DataStreamString filter stream.filter(new UserFilter());filter.print();env.execute();}public static class UserFilter implements FilterFunctionWaterSensor {Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals(sensor_1);}}
}方式二通过匿名类来实现FilterFunction接口
DataStreamString stream stream.filter(new FilterFunction WaterSensor() {Overridepublic boolean filter(WaterSensor e) throws Exception {return e.id.equals(sensor_1);}
});方式二的优化为了类可以更加通用我们还可以将用于过滤的关键字home抽象出来作为类的属性调用构造方法时传进去。
DataStreamSourceWaterSensor stream env.fromElements(
new WaterSensor(sensor_1, 1, 1),
new WaterSensor(sensor_1, 2, 2),
new WaterSensor(sensor_2, 2, 2),
new WaterSensor(sensor_3, 3, 3)
);DataStreamString stream stream.filter(new FilterFunctionImpl(sensor_1));public static class FilterFunctionImpl implements FilterFunctionWaterSensor {private String id;FilterFunctionImpl(String id) { this.idid; }Overridepublic boolean filter(WaterSensor value) throws Exception {return thid.id.equals(value.id);}
}方式三采用匿名函数Lambda
public class TransFunctionUDF {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSourceWaterSensor stream env.fromElements(new WaterSensor(sensor_1, 1, 1),
new WaterSensor(sensor_1, 2, 2),
new WaterSensor(sensor_2, 2, 2),
new WaterSensor(sensor_3, 3, 3)); //map函数使用Lambda表达式不需要进行类型声明SingleOutputStreamOperatorString filter stream.filter(sensor - sensor_1.equals(sensor.id));filter.print();env.execute();}
}富函数类Rich Function Classes
“富函数类”也是DataStream API提供的一个函数类的接口所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如RichMapFunction、RichFilterFunction、RichReduceFunction等。 与常规函数类的不同主要在于富函数类可以获取运行环境的上下文并拥有一些生命周期方法所以可以实现更复杂的功能。 Rich Function有生命周期的概念。典型的生命周期方法有
open()方法是Rich Function的初始化方法也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前open()会首先被调用。close()方法是生命周期中的最后一个调用的方法类似于结束方法。一般用来做一些清理工作。 需要注意的是这里的生命周期方法对于一个并行子任务来说只会调用一次而对应的实际工作方法例如RichMapFunction中的map()在每条数据到来后都会触发一次调用。 来看一个例子说明
public class RichFunctionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.fromElements(1,2,3,4).map(new RichMapFunctionInteger, Integer() {Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println(索引是 getRuntimeContext().getIndexOfThisSubtask() 的任务的生命周期开始);}Overridepublic Integer map(Integer integer) throws Exception {return integer 1;}Overridepublic void close() throws Exception {super.close();System.out.println(索引是 getRuntimeContext().getIndexOfThisSubtask() 的任务的生命周期结束);}}).print();env.execute();}
}物理分区算子Physical Partitioning
常见的物理分区策略有随机分配Random、轮询分配Round-Robin、重缩放Rescale和广播Broadcast。
随机分区shuffle
最简单的重分区方式就是直接“洗牌”。通过调用DataStream的.shuffle()方法将数据随机地分配到下游算子的并行任务中去。 随机分区服从均匀分布uniform distribution所以可以把流中的数据随机打乱均匀地传递到下游任务分区。因为是完全随机的所以对于同样的输入数据, 每次执行得到的结果也不会相同。 经过随机分区之后得到的依然是一个DataStream。
stream.shuffle()轮询分区Round-Robin
轮询简单来说就是“发牌”按照先后顺序将数据做依次分发。通过调用DataStream的.rebalance()方法就可以实现轮询重分区。rebalance使用的是Round-Robin负载均衡算法可以将输入流数据平均分配到下游的并行任务中去。
stream.rebalance()重缩放分区rescale
重缩放分区和轮询分区非常相似。当调用rescale()方法时其实底层也是使用Round-Robin算法进行轮询但是只会将数据轮询发送到下游并行任务的一部分中。rescale的做法是分成小团体发牌人只给自己团体内的所有人轮流发牌。
stream.rescale()广播broadcast
这种方式其实不应该叫做“重分区”因为经过广播之后数据会在不同的分区都保留一份可能进行重复处理。可以通过调用DataStream的broadcast()方法将输入数据复制并发送到下游算子的所有并行任务中去。
stream.broadcast()全局分区global
全局分区也是一种特殊的分区方式。这种做法非常极端通过调用.global()方法会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了1所以使用这个操作需要非常谨慎可能对程序造成很大的压力。
stream.global()自定义分区Custom
当Flink提供的所有分区策略都不能满足用户的需求时我们可以通过使用partitionCustom()方法来自定义分区策略。 1自定义分区器
public class MyPartitioner implements PartitionerString {Overridepublic int partition(String key, int numPartitions) {return Integer.parseInt(key) % numPartitions;}
}2使用自定义分区
public class PartitionCustomDemo {public static void main(String[] args) throws Exception {
// StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(2);DataStreamSourceString socketDS env.socketTextStream(hadoop102, 7777);DataStreamString myDS socketDS.partitionCustom(new MyPartitioner(),value - value);myDS.print();env.execute();}
}分流
所谓“分流”就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream定义一些筛选条件将符合条件的数据拣选出来放到对应的流里。
简单实现
其实根据条件筛选数据的需求本身非常容易实现只要针对同一条流多次独立调用.filter()方法进行筛选就可以得到拆分之后的流了。 案例需求读取一个整数数字流将数据流划分为奇数流和偶数流。 代码实现
public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperatorInteger ds env.socketTextStream(hadoop102, 7777).map(Integer::valueOf);//将ds 分为两个流 一个是奇数流一个是偶数流//使用filter 过滤两次SingleOutputStreamOperatorInteger ds1 ds.filter(x - x % 2 0);SingleOutputStreamOperatorInteger ds2 ds.filter(x - x % 2 1);ds1.print(偶数);ds2.print(奇数);env.execute();}
}这种实现非常简单但代码显得有些冗余——处理逻辑对拆分出的三条流其实是一样的却重复写了三次。而且这段代码背后的含义是将原始数据流stream复制三份然后对每一份分别做筛选这明显是不够高效的。能不能不用复制流直接用一个算子就把它们都拆分开呢
使用侧输出流
简单来说只需要调用上下文ctx的.output()方法就可以输出任意类型的数据了。而侧输出流的标记和提取都离不开一个“输出标签”OutputTag指定了侧输出流的id和类型。 代码实现将WaterSensor按照Id类型进行分流。
public class SplitStreamByOutputTag { public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();SingleOutputStreamOperatorWaterSensor ds env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());OutputTagWaterSensor s1 new OutputTag(s1, Types.POJO(WaterSensor.class)){};OutputTagWaterSensor s2 new OutputTag(s2, Types.POJO(WaterSensor.class)){};//返回的都是主流SingleOutputStreamOperatorWaterSensor ds1 ds.process(new ProcessFunctionWaterSensor, WaterSensor(){Overridepublic void processElement(WaterSensor value, Context ctx, CollectorWaterSensor out) throws Exception {if (s1.equals(value.getId())) {ctx.output(s1, value);} else if (s2.equals(value.getId())) {ctx.output(s2, value);} else {//主流out.collect(value);}}});ds1.print(主流非s1,s2的传感器);SideOutputDataStreamWaterSensor s1DS ds1.getSideOutput(s1);SideOutputDataStreamWaterSensor s2DS ds1.getSideOutput(s2);s1DS.printToErr(s1);s2DS.printToErr(s2);env.execute();}
}基本合流操作
联合Union
最简单的合流操作就是直接将多条流合在一起叫作流的“联合”union。联合操作要求必须流中的数据类型必须相同合并之后的新流会包括所有流中的元素数据类型不变。 在代码中只要基于DataStream直接调用.union()方法传入其他DataStream作为参数就可以实现流的联合了得到的依然是一个DataStream
stream1.union(stream2, stream3, ...)注意union()的参数可以是多个DataStream所以联合操作可以实现多条流的合并。 代码实现我们可以用下面的代码做一个简单测试
public class UnionExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSourceInteger ds1 env.fromElements(1, 2, 3);DataStreamSourceInteger ds2 env.fromElements(2, 2, 3);DataStreamSourceString ds3 env.fromElements(2, 2, 3);ds1.union(ds2,ds3.map(Integer::valueOf)).print();env.execute();}
}连接Connect
流的联合虽然简单不过受限于数据类型不能改变灵活性大打折扣所以实际应用较少出现。除了联合unionFlink还提供了另外一种方便的合流操作——连接connect。 1连接流ConnectedStreams 为了处理更加灵活连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型所以连接得到的并不是DataStream而是一个“连接流”。连接流可以看成是两条流形式上的“统一”被放在了一个同一个流中事实上内部仍保持各自的数据形式不变彼此之间是相互独立的。要想得到新的DataStream还需要进一步定义一个“同处理”co-process转换操作用来说明对于不同来源、不同类型的数据怎样分别进行处理转换、得到统一的输出类型。 所以整体上来两条流的连接就像是“一国两制”两条流可以保持各自的数据类型、处理方式也可以不同不过最终还是会统一到同一个DataStream中。 代码实现需要分为两步首先基于一条DataStream调用.connect()方法传入另外一条DataStream作为参数将两条流连接起来得到一个ConnectedStreams然后再调用同处理方法得到DataStream。这里可以的调用的同处理方法有.map()/.flatMap()以及.process()方法。
public class ConnectDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// DataStreamSourceInteger source1 env.fromElements(1, 2, 3);
// DataStreamSourceString source2 env.fromElements(a, b, c);SingleOutputStreamOperatorInteger source1 env.socketTextStream(hadoop102, 7777).map(i - Integer.parseInt(i));DataStreamSourceString source2 env.socketTextStream(hadoop102, 8888);/*** TODO 使用 connect 合流* 1、一次只能连接 2条流* 2、流的数据类型可以不一样* 3、 连接后可以调用 map、flatmap、process来处理但是各处理各的*/ConnectedStreamsInteger, String connect source1.connect(source2);SingleOutputStreamOperatorString result connect.map(new CoMapFunctionInteger, String, String() {Overridepublic String map1(Integer value) throws Exception {return 来源于数字流: value.toString();}Overridepublic String map2(String value) throws Exception {return 来源于字母流: value;}});result.print();env.execute(); }
}上面的代码中ConnectedStreams有两个类型参数分别表示内部包含的两条流各自的数据类型由于需要“一国两制”因此调用.map()方法时传入的不再是一个简单的MapFunction而是一个CoMapFunction表示分别对两条流中的数据执行map操作。这个接口有三个类型参数依次表示第一条流、第二条流以及合并后的流中的数据类型。需要实现的方法也非常直白.map1()就是对第一条流中数据的map操作.map2()则是针对第二条流。 2CoProcessFunction 与CoMapFunction类似如果是调用.map()就需要传入一个CoMapFunction需要实现map1()、map2()两个方法而调用.process()时传入的则是一个CoProcessFunction。它也是“处理函数”家族中的一员用法非常相似。它需要实现的就是processElement1()、processElement2()两个方法在每个数据到来时会根据来源的流调用其中的一个方法进行处理。 值得一提的是ConnectedStreams也可以直接调用.keyBy()进行按键分区的操作得到的还是一个ConnectedStreams
connectedStreams.keyBy(keySelector1, keySelector2);这里传入两个参数keySelector1和keySelector2是两条流中各自的键选择器当然也可以直接传入键的位置值keyPosition或者键的字段名field这与普通的keyBy用法完全一致。ConnectedStreams进行keyBy操作其实就是把两条流中key相同的数据放到了一起然后针对来源的流再做各自处理这在一些场景下非常有用。 案例需求连接两条流输出能根据id匹配上的数据类似inner join效果
public class ConnectKeybyDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSourceTuple2Integer, String source1 env.fromElements(Tuple2.of(1, a1),Tuple2.of(1, a2),Tuple2.of(2, b),Tuple2.of(3, c));DataStreamSourceTuple3Integer, String, Integer source2 env.fromElements(Tuple3.of(1, aa1, 1),Tuple3.of(1, aa2, 2),Tuple3.of(2, bb, 1),Tuple3.of(3, cc, 1));ConnectedStreamsTuple2Integer, String, Tuple3Integer, String, Integer connect source1.connect(source2);// 多并行度下需要根据 关联条件 进行keyby才能保证key相同的数据到一起去才能匹配上ConnectedStreamsTuple2Integer, String, Tuple3Integer, String, Integer connectKey connect.keyBy(s1 - s1.f0, s2 - s2.f0);SingleOutputStreamOperatorString result connectKey.process(new CoProcessFunctionTuple2Integer, String, Tuple3Integer, String, Integer, String() {// 定义 HashMap缓存来过的数据keyidvaluelist数据MapInteger, ListTuple2Integer, String s1Cache new HashMap();MapInteger, ListTuple3Integer, String, Integer s2Cache new HashMap();Overridepublic void processElement1(Tuple2Integer, String value, Context ctx, CollectorString out) throws Exception {Integer id value.f0;// TODO 1.来过的s1数据都存起来if (!s1Cache.containsKey(id)) {// 1.1 第一条数据初始化 value的list放入 hashmapListTuple2Integer, String s1Values new ArrayList();s1Values.add(value);s1Cache.put(id, s1Values);} else {// 1.2 不是第一条直接添加到 list中s1Cache.get(id).add(value);}//TODO 2.根据id查找s2的数据只输出 匹配上 的数据if (s2Cache.containsKey(id)) {for (Tuple3Integer, String, Integer s2Element : s2Cache.get(id)) {out.collect(s1: value ---------s2: s2Element);}}}Overridepublic void processElement2(Tuple3Integer, String, Integer value, Context ctx, CollectorString out) throws Exception {Integer id value.f0;// TODO 1.来过的s2数据都存起来if (!s2Cache.containsKey(id)) {// 1.1 第一条数据初始化 value的list放入 hashmapListTuple3Integer, String, Integer s2Values new ArrayList();s2Values.add(value);s2Cache.put(id, s2Values);} else {// 1.2 不是第一条直接添加到 list中s2Cache.get(id).add(value);}//TODO 2.根据id查找s1的数据只输出 匹配上 的数据if (s1Cache.containsKey(id)) {for (Tuple2Integer, String s1Element : s1Cache.get(id)) {out.collect(s1: s1Element ---------s2: value);}}}});result.print();env.execute();}
}输出算子Sink
Flink作为数据处理框架最终还是要把计算处理的结果写入外部存储为外部应用提供支持。
连接到外部系统
Flink的DataStream API专门提供了向外部写入数据的方法addSink。与addSource类似addSink方法对应着一个“Sink”算子主要就是用来实现与外部系统连接、并将数据提交写入的Flink程序中所有对外的输出操作一般都是利用Sink算子完成的。 Flink1.12以前Sink算子的创建是通过调用DataStream的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));addSink方法同样需要传入一个参数实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke()用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。 Flink1.12开始同样重构了Sink架构
stream.sinkTo(…)当然Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示列出了Flink官方目前支持的第三方系统连接器 我们可以看到像Kafka之类流式系统Flink提供了完美对接source/sink两端都能连接可读可写而对于Elasticsearch、JDBC等数据存储系统则只提供了输出写入的sink连接器。 除Flink官方之外Apache Bahir框架也实现了一些其他第三方系统与Flink的连接器。 除此以外就需要用户自定义实现sink连接器了。
输出到文件
Flink专门提供了一个流式文件系统的连接器FileSink为批处理和流处理提供了一个统一的Sink它可以将分区文件写入Flink支持的文件系统。 FileSink支持行编码Row-encoded和批量编码Bulk-encoded格式。这两种不同的方式都有各自的构建器builder可以直接调用FileSink的静态方法
行编码 FileSink.forRowFormatbasePathrowEncoder。批量编码 FileSink.forBulkFormatbasePathbulkWriterFactory。 示例:
public class SinkFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 每个目录中都有 并行度个数的 文件在写入env.setParallelism(2);// 必须开启checkpoint否则一直都是 .inprogressenv.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);DataGeneratorSourceString dataGeneratorSource new DataGeneratorSource(new GeneratorFunctionLong, String() {Overridepublic String map(Long value) throws Exception {return Number: value;}},Long.MAX_VALUE,RateLimiterStrategy.perSecond(1000),Types.STRING);DataStreamSourceString dataGen env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), data-generator);// 输出到文件系统FileSinkString fieSink FileSink// 输出行式存储的文件指定路径、指定编码.StringforRowFormat(new Path(f:/tmp), new SimpleStringEncoder(UTF-8))// 输出文件的一些配置 文件名的前缀、后缀.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix(atguigu-).withPartSuffix(.log).build())// 按照目录分桶如下就是每个小时一个目录.withBucketAssigner(new DateTimeBucketAssigner(yyyy-MM-dd HH, ZoneId.systemDefault()))// 文件滚动策略: 1分钟 或 1m.withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(1)).withMaxPartSize(new MemorySize(1024*1024)).build()).build();dataGen.sinkTo(fieSink);env.execute();}
}输出到Kafka
1添加Kafka 连接器依赖2启动Kafka集群3编写输出到Kafka的示例代码 输出无key的record:
public class SinkKafka {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 如果是精准一次必须开启checkpoint后续章节介绍env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** Kafka Sink:* TODO 注意如果要使用 精准一次 写入Kafka需要满足以下条件缺一不可* 1、开启checkpoint后续介绍* 2、设置事务前缀* 3、设置事务超时时间 checkpoint间隔 事务超时时间 max的15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// 写到kafka的一致性级别 精准一次、至少一次.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// 如果是精准一次必须设置 事务的前缀.setTransactionalIdPrefix(atguigu-)// 如果是精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000).build();sensorDS.sinkTo(kafkaSink);env.execute();}
}自定义序列化器实现带key的record:
public class SinkKafkaWithKey {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);env.setRestartStrategy(RestartStrategies.noRestart());SingleOutputStreamOperatorString sensorDS env.socketTextStream(hadoop102, 7777);/*** 如果要指定写入kafka的key可以自定义序列化器* 1、实现 一个接口重写 序列化 方法* 2、指定key转成 字节数组* 3、指定value转成 字节数组* 4、返回一个 ProducerRecord对象把key、value放进去*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setRecordSerializer(new KafkaRecordSerializationSchemaString() {NullableOverridepublic ProducerRecordbyte[], byte[] serialize(String element, KafkaSinkContext context, Long timestamp) {String[] datas element.split(,);byte[] key datas[0].getBytes(StandardCharsets.UTF_8);byte[] value element.getBytes(StandardCharsets.UTF_8);return new ProducerRecord(ws, key, value);}}).setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE).setTransactionalIdPrefix(atguigu-).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 ).build();sensorDS.sinkTo(kafkaSink);env.execute();}
}4运行代码在Linux主机启动一个消费者查看是否收到数据
[jjmhadoop102 ~]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ws输出到MySQLJDBC
写入数据的MySQL的测试步骤如下。
1添加依赖 添加MySQL驱动
dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.27/version
/dependency如果官方还未提供flink-connector-jdbc的1.17.0的正式依赖暂时从apache snapshot仓库下载pom文件中指定仓库路径
repositoriesrepositoryidapache-snapshots/idnameapache snapshots/name
urlhttps://repository.apache.org/content/repositories/snapshots//url/repository
/repositories添加依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion1.17-SNAPSHOT/version
/dependency如果不生效还需要修改本地maven的配置文件mirrorOf中添加!apache-snapshots内容 mirroridaliyunmaven/idmirrorOf*,!apache-snapshots/mirrorOfname阿里云公共仓库/nameurlhttps://maven.aliyun.com/repository/public/url/mirror如果官方已提供flink-connector-jdbc的1.17.0的正式依赖则只需添加以下依赖就可
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version
/dependency
2启动MySQL在test库下建表ws
mysql
CREATE TABLE ws (id varchar(100) NOT NULL,ts bigint(20) DEFAULT NULL,vc int(11) DEFAULT NULL,PRIMARY KEY (id)
) ENGINEInnoDB DEFAULT CHARSETutf83编写输出到MySQL的示例代码
public class SinkMySQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());/*** TODO 写入mysql* 1、只能用老的sink写法 addsink* 2、JDBCSink的4个参数:* 第一个参数 执行的sql一般就是 insert into* 第二个参数 预编译sql 对占位符填充值* 第三个参数 执行选项 ---》 攒批、重试* 第四个参数 连接选项 ---》 url、用户名、密码*/SinkFunctionWaterSensor jdbcSink JdbcSink.sink(insert into ws values(?,?,?),new JdbcStatementBuilderWaterSensor() {Overridepublic void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {//每收到一条WaterSensor如何去填充占位符preparedStatement.setString(1, waterSensor.getId());preparedStatement.setLong(2, waterSensor.getTs());preparedStatement.setInt(3, waterSensor.getVc());}},JdbcExecutionOptions.builder().withMaxRetries(3) // 重试次数.withBatchSize(100) // 批次的大小条数.withBatchIntervalMs(3000) // 批次的时间.build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl(jdbc:mysql://hadoop102:3306/test?serverTimezoneAsia/ShanghaiuseUnicodetruecharacterEncodingUTF-8).withUsername(root).withPassword(123456).withConnectionCheckTimeoutSeconds(60) // 重试的超时时间.build());sensorDS.addSink(jdbcSink);env.execute();}
}(4) 运行代码用客户端连接MySQL查看是否成功写入数据。
自定义Sink输出
如果我们想将数据存储到我们自己的存储设备中而Flink并没有提供可以直接使用的连接器就只能自定义Sink进行输出了。与Source类似Flink为我们提供了通用的SinkFunction接口和对应的RichSinkDunction抽象类只要实现它通过简单地调用DataStream的.addSink()方法就可以自定义写入任何外部存储。
stream.addSink(new MySinkFunctionString());在实现SinkFunction的时候需要重写的一个关键方法invoke()在这个方法中我们就可以实现将流里的数据发送出去的逻辑。 这种方式比较通用对于任何外部存储系统都有效不过自定义Sink想要实现状态一致性并不容易所以一般只在没有其它选择时使用。实际项目中用到的外部连接器Flink官方基本都已实现而且在不断地扩充因此自定义的场景并不常见。
Flink中的时间和窗口
窗口Window
窗口的概念
Flink是一种流式计算引擎主要是来处理无界数据流的数据源源不断、无穷无尽。想要更加方便高效地处理无界流一种方式就是将无限数据切割成有限的“数据块”进行处理这就是所谓的“窗口”Window。 在Flink中窗口其实并不是一个“框”应该把窗口理解成一个“桶”。在Flink中窗口可以把流切割成有限大小的多个“存储桶”bucket)每个数据都会分发到对应的桶中当到达窗口结束时间时就对每个桶中收集的数据进行计算处理。 注意Flink中窗口并不是静态准备好的而是动态创建——当有落在这个窗口区间范围的数据达到时才创建对应的窗口。另外这里我们认为到达窗口结束时间时窗口就触发计算并关闭事实上“触发计算”和“窗口关闭”两个行为也可以分开
窗口的分类
1按照驱动类型分 窗口本身是截取有界数据的一种方式所以窗口一个非常重要的信息其实就是“怎样截取数据”。换句话说就是以什么标准来开始和结束数据的截取我们把它叫作窗口的“驱动类型”。 1时间窗口Time Window 时间窗口以时间点来定义窗口的开始start和结束end所以截取出的就是某一时间段的数据。到达结束时间时窗口不再收集数据触发计算输出结果并将窗口关闭销毁。所以可以说基本思路就是**“定点发车”**。2计数窗口Count Window 计数窗口基于元素的个数来截取数据到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数就是窗口的大小。基本思路是“人齐发车”。 2按照窗口分配数据的规则分类 根据分配数据的规则窗口的具体实现可以分为4类滚动窗口Tumbling Window、滑动窗口Sliding Window、会话窗口Session Window以及全局窗口Global Window。1滚动窗口Tumbling Windows 滚动窗口有固定的大小是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠也不会有间隔是“首尾相接”的状态。这是最简单的窗口形式每个数据都会被分配到一个窗口而且只会属于一个窗口。 滚动窗口可以基于时间定义也可以基于数据个数定义需要的参数只有一个就是窗口的大小window size。滚动窗口应用非常广泛它可以对每个时间段做聚合统计很多BI分析指标都可以用它来实现。2滑动窗口Sliding Windows 滑动窗口的大小也是固定的。但是窗口之间并不是首尾相接的而是可以“错开”一定的位置。 定义滑动窗口的参数有两个除去窗口大小window size之外还有一个“滑动步长”window slide它其实就代表了窗口计算的频率。窗口在结束时间触发计算输出结果那么滑动步长就代表了计算频率。 当滑动步长小于窗口大小时滑动窗口就会出现重叠这时数据也可能会被同时分配到多个窗口中。而具体的个数就由窗口大小和滑动步长的比值size/slide来决定。 滚动窗口也可以看作是一种特殊的滑动窗口——窗口大小等于滑动步长size slide。 滑动窗口适合计算结果更新频率非常高的场景3会话窗口Session Windows 会话窗口是基于“会话”session来来对数据进行分组的。会话窗口只能基于时间来定义。 会话窗口中最重要的参数就是会话的超时时间也就是两个会话窗口之间的最小距离。如果相邻两个数据到来的时间间隔Gap小于指定的大小size那说明还在保持会话它们就属于同一个窗口如果gap大于size那么新来的数据就应该属于新的会话窗口而前一个窗口就应该关闭了。 会话窗口的长度不固定起始和结束时间也是不确定的各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的而且会留有至少为size的间隔session gap。在一些类似保持会话的场景下可以使用会话窗口来进行数据的处理统计。4全局窗口Global Windows “全局窗口”这种窗口全局有效会把相同key的所有数据都分配到同一个窗口中。这种窗口没有结束的时候默认是不会做触发计算的。如果希望它能对数据进行计算处理还需要自定义“触发器”Trigger。 全局窗口没有结束的时间点所以一般在希望做更加灵活的窗口处理时自定义使用。Flink中的计数窗口Count Window底层就是用全局窗口实现的。
窗口API概览
1按键分区Keyed和非按键分区Non-Keyed 在定义窗口操作之前首先需要确定到底是基于按键分区Keyed的数据流KeyedStream来开窗还是直接在没有按键分区的DataStream上开窗。也就是说在调用窗口算子之前是否有keyBy操作。 1按键分区窗口Keyed Windows 经过按键分区keyBy操作后数据流会按照key被分为多条逻辑流logical streams这就是KeyedStream。基于KeyedStream进行窗口操作时窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务而窗口操作会基于每个key进行单独的处理。所以可以认为每个key上都定义了一组窗口各自独立地进行统计计算。 在代码实现上我们需要先对DataStream调用.keyBy()进行按键分区然后再调用.window()定义窗口。
stream.keyBy(...).window(...)2非按键分区Non-Keyed Windows 如果没有进行keyBy那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务task上执行就相当于并行度变成了1。 在代码中直接基于DataStream调用.windowAll()定义窗口。
stream.windowAll(...)注意对于非按键分区的窗口操作手动调大窗口算子的并行度也是无效的windowAll本身就是一个非并行的操作。
2代码中窗口API的调用 窗口操作主要有两个部分窗口分配器Window Assigners和窗口函数Window Functions。
stream.keyBy(key selector).window(window assigner).aggregate(window function)其中.window()方法需要传入一个窗口分配器它指明了窗口的类型而后面的.aggregate()方法传入一个窗口函数作为参数它用来定义窗口具体的处理逻辑。窗口分配器有各种形式而窗口函数的调用方法也不只.aggregate()一种我们接下来就详细展开讲解。
窗口分配器
时间窗口
时间窗口是最常用的窗口类型又可以细分为滚动、滑动和会话三种。
1滚动处理时间窗口 窗口分配器由类TumblingProcessingTimeWindows提供需要调用它的静态方法.of()。
stream.keyBy(...).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).aggregate(...)这里.of()方法需要传入一个Time类型的参数size表示滚动窗口的大小我们这里创建了一个长度为5秒的滚动窗口。 另外.of()还有一个重载方法可以传入两个Time类型的参数size和offset。第一个参数当然还是窗口大小第二个参数则表示窗口起始点的偏移量。
2滑动处理时间窗口 窗口分配器由类SlidingProcessingTimeWindows提供同样需要调用它的静态方法.of()。
stream.keyBy(...).window(SlidingProcessingTimeWindows.of(Time.seconds(10)Time.seconds(5))).aggregate(...)这里.of()方法需要传入两个Time类型的参数size和slide前者表示滑动窗口的大小后者表示滑动窗口的滑动步长。我们这里创建了一个长度为10秒、滑动步长为5秒的滑动窗口。 滑动窗口同样可以追加第三个参数用于指定窗口起始点的偏移量用法与滚动窗口完全一致。
3处理时间会话窗口 窗口分配器由类ProcessingTimeSessionWindows提供需要调用它的静态方法.withGap()或者.withDynamicGap()。
stream.keyBy(...).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)这里.withGap()方法需要传入一个Time类型的参数size表示会话的超时时间也就是最小间隔session gap。我们这里创建了静态会话超时时间为10秒的会话窗口。 另外还可以调用withDynamicGap()方法定义session gap的动态提取逻辑。
4滚动事件时间窗口 窗口分配器由类TumblingEventTimeWindows提供用法与滚动处理事件窗口完全一致。
stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.seconds(5))).aggregate(...)5滑动事件时间窗口 窗口分配器由类SlidingEventTimeWindows提供用法与滑动处理事件窗口完全一致。
stream.keyBy(...).window(SlidingEventTimeWindows.of(Time.seconds(10)Time.seconds(5))).aggregate(...)6事件时间会话窗口 窗口分配器由类EventTimeSessionWindows提供用法与处理事件会话窗口完全一致。
stream.keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(10))).aggregate(...)计数窗口
计数窗口概念非常简单本身底层是基于全局窗口Global Window实现的。Flink为我们提供了非常方便的接口直接调用.countWindow()方法。根据分配规则的不同又可以分为滚动计数窗口和滑动计数窗口两类下面我们就来看它们的具体实现。
1滚动计数窗口 滚动计数窗口只需要传入一个长整型的参数size表示窗口的大小。
stream.keyBy(...).countWindow(10)我们定义了一个长度为10的滚动计数窗口当窗口中元素数量达到10的时候就会触发计算执行并关闭窗口。
2滑动计数窗口 与滚动计数窗口类似不过需要在.countWindow()调用时传入两个参数size和slide前者表示窗口大小后者表示滑动步长。
stream.keyBy(...).countWindow(103)我们定义了一个长度为10、滑动步长为3的滑动计数窗口。每个窗口统计10个数据每隔3个数据就统计输出一次结果。
3全局窗口 全局窗口是计数窗口的底层实现一般在需要自定义窗口时使用。它的定义同样是直接调用.window()分配器由GlobalWindows类提供。
stream.keyBy(...).window(GlobalWindows.create());需要注意使用全局窗口必须自行定义触发器才能实现窗口计算否则起不到任何作用。
窗口函数
定义了窗口分配器我们只是知道了数据属于哪个窗口可以将数据收集起来了至于收集起来到底要做什么其实还完全没有头绪。所以在窗口分配器之后必须再接上一个定义窗口如何进行计算的操作这就是所谓的“窗口函数”window functions。 窗口函数定义了要对窗口中收集的数据做的计算操作根据处理的方式可以分为两类增量聚合函数和全窗口函数。
增量聚合函数ReduceFunction / AggregateFunction
窗口将数据收集起来最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次这就是“增量聚合”。 典型的增量聚合函数有两个ReduceFunction和AggregateFunction。 1归约函数ReduceFunction 代码示例
public class WindowReduceDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).keyBy(r - r.getId())// 设置滚动事件时间窗口.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunctionWaterSensor() {Overridepublic WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {System.out.println(调用reduce方法之前的结果:value1 ,现在来的数据:value2);return new WaterSensor(value1.getId(), System.currentTimeMillis(),value1.getVc()value2.getVc());}}).print();env.execute();}
}2聚合函数AggregateFunction ReduceFunction可以解决大多数归约聚合的问题但是这个接口有一个限制就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。 Flink Window API中的aggregate就突破了这个限制可以定义更加灵活的窗口聚合操作。这个方法需要传入一个AggregateFunction的实现类作为参数。 AggregateFunction可以看作是ReduceFunction的通用版本这里有三种类型输入类型IN、累加器类型ACC和输出类型OUT。输入类型IN就是输入流中元素的数据类型累加器类型ACC则是我们进行聚合的中间状态类型而输出类型当然就是最终计算结果的类型了。 接口中有四个方法
createAccumulator()创建一个累加器这就是为聚合创建了一个初始状态每个聚合任务只会调用一次。add()将输入的元素添加到累加器中。getResult()从累加器中提取聚合的输出结果。merge()合并两个累加器并将合并后的状态作为一个累加器返回。 所以可以看到AggregateFunction的工作原理是首先调用createAccumulator()为任务初始化一个状态累加器而后每来一个数据就调用一次add()方法对数据进行聚合得到的结果保存在状态中等到了窗口需要输出时再调用getResult()方法得到计算结果。很明显与ReduceFunction相同AggregateFunction也是增量式的聚合而由于输入、中间状态、输出的类型可以不同使得应用更加灵活方便。 代码实现如下
public class WindowAggregateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorString aggregate sensorWS.aggregate(new AggregateFunctionWaterSensor, Integer, String() {Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(调用add方法,valuevalue);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用getResult方法);return accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {System.out.println(调用merge方法);return null;}});aggregate.print();env.execute();}
}另外Flink也为窗口的聚合提供了一系列预定义的简单聚合方法可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy()与KeyedStream的简单聚合非常相似。它们的底层其实都是通过AggregateFunction来实现的。
全窗口函数full window functions
有些场景下我们要做的计算必须基于全部的数据才有效这时做增量聚合就没什么意义了另外输出的结果有可能要包含上下文中的一些信息比如窗口的起始时间这是增量聚合函数做不到的。 所以我们还需要有更丰富的窗口计算方式。窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同全窗口函数需要先收集窗口中的数据并在内部缓存起来等到窗口要输出结果的时候再取出数据进行计算。 在Flink中全窗口函数也有两种WindowFunction和ProcessWindowFunction。
1窗口函数WindowFunction WindowFunction字面上就是“窗口函数”它其实是老版本的通用窗口函数接口。我们可以基于WindowedStream调用.apply()方法传入一个WindowFunction的实现类。
stream.keyBy(key selector).window(window assigner).apply(new MyWindowFunction());这个类中可以获取到包含窗口所有数据的可迭代集合Iterable还可以拿到窗口Window本身的信息。 不过WindowFunction能提供的上下文信息较少也没有更高级的功能。事实上它的作用可以被ProcessWindowFunction全覆盖所以之后可能会逐渐弃用。
2处理窗口函数ProcessWindowFunction ProcessWindowFunction是Window API中最底层的通用窗口函数接口。之所以说它“最底层”是因为除了可以拿到窗口中的所有数据之外ProcessWindowFunction还可以获取到一个“上下文对象”Context。这个上下文对象非常强大不仅能够获取窗口信息还可以访问当前的时间和状态信息。这里的时间就包括了处理时间processing time和事件时间水位线event time watermark。这就使得ProcessWindowFunction更加灵活、功能更加丰富其实就是一个增强版的WindowFunction。 事实上ProcessWindowFunction是Flink底层API——处理函数process function中的一员。 代码实现如下
public class WindowProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));SingleOutputStreamOperatorString process sensorWS.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long count elements.spliterator().estimateSize();long windowStartTs context.window().getStart();long windowEndTs context.window().getEnd();String windowStart DateFormatUtils.format(windowStartTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(windowEndTs, yyyy-MM-dd HH:mm:ss.SSS);out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}});process.print();env.execute();}
}增量聚合和全窗口函数的结合使用
在实际应用中我们往往希望兼具这两者的优点把它们结合在一起使用。Flink的Window API就给我们实现了这样的用法。 我们之前在调用WindowedStream的.reduce()和.aggregate()方法时只是简单地直接传入了一个ReduceFunction或AggregateFunction进行增量聚合。除此之外其实还可以传入第二个参数一个全窗口函数可以是WindowFunction或者ProcessWindowFunction。
// ReduceFunction与WindowFunction结合
public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunctionWindowFunctionTRKW function) // ReduceFunction与ProcessWindowFunction结合
public R SingleOutputStreamOperatorR reduce(ReduceFunctionT reduceFunctionProcessWindowFunctionTRKW function)// AggregateFunction与WindowFunction结合
public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunctionWindowFunctionVRKW windowFunction)// AggregateFunction与ProcessWindowFunction结合
public ACCVR SingleOutputStreamOperatorR aggregate(AggregateFunctionTACCV aggFunction,ProcessWindowFunctionVRKW windowFunction)这样调用的处理机制是基于第一个参数增量聚合函数来处理窗口数据每来一个数据就做一次聚合等到窗口需要触发计算时则调用第二个参数全窗口函数的处理逻辑输出结果。需要注意的是这里的全窗口函数就不再缓存所有数据了而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。 具体实现代码如下
public class WindowAggregateAndProcessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// 1. 窗口分配器WindowedStreamWaterSensor, String, TimeWindow sensorWS sensorKS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));// 2. 窗口函数/*** 增量聚合 Aggregate 全窗口 process* 1、增量聚合函数处理数据 来一条计算一条* 2、窗口触发时 增量聚合的结果只有一条 传递给 全窗口函数* 3、经过全窗口函数的处理包装后输出** 结合两者的优点* 1、增量聚合 来一条计算一条存储中间的计算结果占用的空间少* 2、全窗口函数 可以通过 上下文 实现灵活的功能*/// sensorWS.reduce() //也可以传两个SingleOutputStreamOperatorString result sensorWS.aggregate(new MyAgg(),new MyProcess());result.print();env.execute();}public static class MyAgg implements AggregateFunctionWaterSensor, Integer, String{Overridepublic Integer createAccumulator() {System.out.println(创建累加器);return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {System.out.println(调用add方法,valuevalue);return accumulator value.getVc();}Overridepublic String getResult(Integer accumulator) {System.out.println(调用getResult方法);return accumulator.toString();}Overridepublic Integer merge(Integer a, Integer b) {System.out.println(调用merge方法);return null;}}// 全窗口函数的输入类型 增量聚合函数的输出类型public static class MyProcess extends ProcessWindowFunctionString,String,String,TimeWindow{Overridepublic void process(String s, Context context, IterableString elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}
}这里我们为了方便处理单独定义了一个POJO类UrlViewCount来表示聚合输出结果的数据类型包含了url、浏览量以及窗口的起始结束时间。用一个AggregateFunction来实现增量聚合每来一个数据就计数加一得到的结果交给ProcessWindowFunction结合窗口信息包装成我们想要的UrlViewCount最终输出统计结果。
其他API
触发器Trigger
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”本质上就是执行窗口函数所以可以认为是计算得到结果并输出的过程。 基于WindowedStream调用.trigger()方法就可以传入一个自定义的窗口触发器Trigger。
stream.keyBy(...).window(...).trigger(new MyTrigger())移除器Evictor
移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法就可以传入一个自定义的移除器Evictor。Evictor是一个接口不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...).window(...).evictor(new MyEvictor())时间语义
Flink中的时间语义
事件时间一个是数据产生的时间时间戳Timestamp。 处理时间数据真正被处理的时刻 到底是以那种时间作为衡量标准就是所谓的“时间语义”。
哪种时间语义更重要
在实际应用中事件时间语义会更为常见。一般情况下业务日志数据中都会记录数据生成的时间戳timestamp它就可以作为事件时间的判断基础。 在Flink中由于处理时间比较简单早期版本默认的时间语义是处理时间而考虑到事件时间在实际应用中更为广泛从Flink1.12版本开始Flink已经将事件时间作为默认的时间语义了。
水位线Watermark
事件时间和窗口 在窗口的处理过程中我们可以基于数据的时间戳自定义一个“逻辑时钟”。这个时钟的时间不会自动流逝它的时间进展就是靠着新到数据的时间戳来推动的。 这样的好处在于计算的过程可以完全不依赖处理时间系统时间不论什么时候进行统计处理得到的结果都是正确的。而一般实时流处理的场景中事件时间可以基本与处理时间保持同步只是略微有一点延迟同时保证了窗口计算的正确性。
什么是水位线
在Flink中用来衡量事件时间进展的标记就被称作“水位线”Watermark。
1有序流中的水位线 1理想状态数据量小数据应该按照生成的先后顺序进入流中每条数据产生一个水位线 2实际应用中如果当前数据量非常大且同时涌来的数据时间差会非常小比如几毫秒往往对处理计算也没什么影响。所以为了提高效率一般会每隔一段时间生成一个水位线。2乱序流中的水位线 在分布式系统中数据在节点间传输会因为网络传输延迟的不确定性导致顺序发生改变这就是所谓的“乱序数据”。 乱序 数据量小我们还是靠数据来驱动每来一个数据就提取它的时间戳、插入一个水位线。不过现在的情况是数据乱序所以插入新的水位线时要先判断一下时间戳是否比之前的大否则就不再生成新的水位线。也就是说只有数据的时间戳比当前时钟大才能推动时钟前进这时才插入水位线。 乱序 数据量大如果考虑到大量数据同时到来的处理效率我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳需要插入水位线时就直接以它作为时间戳生成新的水位线。 乱序 迟到数据我们无法正确处理“迟到”的数据。为了让窗口能够正确收集到迟到的数据我们也可以等上一段时间比如2秒也就是用当前已有数据的最大时间戳减去2秒就是要插入的水位线的时间戳。这样的话9秒的数据到来之后事件时钟不会直接推进到9秒而是进展到了7秒必须等到11秒的数据到来之后事件时钟才会进展到9秒这时迟到数据也都已收集齐0~9秒的窗口就可以正确计算结果了。 现在我们可以知道水位线就代表了当前的事件时间时钟而且可以在数据的时间戳基础上加一些延迟来保证不丢数据这一点对于乱序流的正确处理非常重要。3水位线特性 水位线是插入到数据流中的一个标记可以认为是一个特殊的数据水位线主要的内容是一个时间戳用来表示当前事件时间的进展水位线是基于数据的时间戳生成的水位线的时间戳必须单调递增以确保任务的事件时间时钟一直向前推进水位线可以通过设置延迟来保证正确处理乱序数据一个水位线Watermark(t)表示在当前流中事件时间已经达到了时间戳t这代表t之前的所有数据都到齐了之后流中不会出现时间戳t’ ≤ t的数据
水位线和窗口的工作原理
窗口 我们定义一个时间窗口每10秒统计一次数据那么就相当于把窗口放在那里从0秒开始收集数据到10秒时处理当前窗口内所有数据输出一个结果然后清空窗口继续收集数据到20秒时再对窗口内所有数据进行计算处理输出结果依次类推。 注意为了明确数据划分到哪一个窗口定义窗口都是包含起始时间、不包含结束时间的用数学符号表示就是一个左闭右开的区间例如0~10秒的窗口可以表示为[0, 10)这里单位为秒。 正确理解在Flink中窗口其实并不是一个“框”应该把窗口理解成一个“桶”。在Flink中窗口可以把流切割成有限大小的多个“存储桶”bucket)每个数据都会分发到对应的桶中当到达窗口结束时间时就对每个桶中收集的数据进行计算处理。 注意Flink中窗口并不是静态准备好的而是动态创建——当有落在这个窗口区间范围的数据达到时才创建对应的窗口。另外这里我们认为到达窗口结束时间时窗口就触发计算并关闭事实上“触发计算”和“窗口关闭”两个行为也可以分开。
生成水位线
生成水位线的总体原则
完美的水位线是“绝对正确”的也就是一个水位线一旦出现就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。不过如果要保证绝对正确就必须等足够长的时间这会带来更高的延迟。 如果我们希望处理得更快、实时性更强那么可以将水位线延迟设得低一些。这种情况下可能很多迟到数据会在水位线之后才到达就会导致窗口遗漏数据计算结果不准确。当然如果我们对准确性完全不考虑、一味地追求处理速度可以直接使用处理时间语义这在理论上可以得到最低的延迟。 所以Flink中的水位线其实是流处理中对低延迟和结果正确性的一个权衡机制而且把控制的权力交给了程序员我们可以在代码中定义水位线的生成策略。
水位线生成策略
在Flink的DataStream API中有一个单独用于生成水位线的方法.assignTimestampsAndWatermarks()它主要用来为流中的数据分配时间戳并生成水位线来指示事件时间。具体使用如下
DataStreamEvent stream env.addSource(new ClickSource());DataStreamEvent withTimestampsAndWatermarks
stream.assignTimestampsAndWatermarks(watermark strategy);说明WatermarkStrategy作为参数这就是所谓的“水位线生成策略”。WatermarkStrategy是一个接口该接口中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator。
public interface WatermarkStrategyT extends TimestampAssignerSupplierT,WatermarkGeneratorSupplierT{// 负责从流中数据元素的某个字段中提取时间戳并分配给元素。时间戳的分配是生成水位线的基础。OverrideTimestampAssignerT createTimestampAssigner(TimestampAssignerSupplier.Context context);// 主要负责按照既定的方式基于时间戳生成水位线OverrideWatermarkGeneratorT createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}Flink内置水位线
1有序流中内置水位线设置 对于有序流主要特点就是时间戳单调增长所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。
public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成升序的watermark没有等待时间.WaterSensorforMonotonousTimestamps()// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner(new SerializableTimestampAssignerWaterSensor() {Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;}});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();}
}2乱序流中内置水位线设置 由于乱序流中需要等待迟到数据到齐所以必须设置一个固定量的延迟时间。这时生成水位线的时间戳就是当前数据流中最大的时间戳减去延迟的结果相当于把表调慢当前时钟会滞后于数据的最大时间戳。调用WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个maxOutOfOrderness参数表示“最大乱序程度”它表示数据流中乱序数据时间戳的最大差值如果我们能确定乱序程度那么设置对应时间长度的延迟就可以等到所有的乱序数据了。
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());// TODO 1.定义Watermark策略WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy// 1.1 指定watermark生成乱序的等待3s.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3))// 1.2 指定 时间戳分配器从数据中提取.withTimestampAssigner((element, recordTimestamp) - {// 返回的时间戳要 毫秒System.out.println(数据 element ,recordTs recordTimestamp);return element.getTs() * 1000L;});// TODO 2. 指定 watermark策略SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);sensorDSwithWatermark.keyBy(sensor - sensor.getId())// TODO 3.使用 事件时间语义 的窗口.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}}).print();env.execute();}
}自定义水位线生成器
1周期性水位线生成器Periodic Generator 周期性生成器一般是通过onEvent()观察判断输入的事件而在onPeriodicEmit()里发出水位线。 下面是一段自定义周期性生成水位线的代码
import com.atguigu.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategyEvent {Overridepublic TimestampAssignerEvent createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssignerEvent() {Overridepublic long extractTimestamp(Event elementlong recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}Overridepublic WatermarkGeneratorEvent createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGeneratorEvent {private Long delayTime 5000L; // 延迟时间private Long maxTs -Long.MAX_VALUE delayTime 1L; // 观察到的最大时间戳Overridepublic void onEvent(Event eventlong eventTimestampWatermarkOutput output) {// 每来一条数据就调用一次maxTs Math.max(event.timestampmaxTs); // 更新最大时间戳}Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}我们在onPeriodicEmit()里调用output.emitWatermark()就可以发出水位线了这个方法由系统框架周期性地调用默认200ms一次。 如果想修改默认周期时间可以通过下面方法修改。例如修改为400ms
env.getConfig().setAutoWatermarkInterval(400L);2断点式水位线生成器Punctuated Generator 断点式生成器会不停地检测onEvent()中的事件当发现带有水位线信息的事件时就立即发出水位线。我们把发射水位线的逻辑写在onEvent方法当中即可。3在数据源中发送水位线 我们也可以在自定义的数据源中抽取事件时间然后发送水位线。这里要注意的是在自定义数据源中发送了水位线以后就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。示例程序如下
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource
)水位线的传递 在流处理中上游任务处理完水位线、时钟改变之后要把当前的水位线再次发出广播给所有的下游子任务。而当一个任务接收到多个上游并行任务传递来的水位线时应该以最小的那个作为当前任务的事件时钟。水位线在上下游任务之间的传递非常巧妙地避免了分布式系统中没有统一时钟的问题每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
迟到数据的处理
推迟水印推进
在水印产生时设置一个乱序容忍度推迟系统时间的推进保证窗口计算被延迟执行为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));设置窗口延迟关闭
Flink的窗口也允许迟到数据。当触发了窗口计算后会先计算当前的结果但是此时并不会关闭窗口。 以后每来一条迟到数据就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间推迟时间此时窗口会真正关闭。
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))注意: 允许迟到只能运用在event time上
使用侧流接收迟到的数据
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(lateWS)完整案例代码如下
public class WatermarkLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());WatermarkStrategyWaterSensor watermarkStrategy WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, recordTimestamp) - element.getTs() * 1000L);SingleOutputStreamOperatorWaterSensor sensorDSwithWatermark sensorDS.assignTimestampsAndWatermarks(watermarkStrategy);OutputTagWaterSensor lateTag new OutputTag(late-data, Types.POJO(WaterSensor.class));SingleOutputStreamOperatorString process sensorDSwithWatermark.keyBy(sensor - sensor.getId()).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2)) // 推迟2s关窗.sideOutputLateData(lateTag) // 关窗后的迟到数据放入侧输出流.process(new ProcessWindowFunctionWaterSensor, String, String, TimeWindow() {Overridepublic void process(String s, Context context, IterableWaterSensor elements, CollectorString out) throws Exception {long startTs context.window().getStart();long endTs context.window().getEnd();String windowStart DateFormatUtils.format(startTs, yyyy-MM-dd HH:mm:ss.SSS);String windowEnd DateFormatUtils.format(endTs, yyyy-MM-dd HH:mm:ss.SSS);long count elements.spliterator().estimateSize();out.collect(key s 的窗口[ windowStart , windowEnd )包含 count 条数据 elements.toString());}});process.print();// 从主流获取侧输出流打印process.getSideOutput(lateTag).printToErr(关窗后的迟到数据);env.execute();}
}基于时间的合流——双流联结Join
为了更方便地实现基于时间的合流操作Flink的DataStrema API提供了内置的join算子。
窗口联结Window Join
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子可以定义时间窗口并将两条流中共享一个公共键key的数据放在窗口中进行配对处理。
1窗口联结的调用 窗口联结在代码中的实现首先需要调用DataStream的.join()方法来合并两条流得到一个JoinedStreams接着通过.where()和.equalTo()方法指定两条流中联结的key然后通过.window()开窗口并调用.apply()传入联结窗口函数进行处理计算。通用调用形式如下
stream1.join(stream2).where(KeySelector).equalTo(KeySelector).window(WindowAssigner).apply(JoinFunction)上面代码中.where()的参数是键选择器KeySelector用来指定第一条流中的key而.equalTo()传入的KeySelector则指定了第二条流中的key。两者相同的元素如果在同一窗口中就可以匹配起来并通过一个“联结函数”JoinFunction进行处理了。 这里.window()传入的就是窗口分配器之前讲到的三种时间窗口都可以用在这里滚动窗口tumbling window、滑动窗口sliding window和会话窗口session window。 而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply()没有其他替代的方法。 传入的JoinFunction也是一个函数类接口使用时需要实现内部的.join()方法。这个方法有两个参数分别表示两条流中成对匹配的数据。 其实仔细观察可以发现窗口join的调用语法和我们熟悉的SQL中表的join非常相似
SELECT * FROM table1 t1, table2 t2 WHERE t1.id t2.id; 这句SQL中where子句的表达等价于inner join … on所以本身表示的是两张表基于id的“内连接”inner join。而Flink中的window join同样类似于inner join。也就是说最后处理输出的只有两条流中数据按key配对成功的那些如果某个窗口中一条流的数据没有任何另一条流的数据匹配那么就不会调用JoinFunction的.join()方法也就没有任何输出了。
2窗口联结实例 代码实现
public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer,Integer ds2 env.fromElements(Tuple3.of(a, 1,1),Tuple3.of(a, 11,1),Tuple3.of(b, 2,1),Tuple3.of(b, 12,1),Tuple3.of(c, 14,1),Tuple3.of(d, 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer,IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));// TODO window join// 1. 落在同一个时间窗口范围内才能匹配// 2. 根据keyby的key来进行匹配关联// 3. 只能拿到匹配上的数据类似有固定时间范围的inner joinDataStreamString join ds1.join(ds2).where(r1 - r1.f0) // ds1的keyby.equalTo(r2 - r2.f0) // ds2的keyby.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new JoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 关联上的数据调用join方法* param first ds1的数据* param second ds2的数据* return* throws Exception*/Overridepublic String join(Tuple2String, Integer first, Tuple3String, Integer, Integer second) throws Exception {return first ----- second;}});join.print();env.execute();}
}间隔联结Interval Join
在有些场景下我们要处理的时间间隔可能并不是固定的。这时显然不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧于是窗口内就都没有匹配了会话窗口虽然时间不固定但也明显不适合这个场景。基于时间的窗口联结已经无能为力了。 为了应对这样的需求Flink提供了一种叫作“间隔联结”interval join的合流操作。顾名思义间隔联结的思路就是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔看这期间是否有来自另一条流的数据匹配。
1间隔联结的原理 间隔联结具体的定义方式是我们给定两个时间点分别叫作间隔的“上界”upperBound和“下界”lowerBound于是对于一条流不妨叫作A中的任意一个数据元素a就可以开辟一段时间间隔[a.timestamp lowerBound, a.timestamp upperBound]即以a的时间戳为中心下至下界点、上至上界点的一个闭区间我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流不妨叫B中的数据元素b如果它的时间戳落在了这个区间范围内a和b就可以成功配对进而进行计算输出结果。所以匹配的条件为 a.timestamp lowerBound b.timestamp a.timestamp upperBound 这里需要注意做间隔联结的两条流A和B也必须基于相同的key下界lowerBound应该小于等于上界upperBound两者都可正可负间隔联结目前只支持事件时间语义。 如下图所示我们可以清楚地看到间隔联结的方式 下方的流A去间隔联结上方的流B所以基于A的每个数据元素都可以开辟一个间隔区间。我们这里设置下界为-2毫秒上界为1毫秒。于是对于时间戳为2的A中元素它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内所以就可以得到匹配数据对2, 0和2, 1。同样地A中时间戳为3的元素可匹配区间为[1, 4]B中只有时间戳为1的一个数据可以匹配于是得到匹配数据对3, 1。 所以我们可以看到间隔联结同样是一种内连接inner join。与窗口联结不同的是interval join做匹配的时间段是基于流中数据的所以并不确定而且流B中的数据可以不只在一个区间内被匹配。2间隔联结的调用 间隔联结在代码中是基于KeyedStream的联结join操作。DataStream在keyBy得到KeyedStream之后可以调用.intervalJoin()来合并两条流传入的参数同样是一个KeyedStream两者的key类型应该一致得到的是一个IntervalJoin类型。后续的操作同样是完全固定的先通过.between()方法指定间隔的上下界再调用.process()方法定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数这是处理函数家族的最后一员“处理联结函数”ProcessJoinFunction。 通用调用形式如下
stream1.keyBy(KeySelector).intervalJoin(stream2.keyBy(KeySelector)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunctionInteger, Integer, String(){Overridepublic void processElement(Integer left, Integer right, Context ctx, CollectorString out) {out.collect(left , right);}});可以看到抽象类ProcessJoinFunction就像是ProcessFunction和JoinFunction的结合内部同样有一个抽象方法.processElement()。与其他处理函数不同的是它多了一个参数这自然是因为有来自两条流的数据。参数中left指的就是第一条流中的数据right则是第二条流中与它匹配的数据。每当检测到一组匹配就会调用这里的.processElement()方法经处理转换之后输出结果。
3间隔联结实例 案例需求在电商网站中某些用户行为往往会有短时间内的强关联。我们这里举一个例子我们有两条流一条是下订单的流一条是浏览数据的流。我们可以针对同一个用户来做这样一个联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。 1代码实现正常使用
public class IntervalJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.fromElements(Tuple2.of(a, 1),Tuple2.of(a, 2),Tuple2.of(b, 3),Tuple2.of(c, 4)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer, Integer ds2 env.fromElements(Tuple3.of(a, 1, 1),Tuple3.of(a, 11, 1),Tuple3.of(b, 2, 1),Tuple3.of(b, 12, 1),Tuple3.of(c, 14, 1),Tuple3.of(d, 15, 1)).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, IntegerforMonotonousTimestamps().withTimestampAssigner((value, ts) - value.f1 * 1000L));// TODO interval join//1. 分别做keybykey其实就是关联条件KeyedStreamTuple2String, Integer, String ks1 ds1.keyBy(r1 - r1.f0);KeyedStreamTuple3String, Integer, Integer, String ks2 ds2.keyBy(r2 - r2.f0);//2. 调用 interval joinks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).process(new ProcessJoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 两条流的数据匹配上才会调用这个方法* param left ks1的数据* param right ks2的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement(Tuple2String, Integer left, Tuple3String, Integer, Integer right, Context ctx, CollectorString out) throws Exception {// 进入这个方法是关联上的数据out.collect(left ------ right);}}).print();env.execute();}
}2代码实现处理迟到数据
public class IntervalJoinWithLateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorTuple2String, Integer ds1 env.socketTextStream(hadoop102, 7777).map(new MapFunctionString, Tuple2String, Integer() {Overridepublic Tuple2String, Integer map(String value) throws Exception {String[] datas value.split(,);return Tuple2.of(datas[0], Integer.valueOf(datas[1]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));SingleOutputStreamOperatorTuple3String, Integer, Integer ds2 env.socketTextStream(hadoop102, 8888).map(new MapFunctionString, Tuple3String, Integer, Integer() {Overridepublic Tuple3String, Integer, Integer map(String value) throws Exception {String[] datas value.split(,);return Tuple3.of(datas[0], Integer.valueOf(datas[1]), Integer.valueOf(datas[2]));}}).assignTimestampsAndWatermarks(WatermarkStrategy.Tuple3String, Integer, IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((value, ts) - value.f1 * 1000L));/*** TODO Interval join* 1、只支持事件时间* 2、指定上界、下界的偏移负号代表时间往前正号代表时间往后* 3、process中只能处理 join上的数据* 4、两条流关联后的watermark以两条流中最小的为准* 5、如果 当前数据的事件时间 当前的watermark就是迟到数据 主流的process不处理* between后可以指定将 左流 或 右流 的迟到数据 放入侧输出流*///1. 分别做keybykey其实就是关联条件KeyedStreamTuple2String, Integer, String ks1 ds1.keyBy(r1 - r1.f0);KeyedStreamTuple3String, Integer, Integer, String ks2 ds2.keyBy(r2 - r2.f0);//2. 调用 interval joinOutputTagTuple2String, Integer ks1LateTag new OutputTag(ks1-late, Types.TUPLE(Types.STRING, Types.INT));OutputTagTuple3String, Integer, Integer ks2LateTag new OutputTag(ks2-late, Types.TUPLE(Types.STRING, Types.INT, Types.INT));SingleOutputStreamOperatorString process ks1.intervalJoin(ks2).between(Time.seconds(-2), Time.seconds(2)).sideOutputLeftLateData(ks1LateTag) // 将 ks1的迟到数据放入侧输出流.sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据放入侧输出流.process(new ProcessJoinFunctionTuple2String, Integer, Tuple3String, Integer, Integer, String() {/*** 两条流的数据匹配上才会调用这个方法* param left ks1的数据* param right ks2的数据* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void processElement(Tuple2String, Integer left, Tuple3String, Integer, Integer right, Context ctx, CollectorString out) throws Exception {// 进入这个方法是关联上的数据out.collect(left ------ right);}});process.print(主流);process.getSideOutput(ks1LateTag).printToErr(ks1迟到数据);process.getSideOutput(ks2LateTag).printToErr(ks2迟到数据);env.execute();}
}处理函数
之前所介绍的流处理API无论是基本的转换、聚合还是更为复杂的窗口操作其实都是基于DataStream进行转换的所以可以统称为DataStream API。 在Flink更底层我们可以不定义任何具体的算子比如mapfilter或者window而只是提炼出一个统一的“处理”process操作——它是所有转换算子的一个概括性的表达可以自定义处理逻辑所以这一层接口就被叫作“处理函数”process function。
基本处理函数ProcessFunction
处理函数的功能和使用
在很多应用需求中要求我们对时间有更精细的控制需要能够获取水位线甚至要“把控时间”、定义什么时候做什么事这就不是基本的时间窗口能够实现的了。 这时就需要使用底层的处理函数。处理函数提供了一个“定时服务”TimerService我们可以通过它访问流中的事件event、时间戳timestamp、水位线watermark甚至可以注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类所以拥有富函数类的所有特性同样可以访问状态state和其他运行时信息。此外处理函数还可以直接将数据输出到侧输出流side output中。所以处理函数是最为灵活的处理方法可以实现各种自定义的业务逻辑。 处理函数的使用与基本的转换操作类似只需要直接基于DataStream调用.process()方法就可以了。方法需要传入一个ProcessFunction作为参数用来定义处理逻辑。
stream.process(new MyProcessFunction())这里ProcessFunction不是接口而是一个抽象类继承了AbstractRichFunctionMyProcessFunction是它的一个具体实现。所以所有的处理函数都是富函数RichFunction富函数可以调用的东西这里同样都可以调用。
ProcessFunction解析
在源码中我们可以看到抽象类ProcessFunction继承了AbstractRichFunction有两个泛型类型参数I表示Input也就是输入的数据类型O表示Output也就是处理完成之后输出的数据类型。 内部单独定义了两个方法一个是必须要实现的抽象方法.processElement()另一个是非抽象方法.onTimer()。
public abstract class ProcessFunctionI, O extends AbstractRichFunction {...public abstract void processElement(I value, Context ctx, CollectorO out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, CollectorO out) throws Exception {}...}1抽象方法.processElement() 用于“处理元素”定义了处理的核心逻辑。这个方法对于流中的每个元素都会调用一次参数包括三个输入数据值value上下文ctx以及“收集器”Collectorout。方法没有返回值处理之后的输出数据是通过收集器out来定义的。 value当前流中的输入元素也就是正在处理的数据类型与流中数据类型一致。ctx类型是ProcessFunction中定义的内部抽象类Context表示当前运行的上下文可以获取到当前的时间戳并提供了用于查询时间和注册定时器的“定时服务”TimerService以及可以将数据发送到“侧输出流”side output的方法.output()。out“收集器”类型为Collector用于返回输出数据。使用方式与flatMap算子中的收集器完全一样直接调用out.collect()方法就可以向下游发出一个数据。这个方法可以多次调用也可以不调用。 通过几个参数的分析不难发现ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能而通过富函数提供的获取上下文方法.getRuntimeContext()也可以自定义状态state进行处理这也就能实现聚合操作的功能了。 2非抽象方法.onTimer() 这个方法只有在注册好的定时器触发的时候才会调用而定时器是通过“定时服务”TimerService来注册的。打个比方注册定时器timer就是设了一个闹钟到了设定时间就会响而.onTimer()中定义的就是闹钟响的时候要做的事。所以它本质上是一个基于时间的“回调”callback方法通过时间的进展来触发在事件时间语义下就是由水位线watermark来触发了。 定时方法.onTimer()也有三个参数时间戳timestamp上下文ctx以及收集器out。这里的timestamp是指设定好的触发时间事件时间语义下当然就是水位线了。另外这里同样有上下文和收集器所以也可以调用定时服务TimerService以及任意输出处理之后的数据。 既然有.onTimer()方法做定时触发我们用ProcessFunction也可以自定义数据按照时间分组、定时触发计算输出结果这其实就实现了窗口window的功能。所以说ProcessFunction其实可以实现一切功能。 注意在Flink中只有“按键分区流”KeyedStream才支持设置定时器的操作。
处理函数的分类
我们知道DataStream在调用一些转换方法之后有可能生成新的流类型例如调用.keyBy()之后得到KeyedStream进而再调用.window()之后得到WindowedStream。对于不同类型的流其实都可以直接调用.process()方法进行自定义处理这时传入的参数就都叫作处理函数。当然它们尽管本质相同都是可以访问状态和时间信息的底层API可彼此之间也会有所差异。 Flink提供了8个不同的处理函数
1ProcessFunction 最基本的处理函数基于DataStream直接调用.process()时作为参数传入。2KeyedProcessFunction 对流按键分区后的处理函数基于KeyedStream调用.process()时作为参数传入。要想使用定时器必须基于KeyedStream。3ProcessWindowFunction 开窗之后的处理函数也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。4ProcessAllWindowFunction 同样是开窗之后的处理函数基于AllWindowedStream调用.process()时作为参数传入。5CoProcessFunction 合并connect两条流之后的处理函数基于ConnectedStreams调用.process()时作为参数传入。6ProcessJoinFunction 间隔连接interval join两条流之后的处理函数基于IntervalJoined调用.process()时作为参数传入。 -7BroadcastProcessFunction 广播连接流处理函数基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream是一个未keyBy的普通DataStream与一个广播流BroadcastStream做连接conncet之后的产物。 -8KeyedBroadcastProcessFunction 按键分区的广播连接流处理函数同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是这时的广播连接流是一个KeyedStream与广播流BroadcastStream做连接之后的产物。
按键分区处理函数KeyedProcessFunction
在上节中提到只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以一般情况下我们都是先做了keyBy分区之后再去定义处理操作代码中更加常见的处理函数是KeyedProcessFunction。
定时器Timer和定时服务TimerService
在.onTimer()方法中可以实现定时处理的逻辑而它能触发的前提就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能是通过上下文中提供的“定时服务”来实现的。 定时服务与当前运行的环境有关。前面已经介绍过ProcessFunction的上下文Context中提供了.timerService()方法可以直接返回一个TimerService对象。TimerService是Flink关于时间和定时器的基础服务接口包含以下六个方法
// 获取当前的处理时间
long currentProcessingTime();// 获取当前的水位线事件时间
long currentWatermark();// 注册处理时间定时器当处理时间超过time时触发
void registerProcessingTimeTimer(long time);// 注册事件时间定时器当水位线超过time时触发
void registerEventTimeTimer(long time);// 删除触发时间为time的处理时间定时器
void deleteProcessingTimeTimer(long time);// 删除触发时间为time的处理时间定时器
void deleteEventTimeTimer(long time);六个方法可以分成两大类基于处理时间和基于事件时间。而对应的操作主要有三个获取当前时间注册定时器以及删除定时器。需要注意尽管处理函数中都可以直接访问TimerService不过只有基于KeyedStream的处理函数才能去调用注册和删除定时器的方法未作按键分区的DataStream不支持定时器操作只能获取当前时间。 TimerService会以键key和时间戳为标准对定时器进行去重也就是说对于每个key和时间戳最多只有一个定时器如果注册了多次onTimer()方法也将只被调用一次。
KeyedProcessFunction案例
基于keyBy之后的KeyedStream直接调用.process()方法这时需要传入的参数就是KeyedProcessFunction的实现类。 stream.keyBy( t - t.f0 ) .process(new MyKeyedProcessFunction()) 类似地KeyedProcessFunction也是继承自AbstractRichFunction的一个抽象类与ProcessFunction的定义几乎完全一样区别只是在于类型参数多了一个K这是当前按键分区的key的类型。同样地我们必须实现一个.processElement()抽象方法用来处理流中的每一个数据另外还有一个非抽象方法.onTimer()用来定义定时器触发时的回调操作。 代码如下
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));KeyedStreamWaterSensor, String sensorKS sensorDS.keyBy(sensor - sensor.getId());// TODO Process:keyedSingleOutputStreamOperatorString process sensorKS.process(new KeyedProcessFunctionString, WaterSensor, String() {/*** 来一条数据调用一次* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {//获取当前数据的keyString currentKey ctx.getCurrentKey();// TODO 1.定时器注册TimerService timerService ctx.timerService();// 1、事件时间的案例Long currentEventTime ctx.timestamp(); // 数据中提取出来的事件时间timerService.registerEventTimeTimer(5000L);System.out.println(当前key currentKey ,当前时间 currentEventTime ,注册了一个5s的定时器);// 2、处理时间的案例
// long currentTs timerService.currentProcessingTime();
// timerService.registerProcessingTimeTimer(currentTs 5000L);
// System.out.println(当前key currentKey ,当前时间 currentTs ,注册了一个5s后的定时器);// 3、获取 process的 当前watermark
// long currentWatermark timerService.currentWatermark();
// System.out.println(当前数据 value ,当前watermark currentWatermark);// 注册定时器 处理时间、事件时间
// timerService.registerProcessingTimeTimer();
// timerService.registerEventTimeTimer();// 删除定时器 处理时间、事件时间
// timerService.deleteEventTimeTimer();
// timerService.deleteProcessingTimeTimer();// 获取当前时间进展 处理时间-当前系统时间 事件时间-当前watermark
// long currentTs timerService.currentProcessingTime();
// long wm timerService.currentWatermark();}/*** TODO 2.时间进展到定时器注册的时间调用该方法* param timestamp 当前时间进展就是定时器被触发时的时间* param ctx 上下文* param out 采集器* throws Exception*/Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);String currentKey ctx.getCurrentKey();System.out.println(key currentKey 现在时间是 timestamp 定时器触发);}});process.print();env.execute();}
}窗口处理函数
除了KeyedProcessFunction另外一大类常用的处理函数就是基于窗口的ProcessWindowFunction和ProcessAllWindowFunction了。
窗口处理函数的使用
进行窗口计算我们可以直接调用现成的简单聚合方法sum/max/min也可以通过调用.reduce()或.aggregate()来自定义一般的增量聚合函数ReduceFunction/AggregateFucntion而对于更加复杂、需要窗口信息和额外状态的一些场景我们还可以直接使用全窗口函数、把数据全部收集保存在窗口内等到触发窗口计算时再统一处理。窗口处理函数就是一种典型的全窗口函数。 窗口处理函数ProcessWindowFunction的使用与其他窗口函数类似也是基于WindowedStream直接调用方法就可以只不过这时调用的是.process()。
stream.keyBy( t - t.f0 ).window( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessWindowFunction())ProcessWindowFunction解析
ProcessWindowFunction既是处理函数又是全窗口函数。从名字上也可以推测出它的本质似乎更倾向于“窗口函数”一些。事实上它的用法也确实跟其他处理函数有很大不同。我们可以从源码中的定义看到这一点
public abstract class ProcessWindowFunctionIN, OUT, KEY, W extends Window extends AbstractRichFunction {...public abstract void process(KEY key, Context context, IterableIN elements, CollectorOUT out) throws Exception;public void clear(Context context) throws Exception {}public abstract class Context implements java.io.Serializable {...}
}ProcessWindowFunction依然是一个继承了AbstractRichFunction的抽象类它有四个类型参数
INinput数据流中窗口任务的输入数据类型。OUToutput窗口任务进行计算之后的输出数据类型。KEY数据中键key的类型。W窗口的类型是Window的子类型。一般情况下我们定义时间窗口W就是TimeWindow。 ProcessWindowFunction里面处理数据的核心方法.process()。方法包含四个参数。key窗口做统计计算基于的键也就是之前keyBy用来分区的字段。context当前窗口进行计算的上下文它的类型就是ProcessWindowFunction内部定义的抽象类Context。elements窗口收集到用来计算的所有数据这是一个可迭代的集合类型。out用来发送数据输出计算结果的收集器类型为Collector。 可以明显看出这里的参数不再是一个输入数据而是窗口中所有数据的集合。而上下文context所包含的内容也跟其他处理函数有所差别
public abstract class Context implements java.io.Serializable {public abstract W window();public abstract long currentProcessingTime();public abstract long currentWatermark();public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();public abstract X void output(OutputTagX outputTag, X value);}除了可以通过.output()方法定义侧输出流不变外其他部分都有所变化。这里不再持有TimerService对象只能通过currentProcessingTime()和currentWatermark()来获取当前时间所以失去了设置定时器的功能另外由于当前不是只处理一个数据所以也不再提供.timestamp()方法。与此同时也增加了一些获取其他信息的方法比如可以通过.window()直接获取到当前的窗口对象也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。注意这里的“窗口状态”是自定义的不包括窗口本身已经有的状态针对当前key、当前窗口有效而“全局状态”同样是自定义的状态针对当前key的所有窗口有效。 所以我们会发现ProcessWindowFunction中除了.process()方法外并没有.onTimer()方法而是多出了一个.clear()方法。从名字就可以看出这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态那么必须在.clear()方法中进行显式地清除避免内存溢出。 至于另一种窗口处理函数ProcessAllWindowFunction它的用法非常类似。区别在于它基于的是AllWindowedStream相当于对没有keyBy的数据流直接开窗并调用.process()方法
stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) ).process(new MyProcessAllWindowFunction())应用案例——Top N
案例需求实时统计一段时间内的出现次数最多的水位。例如统计最近10秒钟内出现次数最多的两个水位并且每5秒钟更新一次。我们知道这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据按照不同的水位进行统计而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
使用ProcessAllWindowFunction
思路一一种最简单的想法是我们干脆不区分不同水位而是将所有访问数据都收集起来统一进行统计计算。所以可以不做keyBy直接基于DataStream开窗然后使用全窗口函数ProcessAllWindowFunction来进行处理。 在窗口中可以用一个HashMap来保存每个水位的出现次数只要遍历窗口中的所有数据自然就能得到所有水位的出现次数。最后把HashMap转成一个列表ArrayList然后进行排序、取出前两名输出就可以了。 代码具体实现如下
public class ProcessAllWindowTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));// 最近10秒 窗口长度 每5秒输出 滑动步长// TODO 思路一 所有数据到一起 用hashmap存 keyvcvaluecount值sensorDS.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).process(new MyTopNPAWF()).print();env.execute();}public static class MyTopNPAWF extends ProcessAllWindowFunctionWaterSensor, String, TimeWindow {Overridepublic void process(Context context, IterableWaterSensor elements, CollectorString out) throws Exception {// 定义一个hashmap用来存keyvcvaluecount值MapInteger, Integer vcCountMap new HashMap();// 1.遍历数据, 统计 各个vc出现的次数for (WaterSensor element : elements) {Integer vc element.getVc();if (vcCountMap.containsKey(vc)) {// 1.1 key存在不是这个key的第一条数据直接累加vcCountMap.put(vc, vcCountMap.get(vc) 1);} else {// 1.2 key不存在初始化vcCountMap.put(vc, 1);}}// 2.对 count值进行排序: 利用List来实现排序ListTuple2Integer, Integer datas new ArrayList();for (Integer vc : vcCountMap.keySet()) {datas.add(Tuple2.of(vc, vcCountMap.get(vc)));}// 对List进行排序根据count值 降序datas.sort(new ComparatorTuple2Integer, Integer() {Overridepublic int compare(Tuple2Integer, Integer o1, Tuple2Integer, Integer o2) {// 降序 后 减 前return o2.f1 - o1.f1;}});// 3.取出 count最大的2个 vcStringBuilder outStr new StringBuilder();outStr.append(\n);// 遍历 排序后的 List取出前2个 考虑可能List不够2个的情况 》 List中元素的个数 和 2 取最小值for (int i 0; i Math.min(2, datas.size()); i) {Tuple2Integer, Integer vcCount datas.get(i);outStr.append(Top (i 1) \n);outStr.append(vc vcCount.f0 \n);outStr.append(count vcCount.f1 \n);outStr.append(窗口结束时间 DateFormatUtils.format(context.window().getEnd(), yyyy-MM-dd HH:mm:ss.SSS) \n);outStr.append(\n);}out.collect(outStr.toString());}}
}使用KeyedProcessFunction
思路二在上一小节的实现过程中我们没有进行按键分区直接将所有数据放在一个分区上进行了开窗操作。这相当于将并行度强行设置为1在实际应用中是要尽量避免的所以Flink官方也并不推荐使用AllWindowedStream进行处理。另外我们在全窗口函数中定义了HashMap来统计vc的出现次数计算过程是要先收集齐所有数据、然后再逐一遍历更新HashMap这显然不够高效。 基于这样的想法我们可以从两个方面去做优化一是对数据进行按键分区分别统计vc的出现次数二是进行增量聚合得到结果最后再做排序输出。所以我们可以使用增量聚合函数AggregateFunction进行浏览量的统计然后结合ProcessWindowFunction排序输出来实现Top N的需求。 具体实现可以分成两步先对每个vc统计出现次数然后再将统计结果收集起来排序输出最终结果。由于最后的排序还是基于每个时间窗口的输出的统计结果中要包含窗口信息我们可以输出包含了vc、出现次数count以及窗口结束时间的Tuple3。之后先按窗口结束时间分区然后用KeyedProcessFunction来实现。 用KeyedProcessFunction来收集数据做排序这时面对的是窗口聚合之后的数据流而窗口已经不存在了我们需要确保能够收集齐所有数据所以应该在窗口结束时间基础上再“多等一会儿”。具体实现上可以采用一个延迟触发的事件时间定时器。基于窗口的结束时间来设定延迟其实并不需要等太久——因为我们是靠水位线的推进来触发定时器而水位线的含义就是“之前的数据都到齐了”。所以我们只需要设置1毫秒的延迟就一定可以保证这一点。 而在等待过程中之前已经到达的数据应该缓存起来我们这里用一个自定义的HashMap来进行存储key为窗口的标记value为List。之后每来一条数据就把它添加到当前的HashMap中并注册一个触发时间为窗口结束时间加1毫秒windowEnd 1的定时器。待到水位线到达这个时间定时器触发我们可以保证当前窗口所有vc的统计结果Tuple3都到齐了于是从HashMap中取出进行排序输出。 具体代码实现如下
public class KeyedProcessFunctionTopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));// 最近10秒 窗口长度 每5秒输出 滑动步长/*** TODO 思路二 使用 KeyedProcessFunction实现* 1、按照vc做keyby开窗分别count* 》 增量聚合计算 count* 》 全窗口对计算结果 count值封装 带上 窗口结束时间的 标签* 》 为了让同一个窗口时间范围的计算结果到一起去** 2、对同一个窗口范围的count值进行处理 排序、取前N个* 》 按照 windowEnd做keyby* 》 使用process 来一条调用一次需要先存分开存用HashMap,keywindowEnd,valueList* 》 使用定时器对 存起来的结果 进行 排序、取前N个*/// 1. 按照 vc 分组、开窗、聚合增量计算全量打标签// 开窗聚合后就是普通的流没有了窗口信息需要自己打上窗口的标记 windowEndSingleOutputStreamOperatorTuple3Integer, Integer, Long windowAgg sensorDS.keyBy(sensor - sensor.getVc()).window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(),new WindowResult());// 2. 按照窗口标签窗口结束时间keyby保证同一个窗口时间范围的结果到一起去。排序、取TopNwindowAgg.keyBy(r - r.f2).process(new TopN(2)).print();env.execute();}public static class VcCountAgg implements AggregateFunctionWaterSensor, Integer, Integer {Overridepublic Integer createAccumulator() {return 0;}Overridepublic Integer add(WaterSensor value, Integer accumulator) {return accumulator 1;}Overridepublic Integer getResult(Integer accumulator) {return accumulator;}Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下* 第一个输入类型 增量函数的输出 count值Integer* 第二个输出类型 Tuple3(vccountwindowEnd) ,带上 窗口结束时间 的标签* 第三个key类型 vcInteger* 第四个窗口类型*/public static class WindowResult extends ProcessWindowFunctionInteger, Tuple3Integer, Integer, Long, Integer, TimeWindow {Overridepublic void process(Integer key, Context context, IterableInteger elements, CollectorTuple3Integer, Integer, Long out) throws Exception {// 迭代器里面只有一条数据next一次即可Integer count elements.iterator().next();long windowEnd context.window().getEnd();out.collect(Tuple3.of(key, count, windowEnd));}}public static class TopN extends KeyedProcessFunctionLong, Tuple3Integer, Integer, Long, String {// 存不同窗口的 统计结果keywindowEndvaluelist数据private MapLong, ListTuple3Integer, Integer, Long dataListMap;// 要取的Top数量private int threshold;public TopN(int threshold) {this.threshold threshold;dataListMap new HashMap();}Overridepublic void processElement(Tuple3Integer, Integer, Long value, Context ctx, CollectorString out) throws Exception {// 进入这个方法只是一条数据要排序得到齐才行 》 存起来不同窗口分开存// 1. 存到HashMap中Long windowEnd value.f2;if (dataListMap.containsKey(windowEnd)) {// 1.1 包含vc不是该vc的第一条直接添加到List中ListTuple3Integer, Integer, Long dataList dataListMap.get(windowEnd);dataList.add(value);} else {// 1.1 不包含vc是该vc的第一条需要初始化listListTuple3Integer, Integer, Long dataList new ArrayList();dataList.add(value);dataListMap.put(windowEnd, dataList);}// 2. 注册一个定时器 windowEnd1ms即可// 同一个窗口范围应该同时输出只不过是一条一条调用processElement方法只需要延迟1ms即可ctx.timerService().registerEventTimeTimer(windowEnd 1);}Overridepublic void onTimer(long timestamp, OnTimerContext ctx, CollectorString out) throws Exception {super.onTimer(timestamp, ctx, out);// 定时器触发同一个窗口范围的计算结果攒齐了开始 排序、取TopNLong windowEnd ctx.getCurrentKey();// 1. 排序ListTuple3Integer, Integer, Long dataList dataListMap.get(windowEnd);dataList.sort(new ComparatorTuple3Integer, Integer, Long() {Overridepublic int compare(Tuple3Integer, Integer, Long o1, Tuple3Integer, Integer, Long o2) {// 降序 后 减 前return o2.f1 - o1.f1;}});// 2. 取TopNStringBuilder outStr new StringBuilder();outStr.append(\n);// 遍历 排序后的 List取出前 threshold 个 考虑可能List不够2个的情况 》 List中元素的个数 和 2 取最小值for (int i 0; i Math.min(threshold, dataList.size()); i) {Tuple3Integer, Integer, Long vcCount dataList.get(i);outStr.append(Top (i 1) \n);outStr.append(vc vcCount.f0 \n);outStr.append(count vcCount.f1 \n);outStr.append(窗口结束时间 vcCount.f2 \n);outStr.append(\n);}// 用完的List及时清理节省资源dataList.clear();out.collect(outStr.toString());}}
}侧输出流Side Output
处理函数还有另外一个特有功能就是将自定义的数据放入“侧输出流”side output输出。这个概念我们并不陌生之前在讲到窗口处理迟到数据时最后一招就是输出到侧输出流。而这种处理方式的本质其实就是处理函数的侧输出流功能。 我们之前讲到的绝大多数转换算子输出的都是单一流流里的数据类型只能有一种。而侧输出流可以认为是“主流”上分叉出的“支流”所以可以由一条流产生出多条流而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。 具体应用时只要在处理函数的.processElement()或者.onTimer()方法中调用上下文的.output()方法就可以了。
DataStreamInteger stream env.fromSource(...);OutputTagString outputTag new OutputTagString(side-output) {};SingleOutputStreamOperatorLong longStream stream.process(new ProcessFunctionInteger, Long() {Overridepublic void processElement( Integer value, Context ctx, CollectorInteger out) throws Exception {// 转换成Long输出到主流中out.collect(Long.valueOf(value));// 转换成String输出到侧输出流中ctx.output(outputTag, side-output: String.valueOf(value));}
});这里output()方法需要传入两个参数第一个是一个“输出标签”OutputTag用来标识侧输出流一般会在外部统一声明第二个就是要输出的数据。 我们可以在外部先将OutputTag声明出来
OutputTagString outputTag new OutputTagString(side-output) {};如果想要获取这个侧输出流可以基于处理之后的DataStream直接调用.getSideOutput()方法传入对应的OutputTag这个方式与窗口API中获取侧输出流是完全一样的。
DataStreamString stringStream longStream.getSideOutput(outputTag);案例需求对每个传感器水位超过10的输出告警信息 代码如下
public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));OutputTagString warnTag new OutputTag(warn, Types.STRING);SingleOutputStreamOperatorWaterSensor process sensorDS.keyBy(sensor - sensor.getId()).process(new KeyedProcessFunctionString, WaterSensor, WaterSensor() {Overridepublic void processElement(WaterSensor value, Context ctx, CollectorWaterSensor out) throws Exception {// 使用侧输出流告警if (value.getVc() 10) {ctx.output(warnTag, 当前水位 value.getVc() ,大于阈值10);}// 主流正常 发送数据out.collect(value);}});process.print(主流);process.getSideOutput(warnTag).printToErr(warn);env.execute();}
}状态管理
Flink中的状态
概述
在Flink中算子任务可以分为无状态和有状态两种情况。 无状态的算子任务只需要观察每个独立事件根据当前输入的数据直接转换输出结果。我们之前讲到的基本转换算子如map、filter、flatMap计算时不依赖其他数据就都属于无状态的算子。 而有状态的算子任务则除当前数据之外还需要一些其他数据来得到计算结果。这里的“其他数据”就是所谓的状态state。我们之前讲到的算子中聚合算子、窗口算子都属于有状态的算子。 有状态算子的一般处理流程具体步骤如下:
1算子任务接收到上游发来的数据2获取当前状态3根据业务逻辑进行计算更新状态4得到计算结果输出发送到下游任务。
状态的分类
1托管状态Managed State和原始状态Raw State Flink的状态有两种托管状态Managed State和原始状态Raw State。托管状态就是由Flink统一管理的状态的存储访问、故障恢复和重组等一系列问题都由Flink实现我们只要调接口就可以而原始状态则是自定义的相当于就是开辟了一块内存需要我们自己管理实现状态的序列化和故障恢复。 通常我们采用Flink托管状态来实现需求。2算子状态Operator State和按键分区状态Keyed State 接下来我们的重点就是托管状态Managed State。 我们知道在Flink中一个算子任务会按照并行度分为多个并行子任务执行而不同的子任务会占据不同的任务槽task slot。由于不同的slot在计算资源上是物理隔离的所以Flink能管理的状态在并行任务间是无法共享的每个状态只能针对当前子任务的实例有效。 而很多有状态的操作比如聚合、窗口都是要先做keyBy进行按键分区的。按键分区之后任务所进行的所有计算都应该只针对当前key有效所以状态也应该按照key彼此隔离。在这种情况下状态的访问方式又会有所不同。 基于这样的想法我们又可以将托管状态分为两类算子状态和按键分区状态。
按键分区状态Keyed State
按键分区状态Keyed State顾名思义是任务按照键key来访问和维护的状态。它的特点非常鲜明就是以key为作用范围进行隔离。 需要注意使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream即使转换算子实现了对应的富函数类也不能通过运行时上下文访问Keyed State。
值状态ValueState
顾名思义状态中只保存一个“值”value。ValueState本身是一个接口源码中定义如下
public interface ValueStateT extends State {T value() throws IOException;void update(T value) throws IOException;
}这里的T是泛型表示状态的数据内容可以是任何具体的数据类型。如果想要保存一个长整型值作为状态那么类型就是ValueState。 我们可以在代码中读写值状态实现对于状态的访问和更新。
T value()获取当前状态的值update(T value)对状态进行更新传入的参数value就是要覆写的状态值。 在具体使用时为了让运行时上下文清楚到底是哪个状态我们还需要创建一个“状态描述器”StateDescriptor来提供状态的基本信息。例如源码中ValueState的状态描述器构造方法如下
public ValueStateDescriptor(String name, ClassT typeClass) {super(name, typeClass, null);
}这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。 案例需求检测每种传感器的水位值如果连续的两个水位值超过10就输出报警。
public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {// TODO 1.定义状态ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 2.在open方法中初始化状态// 状态描述器两个参数第一个参数起个名字不重复第二个参数存储的类型lastVcState getRuntimeContext().getState(new ValueStateDescriptorInteger(lastVcState, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {
// lastVcState.value(); // 取出 本组 值状态 的数据
// lastVcState.update(); // 更新 本组 值状态 的数据
// lastVcState.clear(); // 清除 本组 值状态 的数据// 1. 取出上一条数据的水位值(Integer默认值是null判断)int lastVc lastVcState.value() null ? 0 : lastVcState.value();// 2. 求差值的绝对值判断是否超过10Integer vc value.getVc();if (Math.abs(vc - lastVc) 10) {out.collect(传感器 value.getId() 当前水位值 vc ,与上一条水位值 lastVc ,相差超过10);}// 3. 更新状态里的水位值lastVcState.update(vc);}}).print();env.execute();}
}列表状态ListState
将需要保存的数据以列表List的形式组织起来。在ListState接口中同样有一个类型参数T表示列表中数据的类型。ListState也提供了一系列的方法来操作状态使用方式与一般的List非常相似。
Iterable get()获取当前的列表状态返回的是一个可迭代类型Iterableupdate(List values)传入一个列表values直接对状态进行覆盖add(T value)在状态列表中添加一个元素valueaddAll(List values)向列表中添加多个元素以列表values形式传入。 类似地ListState的状态描述器就叫作ListStateDescriptor用法跟ValueStateDescriptor完全一致。 案例:针对每种传感器输出最高的3个水位值
public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ListStateInteger vcListState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState getRuntimeContext().getListState(new ListStateDescriptorInteger(vcListState, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 1.来一条存到list状态里vcListState.add(value.getVc());// 2.从list状态拿出来(Iterable) 拷贝到一个List中排序 只留3个最大的IterableInteger vcListIt vcListState.get();// 2.1 拷贝到List中ListInteger vcList new ArrayList();for (Integer vc : vcListIt) {vcList.add(vc);}// 2.2 对List进行降序排序vcList.sort((o1, o2) - o2 - o1);// 2.3 只保留最大的3个(list中的个数一定是连续变大一超过3就立即清理即可)if (vcList.size() 3) {// 将最后一个元素清除第4个vcList.remove(3);}out.collect(传感器id为 value.getId() ,最大的3个水位值 vcList.toString());// 3.更新list状态vcListState.update(vcList);// vcListState.get(); //取出 list状态 本组的数据是一个Iterable
// vcListState.add(); // 向 list状态 本组 添加一个元素
// vcListState.addAll(); // 向 list状态 本组 添加多个元素
// vcListState.update(); // 更新 list状态 本组数据覆盖
// vcListState.clear(); // 清空List状态 本组数据}}).print();env.execute();}
}Map状态MapState
把一些键值对key-value作为状态整体保存起来可以认为就是一组key-value映射的列表。对应的MapStateUK, UV接口中就会有UK、UV两个泛型分别表示保存的key和value的类型。同样MapState提供了操作映射状态的方法与Map的使用非常类似。
UV get(UK key)传入一个key作为参数查询对应的value值put(UK key, UV value)传入一个键值对更新key对应的value值putAll(MapUK, UV map)将传入的映射map中所有的键值对全部添加到映射状态中remove(UK key)将指定key对应的键值对删除boolean contains(UK key)判断是否存在指定的key返回一个boolean值。 另外MapState也提供了获取整个映射相关信息的方法IterableMap.EntryUK, UV entries()获取映射状态中所有的键值对Iterable keys()获取映射状态中所有的键key返回一个可迭代Iterable类型Iterable values()获取映射状态中所有的值value返回一个可迭代Iterable类型boolean isEmpty()判断映射是否为空返回一个boolean值。 案例需求统计每种传感器每种水位值出现的次数。
public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {MapStateInteger, Integer vcCountMapState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState getRuntimeContext().getMapState(new MapStateDescriptorInteger, Integer(vcCountMapState, Types.INT, Types.INT));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 1.判断是否存在vc对应的keyInteger vc value.getVc();if (vcCountMapState.contains(vc)) {// 1.1 如果包含这个vc的key直接对value1Integer count vcCountMapState.get(vc);vcCountMapState.put(vc, count);} else {// 1.2 如果不包含这个vc的key初始化put进去vcCountMapState.put(vc, 1);}// 2.遍历Map状态输出每个k-v的值StringBuilder outStr new StringBuilder();outStr.append(\n);outStr.append(传感器id为 value.getId() \n);for (Map.EntryInteger, Integer vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString() \n);}outStr.append(\n);out.collect(outStr.toString());// vcCountMapState.get(); // 对本组的Map状态根据key获取value
// vcCountMapState.contains(); // 对本组的Map状态判断key是否存在
// vcCountMapState.put(, ); // 对本组的Map状态添加一个 键值对
// vcCountMapState.putAll(); // 对本组的Map状态添加多个 键值对
// vcCountMapState.entries(); // 对本组的Map状态获取所有键值对
// vcCountMapState.keys(); // 对本组的Map状态获取所有键
// vcCountMapState.values(); // 对本组的Map状态获取所有值
// vcCountMapState.remove(); // 对本组的Map状态根据指定key移除键值对
// vcCountMapState.isEmpty(); // 对本组的Map状态判断是否为空
// vcCountMapState.iterator(); // 对本组的Map状态获取迭代器
// vcCountMapState.clear(); // 对本组的Map状态清空}}).print();env.execute();}
}归约状态ReducingState
类似于值状态Value不过需要对添加进来的所有数据进行归约将归约聚合之后的值作为状态保存下来。ReducingState这个接口调用的方法类似于ListState只不过它保存的只是一个聚合值所以调用.add()方法时不是在状态列表里添加元素而是直接把新数据和之前的状态进行归约并用得到的结果更新状态。 归约逻辑的定义是在归约状态描述器ReducingStateDescriptor中通过传入一个归约函数ReduceFunction来实现的。这里的归约函数就是我们之前介绍reduce聚合算子时讲到的ReduceFunction所以状态类型跟输入的数据类型是一样的。
public ReducingStateDescriptor(String name, ReduceFunctionT reduceFunction, ClassT typeClass) {...}这里的描述器有三个参数其中第二个参数就是定义了归约聚合逻辑的ReduceFunction另外两个参数则是状态的名称和类型。 案例计算每种传感器的水位和
......
.process(new KeyedProcessFunctionString WaterSensor Integer() {private ReducingStateInteger sumVcState;Overridepublic void open(Configuration parameters) throws Exception {sumVcState this.getRuntimeContext().getReducingState(new ReducingStateDescriptorInteger(sumVcStateInteger::sumInteger.class));}Overridepublic void processElement(WaterSensor value Context ctx CollectorInteger out) throws Exception {sumVcState.add(value.getVc());out.collect(sumVcState.get());}
})聚合状态AggregatingState
与归约状态非常类似聚合状态也是一个值用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数AggregateFunction来定义的这也就是之前我们讲过的AggregateFunction里面通过一个累加器Accumulator来表示状态所以聚合的状态类型可以跟添加进来的数据类型完全不同使用更加灵活。 同样地AggregatingState接口调用方法也与ReducingState相同调用.add()方法添加元素时会直接使用指定的AggregateFunction进行聚合并更新状态。 案例需求计算每种传感器的平均水位
public class KeyedAggregatingStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {AggregatingStateInteger, Double vcAvgAggregatingState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAvgAggregatingState getRuntimeContext().getAggregatingState(new AggregatingStateDescriptorInteger, Tuple2Integer, Integer, Double(vcAvgAggregatingState,new AggregateFunctionInteger, Tuple2Integer, Integer, Double() {Overridepublic Tuple2Integer, Integer createAccumulator() {return Tuple2.of(0, 0);}Overridepublic Tuple2Integer, Integer add(Integer value, Tuple2Integer, Integer accumulator) {return Tuple2.of(accumulator.f0 value, accumulator.f1 1);}Overridepublic Double getResult(Tuple2Integer, Integer accumulator) {return accumulator.f0 * 1D / accumulator.f1;}Overridepublic Tuple2Integer, Integer merge(Tuple2Integer, Integer a, Tuple2Integer, Integer b) {
// return Tuple2.of(a.f0 b.f0, a.f1 b.f1);return null;}},Types.TUPLE(Types.INT, Types.INT)));}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 将 水位值 添加到 聚合状态中vcAvgAggregatingState.add(value.getVc());// 从 聚合状态中 获取结果Double vcAvg vcAvgAggregatingState.get();out.collect(传感器id为 value.getId() ,平均水位值 vcAvg);// vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果
// vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据会自动进行聚合
// vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据}}).print();env.execute();}
}状态生存时间TTL
在实际应用中很多状态会随着时间的推移逐渐增长如果不加以限制最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”time-to-liveTTL当状态在内存中存在的时间超出这个值时就将它清除。 具体实现上如果用一个进程不停地扫描所有状态看是否过期显然会占用大量资源做无用功。状态的失效其实不需要立即删除所以我们可以给状态附加一个属性也就是状态的“失效时间”。状态创建的时候设置 失效时间 当前时间 TTL之后如果有对状态的访问和修改我们可以再对失效时间进行更新当设置的清除条件被触发时比如状态被访问的时候或者每隔一段时间扫描一次失效状态就可以判断状态是否失效、从而进行清除了。 配置状态的TTL时需要创建一个StateTtlConfig配置对象然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。
StateTtlConfig ttlConfig StateTtlConfig.newBuilder(Time.seconds(10)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();ValueStateDescriptorString stateDescriptor new ValueStateDescriptor(my state, String.class);stateDescriptor.enableTimeToLive(ttlConfig);这里用到了几个配置项
.newBuilder() 状态TTL配置的构造器方法必须调用返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数这就是设定的状态生存时间。.setUpdateType() 设置更新类型。更新类型指定了什么时候更新状态失效时间这里的OnCreateAndWrite表示只有创建状态和更改状态写操作时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间也就是只要对状态进行了访问就表明它是活跃的从而延长生存时间。这个配置默认为OnCreateAndWrite。.setStateVisibility() 设置状态的可见性。所谓的“状态可见性”是指因为清除操作并不是实时的所以当状态过期之后还有可能继续存在这时如果对它进行访问能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为表示从不返回过期值也就是只要过期就认为它已经被清除了应用不能继续读取这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp就是如果过期状态还存在就返回它的值。 除此之外TTL配置还可以设置在保存检查点checkpoint时触发清除操作或者配置增量的清理incremental cleanup还可以针对RocksDB状态后端使用压缩过滤器compaction filter进行后台清理。这里需要注意目前的TTL设置只支持处理时间。
public class StateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction()).assignTimestampsAndWatermarks(WatermarkStrategy.WaterSensorforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element, ts) - element.getTs() * 1000L));sensorDS.keyBy(r - r.getId()).process(new KeyedProcessFunctionString, WaterSensor, String() {ValueStateInteger lastVcState;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);// TODO 1.创建 StateTtlConfigStateTtlConfig stateTtlConfig StateTtlConfig.newBuilder(Time.seconds(5)) // 过期时间5s
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入更新 更新 过期时间.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入更新 更新 过期时间.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值.build();// TODO 2.状态描述器 启用 TTLValueStateDescriptorInteger stateDescriptor new ValueStateDescriptor(lastVcState, Types.INT);stateDescriptor.enableTimeToLive(stateTtlConfig);this.lastVcState getRuntimeContext().getState(stateDescriptor);}Overridepublic void processElement(WaterSensor value, Context ctx, CollectorString out) throws Exception {// 先获取状态值打印 》 读取状态Integer lastVc lastVcState.value();out.collect(key value.getId() ,状态值 lastVc);// 如果水位大于10更新状态值 》 写入状态if (value.getVc() 10) {lastVcState.update(value.getVc());}}}).print();env.execute();}
}算子状态Operator State
算子状态Operator State就是一个算子并行实例上定义的状态作用范围被限定为当前算子任务。算子状态跟数据的key无关所以不同key的数据只要被分发到同一个并行子任务就会访问到同一个Operator State。 算子状态的实际应用场景不如Keyed State多一般用在Source或Sink等与外部系统连接的算子上或者完全没有key定义的场景。比如Flink的Kafka连接器中就用到了算子状态。 当算子的并行度发生变化时算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同重组分配的方案也会不同。 算子状态也支持不同的结构类型主要有三种ListState、UnionListState和BroadcastState。
列表状态ListState
与Keyed State中的ListState一样将状态表示为一组数据的列表。 与Keyed State中的列表状态的区别是在算子状态的上下文中不会按键key分别处理状态所以每一个并行子任务上只会保留一个“列表”list也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度彼此之间完全独立。 当算子并行度进行缩放调整时算子的列表状态中的所有元素项会被统一收集起来相当于把多个分区的列表合并成了一个“大列表”然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”round-robin与之前介绍的rebanlance数据传输方式类似是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”even-split redistribution。 算子状态中不会存在“键组”key group这样的结构所以为了方便重组分配就把它直接定义成了“列表”list。这也就解释了为什么算子状态中没有最简单的值状态ValueState。 案例实操在map算子中计算数据的个数。
public class OperatorListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);env.socketTextStream(hadoop102, 7777).map(new MyCountMapFunction()).print();env.execute();}// TODO 1.实现 CheckpointedFunction 接口public static class MyCountMapFunction implements MapFunctionString, Long, CheckpointedFunction {private Long count 0L;private ListStateLong state;Overridepublic Long map(String value) throws Exception {return count;}/*** TODO 2.本地变量持久化将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用** param context* throws Exception*/Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println(snapshotState...);// 2.1 清空算子状态state.clear();// 2.2 将 本地变量 添加到 算子状态 中state.add(count);}/*** TODO 3.初始化本地变量程序启动和恢复时 从状态中 把数据添加到 本地变量每个子任务调用一次** param context* throws Exception*/Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println(initializeState...);// 3.1 从 上下文 初始化 算子状态state context.getOperatorStateStore().getListState(new ListStateDescriptorLong(state, Types.LONG));// 3.2 从 算子状态中 把数据 拷贝到 本地变量if (context.isRestored()) {for (Long c : state.get()) {count c;}}}}
}联合列表状态UnionListState
与ListState类似联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于算子并行度进行缩放调整时对于状态的分配方式不同。 UnionListState的重点就在于“联合”union。在并行度调整时常规列表状态是轮询分配状态项而联合列表状态的算子则会直接广播状态的完整列表。这样并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”union redistribution。如果列表中状态项数量太多为资源和效率考虑一般不建议使用联合重组的方式。 使用方式同ListState区别在如下加粗部分 state context .getOperatorStateStore() .getUnionListState(new ListStateDescriptor(“union-state”, Types.LONG));
广播状态BroadcastState
有时我们希望算子并行子任务都保持同一份“全局”状态用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态状态就像被“广播”到所有分区一样这种特殊的算子状态就叫作广播状态BroadcastState。 因为广播状态在每个并行子任务上的实例都一样所以在并行度调整的时候就比较简单只要复制一份到新的并行任务就可以实现扩展而对于并行度缩小的情况可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的并不会丢失。 案例实操水位超过指定的阈值发送告警阈值可以动态修改。
public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);// 数据流SingleOutputStreamOperatorWaterSensor sensorDS env.socketTextStream(hadoop102, 7777).map(new WaterSensorMapFunction());// 配置流用来广播配置DataStreamSourceString configDS env.socketTextStream(hadoop102, 8888);// TODO 1. 将 配置流 广播MapStateDescriptorString, Integer broadcastMapState new MapStateDescriptor(broadcast-state, Types.STRING, Types.INT);BroadcastStreamString configBS configDS.broadcast(broadcastMapState);// TODO 2.把 数据流 和 广播后的配置流 connectBroadcastConnectedStreamWaterSensor, String sensorBCS sensorDS.connect(configBS);// TODO 3.调用 processsensorBCS.process(new BroadcastProcessFunctionWaterSensor, String, String() {/*** 数据流的处理方法 数据流 只能 读取 广播状态不能修改* param value* param ctx* param out* throws Exception*/Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, CollectorString out) throws Exception {// TODO 5.通过上下文获取广播状态取出里面的值只读不能修改ReadOnlyBroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);Integer threshold broadcastState.get(threshold);// 判断广播状态里是否有数据因为刚启动时可能是数据流的第一条数据先来threshold (threshold null ? 0 : threshold);if (value.getVc() threshold) {out.collect(value ,水位超过指定的阈值 threshold !!!);}}/*** 广播后的配置流的处理方法: 只有广播流才能修改 广播状态* param value* param ctx* param out* throws Exception*/Overridepublic void processBroadcastElement(String value, Context ctx, CollectorString out) throws Exception {// TODO 4. 通过上下文获取广播状态往里面写数据BroadcastStateString, Integer broadcastState ctx.getBroadcastState(broadcastMapState);broadcastState.put(threshold, Integer.valueOf(value));}}).print();env.execute();}
}状态后端State Backends
在Flink中状态的存储、访问以及维护都是由一个可插拔的组件决定的这个组件就叫作状态后端state backend。状态后端主要负责管理本地状态的存储方式和位置。
状态后端的分类HashMapStateBackend/RocksDB
状态后端是一个“开箱即用”的组件可以在不改变应用程序逻辑的情况下独立配置。Flink中提供了两类不同的状态后端一种是“哈希表状态后端”HashMapStateBackend另一种是“内嵌RocksDB状态后端”EmbeddedRocksDBStateBackend。如果没有特别配置系统默认的状态后端是HashMapStateBackend。
1哈希表状态后端HashMapStateBackend HashMapStateBackend是把状态存放在内存里。具体实现上哈希表状态后端在内部会直接把状态当作对象objects保存在Taskmanager的JVM堆上。普通的状态以及窗口中收集的数据和触发器都会以键值对的形式存储起来所以底层是一个哈希表HashMap这种状态后端也因此得名。2内嵌RocksDB状态后端EmbeddedRocksDBStateBackend RocksDB是一种内嵌的key-value存储介质可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后会将处理中的数据全部放入RocksDB数据库中RocksDB默认存储在TaskManager的本地数据目录里。 RocksDB的状态数据被存储为序列化的字节数组读写操作需要序列化/反序列化因此状态的访问性能要差一些。另外因为做了序列化key的比较也会按照字节进行而不是直接调用.hashCode()和.equals()方法。 EmbeddedRocksDBStateBackend始终执行的是异步快照所以不会因为保存检查点而阻塞数据的处理而且它还提供了增量式保存检查点的机制这在很多情况下可以大大提升保存效率。
如何选择正确的状态后端
HashMap和RocksDB两种状态后端最大的区别就在于本地状态存放在哪里。 HashMapStateBackend是内存计算读写速度非常快但是状态的大小会受到集群可用内存的限制如果应用的状态随着时间不停地增长就会耗尽内存资源。 而RocksDB是硬盘存储所以可以根据可用的磁盘空间进行扩展所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化而且可能需要直接从磁盘读取数据这就会导致性能的降低平均读写性能要比HashMapStateBackend慢一个数量级。
状态后端的配置
在不做配置的时候应用程序使用的默认状态后端是由集群配置文件flink-conf.yaml中指定的配置的键名称为state.backend。这个默认配置对集群上运行的所有作业都有效我们可以通过更改配置值来改变默认的状态后端。另外我们还可以在代码中为当前作业单独配置状态后端这个配置会覆盖掉集群配置文件的默认值。 1配置默认的状态后端 在flink-conf.yaml中可以使用state.backend来配置默认状态后端。 配置项的可能值为hashmap这样配置的就是HashMapStateBackend如果配置项的值是rocksdb这样配置的就是EmbeddedRocksDBStateBackend。 下面是一个配置HashMapStateBackend的例子
# 默认状态后端
state.backend: hashmap# 存放检查点的文件路径
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints这里的state.checkpoints.dir配置项定义了检查点和元数据写入的目录。 2为每个作业Per-job/Application单独配置状态后端 通过执行环境设置HashMapStateBackend。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new HashMapStateBackend());通过执行环境设置EmbeddedRocksDBStateBackend。
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setStateBackend(new EmbeddedRocksDBStateBackend());需要注意如果想在IDE中使用EmbeddedRocksDBStateBackend需要为Flink项目添加依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb/artifactIdversion${flink.version}/version
/dependency而由于Flink发行版中默认就包含了RocksDB(服务器上解压的Flink)所以只要我们的代码中没有使用RocksDB的相关内容就不需要引入这个依赖。
容错机制
在Flink中有一套完整的容错机制来保证故障后的恢复其中最重要的就是检查点。
检查点Checkpoint
在流处理中我们可以用存档读档的思路就是将之前某个时间点所有的状态保存下来这份“存档”就是所谓的“检查点”checkpoint。 遇到故障重启的时候我们可以从检查点中“读档”恢复出之前的状态这样就可以回到当时保存的一刻接着处理数据了。 这里所谓的“检查”其实是针对故障恢复的结果而言的故障恢复之后继续处理的结果应该与发生故障前完全一致我们需要“检查”结果的正确性。所以有时又会把checkpoint叫做“一致性检查点”。
检查点的保存
1周期性的触发保存 “随时存档”确实恢复起来方便可是需要我们不停地做存档操作。如果每处理一条数据就进行检查点的保存当大量数据同时到来时就会耗费很多资源来频繁做检查点数据处理的速度就会受到影响。所以在Flink中检查点的保存是周期性触发的间隔时间可以进行设置。2保存的时间点 我们应该在所有任务算子都恰好处理完一个相同的输入数据的时候将它们的状态保存下来。 这样做可以实现一个数据被所有任务算子完整地处理完状态得到了保存。 如果出现故障我们恢复到之前保存的状态故障时正在处理的所有数据都需要重新处理我们只需要让源source任务向数据源重新提交偏移量、请求重放数据就可以了。当然这需要源任务可以把偏移量作为算子状态保存下来而且外部数据源能够重置偏移量kafka就是满足这些要求的一个最好的例子。3保存的具体流程 检查点的保存最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一个具体的例子来详细描述一下检查点具体的保存过程。 回忆一下我们最初实现的统计词频的程序——word count。这里为了方便我们直接从数据源读入已经分开的一个个单词例如这里输入的是 “hello”“world”“hello”“flink”“hello”“world”“hello”“flink”… 我们所需要的就是每个任务都处理完“hello”之后保存自己的状态。
从检查点恢复状态 检查点算法
在Flink中采用了基于Chandy-Lamport算法的分布式快照可以在不暂停整体流处理的前提下将状态备份保存到检查点。
检查点分界线Barrier
把一条流上的数据按照不同的检查点分隔开所以就叫做检查点的“分界线”Checkpoint Barrier。 在JobManager中有一个“检查点协调器”专门用来协调处理检查点的相关工作。检查点协调器会定期向TaskManager发出指令要求保存检查点带着检查点IDTaskManager会让所有的Source任务把自己的偏移量算子状态保存起来并将带有检查点ID的分界线插入到当前的数据流中然后像正常的数据一样像下游传递之后Source任务就可以继续读入新的数据了。
分布式快照算法Barrier对齐的精准一次
watermark指示的是“之前的数据全部到齐了”而barrier指示的是“之前所有数据的状态更改保存入当前检查点”它们都是一个“截止时间”的标志。所以在处理多个分区的传递时也要以是否还会有数据到来作为一个判断标准。 具体实现上Flink使用了Chandy-Lamport算法的一种变体被称为“异步分界线快照”算法。算法的核心就是两个原则 当上游任务向多个并行下游任务发送barrier时需要广播出去 而当多个上游任务向同一个下游任务传递分界线时需要在下游任务执行“分界线对齐”操作也就是需要等到所有并行分区的barrier都到齐才可以开始状态的保存。 1JobManager发送指令触发检查点的保存Source 任务中插入一个分界线并将偏移量保存到远程的持久化存储中。(同分布式快照算法Barrier对齐的至少一次一样) 2状态快照保存完成分界线向下游传递 状态存入持久化存储之后会返回通知给 Source 任务Source 任务就会向 JobManager 确认检查点完成然后跟数据一样把分界线向下游任务传递。(同分布式快照算法Barrier对齐的至少一次一样) 3向下游多个并行子任务广播分界线执行分界线对齐 Map任务没有状态所以直接将barrier继续向下游传递。这时由于进行了keyBy分区所以需要将barrier广播到下游并行的两个Sum任务。同时Sum任务可能收到来自上游两个并行Map任务的barrier所以需要执行“分界线对齐”操作。 而Sum 1只收到了来自Map 2的barrier所以这时需要等待分界线对齐。在等待的过程中如果分界线尚未到达的分区任务Map 1又传来了数据hello, 1说明这是需要保存到检查点的Sum任务应该正常继续处理数据状态更新为3而如果分界线已经到达的分区任务Map 2又传来数据这已经是下一个检查点要保存的内容了就不应立即处理而是要缓存起来等到状态保存之后再做处理。 4分界线对齐后保存状态到持久化存储(同分布式快照算法Barrier对齐的至少一次一样) 1触发检查点JobManager向Source发送Barrier 2Barrier发送向下游广播发送 3Barrier对齐下游需要收到上游所有并行度传递过来的Barrier才做自身状态的保存 4状态保存有状态的算子将状态保存至持久化。 5先处理缓存数据然后正常继续处理 完成检查点保存之后任务就可以继续正常处理数据了。这时如果有等待分界线对齐时缓存的数据需要先做处理然后再按照顺序依次处理新到的数据。当JobManager收到所有任务成功保存状态的信息就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。 补充由于分界线对齐要求先到达的分区做缓存等待一定程度上会影响处理的速度当出现背压时下游任务会堆积大量的缓冲数据检查点可能需要很久才可以保存完毕。 为了应对这种场景Barrier对齐中提供了至少一次语义以及Flink 1.11之后提供了不对齐的检查点保存方式可以将未处理的缓冲数据也保存进检查点。这样当我们遇到一个分区barrier时就不需等待对齐而是可以直接启动状态的保存了。
分布式快照算法Barrier对齐的至少一次
1JobManager发送指令触发检查点的保存Source 任务中插入一个分界线并将偏移量保存到远程的持久化存储中。 说明并行的Source任务保存的状态为3和1表示当前的1号检查点应该包含第一条流中截至第三个数据、第二条流中截至第一个数据的所有状态更改。可以发现Source任务做这些的时候并不影响后面任务的处理Sum任务已经处理完了第一条流中传来的world, 1对应的状态也有了更改。 2状态快照保存完成分界线向下游传递 状态存入持久化存储之后会返回通知给 Source 任务Source 任务就会向 JobManager 确认检查点完成然后跟数据一样把分界线向下游任务传递。 说明由于Source和Map之间是一对一forward的传输关系这里没有考虑算子链所以barrier可以直接传递给对应的Map任务。之后Source任务就可以继续读取新的数据了。与此同时Sum 1已经将第二条流传来的hello1处理完毕更新了状态。 3向下游多个并行子任务广播分界线执行分界线对齐 Map任务没有状态所以直接将barrier继续向下游传递。这时由于进行了keyBy分区所以需要将barrier广播到下游并行的两个Sum任务。同时Sum任务可能收到来自上游两个并行Map任务的barrier所以需要执行“分界线对齐”操作。 而Sum 1只收到了来自Map 2的barrier所以这时需要等待分界线对齐。而如果分界线已经到达的分区任务Map 2又传来数据直接计算等到下一个Barrier到达时做状态的保存。重新启动时介于两个Barrier之间分界线已经到达的分区任务Map 2传过来的数据会再次计算至少一次。 4分界线对齐后保存状态到持久化存储 各个分区的分界线都对齐后就可以对当前状态做快照保存到持久化存储了。存储完成之后同样将barrier向下游继续传递并通知JobManager保存完毕。 这个过程中每个任务保存自己的状态都是相对独立的互不影响。我们可以看到当Sum将当前状态保存完毕时Source 1任务已经读取到第一条流的第五个数据了。
分布式快照算法非Barrier对齐的精准一次
1JobManager发送指令触发检查点的保存Source 任务中插入一个分界线并将偏移量保存到远程的持久化存储中。(同分布式快照算法Barrier对齐的至少一次一样) 2状态快照保存完成分界线向下游传递 状态存入持久化存储之后会返回通知给 Source 任务Source 任务就会向 JobManager 确认检查点完成然后跟数据一样把分界线向下游任务传递。(同分布式快照算法Barrier对齐的至少一次一样) 3向下游多个并行子任务广播分界线执行非Barrier对齐 Map任务没有状态所以直接将barrier继续向下游传递。这时由于进行了keyBy分区所以需要将barrier广播到下游并行的两个Sum任务。同时Sum任务可能收到来自上游两个并行Map任务的barrier执行“非Barrier对齐”操作。 这里只关注Sum 1的细节Sum 1在第一个barrier到达时就开始执行非对齐检查点。 核心思想只要in-flight的数据也存到状态里barrier就可以越过所有in-flight的数据继续往下游传递。 此时的Sum 1任务在第一个Barrier到达输入缓冲区时 ① 直接将barrier放到输出缓冲区末端向下游传递。 ② 标记数据图中标颜色部分 一是被第一个barrier越过的输入缓冲区和输出缓冲区的数据 二是在其他barrier之前的所有数据 ③ 把标记数据和状态一起保存到checkpoint中从checkpoint恢复时这些数据也会一起恢复到对应位置
检查点配置
启用检查点
默认情况下Flink程序是禁用检查点的。如果想要为Flink应用开启自动保存快照的功能需要在代码中显式地调用执行环境的.enableCheckpointing()方法
StreamExecutionEnvironment env
StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1秒启动一次检查点保存
env.enableCheckpointing(1000);这里需要传入一个长整型的毫秒数表示周期性保存检查点的间隔时间。如果不传参数直接启用检查点默认的间隔周期为500毫秒这种方式已经被弃用。 检查点的间隔时间是对处理性能和故障恢复速度的一个权衡。如果我们希望对性能的影响更小可以调大间隔时间而如果希望故障重启后迅速赶上实时的数据处理就需要将间隔时间设小一些。
检查点存储
检查点具体的持久化存储位置取决于“检查点存储”的设置。默认情况下检查点存储在JobManager的堆内存中。而对于大状态的持久化保存Flink也提供了在其他存储位置进行保存的接口。 具体可以通过调用检查点配置的.setCheckpointStorage()来配置需要传入一个CheckpointStorage的实现类。Flink主要提供了两种CheckpointStorage作业管理器的堆内存和文件系统。
// 配置存储检查点到JobManager堆内存
env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());// 配置存储检查点到文件系统
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage(hdfs://namenode:40010/flink/checkpoints));对于实际生产应用我们一般会将CheckpointStorage配置为高可用的分布式文件系统HDFSS3等。
其它高级配置
检查点还有很多可以配置的选项可以通过获取检查点配置CheckpointConfig来进行设置。
CheckpointConfig checkpointConfig env.getCheckpointConfig();1常用高级配置
检查点模式CheckpointingMode 设置检查点一致性的保证级别有“精确一次”exactly-once和“至少一次”at-least-once两个选项。默认级别为exactly-once而对于大多数低延迟的流处理程序at-least-once就够用了而且处理效率会更高。超时时间checkpointTimeout 用于指定检查点保存的超时时间超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数表示超时时间。最小间隔时间minPauseBetweenCheckpoints 用于指定在上一个检查点完成之后检查点协调器最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点只要距离上一个检查点完成的间隔不够就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时实际并发为1。最大并发检查点数量maxConcurrentCheckpoints 用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。开启外部持久化存储enableExternalizedCheckpoints 用于开启检查点的外部持久化而且默认在作业失败的时候不会自动清理如果想释放空间需要自己手工清理。里面传入的参数ExternalizedCheckpointCleanup指定了当作业取消的时候外部的检查点该如何清理。 DELETE_ON_CANCELLATION在作业取消的时候会自动删除外部检查点但是如果是作业失败退出则会保留检查点。 RETAIN_ON_CANCELLATION作业取消的时候也会保留外部检查点。检查点连续失败次数tolerableCheckpointFailureNumber 用于指定检查点连续失败的次数当达到这个次数作业就失败退出。默认为0这意味着不能容忍检查点失败并且作业将在第一次报告检查点失败时失败。
2开启非对齐检查点
非对齐检查点enableUnalignedCheckpoints 不再执行检查点的分界线对齐操作启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式CheckpointingMode必须为exctly-once并且最大并发的检查点个数为1。对齐检查点超时时间alignedCheckpointTimeout 该参数只有在启用非对齐检查点的时候有效。参数默认是0表示一开始就直接用非对齐检查点。如果设置大于0一开始会使用对齐的检查点当对齐时间超过该参数设定的时间则会自动切换成非对齐检查点。 代码中具体设置如下
public class CheckpointConfigDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());env.setParallelism(1);// 代码中用到hdfs需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty(HADOOP_USER_NAME, jjm);// TODO 检查点配置// 1、启用检查点: 默认是barrier对齐的周期为5s, 精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig env.getCheckpointConfig();// 2、指定检查点的存储位置checkpointConfig.setCheckpointStorage(hdfs://hadoop102:8020/chk);// 3、checkpoint的超时时间: 默认10分钟checkpointConfig.setCheckpointTimeout(60000);// 4、同时运行中的checkpoint的最大数量checkpointConfig.setMaxConcurrentCheckpoints(1);// 5、最小等待间隔: 上一轮checkpoint结束 到 下一轮checkpoint开始 之间的间隔设置了0,并发就会变成1checkpointConfig.setMinPauseBetweenCheckpoints(1000);// 6、取消作业时checkpoint的数据 是否保留在外部系统// DELETE_ON_CANCELLATION:主动cancel时删除存在外部系统的chk-xx目录 如果是程序突然挂掉不会删// RETAIN_ON_CANCELLATION:主动cancel时外部系统的chk-xx目录会保存下来checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 7、允许 checkpoint 连续失败的次数默认0--》表示checkpoint一失败job就挂掉checkpointConfig.setTolerableCheckpointFailureNumber(10);// TODO 开启 非对齐检查点barrier非对齐// 开启的要求 Checkpoint模式必须是精准一次最大并发必须设为1checkpointConfig.enableUnalignedCheckpoints();// 开启非对齐检查点才生效 默认0表示一开始就直接用 非对齐的检查点// 如果大于0 一开始用 对齐的检查点barrier对齐 对齐的时间超过这个参数自动切换成 非对齐检查点barrier非对齐checkpointConfig.setAlignedCheckpointTimeout(Duration.ofSeconds(1));env.socketTextStream(hadoop102, 7777).flatMap((String value, CollectorTuple2String, Integer out) - {String[] words value.split( );for (String word : words) {out.collect(Tuple2.of(word, 1));}}).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(value - value.f0).sum(1).print();env.execute();}
}通用增量 checkpoint (changelog)
1.15 之前只有RocksDB 支持增量快照。不同于产生一个包含所有数据的全量备份增量快照中只包含自上一次快照完成之后被修改的记录因此可以显著减少快照完成的耗时。 Rocksdb状态后端启用增量checkpoint
EmbeddedRocksDBStateBackend backend new EmbeddedRocksDBStateBackend(true);从 1.15 开始不管hashmap还是rocksdb 状态后端都可以通过开启changelog实现通用的增量checkpoint。 1执行过程 1带状态的算子任务将状态更改写入变更日志记录状态 2状态物化状态表定期保存独立于检查点 3状态物化完成后状态变更日志就可以被截断到相应的点 2注意事项 1目前标记为实验性功能开启后可能会造成资源消耗增大
HDFS上保存的文件数变多消耗更多的IO带宽用于上传变更日志更多的CPU用于序列化状态更改TaskManager使用更多内存来缓存状态更改 2使用限制Checkpoint的最大并发必须为1从 Flink 1.15 开始只有文件系统的存储类型实现可用memory测试阶段不支持 NO_CLAIM 模式 3使用方式 1方式一配置文件指定
state.backend.changelog.enabled: true
state.backend.changelog.storage: filesystem
# 存储 changelog 数据
dstl.dfs.base-path: hdfs://hadoop102:8020/changelog
execution.checkpointing.max-concurrent-checkpoints: 1
execution.savepoint-restore-mode: CLAIM2方式二在代码中设置 需要引入依赖
dependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-changelog/artifactIdversion${flink.version}/versionscoperuntime/scope
/dependency开启changelog:
env.enableChangelogStateBackend(true);最终检查点
如果数据源是有界的就可能出现部分Task已经处理完所有数据变成finished状态不继续工作。从 Flink 1.14 开始这些finished状态的Task也可以继续执行检查点。自 1.15 起默认启用此功能并且可以通过功能标志禁用它
Configuration config new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, false);
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(config);保存点Savepoint
除了检查点外Flink还提供了另一个非常独特的镜像保存功能——保存点savepoint。 从名称就可以看出这也是一个存盘的备份它的原理和算法与检查点完全相同只是多了一些额外的元数据。
保存点的用途
保存点与检查点最大的区别就是触发的时机。检查点是由Flink自动管理的定期创建发生故障之后自动读取进行恢复这是一个“自动存盘”的功能而保存点不会自动创建必须由用户明确地手动触发保存操作所以就是“手动存盘”。 保存点可以当作一个强大的运维工具来使用。我们可以在需要的时候创建一个保存点然后停止应用做一些处理调整之后再从保存点重启。它适用的具体场景有
版本管理和归档存储更新Flink版本更新应用程序调整并行度暂停应用程序 需要注意的是保存点能够在程序更改的时候依然兼容前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子ID-状态名称这样的key-value组织起来的算子ID可以在代码中直接调用SingleOutputStreamOperator的.uid()方法来进行指定
DataStreamString stream env.addSource(new StatefulSource()).uid(source-id).map(new StatefulMapper()).uid(mapper-id).print();对于没有设置ID的算子Flink默认会自动进行设置所以在重新启动应用后可能会导致ID不同而无法兼容以前的状态。所以为了方便后续的维护强烈建议在程序中为每一个算子手动指定ID。
使用保存点
保存点的使用非常简单我们可以使用命令行工具来创建保存点也可以从保存点恢复作业。 1创建保存点 要在命令行中为运行的作业创建一个保存点镜像只需要执行
bin/flink savepoint :jobId [:targetDirectory]这里jobId需要填充要做镜像保存的作业ID目标路径targetDirectory可选表示保存点存储的路径。 对于保存点的默认路径可以通过配置文件flink-conf.yaml中的state.savepoints.dir项来设定
state.savepoints.dir: hdfs:///flink/savepoints当然对于单独的作业我们也可以在程序代码中通过执行环境来设置
env.setDefaultSavepointDir(hdfs:///flink/savepoints);由于创建保存点一般都是希望更改环境之后重启所以创建之后往往紧接着就是停掉作业的操作。除了对运行的作业创建保存点我们也可以在停掉一个作业时直接创建保存点
bin/flink stop --savepointPath [:targetDirectory] :jobId2从保存点重启应用 我们已经知道提交启动一个Flink作业使用的命令是flink run现在要从保存点重启一个应用其实本质是一样的
bin/flink run -s :savepointPath [:runArgs]这里只要增加一个-s参数指定保存点的路径就可以了其它启动时的参数还是完全一样的如果是基于yarn的运行模式还需要加上 -yid application-id。使用web UI进行作业提交时可以填入的参数除了入口类、并行度和运行参数还有一个“Savepoint Path”这就是从保存点启动应用的配置。
使用保存点切换状态后端
使用savepoint恢复状态的时候也可以更换状态后端。但是有一点需要注意的是不要在代码中指定状态后端了 通过配置文件来配置或者-D 参数配置。 打包时服务器上有的就provided可能遇到依赖问题报错javax.annotation.Nullable找不到可以导入如下依赖 dependencygroupIdcom.google.code.findbugs/groupIdartifactIdjsr305/artifactIdversion1.3.9/version/dependency1提交flink作业
bin/flink run-application -d -t yarn-application -Dstate.backendhashmap -c com.jjm.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar2停止flink作业时触发保存点
方式一stop优雅停止并触发保存点要求source实现StoppableFunction接口
bin/flink stop -p savepoint路径 job-id -yid application-id方式二cancel立即停止并触发保存点
bin/flink cancel -s savepoint路径 job-id -yid application-id案例中source是socket不能用stop
bin/flink cancel -s hdfs://hadoop102:8020/sp cffca338509ea04f38f03b4b77c8075c -yid application_1681871196375_00013从savepoint恢复作业同时修改状态后端
bin/flink run-application -d -t yarn-application -s hdfs://hadoop102:8020/sp/savepoint-267cc0-47a214b019d5 -Dstate.backendrocksdb -c com.atguigu.checkpoint.SavepointDemo FlinkTutorial-1.0-SNAPSHOT.jar 4从保存下来的checkpoint恢复作业
bin/flink run-application -d -t yarn-application -Dstate.backendrocksdb -s hdfs://hadoop102:8020/chk/532f87ef4146b2a2968a1c137d33d4a6/chk-175 -c com.atguigu.checkpoint.SavepointDemo ./FlinkTutorial-1.0-SNAPSHOT.jar如果停止作业时忘了触发保存点也不用担心现在版本的flink支持从保留在外部系统的checkpoint恢复作业但是恢复时不支持切换状态后端。
状态一致性
一致性的概念和级别
一致性其实就是结果的正确性一般从数据丢失、数据重复来评估。 流式计算本身就是一个一个来的所以正常处理的过程中结果肯定是正确的但在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确所以主要讨论的就是“状态的一致性”。 一般说来状态一致性有三种级别
最多一次At-Most-Once至少一次At-Least-Once精确一次Exactly-Once
端到端的状态一致性
在实际应用中一般要保证从用户的角度看来最终消费的数据是正确的。而用户或者外部应用不会直接从Flink内部的状态读取数据往往需要我们将处理结果写入外部存储中。这就要求我们不仅要考虑Flink内部数据的处理转换还涉及到从外部数据源读取以及写入外部持久化系统整个应用处理流程从头到尾都应该是正确的。 所以完整的流处理应用应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性就叫做“端到端end-to-end的状态一致性”它取决于三个组件中最弱的那一环。一般来说能否达到at-least-once一致性级别主要看数据源能够重放数据而能否达到exactly-once级别流处理器内部、数据源、外部存储都要有相应的保证机制。
端到端精确一次End-To-End Exactly-Once 输入端保证
输入端主要指的就是Flink读取的外部数据源。对于一些数据源来说并不提供数据的缓冲或是持久化保存数据被消费之后就彻底不存在了例如socket文本流。对于这样的数据源故障后我们即使通过检查点恢复之前的状态可保存检查点之后到发生故障期间的数据已经不能重发了这就会导致数据丢失。所以就只能保证at-most-once的一致性语义相当于没有保证。 想要在故障恢复后不丢数据外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存并且可以重设数据的读取位置。一个最经典的应用就是Kafka。在Flink的Source任务中将数据读取的偏移量保存为状态这样就可以在故障恢复时从检查点中读取出来对数据源重置偏移量重新获取数据。 数据源可重放数据或者说可重置读取数据偏移量加上Flink的Source算子将偏移量作为状态保存进检查点就可以保证数据不丢。这是达到at-least-once一致性语义的基本要求当然也是实现端到端exactly-once的基本要求。
输出端保证
为了实现端到端exactly-once我们还需要对外部存储系统、以及Sink连接器有额外的要求。能够保证exactly-once一致性的写入方式有两种
幂等写入事务写入 我们需要外部存储系统对这两种写入方式的支持而Flink也为提供了一些Sink连接器接口。接下来我们进行展开讲解。1幂等Idempotent写入 所谓“幂等”操作就是说一个操作可以重复执行很多次但只导致一次结果更改。也就是说后面再重复执行就不会对结果起作用了。 这相当于说我们并没有真正解决数据重复计算、写入的问题而是说重复写入也没关系结果不会改变。所以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入比如Redis中键值存储或者关系型数据库如MySQL中满足查询条件的更新操作。 需要注意对于幂等写入遇到故障进行恢复时有可能会出现短暂的不一致。因为保存点完成之后到发生故障之间的数据其实已经写入了一遍回滚的时候并不能消除它们。如果有一个外部应用读取写入的数据可能会看到奇怪的现象短时间内结果会突然“跳回”到之前的某个值然后“重播”一段之前的数据。不过当数据的重放逐渐超过发生故障的点的时候最终的结果还是一致的。2事务Transactional写入 如果说幂等写入对应用场景限制太多那么事务写入可以说是更一般化的保证一致性的方式。 输出端最大的问题就是写入到外部系统的数据难以撤回。而利用事务就可以实现对已写入数据的撤回。 事务是应用程序中一系列严密的操作所有操作必须成功完成否则在每个操作中所作的所有更改都会被撤消。事务有四个基本特性原子性、一致性、隔离性和持久性这就是著名的ACID。 在Flink流处理的结果写入外部系统时如果能够构建一个事务让写入操作可以随着检查点来提交和回滚那么自然就可以解决重复写入的问题了。所以事务写入的基本思想就是用一个事务来进行数据向外部系统的写入这个事务是与检查点绑定在一起的。当Sink任务遇到barrier时开始保存状态的同时就开启一个事务接下来所有数据的写入都在这个事务中待到当前检查点保存完毕时将事务提交所有写入的数据就真正可用了。如果中间过程出现故障状态会回退到上一个检查点而当前事务没有正常关闭因为当前检查点没有保存完所以也会回滚写入到外部的数据就被撤销了。 具体来说又有两种实现方式预写日志WAL和两阶段提交2PC 1预写日志write-ahead-logWAL 我们发现事务提交是需要外部存储系统支持事务的否则没有办法真正实现写入的回撤。那对于一般不支持事务的存储系统能够实现事务写入呢 预写日志WAL就是一种非常简单的方式。具体步骤是 ①先把结果数据作为日志log状态保存起来 ②进行检查点保存时也会将这些结果数据一并做持久化存储 ③在收到检查点完成的通知时将所有结果一次性写入外部系统。 ④在成功写入所有数据后在内部再次确认相应的检查点将确认信息也进行持久化保存。这才代表着检查点的真正完成。 我们会发现这种方式类似于检查点完成时做一个批处理一次性的写入会带来一些性能上的问题而优点就是比较简单由于数据提前在状态后端中做了缓存所以无论什么外部存储系统理论上都能用这种方式一批搞定。在Flink中DataStream API提供了一个模板类GenericWriteAheadSink用来实现这种事务型的写入方式。 需要注意的是预写日志这种一批写入的方式有可能会写入失败所以在执行写入动作之后必须等待发送成功的返回确认消息。在成功写入所有数据后在内部再次确认相应的检查点这才代表着检查点的真正完成。这里需要将确认信息也进行持久化保存在故障恢复时只有存在对应的确认信息才能保证这批数据已经写入可以恢复到对应的检查点位置。 但这种“再次确认”的方式也会有一些缺陷。如果我们的检查点已经成功保存、数据也成功地一批写入到了外部系统但是最终保存确认信息时出现了故障Flink最终还是会认为没有成功写入。于是发生故障时不会使用这个检查点而是需要回退到上一个这样就会导致这批数据的重复写入。 2两阶段提交two-phase-commit2PC 前面提到的各种实现exactly-once的方式多少都有点缺陷而更好的方法就是传说中的两阶段提交2PC。 顾名思义它的想法是分成两个阶段先做“预提交”等检查点完成之后再正式提交。这种提交方式是真正基于事务的它需要外部系统提供事务支持。 具体的实现步骤为 ①当第一条数据到来时或者收到检查点的分界线时Sink任务都会启动一个事务。 ②接下来接收到的所有数据都通过这个事务写入外部系统这时由于事务没有提交所以数据尽管写入了外部系统但是不可用是“预提交”的状态。 ③当Sink任务收到JobManager发来检查点完成的通知时正式提交事务写入的结果就真正可用了。 当中间发生故障时当前未提交的事务就会回滚于是所有写入外部系统的数据也就实现了撤回。这种两阶段提交2PC的方式充分利用了Flink现有的检查点机制分界线的到来就标志着开始一个新事务而收到来自JobManager的checkpoint成功的消息就是提交事务的指令。每个结果数据的写入依然是流式的不再有预写日志时批处理的性能问题最终提交时也只需要额外发送一个确认信息。所以2PC协议不仅真正意义上实现了exactly-once而且通过搭载Flink的检查点机制来实现事务只给系统增加了很少的开销。 Flink提供了TwoPhaseCommitSinkFunction接口方便我们自定义实现两阶段提交的SinkFunction的实现提供了真正端到端的exactly-once保证。新的Sink架构使用的是TwoPhaseCommittingSink接口。 不过两阶段提交虽然精巧却对外部系统有很高的要求。这里将2PC对外部系统的要求列举如下外部系统必须提供事务支持或者Sink任务必须能够模拟外部系统上的事务。在检查点的间隔期间里必须能够开启一个事务并接受数据写入。在收到检查点完成的通知之前事务必须是“等待提交”的状态。在故障恢复的情况下这可能需要一些时间。如果这个时候外部系统关闭事务例如超时了那么未提交的数据就会丢失。Sink任务必须能够在进程失败后恢复事务。提交事务必须是幂等操作。也就是说事务的重复提交应该是无效的。 可见2PC在实际应用同样会受到比较大的限制。具体在项目中的选型最终还应该是一致性级别和处理性能的权衡考量
Flink和Kafka连接时的精确一次保证
在流处理的应用中最佳的数据源当然就是可重置偏移量的消息队列了它不仅可以提供数据重放的功能而且天生就是以流的方式存储和处理数据的。所以作为大数据工具中消息队列的代表Kafka可以说与Flink是天作之合实际项目中也经常会看到以Kafka作为数据源和写入的外部系统的应用。 Flink写入Kafka两阶段提交 ①JobManager发送指令触发检查点的保存 所有Source 节点插入一个id1的barrier,触发source节点将偏移量保存到远程的持久化存储中 ②sink节点接收到Flink启动后的第一条数据负责开启Kafka的第一次事务预提交开始。同时会将事务的状态保存到状态里 ③预提交阶段到达sink的数据会调用kafka producer的send(),数据写入缓冲区再flush()。此时数据写到kafka中标记为”未提交”状态 如果任意一个sink节点预提交过程中出现失败整个预提交会放弃 ④id1的barrier到达sink节点触发barrier节点的本地状态保存到hdfs本地状态包含自身的状态和事务快照。同时开启一个新的Kafka事务用于该barrier后面数据的预提交如分区0的b分区1的b、c。只有第一个事务由第一条数据开启后面都是由barrier开启事务 ⑤全部节点做完本地checkpointjobmanager向所有节点发送一个本轮成功的回调消息预提交结束。 ⑥sink收到checkpoint完成的通知进行事务正式提交将写入kafka数据的标记修改成“已提交”如果发生故障回滚到上次成功完成快照的时间点 ⑦成功正式提交后kafka会返回一个消息给sink节点sink节点会将存在状态里的事务状态修改为finished状态 1整体介绍 既然是端到端的exactly-once我们依然可以从三个组件的角度来进行分析 1Flink内部 Flink内部可以通过检查点机制保证状态和处理结果的exactly-once语义。 2输入端 输入数据源端的Kafka可以对数据进行持久化保存并可以重置偏移量offset。所以我们可以在Source任务FlinkKafkaConsumer中将当前读取的偏移量保存为算子状态写入到检查点中当发生故障时从检查点中读取恢复状态并由连接器FlinkKafkaConsumer向Kafka重新提交偏移量就可以重新消费数据、保证结果的一致性了。 3输出端 输出端保证exactly-once的最佳实现当然就是两阶段提交2PC。作为与Flink天生一对的Kafka自然需要用最强有力的一致性保证来证明自己。 也就是说我们写入Kafka的过程实际上是一个两段式的提交处理完毕得到结果写入Kafka时是基于事务的“预提交”等到检查点保存完毕才会提交事务进行“正式提交”。如果中间出现故障事务进行回滚预提交就会被放弃恢复状态之后也只能恢复所有已经确认提交的操作。 2需要的配置 在具体应用中实现真正的端到端exactly-once还需要有一些额外的配置 1必须启用检查点 2指定KafkaSink的发送级别为DeliveryGuarantee.EXACTLY_ONCE 3配置Kafka读取数据的消费者的隔离级别 这里所说的Kafka是写入的外部系统。预提交阶段数据已经写入只是被标记为“未提交”uncommitted而Kafka中默认的隔离级别isolation.level是read_uncommitted也就是可以读取未提交的数据。这样一来外部应用就可以直接消费未提交的数据对于事务性的保证就失效了。所以应该将隔离级别配置 为read_committed表示消费者遇到未提交的消息时会停止从分区中消费数据直到消息被标记为已提交才会再次恢复消费。当然这样做的话外部应用消费数据就会有显著的延迟。 4事务超时配置 Flink的Kafka连接器中配置的事务超时时间transaction.timeout.ms默认是1小时而Kafka集群配置的事务最大超时时间transaction.max.timeout.ms默认是15分钟。所以在检查点保存时间很长时有可能出现Kafka已经认为事务超时了丢弃了预提交的数据而Sink任务认为还可以继续等待。如果接下来检查点保存成功发生故障后回滚到这个检查点的状态这部分数据就被真正丢掉了。所以这两个超时时间前者应该小于等于后者。
public class KafkaEOSDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 代码中用到hdfs需要导入hadoop依赖、指定访问hdfs的用户名System.setProperty(HADOOP_USER_NAME, jjm);// TODO 1、启用检查点,设置为精准一次env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig env.getCheckpointConfig();checkpointConfig.setCheckpointStorage(hdfs://hadoop102:8020/chk);checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// TODO 2.读取kafkaKafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setGroupId(jjm).setTopics(topic_1).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSourceString kafkasource env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource);/*** TODO 3.写出到Kafka* 精准一次 写入Kafka需要满足以下条件缺一不可* 1、开启checkpoint* 2、sink设置保证级别为 精准一次* 3、sink设置事务前缀* 4、sink设置事务超时时间 checkpoint间隔 事务超时时间 max的15分钟*/KafkaSinkString kafkaSink KafkaSink.Stringbuilder()// 指定 kafka 的地址和端口.setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092)// 指定序列化器指定Topic名称、具体的序列化.setRecordSerializer(KafkaRecordSerializationSchema.Stringbuilder().setTopic(ws).setValueSerializationSchema(new SimpleStringSchema()).build())// TODO 3.1 精准一次,开启 2pc.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)// TODO 3.2 精准一次必须设置 事务的前缀.setTransactionalIdPrefix(jjm-)// TODO 3.3 精准一次必须设置 事务超时时间: 大于checkpoint间隔小于 max 15分钟.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000).build();kafkasource.sinkTo(kafkaSink);env.execute();}
}后续读取“ws”这个topic的消费者要设置事务的隔离级别为“读已提交”如下
public class KafkaEOSDemo2 {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 消费 在前面使用两阶段提交写入的TopicKafkaSourceString kafkaSource KafkaSource.Stringbuilder().setBootstrapServers(hadoop102:9092,hadoop103:9092,hadoop104:9092).setGroupId(jjm).setTopics(ws).setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())// TODO 作为 下游的消费者要设置 事务的隔离级别 读已提交.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, read_committed).build();env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), kafkasource).print();env.execute();}
}