当前位置: 首页 > news >正文

加盟网站开发费用中高风险区域最新名单

加盟网站开发费用,中高风险区域最新名单,专业网站制,域名是干嘛的简介#xff1a; 本文由同城艺龙大数据开发工程师张军分享#xff0c;主要介绍同城艺龙 Flink 集成 Iceberg 的生产实践。 本文由同城艺龙大数据开发工程师张军分享#xff0c;主要介绍同城艺龙 Flink 集成 Iiceberg 的生产实践。内容包括#xff1a; 背景及痛点Flink Ice…简介 本文由同城艺龙大数据开发工程师张军分享主要介绍同城艺龙 Flink 集成 Iceberg 的生产实践。 本文由同城艺龙大数据开发工程师张军分享主要介绍同城艺龙 Flink 集成 Iiceberg 的生产实践。内容包括 背景及痛点Flink Iceberg 的落地Iceberg 优化实践后续工作收益及总结一、背景及痛点 业务背景 同程艺龙是一个提供机票、住宿、交通等服务的在线旅游服务平台目前我所在的部门属于公司的研发部门主要职责是为公司内其他业务部门提供一些基础服务我们的大数据系统主要承接的业务是部门内的一些大数据相关的数据统计、分析工作等。数据来源有网关日志数据、服务器监控数据、K8s 容器的相关日志数据App 的打点日志, MySQL 的 binlog 日志等。我们主要的大数据任务是基于上述日志构建实时报表提供基于 Presto 的报表展示和即时查询服务同时也会基于 Flink 开发一些实时、批处理任务为业务方提供准确及时的数据支撑。 原架构方案 由于我们所有的原始数据都是存储在 Kafka 的所以原来的技术架构就是首先是 Flink 任务消费 Kafka 的数据经过 Flink SQL 或者 Flink jar 的各种处理之后实时写入 Hive其中绝大部分任务都是 Flink SQL 任务因为我认为 SQL 开发相对代码要简单的多并且维护方便、好理解所以能用 SQL 写的都尽量用 SQL 来写。 提交 Flink 的平台使用的是 Zeppelin其中提交 Flink SQL 任务是 Zeppelin 自带的功能提交 jar 包任务是我自己基于 Application 模式开发的 Zeppelin 插件。 对于落地到 Hive 的数据使用开源的报表系统 metabase (底层使用 Presto) 提供实时报表展示、定时发送邮件报表以及自定义 SQL 查询服务。由于业务对数据的实时性要求比较高希望数据能尽快的展示出来所以我们很多的 Flink 流式任务的 checkpoint 设置为 1 分钟数据格式采用的是 orc 格式。 痛点 由于采用的是列式存储格式 ORC无法像行式存储格式那样进行追加操作所以不可避免的产生了一个大数据领域非常常见且非常棘手的问题即 HDFS 小文件问题。 开始的时候我们的小文件解决方案是自己写的一个小文件压缩工具定期去合并我们的 Hive 分区一般都是天级别的所以这个工具的原理就是每天凌晨启动一个定时任务去压缩昨天的数据首先把昨天的数据写入一个临时文件夹压缩完和原来的数据进行记录数的比对检验数据条数一致之后用压缩后的数据覆盖原来的数据但是由于无法保证事务所以出现了很多问题 压缩的同时由于延迟数据的到来导致昨天的 Hive 分区又有数据写入了检验就会失败导致合并小文件失败。替换旧数据的操作是没有事务保证的如果替换的过程中旧分区有新的数据写入就会覆盖新写入的数据造成数据丢失。没有事务的支持无法实时合并当前分区的数据只能合并压缩前一个分区的最新的分区数据仍然有小文件的问题导致最新数据查询性能提高不了。 二、FlinkIceberg 的落地 Iceberg 技术调研 所以基于以上的 HDFS 小文件、查询慢等问题结合我们的现状我调研了目前市面上的数据湖技术Delta、Apache Iceberg 和 Apache Hudi考虑了目前数据湖框架支持的功能和以后的社区规划最终我们是选择了 Iceberg其中考虑的原因有以下几方面 ■ Iceberg 深度集成 Flink 前面讲到我们的绝大部分任务都是 Flink 任务包括批处理任务和流处理任务目前这三个数据湖框架Iceberg 是集成 Flink 做的最完善的如果采用 Iceberg 替代 Hive 之后迁移的成本非常小对用户几乎是无感知的 比如我们原来的 SQL 是这样的 INSERT INTO hive_catalog.db.hive_table SELECT * FROM kafka_table 迁移到 Iceberg 以后只需要修改 catalog 就行。 INSERT INTO iceberg_catalog.db.iIcebergceberg_table SELECT * FROM kafka_table Presto 查询也是和这个类似只需要修改 catalog 就行了。 ■Iceberg 的设计架构使得查询更快 在 Iceberg 的设计架构中manifest 文件存储了分区相关信息、data files 的相关统计信息max/min等去查询一些大的分区的数据就可以直接定位到所要的数据而不是像 Hive 一样去 list 整个 HDFS 文件夹时间复杂度从 O(n) 降到了 O(1)使得一些大的查询速度有了明显的提升在 Iceberg PMC Chair Ryan Blue 的演讲中我们看到命中 filter 的任务执行时间从 61.5 小时降到了 22 分钟。 ■使用 Flink SQL 将 CDC 数据写入 Iceberg Flink CDC 提供了直接读取 MySQL binlog 的方式相对以前需要使用 canal 读取 binlog 写入 Iceberg然后再去消费 Iceberg 数据。少了两个组件的维护链路减少了节省了维护的成本和出错的概率。并且可以实现导入全量数据和增量数据的完美对接所以使用 Flink SQL 将 MySQL binlog 数据导入 Iceberg 来做 MySQL-Iceberg 的导入将会是一件非常有意义的事情。 此外对于我们最初的压缩小文件的需求虽然 Iceberg 目前还无法实现自动压缩但是它提供了一个批处理任务已经能满足我们的需求。 ■Hive 表迁移 Iceberg 表 迁移准备工作 目前我们的所有数据都是存储在 Hive 表的在验证完 Iceberg 之后我们决定将 Hive 的数据迁移到 Iceberg所以我写了一个工具可以使用 Hive 的数据然后新建一个 Iceberg 表为其建立相应的元数据但是测试的时候发现如果采用这种方式需要把写入 Hive 的程序停止因为如果 Iceberg 和 Hive 使用同一个数据文件而压缩程序会不断地压缩 Iceberg 表的小文件压缩完之后不会马上删除旧数据所以 Hive 表就会查到双份的数据故我们采用双写的策略原来写入 Hive 的程序不动新启动一套程序写入 Iceberg这样能对 Iceberg 表观察一段时间。还能和原来 Hive 中的数据进行比对来验证程序的正确性。 经过一段时间观察每天将近几十亿条数据、压缩后几个 T 大小的 Hive 表和 Iceberg 表一条数据也不差。所以在最终对比数据没有问题之后把 Hive 表停止写入使用新的 Iceberg 表。 迁移工具 我将这个 Hive 表迁移 Iceberg 表的工具做成了一个基于 Flink batch job 的 Iceberg Action提交了社区不过目前还没合并https://github.com/apache/iceberg/pull/2217。这个功能的思路是使用 Hive 原始的数据不动然后新建一个 Iceberg table再为这个新的 Iceberg table 生成对应的元数据大家有需要的话可以先看看。 此外Iceberg 社区还有一个把现有的数据迁移到已存在的 Iceberg table 的工具类似 Hive 的 LOAD DATA INPATH ... INTO TABLE 是用 Spark 的存储过程做的大家也可以关注下https://github.com/apache/iceberg/pull/2210 三、Iceberg 优化实践 压缩小文件 目前压缩小文件是采用的一个额外批任务来进行的Iceberg 提供了一个 Spark 版本的 action我在做功能测试的时候发现了一些问题此外我对 Spark 也不是非常熟悉担心出了问题不好排查所以参照 Spark 版本的自己实现了一个 Flink 版本并修复了一些 bug进行了一些功能的优化。 由于我们的 Iceberg 的元数据都是存储在 Hive 中的也就是我们使用了 HiveCatalog所以压缩程序的逻辑是把 Hive 中所有的 Iceberg 表全部都查出来依次压缩。压缩没有过滤条件不管是分区表还是非分区表都进行全表的压缩这样做是为了处理某些使用 eventtime 的 Flink 任务。如果有延迟的数据的到来就会把数据写入以前的分区如果不是全表压缩只压缩当天分区的话新写入的其他天的数据就不会被压缩。 之所以没有开启定时任务来压缩是因为比如定时五分钟压缩一个表如果五分钟之内这个压缩任务没完成没有提交新的 snapshot下一个定时任务又开启了就会把上一个没有完成的压缩任务中的数据重新压缩一次所以每个表依次压缩的策略可以保证某一时刻一个表只有一个任务在压缩。 代码示例参考 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles() //.maxParallelism(parallelism) //.filter(Expressions.equal(day, day)) //.targetSizeInBytes(targetSizeInBytes) .execute(); 目前系统运行稳定已经完成了几万次任务的压缩。 注意 不过目前对于新发布的 Iceberg 0.11 来说还有一个已知的 bug即当压缩前的文件大小大于要压缩的大小targetSizeInBytes时会造成数据丢失其实这个问题我在最开始测试小文件压缩的时候就发现了并且提了一个 pr我的策略是大于目标文件的数据文件不参与压缩不过这个 pr 没有合并到 0.11 版本中后来社区另外一个兄弟也发现了相同的问题提交了一个 pr https://github.com/apache/iceberg/pull/2196  策略是将这个大文件拆分到目标文件大小目前已经合并到 master会在下一个 bug fix 版本 0.11.1 中发布。 查询优化 ■ 批处理定时任务 目前对于定时调度中的批处理任务Flink 的 SQL 客户端还没 Hive 那样做的很完善比如执行 hive-f 来执行一个文件。而且不同的任务需要不同的资源并行度等。 所以我自己封装了一个 Flink 程序通过调用这个程序来进行处理读取一个指定文件里面的 SQL来提交批任务。在命令行控制任务的资源和并行度等。 /home/flink/bin/fFlinklinklink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql ■ 优化 批任务的查询这块我做了一些优化工作比如 limit 下推filter 下推查询并行度推断等可以大大提高查询的速度这些优化都已经推回给社区并且在 Iceberg 0.11 版本中发布。 运维管理 ■ 清理 orphan 文件 定时任务删除 在使用 Iceberg 的过程中有时候会有这样的情况我提交了一个 Flink 任务由于各种原因把它停了这个时候 Iceberg 还没提交相应的快照。此外由于一些异常导致程序失败会产生一些不在 Iceberg 元数据里面的孤立的数据文件这些文件对 Iceberg 来说是不可达的也是没用的。所以我们需要像 jvm 的垃圾回收一样来清理这些文件。 目前 Iceberg 提供了一个 Spark 版本的 action 来处理这些没用的文件我们采取的策略和压缩小文件一样获取 Hive 中的所有的 Iceberg 表。每隔一个小时执行一次定时任务来删除这些没用的文件。 SparkSession spark ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute(); 踩坑 我们在程序运行过程中出现了正常的数据文件被删除的问题经过调研由于快照保留设置是一小时这个清理程序清理时间也是设置一个小时通过日志发现是这个清理程序删除了正常的数据。查了查代码应该是设置了一样的时间在清理孤立文件的时候有其他程序正在读取要 expired 的 snapshot导致删除了正常的数据。最后把这个清理程序的清理时间改成默认的三天没有再出现删除数据文件的问题。 当然为了保险起见我们可以覆盖原来的删除文件的方法改成将文件到一个备份文件夹检查没有问题之后手工删除。 ■ 快照过期处理 我们的快照过期策略是和压缩小文件的批处理任务写在一起的压缩完小文件之后进行表的快照过期处理目前保留的时间是一个小时。这是因为对于有一些比较大的表分区比较多而且 checkpoint 比较短如果保留的快照过长的话还是会保留过多小文件我们暂时没有查询历史快照的需求所以我将快照的保留时间设置了一个小时。 long olderThanTimestamp System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);table.expireSnapshots()// .retainLast(20).expireOlderThan(olderThanTimestamp).commit(); ■ 数据管理 写入了数据之后当想查看相应的快照有多少数据文件时直接查询 Spark 无法知道哪个是有用的哪个是没用的。所以需要有对应的管理工具。目前 Flink 这块还不太成熟我们可以使用 Spark3 提供的工具来查看。 DDL 目前 create table 这些操作我们是通过 Flink SQL Client 来做的。其他相关的 DDL 的操作可以使用 Spark 来做https://iceberg.apache.org/spark/#ddl-commands DML 一些相关的数据的操作比如删除数据等可以通过 MySQL 来实现Presto 目前只支持分区级别的删除功能。 show partitions show create table 在我们操作 Hive 的时候有一些很常用的操作比如 show partitions、 show create table 等这些目前 Flink 还没有支持所以在操作 Iceberg 的时候就很不方便我们自己基于 Flink 1.12 做 了修改不过目前还没有完全提交到社区后续有时间会提交到 Flink 和 Iceberg 社区。 四、后续工作 Flink SQL 接入 CDC 数据到 Iceberg 目前在我们内部的版本中我已经测试通过可以使用 Flink SQL 将 CDC 数据比如 MySQL binlog写入 Iceberg社区的版本中实现该功能还需要做一些工作我也提交了一些相关的 PR 来推进这个工作。 使用 SQL 进行删除和更新 对于 copy-on-write 表我们可以使用 Spark SQL 来进行行级的删除和更新。具体的支持的语法可以参考源码中的测试类 org.apache.iceberg.spark.extensions.TestDelete org.apache.iceberg.spark.extensions.TestUpdate这些功能我在测试环境测试是可以的但是还没有来得及更新到生产。 使用 Flink SQL 进行 streaming read 在工作中会有一些这样的场景由于数据比较大Iceberg 的数据只存了较短的时间如果很不幸因为程序写错了等原因想从更早的时间来消费就无能为力了。 当引入了 Iceberg 的 streaming read 之后这些问题就可以解决了因为 Iceberg 存储了所有的数据当然这里有一个前提就是对于数据没有要求特别精确比如达到秒级别因为目前 Flink 写入 Iceberg 的事务提交是基于 Flink Checkpoint 间隔的。 五、收益及总结 经过对 Iceberg 大概一个季度的调研测试优化和 bug 修复我们将现有的 Hive 表都迁移到了 Iceberg完美解决了原来的所有的痛点问题目前系统稳定运行而且相对 Hive 得到了很多的收益 Flink 写入的资源减少 举一个例子默认配置下原来一个 flink 读取 kafka 写入 hive 的任务需要60个并行度才不会让 Kafka 产生积压。改成写入 iceberg 之后只需要20个并行度就够了。 查询速度变快 前面我们讲到 Iceberg 查询的时候不会像 Hive 一样去 list 整个文件夹来获取分区数据而是先从 manifest 文件中获取相关数据查询的性能得到了显著的提升一些大的报表的查询速度从 50 秒提高到 30 秒。 并发读写 由于 Iceberg 的事务支持我们可以实现对一个表进行并发读写Flink 流式数据实时入湖压缩程序同时压缩小文件清理过期文件和快照的程序同时清理无用的文件这样就能更及时的提供数据做到分钟级的延迟查询最新分区数据的速度大大加快了并且由于 Iceberg 的 ACID 特性可以保证数据的准确性。 time travel 可以回溯查询以前某一时刻的数据。 总结一下我们目前可以实现使用 Flink SQL 对 Iceberg 进行批、流的读写并可以对小文件进行实时的压缩使用 Spark SQL 做一些 delete 和 update 工作以及一些 DDL 操作后续可以使用 Flink SQL 将 CDC 的数据写入 Iceberg。目前对 Iceberg 的所有的优化和 bug fix我已经贡献给社区。由于笔者水平有限有时候也难免有错误还请大家不吝赐教。 作者介绍 张军同程艺龙大数据开发工程师 原文链接 本文为阿里云原创内容未经允许不得转载。
http://www.pierceye.com/news/540578/

