什么网站做电脑系统好,wordpress调用自定义菜单,微信建公众号怎么建,做网站还有用吗一、Flink 专栏
Flink 专栏系统介绍某一知识点#xff0c;并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分#xff0c;比如术语、架构、编程模型、编程指南、基本的datastream api用…一、Flink 专栏
Flink 专栏系统介绍某一知识点并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、Flik Table API和SQL基础系列 本部分介绍Flink Table Api和SQL的基本用法比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。 4、Flik Table API和SQL提高与应用系列 本部分是table api 和sql的应用部分和实际的生产应用联系更为密切以及有一定开发难度的内容。 5、Flink 监控系列 本部分和实际的运维、监控工作相关。
二、Flink 示例专栏
Flink 示例专栏是 Flink 专栏的辅助说明一般不会介绍知识点的信息更多的是提供一个一个可以具体使用的示例。本专栏不再分目录通过链接即可看出介绍的内容。
两专栏的所有文章入口点击Flink 系列文章汇总索引 文章目录 一、DataStream 和 Table集成-数据管道1、maven依赖2、Adding Table API Pipelines to DataStream API 示例 本文介绍了将table api管道加入datastream。
如果需要了解更多内容可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外没有其他依赖。
更多详细内容参考文章
21、Flink 的table API与DataStream API 集成完整版
一、DataStream 和 Table集成-数据管道
1、maven依赖
propertiesencodingUTF-8/encodingproject.build.sourceEncodingUTF-8/project.build.sourceEncodingmaven.compiler.source1.8/maven.compiler.sourcemaven.compiler.target1.8/maven.compiler.targetjava.version1.8/java.versionscala.version2.12/scala.versionflink.version1.17.0/flink.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-common/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-bridge/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-sql-gateway/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-csv/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-json/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.12/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-api-java-uber/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-jdbc/artifactIdversion3.1.0-1.17/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.38/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-hive_2.12/artifactIdversion1.17.0/version/dependencydependencygroupIdorg.apache.hive/groupIdartifactIdhive-exec/artifactIdversion3.1.2/version/dependency!-- flink连接器 --!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependency!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka --dependencygroupIdorg.apache.flink/groupIdartifactIdflink-sql-connector-kafka/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --dependencygroupIdorg.apache.commons/groupIdartifactIdcommons-compress/artifactIdversion1.24.0/version/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.18.2/version!-- scopeprovided/scope --/dependency/dependencies2、Adding Table API Pipelines to DataStream API 示例
单个Flink作业可以由多个相邻运行的断开连接的管道组成。
Table API中定义的Source-to-sink管道可以作为一个整体附加到StreamExecutionEnvironment并在调用DataStream API中的某个执行方法时提交。
源不一定是table source也可以是以前转换为Table API的另一个DataStream管道。因此可以将 table sinks用于DataStream API程序。
通过使用StreamTableEnvironment.createStatementSet()创建的专用StreamStatementSet实例可以使用该功能。通过使用语句集planner 可以一起优化所有添加的语句并在调用StreamStatement set.attachAsDataStream()时提供一个或多个添加到StreamExecutionEnvironment的端到端管道( end-to-end pipelines)。
下面的示例演示如何将表程序添加到一个作业中的DataStream API程序。 import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamStatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** author alanchan**/
public class TestTablePipelinesToDataStreamDemo {/*** param args* throws Exception */public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);StreamStatementSet statementSet tenv.createStatementSet();// 建立数据源TableDescriptor sourceDescriptor TableDescriptor.forConnector(datagen).option(number-of-rows, 3).schema(Schema.newBuilder().column(myCol, DataTypes.INT()).column(myOtherCol, DataTypes.BOOLEAN()).build()).build();// 建立sinkTableDescriptor sinkDescriptor TableDescriptor.forConnector(print).build();// add a pure Table API pipelineTable tableFromSource tenv.from(sourceDescriptor);statementSet.add(tableFromSource.insertInto(sinkDescriptor));// use table sinks for the DataStream API pipelineDataStreamInteger dataStream env.fromElements(1, 2, 3);Table tableFromStream tenv.fromDataStream(dataStream);statementSet.add(tableFromStream.insertInto(sinkDescriptor));// attach both pipelines to StreamExecutionEnvironment (the statement set will be cleared after calling this method)statementSet.attachAsDataStream();// define other DataStream API partsenv.fromElements(4, 5, 6).addSink(new DiscardingSink());// use DataStream API to submit the pipelinesenv.execute();// 1 I[287849559, true]
// I[1]
// I[2]
// I[3]
// 3 I[-1058230612, false]
// 2 I[-995481497, false]}}以上本文介绍了将table api管道加入datastream。