标准物质网站建设,wordpress手动添加媒体,wordpress多国语言版本,遂昌网站建设Flink SQL 基础概念#xff1a;数据类型 1.原子数据类型1.1 字符串类型1.2 二进制字符串类型1.3 精确数值类型1.4 有损精度数值类型1.5 布尔类型#xff1a;BOOLEAN1.6 日期、时间类型 2.复合数据类型3.用户自定义数据类型 Flink SQL 内置了很多常见的数据类型#xff0c;并… Flink SQL 基础概念数据类型 1.原子数据类型1.1 字符串类型1.2 二进制字符串类型1.3 精确数值类型1.4 有损精度数值类型1.5 布尔类型BOOLEAN1.6 日期、时间类型 2.复合数据类型3.用户自定义数据类型 Flink SQL 内置了很多常见的数据类型并且也为用户提供了自定义数据类型的能力。
总共包含 3 部分
原子 数据类型复合 数据类型用户自定义 数据类型
1.原子数据类型
1.1 字符串类型
CHAR、CHAR(n)定长字符串就和 Java 中的 Char 一样n 代表字符的定长取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定 n则默认为 1。VARCHAR、VARCHAR(n)、STRING可变长字符串就和 Java 中的 String 一样n 代表字符的最大长度取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定 n则默认为 1。STRING 等同于 VARCHAR(2147483647)。
1.2 二进制字符串类型
BINARY、BINARY(n)定长二进制字符串n 代表定长取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定 n则默认为 1。VARBINARY、VARBINARY(n)、BYTES可变长二进制字符串n 代表字符的最大长度取值范围 [ 1 , 2147483647 ] [1, 2147483647] [1,2147483647]。如果不指定 n则默认为 1。BYTES 等同于 VARBINARY(2147483647)。
1.3 精确数值类型
DECIMAL、DECIMAL(p)、DECIMAL(p, s)、DEC、DEC(p)、DEC(p, s)、NUMERIC、NUMERIC(p)、NUMERIC(p, s)固定长度和精度的数值类型就和 Java 中的 BigDecimal 一样p 代表 数值位数长度取值范围 [ 1 , 38 ] [1, 38] [1,38]s 代表 小数点后的位数精度取值范围 [ 0 , p ] [0, p] [0,p]。如果不指定p 默认为 10s 默认为 0。TINYINT − 128 -128 −128 到 127 127 127 的 1 字节大小的有符号整数就和 Java 中的 byte 一样。SMALLINT − 32768 -32768 −32768 到 32767 32767 32767 的 2 字节大小的有符号整数就和 Java 中的 short 一样。INT、INTEGER − 2147483648 -2147483648 −2147483648 到 2147483647 2147483647 2147483647 的 4 字节大小的有符号整数就和 Java 中的 int 一样。BIGINT − 9223372036854775808 -9223372036854775808 −9223372036854775808 到 9223372036854775807 9223372036854775807 9223372036854775807 的 8 字节大小的有符号整数就和 Java 中的 long 一样。
1.4 有损精度数值类型
FLOAT4 字节大小的单精度浮点数值就和 Java 中的 float 一样。DOUBLE、DOUBLE PRECISION8 字节大小的双精度浮点数值就和 Java 中的 double 一样。
关于 FLOAT 和 DOUBLE 的区别可见 https://www.runoob.com/w3cnote/float-and-double-different.html
1.5 布尔类型BOOLEAN
NULL 类型NULL。Raw 类型RAW(class, snapshot)。只会在数据发生网络传输时进行序列化反序列化操作可以保留其原始数据。以 Java 举例class 参数代表具体对应的 Java 类型snapshot 代表类型在发生网络传输时的序列化器。
1.6 日期、时间类型
DATE由 年-月-日 组成的 不带时区含义 的日期类型取值范围 [0000-01-01, 9999-12-31]TIME、TIME(p)由 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间数据类型精度高达纳秒取值范围 [00:00:00.000000000, 23:59:59.9999999]。其中 p 代表小数秒的位数取值范围 [ 0 , 9 ] [0, 9] [0,9]如果不指定 p默认为 0。TIMESTAMP、TIMESTAMP(p)、TIMESTAMP WITHOUT TIME ZONE、TIMESTAMP(p) WITHOUT TIME ZONE由 年-月-日 小时:分钟:秒[.小数秒] 组成的 不带时区含义 的时间类型取值范围 [0000-01-01 00:00:00.000000000, 9999-12-31 23:59:59.999999999]。其中 p 代表小数秒的位数取值范围 [ 0 , 9 ] [0, 9] [0,9]如果不指定 p默认为 6。TIMESTAMP WITH TIME ZONE、TIMESTAMP(p) WITH TIME ZONE由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型取值范围 [0000-01-01 00:00:00.000000000 14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数取值范围 [ 0 , 9 ] [0, 9] [0,9]如果不指定 p默认为 6。TIMESTAMP_LTZ、TIMESTAMP_LTZ(p)由 年-月-日 小时:分钟:秒[.小数秒] 时区 组成的 带时区含义 的时间类型取值范围 [0000-01-01 00:00:00.000000000 14:59, 9999-12-31 23:59:59.999999999 -14:59]。其中 p 代表小数秒的位数取值范围 [ 0 , 9 ] [0, 9] [0,9]如果不指定 p默认为 6。 TIMESTAMP_LTZ 与 TIMESTAMP WITH TIME ZONE 的区别在于TIMESTAMP WITH TIME ZONE 的时区信息是携带在数据中的举例其输入数据应该是 2022-01-01 00:00:00.000000000 08:00TIMESTAMP_LTZ 的时区信息不是携带在数据中的而是由 Flink SQL 任务的全局配置决定的我们可以由 table.local-time-zone 参数来设置时区。 INTERVAL YEAR TO MONTH、INTERVAL DAY TO SECONDINTERVAL 的涉及到的种类比较多。INTERVAL 主要是用于给 TIMESTAMP、TIMESTAMP_LTZ 添加偏移量的。举例比如给 TIMESTAMP 加、减几天、几个月、几年。INTERVAL 子句总共涉及到的语法种类如下 Flink SQL 案例所示。
CREATE TABLE sink_table (result_interval_year TIMESTAMP(3),result_interval_year_p TIMESTAMP(3),result_interval_year_p_to_month TIMESTAMP(3),result_interval_month TIMESTAMP(3),result_interval_day TIMESTAMP(3),result_interval_day_p1 TIMESTAMP(3),result_interval_day_p1_to_hour TIMESTAMP(3),result_interval_day_p1_to_minute TIMESTAMP(3),result_interval_day_p1_to_second_p2 TIMESTAMP(3),result_interval_hour TIMESTAMP(3),result_interval_hour_to_minute TIMESTAMP(3),result_interval_hour_to_second TIMESTAMP(3),result_interval_minute TIMESTAMP(3),result_interval_minute_to_second_p2 TIMESTAMP(3),result_interval_second TIMESTAMP(3),result_interval_second_p2 TIMESTAMP(3)
) WITH (connector print
);
INSERT INTO sink_table
SELECT-- Flink SQL 支持的所有 INTERVAL 子句如下总体可以分为 年-月、日-小时-秒 两种-- 1. 年-月。取值范围为 [-9999-11, 9999-11]-- 其中 p 是指有效位数取值范围 [1, 4]默认值为 2。比如如果值为 1000但是 p 2则会直接报错。-- INTERVAL YEARf1 INTERVAL 10 YEAR as result_interval_year,-- INTERVAL YEAR(p)f1 INTERVAL 100 YEAR(3) as result_interval_year_p,-- INTERVAL YEAR(p) TO MONTHf1 INTERVAL 10-03 YEAR(3) TO MONTH as result_interval_year_p_to_month,-- INTERVAL MONTHf1 INTERVAL 13 MONTH as result_interval_month,-- 2. 日-小时-秒。取值范围为 [-999999 23:59:59.999999999, 999999 23:59:59.999999999]-- 其中 p1/p2 都是有效位数p1 取值范围 [1, 6]默认值为 2p2 取值范围 [0, 9]默认值为 6-- INTERVAL DAYf1 INTERVAL 10 DAY as result_interval_day,-- INTERVAL DAY(p1)f1 INTERVAL 100 DAY(3) as result_interval_day_p1,-- INTERVAL DAY(p1) TO HOURf1 INTERVAL 10 03 DAY(3) TO HOUR as result_interval_day_p1_to_hour,-- INTERVAL DAY(p1) TO MINUTEf1 INTERVAL 10 03:12 DAY(3) TO MINUTE as result_interval_day_p1_to_minute,-- INTERVAL DAY(p1) TO SECOND(p2)f1 INTERVAL 10 00:00:00.004 DAY TO SECOND(3) as result_interval_day_p1_to_second_p2,-- INTERVAL HOURf1 INTERVAL 10 HOUR as result_interval_hour,-- INTERVAL HOUR TO MINUTEf1 INTERVAL 10:03 HOUR TO MINUTE as result_interval_hour_to_minute,-- INTERVAL HOUR TO SECOND(p2)f1 INTERVAL 00:00:00.004 HOUR TO SECOND(3) as result_interval_hour_to_second,-- INTERVAL MINUTEf1 INTERVAL 10 MINUTE as result_interval_minute,-- INTERVAL MINUTE TO SECOND(p2)f1 INTERVAL 05:05.006 MINUTE TO SECOND(3) as result_interval_minute_to_second_p2,-- INTERVAL SECONDf1 INTERVAL 3 SECOND as result_interval_second,-- INTERVAL SECOND(p2)f1 INTERVAL 300 SECOND(3) as result_interval_second_p2
FROM (SELECT TO_TIMESTAMP_LTZ(1640966476500, 3) as f1)2.复合数据类型
数组类型ARRAYt、t ARRAY。数组最大长度为 2147483647 2147483647 2147483647。t 代表数组内的数据类型。举例 ARRAYINT、ARRAYSTRING其等同于 INT ARRAY、STRING ARRAY。Map 类型MAPkt, vt。Map 类型就和 Java 中的 Map 类型一样key 是没有重复的。举例 MapSTRING, INT、MapBIGINT, STRING。集合类型MULTISETt、t MULTISET。就和 Java 中的 List 类型一样运行重复的数据。举例 MULTISETINT其等同于 INT MULTISET。对象类型ROWn0 t0, n1 t1, ...、ROWn0 t0 d0, n1 t1 d1, ...、ROW(n0 t0, n1 t1, ...)、ROW(n0 t0 d0, n1 t1 d1, ...)。其中n 是字段的唯一名称t 是字段的逻辑类型d 是字段的描述。就和 Java 中的自定义对象一样。举例ROW(myField INT, myOtherField BOOLEAN)其等同于 ROWmyField INT, myOtherField BOOLEAN
3.用户自定义数据类型
用户自定义类型就是运行用户使用 Java 等语言自定义一个数据类型出来。但是目前数据类型不支持使用 CREATE TABLE 的 DDL 进行定义只支持作为函数的输入输出参数。如下案例
第一步自定义数据类型
public class User {// 1. 基础类型Flink 可以通过反射类型信息自动把数据类型获取到// 关于 SQL 类型和 Java 类型之间的映射见https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/types/#data-type-extractionpublic int age;public String name;// 2. 复杂类型用户可以通过 DataTypeHint(DECIMAL(10, 2)) 注解标注此字段的数据类型public DataTypeHint(DECIMAL(10, 2)) BigDecimal totalBalance;
}第二步在 UDF 中使用此数据类型
public class UserScalarFunction extends ScalarFunction {// 1. 自定义数据类型作为输出参数public User eval(long i) {if (i 0 i 5) {User u new User();u.age (int) i;u.name name1;u.totalBalance new BigDecimal(1.1d);return u;} else {User u new User();u.age (int) i;u.name name2;u.totalBalance new BigDecimal(2.2d);return u;}}// 2. 自定义数据类型作为输入参数public String eval(User i) {if (i.age 0 i.age 5) {User u new User();u.age 1;u.name name1;u.totalBalance new BigDecimal(1.1d);return u.name;} else {User u new User();u.age 2;u.name name2;u.totalBalance new BigDecimal(2.2d);return u.name;}}
}第三步在 Flink SQL 中使用
-- 1. 创建 UDF
CREATE FUNCTION user_scalar_func AS flink.examples.sql._12_data_type._02_user_defined.UserScalarFunction;-- 2. 创建数据源表
CREATE TABLE source_table (user_id BIGINT NOT NULL COMMENT 用户 id
) WITH (connector datagen,rows-per-second 1,fields.user_id.min 1,fields.user_id.max 10
);-- 3. 创建数据汇表
CREATE TABLE sink_table (result_row_1 ROWage INT, name STRING, totalBalance DECIMAL(10, 2),result_row_2 STRING
) WITH (connector print
);-- 4. SQL 查询语句
INSERT INTO sink_table
select-- 4.a. 用户自定义类型作为输出user_scalar_func(user_id) as result_row_1,-- 4.b. 用户自定义类型作为输出及输入user_scalar_func(user_scalar_func(user_id)) as result_row_2
from source_table;-- 5. 查询结果
I[I[9, name2, 2.20], name2]
I[I[1, name1, 1.10], name1]
I[I[5, name1, 1.10], name1]参考《Data Types | Apache Flink》