专业外贸网站开发,辽宁省网站制作,汽车租赁网站建设内容,wordpress4.6 nodejs教程来自尚硅谷 目录 教程来自尚硅谷1. 概述1.1 简介1.2 核心特性1.3 文件布局1.3.1 LSM Trees 2. 集成Flink2.1 安装配置2.2 Catalog 3. 进阶使用3.1 写入性能3.1.1 并行度3.1.2 Compaction3.1.3 触发Compaction的Sorted Run数3.1.4 写入初始化3.1.5 内存 3.2 读取性能3.…教程来自尚硅谷 目录 教程来自尚硅谷1. 概述1.1 简介1.2 核心特性1.3 文件布局1.3.1 LSM Trees 2. 集成Flink2.1 安装配置2.2 Catalog 3. 进阶使用3.1 写入性能3.1.1 并行度3.1.2 Compaction3.1.3 触发Compaction的Sorted Run数3.1.4 写入初始化3.1.5 内存 3.2 读取性能3.2.1 Full Compaction3.2.2 主键表3.2.3 仅追加表3.2.4 格式 3.3 多Writer并发写入3.4 表管理3.4.1 管理快照3.4.2 管理分区3.4.3 管理小文件 1. 概述
1.1 简介
Flink 社区希望能够将 Flink 的 Streaming 实时计算能力和 Lakehouse 新架构优势进一步结合推出新一代的 Streaming Lakehouse 技术促进数据在数据湖上真正实时流动起来并为用户提供实时离线一体化的开发体验。Flink 社区内部孵化了 Flink Table Store (简称 FTS) 子项目一个真正面向 Streaming 以及 Realtime的数据湖存储项目。2023年3月12日FTS进入 Apache 软件基金会 (ASF) 的孵化器改名为 Apache Paimon (incubating)。 Apache Paimon是一个流数据湖平台具有高速数据摄取、变更日志跟踪和高效的实时分析的能力。
1.2 核心特性
1统一批处理和流处理 批量写入和读取、流式更新、变更日志生成全部支持。 2数据湖能力 低成本、高可靠性、可扩展的元数据。 Apache Paimon 具有作为数据湖存储的所有优势。 3各种合并引擎 按照您喜欢的方式更新记录。保留最后一条记录、进行部分更新或将记录聚合在一起由您决定。 4变更日志生成 Apache Paimon 可以从任何数据源生成正确且完整的变更日志从而简化您的流分析。 5丰富的表类型 除了主键表之外Apache Paimon还支持append-only表提供有序的流式读取来替代消息队列。 6模式演化 Apache Paimon 支持完整的模式演化。您可以重命名列并重新排序。
1.3 文件布局 Snapshot Files: 所有快照文件都存储在快照目录中。快照文件是一个 JSON 文件包含有关此快照的信息包括正在使用的Schema文件、包含此快照的所有更改的清单列表manifest listManifest Files: 所有清单列表manifest list和清单文件manifest file都存储在清单manifest目录中。清单列表manifest list是清单文件名manifest file的列表。清单文件manifest file是包含有关 LSM 数据文件和更改日志文件的文件信息。例如对应快照中创建了哪个LSM数据文件、删除了哪个文件。Data Files: 数据文件按分区和存储桶分组。每个存储桶目录都包含一个 LSM 树及其变更日志文件。
1.3.1 LSM Trees
Paimon 采用 LSM 树日志结构合并树作为文件存储的数据结构。 1) Sorted Runs LSM 树将文件组织成多个Sorted Run。Sorted Run由一个或多个数据文件组成并且每个数据文件恰好属于一个Sorted Run。 不同的Sorted Run可能具有重叠的主键范围甚至可能包含相同的主键。查询LSM树时必须合并所有Sorted Run并且必须根据用户指定的合并引擎和每条记录的时间戳来合并具有相同主键的所有记录。 写入LSM树的新记录将首先缓存在内存中。当内存缓冲区满时内存中的所有记录将被排序并刷新到磁盘。
2) Compaction 当越来越多的记录写入LSM树时Sorted Run的数量将会增加。由于查询LSM树需要将所有Sorted Run合并起来太多Sorted Run将导致查询性能较差甚至内存不足。
为了限制Sorted Run的数量我们必须偶尔将多个Sorted Run合并为一个大的Sorted Run。这个过程称为Compaction。
然而Compaction是一个资源密集型过程会消耗一定的CPU时间和磁盘IO因此过于频繁的Compaction可能会导致写入速度变慢。这是查询和写入性能之间的权衡。 Paimon 目前采用了类似于 Rocksdb 通用压缩的Compaction策略。
默认情况下当Paimon将记录追加到LSM树时它也会根据需要执行Compaction。用户还可以选择在“专用Compaction作业”中独立执行所有Compaction。
2. 集成Flink
2.1 安装配置
官网地址paimon-flink下载
1修改flink-conf.yaml配置
#解决中文乱码1.17之前参数是env.java.opts
env.java.opts.all: -Dfile.encodingUTF-8
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 3execution.checkpointing.interval: 10s
state.checkpoints.dir: hdfs://hadoop:9000/ckps2启动 Flink集群 1解决依赖问题
cp /opt/module/hadoop-3.3.4/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.4.jar /opt/module/flink-1.17.0/lib/2这里以 Yarn-Session模式为例
/opt/module/flink-1.17.0/bin/yarn-session.sh -d3启动Flink的sql-client
/opt/module/flink-1.17.0/bin/sql-client.sh -s yarn-session2.2 Catalog
Paimon Catalog可以持久化元数据当前支持两种类型的metastore
文件系统默认将元数据和表文件存储在文件系统中。hive在 hive metastore中存储元数据。用户可以直接从 Hive 访问表。
通常都是使用hive类型。
1上传 hive-connector 将flink-sql-connector-hive-3.1.3_2.12-1.17.0.jar上川到Flink的lib目录下 2重启yarn-session集群 3启动hive的metastore服务
nohup hive --service metastore 4创建Hive Catalog
CREATE CATALOG hive_catalog WITH (type paimon,metastore hive,uri thrift://hadoop:9083,hive-conf-dir /opt/module/hive/conf,warehouse hdfs://hadoop:9000/paimon/hive
);USE CATALOG hive_catalog;5注意事项 使用hive Catalog通过alter table更改不兼容的列类型时参见 HIVE-17832。需要配置 vim /opt/module/hive/conf/hive-site.xml;
propertynamehive.metastore.disallow.incompatible.col.type.changes/namevaluefalse/value
/property上述配置需要在hive-site.xml中配置且hive metastore服务需要重启。 如果使用的是 Hive3请禁用 Hive ACID
hive.strict.managed.tablesfalse
hive.create.as.insert.onlyfalse
metastore.create.as.acidfalse6Sql初始化文件 1创建初始化sql文件 vim conf/sql-client-init.sql
CREATE CATALOG hive_catalog WITH (type paimon,metastore hive,uri thrift://hadoop:9083,hive-conf-dir /opt/module/hive/conf,warehouse hdfs://hadoop:9000/paimon/hive
);USE CATALOG hive_catalog;SET sql-client.execution.result-mode tableau;2启动sql-client时指定该sql初始化文件
bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql3查看catalog
show catalogs;
show current catalog;3. 进阶使用
3.1 写入性能
Paimon的写入性能与检查点密切相关因此需要更大的写入吞吐量
增加检查点间隔或者仅使用批处理模式。增加写入缓冲区大小(write-buffer-size)启用写缓冲区溢出(write-buffer-spillable)如果您使用固定存储桶模式请重新调整存储桶数量。
3.1.1 并行度
建议sink的并行度小于等于bucket的数量最好相等。
3.1.2 Compaction
当Sorted Run数量较少时Paimon writer 将在单独的线程中异步执行压缩因此记录可以连续写入表中。然而为了避免Sorted Runs的无限增长当Sorted Run的数量达到阈值时writer将不得不暂停写入。下表属性确定阈值。
选项必需的默认类型描述num-sorted-run.stop-triggerNo(none)Integer触发停止写入的Sorted Runs次数默认值为 ‘num-sorted-run.compaction-trigger’ 1
当 num-sorted-run.stop-trigger 变大时写入停顿将变得不那么频繁从而提高写入性能。但是如果该值变得太大则查询表时将需要更多内存和 CPU 时间。如果您担心内存 OOM请配置sort-spill-threshold。它的值取决于你的内存大小。
如果希望某种模式具有最大写入吞吐量则可以缓慢而不是匆忙地进行Compaction。可以对表使用以下策略
num-sorted-run.stop-trigger 2147483647
sort-spill-threshold 103.1.3 触发Compaction的Sorted Run数
Paimon使用LSM树支持大量更新。 LSM 在多次Sorted Runs中组织文件。从 LSM 树查询记录时必须组合所有Sorted Runs以生成所有记录的完整视图。 过多的Sorted Run会导致查询性能不佳。为了将Sorted Run的数量保持在合理的范围内Paimon writers 将自动执行Compaction。下表属性确定触发Compaction的最小Sorted Run数。
选项必需的默认类型描述num-sorted-run.compaction-triggerNo5Integer触发Compaction的Sorted Run数。包括 0 级文件一个文件一级排序运行和高级运行一个一级排序运行
3.1.4 写入初始化
在write初始化时bucket的writer需要读取所有历史文件。如果这里出现瓶颈例如同时写入大量分区可以使用write-manifest-cache缓存读取的manifest数据以加速初始化。
3.1.5 内存
Paimon writer中主要占用内存的地方有3个
Writer的内存缓冲区由单个任务的所有Writer共享和抢占。该内存值可以通过 write-buffer-size 表属性进行调整。合并多个Sorted Run以进行Compaction时会消耗内存。可以通过 num-sorted-run.compaction-trigger 选项进行调整以更改要合并的Sorted Run的数量。如果行非常大在进行Compaction时一次读取太多行数据可能会消耗大量内存。减少 read.batch-size 选项可以减轻这种情况的影响。写入列式ORC、Parquet等文件所消耗的内存不可调。
3.2 读取性能
3.2.1 Full Compaction
配置“full-compaction.delta-commits”在Flink写入中定期执行full-compaction。并且可以确保在写入结束之前分区被完全Compaction。 注意Paimon 默认处理小文件并提供良好的读取性能。请不要在没有任何要求的情况下配置此Full Compaction选项因为它会对性能产生重大影响。
3.2.2 主键表
对于主键表来说这是一种“MergeOnRead”技术。读取数据时会合并多层LSM数据并行数会受到桶数的限制。虽然Paimon的merge会高效但是还是赶不上普通的AppendOnly表。 如果你想在某些场景下查询得足够快但只能找到较旧的数据你可以
配置full-compaction.delta-commits写入数据时目前只有Flink会定期进行full Compaction。配置“scan.mode”为“compacted-full”读取数据时选择full-compaction的快照。读取性能良好。
3.2.3 仅追加表
小文件会降低读取速度并影响 DFS 稳定性。默认情况下当单个存储桶中的小文件超过“compaction.max.file-num”默认50个时就会触发compaction。但是当有多个桶时就会产生很多小文件。 您可以使用full-compaction来减少小文件。full-compaction将消除大多数小文件。
3.2.4 格式
Paimon 对 parquet 读取进行了一些查询优化因此 parquet 会比 orc 稍快一些。
3.3 多Writer并发写入
Paimon的快照管理支持向多个writer写入。 默认情况下Paimon支持对不同分区的并发写入。推荐的方式是streaming job将记录写入Paimon的最新分区同时批处理作业覆盖将记录写入历史分区。
如果需要多个Writer写到同一个分区事情就会变得有点复杂。例如不想使用 UNION ALL那就需要有多个流作业来写入“partial-update”表。参考如下的“Dedicated Compaction Job”。
默认情况下Paimon writer 在写入记录时会根据需要执行Compaction。这对于大多数用例来说已经足够了但有两个缺点
这可能会导致写入吞吐量不稳定因为执行压缩时吞吐量可能会暂时下降。Compaction会将某些数据文件标记为“已删除”并未真正删除。如果多个writer标记同一个文件则在提交更改时会发生冲突。 Paimon 会自动解决冲突但这可能会导致作业重新启动。
为了避免这些缺点用户还可以选择在writer中跳过Compaction并仅运行专门的作业来进行Compaction。由于Compaction仅由专用作业执行因此writer可以连续写入记录而无需暂停并且不会发生冲突。
设置表属性 write-onlytrue如果设置为 true将跳过Compaction和快照过期。此选项与独立Compaction一起使用。
FLINK_HOME/bin/flink run \/path/to/paimon-flink-action-0.7-SNAPSHOT.jar \compact \--warehouse warehouse-path \--database database-name \ --table table-name \[--partition partition-name] \[--catalog-conf paimon-catalog-conf [--catalog-conf paimon-catalog-conf ...]] \如果提交一个批处理作业execution.runtime-mode:batch当前所有的表文件都会被Compaction。如果您提交一个流作业execution.runtime-mode: Streaming该作业将持续监视表的新更改并根据需要执行Compaction。
3.4 表管理
3.4.1 管理快照
Paimon Writer每次提交都会生成一个或两个快照。每个快照可能会添加一些新的数据文件或将一些旧的数据文件标记为已删除。然而标记的数据文件并没有真正被删除因为Paimon还支持时间旅行到更早的快照。它们仅在快照过期时被删除。 目前Paimon Writer在提交新更改时会自动执行过期操作。通过使旧快照过期可以删除不再使用的旧数据文件和元数据文件以释放磁盘空间。
选项必需的默认类型描述snapshot.time-retainedNo1 hDuration已完成快照的最长时间保留。snapshot.num-retained.minNo10Integer要保留的已完成快照的最小数量。snapshot.num-retained.maxNoInteger.MAX_VALUEInteger要保留的已完成快照的最大数量。
注意保留时间太短或保留数量太少可能会导致如下问题
批量查询找不到该文件。例如表比较大批量查询需要10分钟才能读取但是10分钟前的快照过期了此时批量查询会读取到已删除的快照。表文件上的流式读取作业没有外部日志系统无法重新启动。当作业重新启动时它记录的快照可能已过期。 此时可以使用Consumer Id来保护快照过期的小保留时间内的流式读取。
3.4.2 管理分区
创建分区表时可以设置partition.expiration-time。 Paimon会定期检查分区的状态并根据时间删除过期的分区。 判断分区是否过期将分区中提取的时间与当前时间进行比较看生存时间是否超过partition.expiration-time。比如
CREATE TABLE T (...) PARTITIONED BY (dt) WITH (partition.expiration-time 7 d,partition.expiration-check-interval 1 d,partition.timestamp-formatter yyyy-MM-dd
);3.4.3 管理小文件
小文件可能会导致
稳定性问题HDFS中小文件过多NameNode会承受过大的压力。成本问题HDFS中的小文件会暂时使用最小1个Block的大小例如128MB。查询效率小文件过多查询效率会受到影响。
1Flink Checkpoint的影响
使用Flink Writer每个checkpoint会生成 1-2 个快照并且checkpoint会强制在 DFS 上生成文件因此checkpoint间隔越小会生成越多的小文件。
默认情况下不仅checkpoint会导致文件生成writer的内存write-buffer-size耗尽也会将数据flush到DFS并生成相应的文件。可以启用 write-buffer-spillable 在 writer 中生成溢出文件从而在 DFS 中生成更大的文件。 所以可以设置如下
增大checkpoint间隔增加 write-buffer-size 或启用 write-buffer-spillable
2快照的影响
Paimon维护文件的多个版本文件的Compaction和删除是逻辑上的并没有真正删除文件。文件只有在 Snapshot 过期后才会被真正删除因此减少文件的第一个方法就是减少 Snapshot 过期的时间。 Flink writer 会自动使快照过期。
3分区和分桶的影响
表数据会被物理分片到不同的分区里面有不同的桶所以如果整体数据量太小单个桶中至少有一个文件建议你配置较少的桶数否则会出现也有很多小文件。
4主键表LSM的影响
LSM 树将文件组织成Sorted Runs的运行。Sorted Runs由一个或多个数据文件组成并且每个数据文件恰好属于一个Sorted Runs。
默认情况下Sorted Runs数取决于 num-sorted-run.compaction-trigger这意味着一个桶中至少有 5 个文件。如果要减少此数量可以保留更少的文件但写入性能可能会受到影响。
5仅追加表的文件的影响
默认情况下Append-Only 还会进行自动Compaction以减少小文件的数量
对于分桶的 Append-only 表为了排序会对bucket内的文件行Compaction可能会保留更多的小文件。
6Full-Compaction的影响 主键表是5个文件但是Append-Only表桶可能单个桶里有50个小文件这是很难接受的。更糟糕的是不再活动的分区还保留了如此多的小文件。
建议配置Full-Compaction在Flink写入时配置‘full-compaction.delta-commits’定期进行full-compaction。并且可以确保在写入结束之前分区被full-compaction。