相关文章:

  • 企业查询网站wordpress注册没反应
  • 如何建立自已的购物网站长沙网站制作主要公司
  • 深圳 电子政务网站建设方案WordPress的login在哪里改
  • 网站快速网站推广怎么制作图片视频和配音乐
  • 河南网站制作团队湖南网址大全
  • 2019为网站网站做代理被判缓刑网站信息化建设建议
  • 部署推进网站建设网站域名费用
  • 企业信息门户网站建设方案seo网站模版
  • 谷歌有做网站建设快速建站哪里好
  • 坤和建设 网站深圳高端网站设计开发
  • 怎么做网站策划的模板如何注册咨询公司
  • 做婚恋网站投入多少钱php注册网站源码带数据库
  • 苏州网站建设制作方案手机上做app的软件
  • 青岛营销型网站html网页制作期末作业
  • 加强网站微信公众号平台建设php 5.4 wordpress
  • 比价网站开发东莞微客巴巴做网站
  • 怎么免费搭建自己的网站交互网站建设
  • 网站架构 规划考研网站做刷词
  • 昆山网站建设kshuituo适合seo优化的站点
  • 免费十八种禁用网站圣诞网站怎么做
  • 做网站排名赚钱吗安卓开发快速入门
  • 南宁百度网站建设求个网站或者软件
  • 岳阳网站项目建设报道网站建设色调的
  • 站长平台怎么添加网站南京市高淳县建设厅网站
  • 广州市住房和城乡建设厅网站首页一键制作自己的app软件
  • 设一个网站链接为安全怎么做微博内容放到wordpress
  • 好的网站设计培训学校wordpress主题 表白
  • 做网站服务器系统模板网站的建设方式与方法
  • 网站建设需要的公司市住房城乡建设部网站
  • 网站备案 厦门怎样做自己的购物网站