网站的维护及建设,深圳住房和建设局网站融悦居,阿里万网站建设,辽宁住房建设厅网站首页DataStream API #xff08;基础篇#xff09; 注#xff1a; 本文只涉及DataStream 原因#xff1a;随着大数据和流式计算需求的增长#xff0c;处理实时数据流变得越来越重要。因此#xff0c;DataStream由于其处理实时数据流的特性和能力#xff0c;逐渐替代了DataSe…DataStream API 基础篇 注 本文只涉及DataStream 原因随着大数据和流式计算需求的增长处理实时数据流变得越来越重要。因此DataStream由于其处理实时数据流的特性和能力逐渐替代了DataSet成为了主流的数据处理方式。 目录
DataStream API 基础篇
前摘
一、执行环境
1. 创建执行环境
2. 执行模式
3. 触发程序执行
二、源算子source
三、转换算子Transformation
四、输出算子sink 前摘
一个 Flink 程序其实就是对 DataStream 的各种转换。具体来说代码基本上都由以下几 部分构成如图所示
获取执行环境Execution Environment读取数据源Source定义基于数据的转换操作Transformations定义计算结果的输出位置Sink触发程序执行Execute
其中获取环境和触发执行都可以认为是针对执行环境的操作。所以本章我们就从执行 环境、数据源source、转换操作Transformation、输出Sink四大部分对常用的 DataStream API 做基本介绍。 一、执行环境
1. 创建执行环境
编写Flink程序的第一步就是创建执行环境。我 们 要 获 取 的 执 行 环 境 是 StreamExecutionEnvironment 类的对象这是所有 Flink 程序的基础在代码中创建执行环境的 方式就是调用这个类的静态方法具体有以下三种。
getExecutionEnvironment 最简单的方式就是直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文 直接得到正确的结果 //此处的 env 是 StreamExecutionEnvironment 对象
val env StreamExecutionEnvironment.getExecutionEnvironment createLocalEnvironment 这个方法返回一个本地执行环境。可以在调用时传入一个参数指定默认的并行度如果 不传入则默认并行度就是本地的 CPU 核心数。 //此处的 localEnvironment 是 StreamExecutionEnvironment 对象
val localEnvironment StreamExecutionEnvironment.createLocalEnvironment()createRemoteEnvironment 这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号并指定 要在集群中运行的 Jar 包。 //此处的 remoteEnv 是 StreamExecutionEnvironment 对象
val remoteEnv StreamExecutionEnvironment.createRemoteEnvironment(host, // JobManager 主机名1234, // JobManager 进程端口号path/to/jarFile.jar // 提交给 JobManager 的 JAR 包
)2. 执行模式 而从 1.12.0 版本起Flink 实现了 API 上的流批统一。DataStream API 新增了一个重要特 性可以支持不同的“执行模式”execution mode通过简单的设置就可以让一段 Flink 程序 在流处理和批处理之间切换。这样一来DataSet API 也就没有存在的必要了。 流执行模式STREAMING 这是 DataStream API 最经典的模式一般用于需要持续实时处理的无界数据流。默认情 况下程序使用的就是 STREAMING 执行模式。批执行模式BATCH 专门用于批处理的执行模式, 这种模式下Flink 处理作业的方式类似于 MapReduce 框架。 对于不会持续计算的有界数据我们用这种模式处理会更方便。自动模式AUTOMATIC 在这种模式下将由程序根据输入数据源是否有界来自动选择执行模式
由于 Flink 程序默认是 STREAMING 模式我们这里重点介绍一下 BATCH 模式的配置。 主要有两种方式
1通过命令行配置
bin/flink run -Dexecution.runtime-modeBATCH ...在提交作业时增加 execution.runtime-mode 参数指定值为 BATCH。
2通过代码配置
val env StreamExecutionEnvironment.getExecutionEnvironment
env.setRuntimeMode(RuntimeExecutionMode.BATCH)
3. 触发程序执行
我们需要显式地调用执行环境的 execute()方法来触发程序执行。execute()方法将一直等 待作业完成然后返回一个执行结果JobExecutionResult。
env.execute()二、源算子source
Source源算子基础篇二
三、转换算子Transformation 持续更新中 四、输出算子sink 持续更新中