网站备案号找回密码,网站后台无法上传本地图片,房子装修效果图,优化网站费用Flink 系列文章
Flink#xff08;一#xff09;1.12.7或1.13.5详细介绍及本地安装部署、验证 Flink#xff08;二#xff09;1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式#xff08;前两种及session和per-job#xff09;验证详细步骤 Flink…Flink 系列文章
Flink一1.12.7或1.13.5详细介绍及本地安装部署、验证 Flink二1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式前两种及session和per-job验证详细步骤 Flink三flink重要概念api分层、角色、执行流程、执行图和编程模型及dataset、datastream详细示例入门和提交任务至on yarn运行 Flink四介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍 Flink五source、transformations、sink的详细示例一 Flink五source、transformations、sink的详细示例二-source和transformation示例 Flink五source、transformations、sink的详细示例三-sink示例 Flink六Flink四大基石之Window详解与详细示例一 Flink六Flink四大基石之Window详解与详细示例二 Flink七Flink四大基石之Time和WaterMaker详解与详细示例watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现 Flink八Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例 Flink九Flink四大基石之Checkpoint容错机制详解及示例checkpoint配置、重启策略、手动恢复checkpoint和savepoint Flink十source、transformations、sink的详细示例二-source和transformation示例【补充示例】 Flink十一Flink配置flink-conf.yaml详细说明HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg Flink十二Flink source和sink 的 clickhouse 详细示例 Flink十三Flink 的table api与sql的基本概念、通用api介绍 Flink十四Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 Flink十五Flink 的table api与sql之流式概念-配置时间属性和如何处理更新结果 Flink十六Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式 Flink十七Flink 的table api与sql之Table API: Table API 支持的操作 Flink十八Flink 的table api与sql之SQL: SQL 支持的操作和语法 Flink十九Flink 的table api与sql之内置函数: Table API 和 SQL 中的内置函数 Flink二十Flink 的table api与sql之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上 Flink二十一Flink 的table api与sql之table api与sql使用示例 Flink二十二Flink 的table api与sql之创建表的DDL 文章目录 Flink 系列文章一、DDL概述二、执行 CREATE 语句1、java2、SQL Cli 三、CREATE TABLE语法1、Columns1、Physical / Regular Columns2、Metadata Columns3、Computed Columns 2、WATERMARK3、PRIMARY KEY4、PARTITIONED BY5、WITH Options6、LIKE7、AS select_statement 三、CREATE CATALOG四、CREATE DATABASE五、CREATE VIEW六、CREATE FUNCTION 本文介绍了Flink 的table api和sql中的DDL操作与示例。 本文比较简单仅仅是介绍Flink 的DDL。
一、DDL概述
CREATE 语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。 目前 Flink SQL 支持下列 CREATE 语句
CREATE TABLECREATE CATALOGCREATE DATABASECREATE VIEWCREATE FUNCTION
二、执行 CREATE 语句
可以使用 TableEnvironment 中的 executeSql() 方法执行 CREATE 语句。 若 CREATE 操作执行成功executeSql() 方法返回 ‘OK’否则会抛出异常。
1、java
以下的例子展示了如何在 TableEnvironment 中执行一个 CREATE 语句。
EnvironmentSettings settings EnvironmentSettings.newInstance()...
TableEnvironment tableEnv TableEnvironment.create(settings);// 对已注册的表进行 SQL 查询
// 注册名为 “Orders” 的表
tableEnv.executeSql(CREATE TABLE Orders (user BIGINT, product STRING, amount INT) WITH (...));
// 在表上执行 SQL 查询并把得到的结果作为一个新的表
Table result tableEnv.sqlQuery(SELECT product, amount FROM Orders WHERE product LIKE %Rubber%);// 对已注册的表进行 INSERT 操作
// 注册 TableSink
tableEnv.executeSql(CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...));
// 在表上执行 INSERT 语句并向 TableSink 发出结果
tableEnv.executeSql(INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE %Rubber%);2、SQL Cli
Flink SQL CREATE TABLE Orders (user BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.Flink SQL CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.Flink SQL INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE %Rubber%;
[INFO] Submitting SQL update statement to the cluster...三、CREATE TABLE语法
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name({ physical_column_definition | metadata_column_definition | computed_column_definition }[ , ...n][ watermark_definition ][ table_constraint ][ , ...n])[COMMENT table_comment][PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]WITH (key1val1, key2val2, ...)[ LIKE source_table [( like_options )] | AS select_query ]physical_column_definition:column_name column_type [ column_constraint ] [COMMENT column_comment]column_constraint:[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCEDtable_constraint:[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCEDmetadata_column_definition:column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]computed_column_definition:column_name AS computed_column_expression [COMMENT column_comment]watermark_definition:WATERMARK FOR rowtime_column_name AS watermark_strategy_expressionsource_table:[catalog_name.][db_name.]table_namelike_options:
{{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]根据指定的表名创建一个表如果同名表已经在 catalog 中存在了则无法注册。
1、Columns
1、Physical / Regular Columns
物理列是数据库中已知的常规列。它们定义物理数据中字段的名称、类型和顺序。因此物理列表示从外部系统读取和写入的有效负载。连接器和格式使用这些列按定义的顺序来配置自身。可以在物理列之间声明其他类型的列但不会影响最终的physical schema。
以下语句创建一个仅包含常规列的表
CREATE TABLE MyTable (user_id BIGINT,name STRING
) WITH (...
);2、Metadata Columns
元数据列是SQL标准的扩展允许访问连接器和/或格式化表的每一行的特定字段。元数据列由元数据关键字指示。例如元数据列可用于从 Kafka 记录读取和写入时间戳以便进行基于时间的操作。连接器和格式文档列出了每个组件的可用元数据字段。但是在table’s schema中声明元数据列是可选的。
以下语句创建一个表其中包含引用元数据字段时间戳的附加元数据列
CREATE TABLE MyTable (user_id BIGINT,name STRING,record_time TIMESTAMP_LTZ(3) METADATA FROM timestamp -- reads and writes a Kafka records timestamp
) WITH (connector kafka...
);每个元数据字段都由基于字符串的键标识并具有记录的数据类型。例如Kafka 连接器公开一个元数据字段其中包含可用于读取和写入记录的键时间戳和数据类型 TIMESTAMP_LTZ3。
在上面的示例中元数据列record_time成为table’s schema的一部分并且可以像常规列一样进行转换和存储
INSERT INTO MyTable SELECT user_id, name, record_time INTERVAL 1 SECOND FROM MyTable;为方便起见如果列名应用作标识元数据键则可以省略 FROM 子句
CREATE TABLE MyTable (user_id BIGINT,name STRING,timestamp TIMESTAMP_LTZ(3) METADATA -- use column name as metadata key
) WITH (connector kafka...
);为方便起见如果列的数据类型与元数据字段的数据类型不同运行时将执行显式强制转换。当然这要求两种数据类型兼容。
CREATE TABLE MyTable (user_id BIGINT,name STRING,timestamp BIGINT METADATA -- cast the timestamp as BIGINT
) WITH (connector kafka...
);默认情况下计划器假定元数据列可用于读取和写入。但是在许多情况下外部系统提供的只读元数据字段多于可写字段。因此可以使用 VIRTUAL 关键字从持久保留中排除元数据列。
CREATE TABLE MyTable (timestamp BIGINT METADATA, -- part of the query-to-sink schemaoffset BIGINT METADATA VIRTUAL, -- not part of the query-to-sink schemauser_id BIGINT,name STRING,
) WITH (connector kafka...
);在上面的示例中偏移量是只读元数据列 在query-to-sink schema中排除。因此source-to-query schema对于 SELECT和query-to-sink 对于INSERT INTO架构不同
source-to-query schema:
MyTable(timestamp BIGINT, offset BIGINT, user_id BIGINT, name STRING)query-to-sink schema:
MyTable(timestamp BIGINT, user_id BIGINT, name STRING)3、Computed Columns
计算列是使用 AS computed_column_expression语法column_name生成的虚拟列。
计算列计算可引用同一表中声明的其他列的表达式。可以访问物理列和元数据列。列本身不以物理方式存储在表中。列的数据类型是从给定表达式自动派生的不必手动声明。
计划器将在源之后将计算列转换为常规投影。对于优化或水印策略下推评估可能会分布在运算符之间、多次执行或在给定查询不需要时跳过。
例如计算列可以定义为
CREATE TABLE MyTable (user_id BIGINT,price DOUBLE,quantity DOUBLE,cost AS price * quanitity, -- evaluate expression and supply the result to queries
) WITH (connector kafka...
);表达式可以包含列、常量或函数的任意组合。表达式不能包含子查询。
计算列在 Flink 中通常用于定义 CREATE TABLE 语句中的时间属性。
处理时间属性可以通过 proc AS PROCTIME 使用系统的 PROCTIME 函数轻松定义。 可以在 WATERMARK 声明之前预处理事件时间属性时间戳。例如如果原始字段不是 TIMESTAMP3 类型或嵌套在 JSON 字符串中则可以使用计算列。
与虚拟元数据列类似计算列从持久化中排除。因此计算列不能是 INSERT INTO 语句的目标。因此source-to-query schema对于 SELECT和query-to-sink (for INSERT INTO) schema不同
source-to-query schema:
MyTable(user_id BIGINT, price DOUBLE, quantity DOUBLE, cost DOUBLE)query-to-sink schema:
MyTable(user_id BIGINT, price DOUBLE, quantity DOUBLE)2、WATERMARK
WATERMARK 定义了表的事件时间属性其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 。
rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)且是 schema 中的顶层列它也可以是一个计算列。
watermark_strategy_expression 定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark 表达式的返回类型必须是 TIMESTAMP(3)表示了从 Epoch 以来的经过的时间。 返回的 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出以保证 watermark 递增。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark 如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark 则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval 中所配置的间隔发出。 若 watermark 的间隔是 0ms 那么每条记录都会产生一个 watermark且 watermark 会在不为空并大于上一个发出的 watermark 时发出。
使用事件时间语义时表必须包含事件时间属性和 watermark 策略。
Flink 提供了三种常用的 watermark 策略。 严格递增时间戳 WATERMARK FOR rowtime_column AS rowtime_column。 发出到目前为止已观察到的最大时间戳的 watermark 时间戳大于最大时间戳的行被认为没有迟到。 递增时间戳 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。 发出到目前为止已观察到的最大时间戳减 1 的 watermark 时间戳大于或等于最大时间戳的行被认为没有迟到。 有界乱序时间戳 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。 发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark 例如 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND 是一个 5 秒延迟的 watermark 策略。
CREATE TABLE Orders (user BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL 5 SECOND
) WITH ( . . . );3、PRIMARY KEY
主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个些列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。
主键可以和列的定义一起声明也可以独立声明为表的限制属性不管是哪种方式主键都不可以重复定义否则 Flink 会报错。
SQL 标准主键限制可以有两种模式ENFORCED 或者 NOT ENFORCED。 它申明了是否输入/出数据会做合法性检查是否唯一。Flink 不存储数据因此只支持 NOT ENFORCED 模式即不做检查用户需要自己保证唯一性。
Flink 假设声明了主键的列都是不包含 Null 值的Connector 在处理数据时需要自己保证语义正确。 在 CREATE TABLE 语句中创建主键会修改列的 nullable 属性主键声明的列默认都是非 Nullable 的。 4、PARTITIONED BY
根据指定的列对已经创建的表进行分区。若表使用 filesystem sink 则将会为每个分区创建一个目录。
5、WITH Options
表属性用于创建 table source/sink 一般用于寻找和创建底层的连接器。
表达式 key1val1 的键和值必须为字符串文本常量。请参考 连接外部系统Flink十六Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式
表名可以为以下三种格式
catalog_name.db_name.table_name 使用catalog_name.db_name.table_name 的表将会与名为 “catalog_name” 的 catalog 和名为 “db_name” 的数据库一起注册到 metastore 中db_name.table_name 使用 db_name.table_name 的表将会被注册到当前执行的 table environment 中的 catalog 且数据库会被命名为 “db_name”table_name对于 table_name, 数据表将会被注册到当前正在运行的catalog和数据库中
使用 CREATE TABLE 语句注册的表均可用作 table source 和 table sink。 在被 DML 语句引用前我们无法决定其实际用于 source 抑或是 sink。6、LIKE
LIKE 子句来源于两种 SQL 特性的变体/组合Feature T171“表定义中的 LIKE 语法” 和 Feature T173“表定义中的 LIKE 语法扩展”。LIKE 子句可以基于现有表的定义去创建新表并且可以扩展或排除原始表中的某些部分。与 SQL 标准相反LIKE 子句必须在 CREATE 语句中定义并且是基于 CREATE 语句的更上层定义这是因为 LIKE 子句可以用于定义表的多个部分而不仅仅是 schema 部分。
可以使用该子句重用或改写指定的连接器配置属性或者可以向外部表添加 watermark 定义例如可以向 Apache Hive 中定义的表添加 watermark 定义。
示例如下
CREATE TABLE Orders (user BIGINT,product STRING,order_time TIMESTAMP(3)
) WITH ( connector kafka,scan.startup.mode earliest-offset
);CREATE TABLE Orders_with_watermark (-- 添加 watermark 定义WATERMARK FOR order_time AS order_time - INTERVAL 5 SECOND
) WITH (-- 改写 startup-mode 属性scan.startup.mode latest-offset
)
LIKE Orders;结果表 Orders_with_watermark 等效于使用以下语句创建的表
CREATE TABLE Orders_with_watermark (user BIGINT,product STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL 5 SECOND
) WITH (connector kafka,scan.startup.mode latest-offset
);表属性的合并逻辑可以用 like options 来控制。
可以控制合并的表属性如下
CONSTRAINTS - 主键和唯一键约束GENERATED - 计算列OPTIONS - 连接器信息、格式化方式等配置项PARTITIONS - 表分区信息WATERMARKS - watermark 定义
并且有三种不同的表属性合并策略
INCLUDING - 新表包含源表source table所有的表属性如果和源表的表属性重复则会直接失败例如新表和源表存在相同 key 的属性EXCLUDING - 新表不包含源表指定的任何表属性OVERWRITING - 新表包含源表的表属性但如果出现重复项则会用新表的表属性覆盖源表中的重复表属性例如两个表中都存在相同 key 的属性则会使用当前语句中定义的 key 的属性值
并且你可以使用 INCLUDING/EXCLUDING ALL 这种声明方式来指定使用怎样的合并策略例如使用 EXCLUDING ALL INCLUDING WATERMARKS那么代表只有源表的 WATERMARKS 属性才会被包含进新表。
示例如下
-- 存储在文件系统的源表
CREATE TABLE Orders_in_file (user BIGINT,product STRING,order_time_string STRING,order_time AS to_timestamp(order_time)
)
PARTITIONED BY (user)
WITH ( connector filesystem,path ...
);-- 对应存储在 kafka 的源表
CREATE TABLE Orders_in_kafka (-- 添加 watermark 定义WATERMARK FOR order_time AS order_time - INTERVAL 5 SECOND
) WITH (connector kafka,...
)
LIKE Orders_in_file (-- 排除需要生成 watermark 的计算列之外的所有内容。-- 去除不适用于 kafka 的所有分区和文件系统的相关属性。EXCLUDING ALLINCLUDING GENERATED
);如果未提供 like 配置项like options默认将使用 INCLUDING ALL OVERWRITING OPTIONS 的合并策略。 您无法选择物理列的合并策略当物理列进行合并时就如使用了 INCLUDING 策略。 源表 source_table 可以是一个组合 ID。您可以指定不同 catalog 或者 DB 的表作为源表: 例如my_catalog.my_db.MyTable 指定了源表 MyTable 来源于名为 MyCatalog 的 catalog 和名为 my_db 的 DB my_db.MyTable 指定了源表 MyTable 来源于当前 catalog 和名为 my_db 的 DB。 7、AS select_statement
表也可以通过一个 CTAS 语句中的查询结果来创建并填充数据CTAS 是一种简单、快捷的创建表并插入数据的方法。
CTAS 有两个部分SELECT 部分可以是 Flink SQL 支持的任何 SELECT 查询。 CREATE 部分从 SELECT 查询中获取列信息并创建目标表。 与 CREATE TABLE 类似CTAS 要求必须在目标表的 WITH 子句中指定必填的表属性。
CTAS 的建表操作需要依赖目标 Catalog。比如Hive Catalog 会自动在 Hive 中创建物理表。但是基于内存的 Catalog 只会将表的元信息注册在执行 SQL 的 Client 的内存中。
示例如下:
CREATE TABLE my_ctas_table
WITH (connector kafka,...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) 0;结果表 my_ctas_table 等效于使用以下语句创建表并写入数据:
CREATE TABLE my_ctas_table (id BIGINT,name STRING,age INT
) WITH (connector kafka,...
);INSERT INTO my_ctas_table SELECT id, name, age FROM source_table WHERE mod(id, 10) 0;注意 CTAS 有如下约束
暂不支持创建临时表。暂不支持指定列信息。暂不支持指定 Watermark。暂不支持创建分区表。暂不支持主键约束。 目前CTAS 创建的目标表是非原子性的如果在向表中插入数据时发生错误该表不会被自动删除。 三、CREATE CATALOG
CREATE CATALOG catalog_nameWITH (key1val1, key2val2, ...)使用给定的目录属性创建目录。如果已存在同名目录则会引发异常。
用于存储与此目录相关的额外信息的目录属性。表达式 key1val1 的键和值都应该是字符串文本。
关于Catalogs请参考Flink二十四Flink 的table api与sql之Catalogs
四、CREATE DATABASE
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name[COMMENT database_comment]WITH (key1val1, key2val2, ...)根据给定的表属性创建数据库。若数据库中已存在同名表会抛出异常。
1、IF NOT EXISTS
若数据库已经存在则不会进行任何操作。
2、WITH OPTIONS
数据库属性一般用于存储关于这个数据库额外的信息。 表达式 key1val1 中的键和值都需要是字符串文本常量。
五、CREATE VIEW
CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name[{columnName [, columnName ]* }] [COMMENT view_comment]AS query_expression根据给定的 query 语句创建一个视图。若数据库中已经存在同名视图会抛出异常.
1、TEMPORARY
创建一个有 catalog 和数据库命名空间的临时视图并覆盖原有的视图。
2、IF NOT EXISTS
若该视图已经存在则不会进行任何操作。
六、CREATE FUNCTION
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION[IF NOT EXISTS] [[catalog_name.]db_name.]function_nameAS identifier [LANGUAGE JAVA|SCALA|PYTHON][USING JAR path_to_filename.jar [, JAR path_to_filename.jar]* ]创建一个有 catalog 和数据库命名空间的 catalog function 需要指定一个 identifier 可指定 language tag 。 若 catalog 中已经有同名的函数注册了则无法注册。
如果 language tag 是 JAVA 或者 SCALA 则 identifier 是 UDF 实现类的全限定名。关于 JAVA/SCALA UDF 的实现请参考 Flink二十五Flink 的table api与sql之函数。
TEMPORARY
创建一个有 catalog 和数据库命名空间的临时 catalog function 并覆盖原有的 catalog function 。
TEMPORARY SYSTEM
创建一个没有数据库命名空间的临时系统 catalog function 并覆盖系统内置的函数。
IF NOT EXISTS
若该函数已经存在则不会进行任何操作。
LANGUAGE JAVA|SCALA|PYTHON
Language tag 用于指定 Flink runtime 如何执行这个函数。目前只支持 JAVA, SCALA 和 PYTHON且函数的默认语言为 JAVA。
USING
指定包含该函数的实现及其依赖的 jar 资源列表。该 jar 应该位于 Flink 当前支持的本地或远程文件系统 中比如 hdfs/s3/oss。
注意 目前只有 JAVA、SCALA 语言支持 USING 子句。
以上介绍了Flink 的table api和sql中的DDL操作与示例。