常见的网站推广方式,网架制作厂,网站建设与管理专业的行业发展,绿色风格的网站简介#xff1a; 58 同城的实时 SQL 建设以及如何从 Storm 迁移至 Flink。 本文整理自 58 同城实时计算平台负责人冯海涛在 Flink Forward Asia 2020 分享的议题《Flink 在 58 同城应用与实践》#xff0c;内容包括#xff1a; 实时计算平台架构实时 SQL 建设Storm 迁移 Fli…简介 58 同城的实时 SQL 建设以及如何从 Storm 迁移至 Flink。 本文整理自 58 同城实时计算平台负责人冯海涛在 Flink Forward Asia 2020 分享的议题《Flink 在 58 同城应用与实践》内容包括 实时计算平台架构实时 SQL 建设Storm 迁移 Flink 实践一站式实时计算平台后续规划一、实时计算平台架构
实时计算平台的定位是为 58 集团海量数据提供高效、稳定的实时计算一站式服务。一站式服务主要分为三个方向
第一个方向是实时数据存储主要负责为线上业务接入提供高速度的实时存储能力。第二是实时数据计算主要为海量数据的处理提供分布式计算框架。第三是实时数据分发主要负责将计算后的数据分发到后续的实时存储供上层应用。平台建设主要分为两个部分
第一部分是基础能力建设目前主要包括 Kafka 集群、storm 集群、 Flink 集群、SparkStreaming 集群。 另一部分是平台化建设主要是包括两点 第一个是数据分发我们的数据分发是基于 Kafka Connect 打造的一个平台目标是实现异构数据源的集成与分发。在实际使用数据场景过程中经常需要将不同的数据源汇聚到一起进行计算分析。 传统方式可能需要针对不同的存储采用不同的数据同步方案。我们的数据分发是通过提供一套完整的架构实现不同数据源的集成和分发。 第二个是我们基于 Flink 打造的一站式实时计算平台后文会有详细的介绍。上图是我们的实时计算平台的架构。
在实时数据接入这部分我们采用的是 Kafkabinlog 提供 canal 和 debezium 两种方式进行接入。在业务日志这部分我们主要采用 flume 进行线上业务的 log 的采集。在实时计算引擎这部分根据开源社区发展以及用户的需求从最早的 Storm 到后来引入 SparkStreaming以及现在主流的 Flink。在实时存储这部分为了满足多元化的实时需求我们支持 Kafka、Druid、Hbase、ES、ClickHouse。同时在计算架构之上我们建设了一些管理平台比如集群管理它主要负责集群的扩容稳定性的管理。另一个是 Nightfury主要负责集群治理包括数据接入、权限治理、资源管理等等。
我们在业务发展过程中引入了 Flink 计算框架。首先从业务来说58 是一个一站式生活服务平台包含很多业务线。随着业务的发展数据量越来越大场景越来越丰富需要一个更加强大的计算框架来满足用户的需求。
第一个场景是实时 ETL主要是针对原始日志进行信息转化结构化处理运用于后续计算需要高吞吐低延迟的计算能力。第二块是实时数仓它作为离线数仓的一个补充主要是提升一些实时指标的时效性。第三种场景是实时监控它需要比较灵活的时间窗口支持。最后一种场景是实时数据流分析比如说数据乱序的处理、中间状态的管理、Exactly once 语义保障。
我们前期基于 Storm 和 SparkStreaming 构建的计算集群在很大程度上并不能满足这些场景需求。于是对 Flink 进行了调研发现 Flink 不论是在计算性能还是流数据特性支持上都体现出了非常大的优势。因此我们决定采用 Flink 作为主流的计算框架。 上图是我们 Flink 集群的建设情况。Flink 作为实时计算框架经常需要 7×24 小时的可用性。我们在建设底层集群的时候需要考虑高可用的架构。
首先在部署模式上主要是采用 Flink On YARN实现集群的高可用。在底层的 HDFS 上采用 HDFS federation 机制既可以避免离线集群的抖动对实时这边造成影响同时也减少了维护的 HDFS 数量。在集群隔离上主要是采用 Node Labe 机制就可以实现把重要业务运行在一些指定节点上。同时在这个基础之上引入了 Cgroup对 CPU 进行隔离避免任务间的 CPU 抢占。在管理层面不同的业务提交到不同的队列进行管理避免业务间的资源抢占。在计算场景上根据不同的计算场景比如说计算型、IO 型会提交到不同的节点从而提升整个集群的资源利用率。
Flink 计算框架在 58 经历了大概两年多的发展。目前我们的集群有 900 多台机器2000 多个实时任务每天处理大概 2.5 万亿的实时数据数据量峰值达到了 3000 万每秒。
二、实时 SQL 建设
1. 实时 SQL 演进
SQL 编程具有低门槛、自动优化、版本统一等特点。同时 Flink SQL 作为实时数仓的主要工具是我们在建设 Flink 平台时考虑的一个主要方向。
我们最早上线的 Flink 是基于 1.6 版本的当时这个版本只支持 DML我们在当时的版本基础上进行了一些扩展主要是在 DDL 语法上的扩展支持。在用户使用层面为了简化 DDL 的定义也通过一个配置化的方式来实现自动生成 DDL。在开发的时候提供可视化开发的功能和在线调试的功能。
随着社区的开源我们将 Flink SQL 切换到了社区版本之后也升级相关的版本以及合并比较多的社区版本特性比如说 Blink 相关、批流合一、对 Hive 的支持。
最后针对 Flink SQL 这块的实时数仓也做了一些数仓化的工作主要包括元数据管理、血缘关系、数仓分层、权限管理等等。 2. 存储扩展
关于存储扩展这一块最开始我们是基于 Flink 自己实现的一套 DDL。随着社区开源切换到社区的 Flink SQL 版本然后在上面做了一些扩展主要有几个方面
第一打通了主流存储和内部的实时存储。比如说在源表上支持了内部的 wmb它是一个分布式消息队列。在维表上支持这种 redis内部的 wtable。在结果表上支持了 ClickHouseredis以及我们内部的 wtable第二定制 format 支持。因为在实际业务中很多数据格式并不是标准的没法通过 DDL 来定义一个表。我们提供了一种通用的方式可以采用一个字段来代表一条日志让用户可以通过 udf 去自定义并解析一条日志。最后在 source 和 sink DDL 定义基础上增加了并发度的设置。这样用户就可以更灵活地控制任务的并发。3. 性能优化
关于性能优化主要是两方面
第一个是对 Blink 特性的引进Blink 提供了大量的特性比如通过 mini batch 的处理方式提高任务的吞吐。通过 local global 两阶段聚合缓解数据热点问题。还有通过 emit增强窗口的功能。把这些功能集成到我们的计算平台用户通过一些按钮可以直接打开。 另一个是对异步 lO 的应用。在实时数仓化建设过程中维表之间的关联是比较大的应用场景经常因为维表的性能导致整个任务的吞吐不高。因此我们增加了一个异步 IO 的机制主要有两种实现 一种针对目标存储支持异步 client直接基于异步 client 来实现。比如 MySQL 和 redis。另一种不支持异步 client 的我们就借助现成的机制来模拟同时在这个基础之上增加了一套缓存的机制避免所有的数据直接查询到目标存储减少目标存储的压力。同时在缓存基础上也增加 LRU 机制更加灵活的控制整个缓存。 同样数据写入这一块遇到大并发量写入的时候尽量提高并发来解决写入性的问题这样就会导致整个任务的 CPU 利用率比较低所以就采用单并发度多线程的写入机制它的实现是在 sink 算子里面增加一个 buffer数据流入到 sink 之后会首先写入到 buffer然后会启动多线程机制去消费这个 buffer最终写到存储里面。 4. 数仓化建设
实时数仓作为 Flink 的一个比较典型的应用场景相较于离线数仓它可能存在一些平台化不完善的方面
首先元数据管理功能不完善。然后Flink SQL 这一块对于每个任务我们都可能需要重新定义一个数据表。并且由于数据没有分层的概念导致任务比较独立烟囱式开发数据和资源使用率比较低下。另外也缺乏数据血缘信息。
为了提升实时数仓建设的效率我们提供了面向数仓化实时 SQL 能力在数仓设计任务开发平台化管理方面全面对齐离线数仓的建设模式。 4.1 数仓化
数仓化主要是参考离线数仓的模型对我们实时数仓这一块进行模型建设。
比如说最原始的数据会进入ODS 层经过一些清洗落入到行为明细层之后会拆分到具体的主题明细层然后再将一些相关的维表信息进行计算再到汇总层最终提供给最上层的应用包括一些实时报表Ad-hoc 查询等。 4.2 数仓平台
实时数仓目前主要还是基于这种 Lambda 架构来进行平台化的建设。
首先在元数据管理这一块Flink 默认采用内存对元数据进行管理我们就采用了 HiveCatalog 机制对库表进行持久化。同时我们在数据库的权限管理上借助 Hive ACL 来进行权限管理。有了元数据持久化之后就可以提供全局的元数据检索。同时任务模式就可以由传统的 DDLDML 简化为 DML。最后我们也做了血缘关系主要是在 Flink SQL 提交过程中自动发现 SQL 任务血缘依赖关系。三、Storm 迁移 Flink 实践
1. Flink 与 Storm 对比
Flink 相对于 Storm 来说有比较多的优势。
在数据保障上Flink 支持 Exactly once 语义在吞吐量、资源管理、状态管理用户越来越多的基于 Flink 进行开发。而 Storm 对用户来说编程模型简单开发成本高流式计算特性缺乏吞吐低无法满足性能。在平台侧独立集群多、运维困难、任务缺少平台化管理、用户体验差。
因此我们决定迁移到 Flink。 2. Flink-Storm 工具
在 Storm 迁移到 Flink 的时候如果让用户重新基于 Flink 进行逻辑开发可能需要比较大的工作量。因此我们对 Flink 进行了调研发现有个 Flink-Storm 工具。它实现了将 Storm Topology 转到 Flink Topology。比如说把 spout 转换到 Flink 的 source function把 bolt 转换到 Transform 和 sink function。
在使用的过程中我们也发现一些问题Flink-Storm 工具无法支持 Yarn 模式 缺少 Storm 引擎功能最后还有一个比较大的问题我们的 storm 在发展过程中维护了很多版本但是 Flink-Storm 工具只支持基于一个版本进行开发。于是我们做了一些改进。 3. 对 Flink-Storm 的改进
3.1 消息保障
Storm 有三个特点
第一ack 机制第二依赖 zookeeper第三at least once 语义保障。
我们做了四点改进
第一Flink-Storm 去掉 ack 支持第二KafkaSpout 实现 CheckpointListener第三KafkaSpout 实现 CheckpointedFunction第四Flink-Storm 打开 checkpoint。3.2 对 Storm 定时器的支持
在早期版本里面其实是没有窗口机制的我们借助 Storm 定时机制来实现窗口计算。它的机制是这样的Storm 引擎会定时向 bolt 里面发送一个系统信号用户就可以通过这个系统信号进行一个切分模拟窗口操作。
同样Flink 也没有这样一个定时器的机制于是我们就考虑从 Flink-Storm 层面来实现改造了 BoltWrapper 类它作为 bolt 类的一个封装实现机制跟 bolt 是一样的包括 5 点
初始化 open 方式启动异步线程。模拟构造 tick 的 StreamRecord调用 processeElement 函数发送 tuple频率由外部参数全局控制close 中关闭线程。3.3 Storm on Yarn
Storm on yarn 并不是直接提交到 YARN 集群它只是提交到 local 或者 stand alone 的模式。Flink on yarn 主要是提供了 ClusterClient 这样一个代理实现方式有三个步骤
初始化 YarnClusterConfiguration Flink 配置 执行 jar 包 / 资源配置 加载 classpath启动 yarn client复用 Flink on yarn 机制 deploy 转换后的 jobGraph。4. 任务迁移
在完善上述的一些改进之后迁移就比较容易了。首先我们会把改造后的版本打包上传到公司的私服上。然后用户在他的工程里面只需要引入 jar 包。在代码这一块只需要将原来基于 storm 的提交方式改造成基于 Flink 的提交方式逻辑是完全不用动的。在任务部署模式这一块也提供了 Flink 提交的模式这样一个脚本可以实现 Flink Perjob 模式。 总结一下除了一些比较极端的复杂情况基本上做到了无缝迁移所有的任务。迁移到 Flink 之后大部分任务的延迟都降低到毫秒级别整个吞吐提升 3~5 倍。同时整体资源节省了大概 40%约等于 80 台机器。完成了 5 个 storm 集群完全下线实现了任务平台化管理。 四、一站式实时计算平台
1. Wstream 平台
我们为了提升管理效率而打造了 Wstream 平台它构建在底层引擎和上层应用之间对用户可以屏蔽底层的集群信息比如跨机房多集群的一些信息。
在任务接入方式上支持 Flink JarFlink SQLFlink-StormPyFlink 这 4 种方式来满足多元化的用户需求。在产品功能上主要支持了任务管理、任务的创建、启动删除等。另外为了更好的让用户管理自己的任务和对任务进行问题定位我们也提供了一个监控告警和任务诊断的系统。针对数仓提供了一些数仓平台化的功能包括权限管理、血缘关系等等。针对 Flink SQL 也提供了调试探查的功能。
用户可以在 Wstream 平台之上很好的去构建他们的应用。 2. 状态管理
状态作为 Flink 一个比较重要的特性在实际场景中有大量的应用。用户在使用平台的时候没法跟底层的 Flink 工具进行交互于是我们就将底层的一些能力进行了集成。
在任务保存方面支持 CheckpointSavepointCancel With Savepoint。在容错方面支持 allowNonRestoredState跳过无法恢复的状态。在分析方面支持 Queryable State 实时查询基于离线的 State Processor 的分析方式我们会帮用户把这个状态下载进行分析。
对于整个任务状态管理来说我们通过 jobgraph 设置定向到指定 Hdfs 目录进行统一目录管理。在状态小文件这块控制并发度jobgraph 优化checkpoint 间隔时间保留版本数量。 3. SQL 调试
针对 Flink SQL我们也提供了一些调试功能。这里主要包括两块 第一语法层面的功能包括 智能提示语法校验转换 graph 逻辑校验。 第二逻辑层面的功能包括 模拟输入DataGen 自定义数据源结果输出Print 重定向到标准输出。
这样我们可以更方便的对整个业务逻辑进行调试。 4. 任务监控
关于任务监控对于 Flink 实时计算任务来说我们主要关心的是任务的稳定性、性能方面、以及业务逻辑是否符合预期。对于如何监控这些指标主要包括 4 个层面
第一个是 Flink 自带的 Flink-metrics提供大量的信息比如流量信息、状态信息、反压、检查点、CPU、网络等等第二个是 yarn 层面提供运行时长、任务状态第三从 kafka 层面提供消息堆积最后通过用户自定义的一些 metrics我们可以了解业务逻辑是否符合预期。5. 监控体系
为了采集这些指标我们也基于 Prometheus 搭建了一套监控体系。对于所有的 Flink 任务会实时将 metrics 推到 pushgateway然后会将收集到的指标推到 Prometheus这一块我们主要是采用的 federation 的机制。所有子节点负责指标采集之后汇聚到一个中心节点由中心节点统一对外提供服务。最终可以实现整个指标的计算和告警。 6. 监控告警
有了上面这些指标之后我们在告警这一块就可以比较方便。针对实时计算比较关注的任务稳定性方面我们可以从 Topic 消息消费堆积、任务计算 qps 波动、Flink task Restart、Flink Checkpoint failed、任务失败、延迟等信息来观察整个任务的运行情况。 7. 指标可视化
在指标可视化这一块主要是两个层面
第一个层面是 Job 层面这一块主要是把一些比较核心的指标汇聚到我们的实时计算平台。比如说qps 信息、输入输出的信息、延迟的信息等等对于更底层的 task 级别的 metrics通过 Grafana 可以了解具体的一些task信息比如流量信息、反压信息等。五、后续规划
我们的后续规划主要包括 4 个方面
第一个是社区比较流行的批流合一。因为我们当前这个实时架构大部分还是基于 Lambda 架构这种架构会带来很大的维护工作量所以我们也希望借助批流合一的能力来简化架构第二个是资源调优因为作为流式计算来说缺少一些动态资源管理的机制因此我们也希望有手段来进行这样一些调优第三个是智能监控我们当前的监控和告警是事后的希望有某种方式在任务出现问题之前进行预警最后是拥抱社区的新能力包括对新场景的探索。原文链接 本文为阿里云原创内容未经允许不得转载。