建个网站在哪备案,做科研交流常用的网站,wordpress自动压缩图,厦门电商培训1.创建 Catalog 管理的 Tables 
在Paimon Catalog中创建的Tables由Catalog管理#xff0c;当Tables从Catalog中删除时#xff0c;其table files也将被删除。 
当使用Paimon Catalog#xff0c;创建一个名为MyTable的managed table#xff0c;在Catalog的default数据库中有五…1.创建 Catalog 管理的 Tables 
在Paimon Catalog中创建的Tables由Catalog管理当Tables从Catalog中删除时其table files也将被删除。 
当使用Paimon Catalog创建一个名为MyTable的managed table在Catalog的default数据库中有五列其中dt、hh和user_id是primary keys。 
Flink 引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);Spark3引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) TBLPROPERTIES (primary-key  dt,hh,user_id
);Hive引擎 
SET hive.metastore.warehouse.dirwarehouse_path;CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
)
STORED BY org.apache.paimon.hive.PaimonStorageHandler
TBLPROPERTIES (primary-key  dt,hh,user_id
);注意在删除表之前应停止在表上的插入Job否则无法完全删除表文件。 
I分区表 
创建一个名为MyTable的表其中dt和hh是分区列dt、hh和user_id是主键。 
Flink引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh);Spark3引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (primary-key  dt,hh,user_id
);Hive引擎 
SET hive.metastore.warehouse.dirwarehouse_path;CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING
) PARTITIONED BY ( dt STRING,hh STRING
)
STORED BY org.apache.paimon.hive.PaimonStorageHandler
TBLPROPERTIES (primary-key  dt,hh,user_id
);注意通过配置partition.expiration-time可以自动删除过期的分区。 
ii选择分区字段 
以下三种类型的字段可以定义为数仓中的分区字段 
Creation Time推荐创建时间通常是不可变的因此可以将其视为分区字段并将其添加到主键中。Event Time事件时间是原始表中的一个字段对于CDC数据例如从MySQL CDC同步的表或Paimon生成的Changelogs都是完整的CDC数据包括UPDATE_BEFORE Records即使声明了包含主键的分区字段也可以实现唯一性需要changelog-producerinput。CDC op_ts它不能被定义为分区字段无法知道以前的record timestamp。 
iii指定统计模式 
Paimon将自动收集数据文件的统计数据以加快查询过程。支持四种模式 
full收集完整的指标null_count, min, max。truncate(length)长度可以是任何正数默认模式是truncate(16)这意味着收集空数最小/最大值截断长度为16。主要是为了避免过大的列会放大清单文件。counts只收集空计数。none禁用元数据统计信息收集。 
统计收集器模式可以通过metadata.stats-mode配置默认为truncate(16)可以通过设置fields.{field_name}.stats-mode来配置字段级别。 
iiii字段默认值 
Paimon表目前支持为表属性中的字段设置默认值请注意无法指定分区字段和主键字段。 
Flink引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh)
with(fields.item_id.default-value0
);Spark3引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (primary-key  dt,hh,user_id,fields.item_id.default-value0
);Hive引擎 
SET hive.metastore.warehouse.dirwarehouse_path;CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
)
STORED BY org.apache.paimon.hive.PaimonStorageHandler
TBLPROPERTIES (primary-key  dt,hh,user_id,partitiondt,hh,fields.item_id.default-value0
);2.Create Table As 
表可以通过查询结果创建例如CREATE TABLE table_b AS SELECT id, name FORM table_a生成的表 table_b 将等同于创建表并插入带有以下语句的数据CREATE TABLE table_b (id INT, name STRING); INSERT INTO table_b SELECT id, name FROM table_a; 
当使用CREATE TABLE AS SELECT可以指定主键或分区。 
Flink 引擎 
/* For streaming mode, you need to enable the checkpoint. */CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT
);
CREATE TABLE MyTableAs AS SELECT * FROM MyTable;/* partitioned table */
CREATE TABLE MyTablePartition (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTablePartitionAs WITH (partition  dt) AS SELECT * FROM MyTablePartition;/* change options */
CREATE TABLE MyTableOptions (user_id BIGINT,item_id BIGINT
) WITH (file.format  orc);
CREATE TABLE MyTableOptionsAs WITH (file.format  parquet) AS SELECT * FROM MyTableOptions;/* primary key */
CREATE TABLE MyTablePk (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);
CREATE TABLE MyTablePkAs WITH (primary-key  dt,hh) AS SELECT * FROM MyTablePk;/* primary key  partition */
CREATE TABLE MyTableAll (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED 
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTableAllAs WITH (primary-key  dt,hh, partition  dt) AS SELECT * FROM MyTableAll;Spark3引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT
);
CREATE TABLE MyTableAs AS SELECT * FROM MyTable;/* partitioned table*/
CREATE TABLE MyTablePartition (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh);
CREATE TABLE MyTablePartitionAs PARTITIONED BY (dt) AS SELECT * FROM MyTablePartition;/* change TBLPROPERTIES */
CREATE TABLE MyTableOptions (user_id BIGINT,item_id BIGINT
) TBLPROPERTIES (file.format  orc);
CREATE TABLE MyTableOptionsAs TBLPROPERTIES (file.format  parquet) AS SELECT * FROM MyTableOptions;/* primary key */
CREATE TABLE MyTablePk (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) TBLPROPERTIES (primary-key  dt,hh,user_id
);
CREATE TABLE MyTablePkAs TBLPROPERTIES (primary-key  dt) AS SELECT * FROM MyTablePk;/* primary key  partition */
CREATE TABLE MyTableAll (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (primary-key  dt,hh,user_id
);
CREATE TABLE MyTableAllAs PARTITIONED BY (dt) TBLPROPERTIES (primary-key  dt,hh) AS SELECT * FROM MyTableAll;iCreate Table Like 
要创建与另一个表相同的schema、分区和表属性的表请使用CREATE TABLE LIKE。 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
);CREATE TABLE MyTableLike LIKE MyTable;-- Create Paimon Table like other connector table
CREATE TABLE MyTableLike WITH (connector  paimon) LIKE MyTable;iiTable Properties 
可以指定表属性来启用功能或提高Paimon的性能。 
以下SQL创建一个名为MyTable的表其五列由dt和hh分区其中dt、hh和user_id是主键此表有两个属性bucket  2和bucket-key  user_id。 
Flink引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) PARTITIONED BY (dt, hh) WITH (bucket  2,bucket-key  user_id
);Spark引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
) PARTITIONED BY (dt, hh) TBLPROPERTIES (primary-key  dt,hh,user_id,bucket  2,bucket-key  user_id
);Hive引擎 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING
)
STORED BY org.apache.paimon.hive.PaimonStorageHandler
TBLPROPERTIES (primary-key  dt,hh,user_id,partitiondt,hh,bucket  2,bucket-key  user_id
);3.Creating External Tables 
外部表被记录但不由catalog管理如果外部表被删除其表文件不会被删除。 
Paimon外部表可以在任何catalog中使用如果不想创建Paimon Catalog只想读/写Table可以考虑创建外部表。 
注意如果表已经存在options 不会像dynamic options一样更新到表的metadata中。 
Flink引擎-已弃用建议使用Paimon Catalog 
Flink SQL支持读取和写入外部表外部Paimon表是通过指定connector和path表属性创建的以下SQL创建了一个名为MyTable的外部表有五列其中表文件的基本路径是hdfs:///path/to/table。 
CREATE TABLE MyTable (user_id BIGINT,item_id BIGINT,behavior STRING,dt STRING,hh STRING,PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
) WITH (connector  paimon,path  hdfs:///path/to/table,auto-create  true -- this table property creates table files for an empty table if table path does not exist-- currently only supported by Flink
);注意Flink SQL必须声明所有字段。 
Spark引擎 
Spark3仅支持通过Scala API创建外部表以下Scala代码将位于hdfs:///path/to/table的表加载到DataSet。 
val dataset  spark.read.format(paimon).load(hdfs:///path/to/table)Hive引擎 
要访问现有的paimon表可以在Hive中将它们注册为外部表以下SQL创建了一个名为my_table的外部表其中表文件的基本路径是hdfs:///path/to/table由于schema存储在表文件中所以用户无需定义列。 
CREATE EXTERNAL TABLE my_table
STORED BY org.apache.paimon.hive.PaimonStorageHandler
LOCATION hdfs:///path/to/table;4.Creating Temporary Tables 
临时表仅支持Flink引擎与外部表一样临时表只是记录但不由当前的Flink SQL session管理。 
如果临时表被删除其resources将不会被删除当Flink SQL session关闭时临时表会被丢弃。 
如果想将Paimon Catalog与其它表一起使用但不想将它们存储在其它Catalog中可以创建一个临时表。 
以下Flink SQL创建了一个Paimon Catalog和一个临时表 
CREATE CATALOG my_catalog WITH (type  paimon,warehouse  hdfs:///path/to/warehouse
);USE CATALOG my_catalog;-- Assume that there is already a table named my_table in my_catalogCREATE TEMPORARY TABLE temp_table (k INT,v STRING
) WITH (connector  filesystem,path  hdfs:///path/to/temp_table.csv,format  csv
);SELECT my_table.k, my_table.v, temp_table.v FROM my_table JOIN temp_table ON my_table.k  temp_table.k;