什么网站可以做网站,wordpress主页制作,免费logo设计在线生成器u钙网,网银网站建设银行简介#xff1a;阿里云 EMR OLAP 与 Flink 团队深度合作#xff0c;支持了 Flink 到 ClickHouse 的 Exactly-Once写入来保证整个实时数仓数据的准确性。本文介绍了基于 EMR OLAP 的开源实时数仓解决方案。 作者简介#xff1a;阿里云 EMR-OLAP 团队#xff1b;主要负责开源…简介阿里云 EMR OLAP 与 Flink 团队深度合作支持了 Flink 到 ClickHouse 的 Exactly-Once写入来保证整个实时数仓数据的准确性。本文介绍了基于 EMR OLAP 的开源实时数仓解决方案。
作者简介阿里云 EMR-OLAP 团队主要负责开源大数据 OLAP 引擎的研发例如 ClickHouseStarrocksTrino 等。通过 EMR 产品向阿里云用户提供一站式的大数据 OLAP 解决方案。内容框架 背景机制梳理技术方案测试结果未来规划一、背景
Flink 和 ClickHouse 分别是实时流式计算和 OLAP 领域的翘楚很多互联网、广告、游戏等客户都将两者联合使用于构建用户画像、实时 BI 报表、应用监控指标查询、监控等业务形成了实时数仓解决方案如图-1。这些业务对数据的准确性要求都十分严格所以实时数仓整个链路需要保证端到端的 Exactly-Once。
通常来说 Flink 的上游是可以重复读取或者消费的 pull-based 持久化存储例如Kafka要实现 Source 端的 Exactly-Once 只需要回溯 Source 端的读取进度即可。Sink 端的 Exactly-Once 则比较复杂因为 Sink 是 push-based 的需要依赖目标输出系统的事务保证但社区 ClickHouse 对事务并不支持。
所以针对此情况阿里云 EMR ClickHouse 与 Flink 团队一起深度研发支持了 Flink 到 ClickHouse 的 Exactly-Once写入来保证整个实时数仓数据的准确性。本文将分别介绍下现有机制以及实现方案。 图-1 实时数仓架构
二 机制梳理
ClickHouse 写入机制
ClickHouse 是一个 MPP 架构的列式 OLAP 系统如图-2各个节点是对等的通过 Zookeeper 协同数据可以通过并发对各个节点写本地表的方式进行大批量的数据导入。
ClickHouse 的 data part 是数据存储的最小单元ClickHouse 接收到的数据 Block 在写入时会按照 partition 粒度进行拆分形成一个或多个 data part。data part 在写入磁盘后会通过后台merge线程不断的合并将小块的 data part 合并成大块的 data part以此降低存储和读取的开销。
在向本地表写入数据时ClickHouse 首先会写入一个临时的 data part这个临时 data part 的数据对客户端不可见之后会直接进行 rename 操作使这个临时 data part 成为正式 data part此时数据对客户端可见。几乎所有的临时 data part 都会快速地成功被 rename 成正式 data part没有被 rename 成功的临时 data part 最终将被 ClickHouse 清理策略从磁盘上删除。
通过上述分析可以看出 ClickHouse 的数据写入有一个从临时 data part 转为正式 data part 的机制加以修改可以符合两阶段提交协议这是实现分布式系统中事务提交一致性的重要协议。 图-2 Flink 作业写入 ClickHouse
注多个 Flink Task 可以写入同一个 shard 或 replica
Flink 写机制
Flink 作为一个分布式处理引擎提供了基于事务的 Sink 机制该机制可以保障写入的 Exactly-Once相应的数据接收方需要提供遵守 XA 规范的 JDBC 。由于完整的 XA 规范相当复杂因此我们先对 Flink 的处理机制进行梳理结合 ClickHouse 的实际情况确定需要实现的接口范围。
为了实现分布式写入时的事务提交统一Flink 借助了 checkpoint 机制。该机制能够周期性地将各个 Operator 中的状态生成快照并进行持久化存储。在 checkpoint 机制中有一个 Coordinator 角色用来协调所有 Operator 的行为。从 Operator 的角度来看一次 checkpoint 有三个阶段初始化--生成快照--完成/废弃 checkpoint。从Coordinator的角度来看需要定时触发 checkpoint以及在所有 Operator 完成快照后触发 complete 通知。参考附录1
接下来介绍 Flink 中的 Operator 是如何借助事务和 checkpoint 机制来保障 Exactly-OnceOperator 的完整执行需要经过 initial、writeData、snapshot、commit 和 close 阶段。
initial 阶段
从快照中取出上次任务执行时持久化的 xid 记录。快照中主要存储两种 xid一组是未完成 snapshot 阶段的 xid一组是已经完成了 snapshot 的 xid。接下来对上次未完成 snapshot 的 xid 进行 rollback 操作对上次已经完成了 snapshot 但 commit 未成功的 xid 进行 commit 重试操作。若上述操作失败则任务初始化失败任务中止进入 close 阶段若上述操作成功则继续。创建一个新的唯一的 xid作为本次事务ID将其记录到快照中。使用新生成的 xid调用 JDBC 提供的 start() 接口。
writeData 阶段
事务开启后进入写数据的阶段Operator 的大部分时间都会处于这个阶段。在与 ClickHouse 的交互中此阶段为调用 JDBC 提供的 preparedStatement 的 addBatch() 和 executeBatch() 接口每次写数据时都会在报文中携带当前 xid。在写数据阶段首先将数据写到 Operator 内存中向 ClickHouse 提交内存中的批量数据有三种触发方式内存中的数据条数达到batchsize的阈值后台定时线程每隔一段时间触发自动flush在 snapshot 阶段调用end() 和 prepare() 接口之前会调用flush清空缓存。
snapshot 阶段
当前事务会调用 end() 和 prepare() 接口等待 commit并更新快照中的状态。接下来会开启一个新的事务作为本 Task 的下一次 xid将新事务记录到快照中并调用 JDBC 提供的start() 接口开启新事务。将快照持久化存储。
complete阶段
在所有 Operator 的 snapshot 阶段全部正常完成后Coordinator 会通知所有 Operator 对已经成功的checkpoint 进行 complete 操作在与 ClickHouse 的交互中此阶段为 Operator 调用 JDBC 提供的 commit() 接口对事务进行提交。
close 阶段
若当前事务尚未进行到 snapshot 阶段则对当前事务进行 rollback 操作。关闭所有资源。
从上述流程可以总结出Flink 通过 checkpoint 和事务机制将上游数据按 checkpoint 周期分割成批保障每一批数据在全部写入完成后再由 Coordinator 通知所有 Operator 共同完成 commit 操作。当有 Operator 写入失败时将会退回到上次成功的 checkpoint 的状态并根据快照记录的 xid 对这一批 checkpoint 的所有 xid 进行 rollback 操作。在有 commit 操作失败时将会重试 commit 操作仍然失败将会交由人工介入处理。
三、技术方案
整体方案
根据 Flink 和 ClickHouse 的写入机制可以描绘出一个Flink 到 ClickHouse 的事务写入的时序图如图-3。由于写的是 ClickHouse 的本地表并且事务的统一提交由 Coordinator 保障因此 ClickHouse 无需实现 XA 规范中标准的分布式事务只需实现两阶段提交协议中的少数关键接口其他接口在 JDBC 侧进行缺省即可。 图-3 Flink 到 ClickHouse 事务写入的时序图
ClickHouse-Server
状态机
为了实现 ClickHouse 的事务我们首先定义一下所要实现的事务允许的几种操作
Begin开启一个事务。Write Data在一个事务内写数据。Commit提交一个事务。Rollback回滚一个未提交的事务。事务状态Unknown事务未开启此时执行任何操作都是非法的。Initialized事务已开启此时允许所有操作。Committing事务正在被提交不再允许 Begin/Write Data 两种操作。Committed事务已经被提交不再允许任何操作。Aborting事务正在被回滚不再允许任何操作。Aborted事务已经被回滚不再允许任何操作。
完整的状态机如下图-4所示 图-4 ClickHouse Server支持事务的状态机
图中所有操作均是幂等的。其中Committing 到 Committed 和 Aborting 到 Aborted 是不需要执行任何操作的在开始执行 Commit 或 Rollback 时事务的状态即转成 Committing 或 Aborting在执行完 Commit 或 Rollback 之后事务的状态会被设置成 Committed 或 Aborted。
事务处理
Client 通过 HTTP Restful API 访问 ClickHouse ServerClient 与 ClickHouse Server 间一次完整事务的交互过程如图-5所示 图-5 Clickhouse事务处理的时序图
正常流程
Client 向 ClickHouse 集群任意一个 ClickHouse Server 发送 Begin Transaction 请求并携带由 Client 生成的全局唯一的 Transaction ID。ClickHouse Server 收到 Begin Transaction 请求时会向 Zookeeper 注册该Transaction ID包括创建 Transaction ID 及子 Znode 节点并初始化该 Transaction 的状态为 Initialized。Client 接收到 Begin Transaction 成功响应时可以开始写入数据。当 ClickHouse Server 收到来自 Client 发送的数据时会生成临时 data part但不会将其转为正式 data partClickHouse Server 会将写入的临时 data part 信息以 JSON 的形式记录到 Zookeeper 上该 Transaction 的信息中。Client 完成数据的写入后会向 ClickHouse Server 发送 Commit Transaction 请求。ClickHouse Server 在收到 Commit Transaction 请求后根据 ZooKeeper 上对应的Transaction的 data part 信息将 ClickHouse Server 本地临时 data part 数据转为正式的 data part 数据并更新Transaction 状态为Committed。Rollback 的过程与 Commit 类似。
异常处理
如果创建 Transaction ID 过程中发现 Zookeeper 中已经存在相同 Transaction ID根据 Zookeeper 中记录的 Transaction 状态进行处理如果状态是 Unknown 则继续进行处理如果状态是 Initialized则直接返回否则会抛异常。目前实现的事务还不支持分布式事务只支持单机事务所以 Client 只能往记录该 Transaction ID 的 ClickHouse Server 节点写数据如果 ClickHouse Server 接收到到非该节点事务的数据ClickHouse Server 会直接返回错误信息。与写入数据不同如果 Commit 阶段 Client 向未记录该 Transaction ID 的 ClickHouse Server 发送了 Commit Transaction 请求ClickHouse Server 不会返回错误信息而是返回记录该 Transaction ID 的 ClickHouse Server 地址给 Client让 Client 端重定向到正确的 ClickHouse Server。Rollback 的过程与 Commit 类似。
ClickHouse-JDBC
根据 XA 规范完整的分布式事务机制需要实现大量的标准接口参考附录2。在本设计中实际上只需要实现少量关键接口因此采用了基于组合的适配器模式向 Flink 提供基于标准 XA 接口的 XAResource 实现同时对 ClickHouse Server 屏蔽了不需要支持的接口。
对于 XADataSource 的实现采用了基于继承的适配器模式并针对 Exactly-Once 的特性修改了部分默认配置如发送失败的重试次数等参数。
另外在生产环境中通常不会通过分布式表而是通过 SLB 进行数据写入时的负载均衡。在 Exactly-Once 场景中Flink 侧的 Task 需要保持针对某一 ClickHouse Server 节点的连接因此不能使用 SLB 的方式进行负载均衡。针对这一问题我们借鉴了 BalanceClickHouseDataSource 的思路通过在 URL 中配置多个IP并在 properties 配置中将 write_mode 设置为 Random 可以使 XADataSource 在保障 Exactly-Once 的同时具有负载均衡的能力。
Flink-Connector-ClickHouse
Flink 作为一个流式数据处理引擎支持向多种数据接收端写入的能力每种接收端都需要实现特定的Connector。针对 Exactly-OnceClickHouse Connector 增加了对于 XADataSource 的选项配置根据客户端的配置提供 Exactly-Once 功能。
四、测试结果
ClickHouse 事务性能测试
写入 ClickHouse 单批次数据量和总批次相同Client端并发写线程不同性能比较。由图-6可以看出无论 ClickHouse 是否开启事务, ClickHouse 的吞吐量都与 Client 端并发写的线程数成正比。开启事务时ClickHouse 中临时 data part 不会立刻被转为正式 data part所以在事务完成前大量临时 data part 不会参与 ClickHouse merge 过程降低磁盘IO对写性能的影响所以开启事务写性能较未开启事务写性能更好但事务内包含的批次变多临时 data part 在磁盘上的增多导致了合并时 CPU 的压力增大从而影响了写入的性能开启事务的写性能也会降低。图-6 ClickHouse写入性能压测一
写入 ClickHouse 总批次 和 Client 端并发写线程相同单批次写入 ClickHouse 数据量不同性能比较。 由图-7可以看出无论ClickHouse 是否开启事务, ClickHouse 的吞吐量都与单批次数据量大小成正比。开启事务时每批次数据越小ClickHouse 的吞吐量受事务是否开启的影响就越大这是因为每批次写入的时间在事务处理的占比较小事务会对此产生一定的影响因此一次事务包含的批次数量越多越能够减少事务对写入性能的影响当事务包含批次的增大事务处理时间在写入中的占比逐渐降低ClickHouse merge 产生的影响越来越大从而影响了写入的性能开启事务较不开启事务写性能更好。图-7 ClickHouse写入性能压测二
总体来说开启事务对写入性能几乎没有影响这个结论是符合我们预期的。
Flink 写入 ClickHouse 性能比较
对于相同数据量和不同 checkpoint 周期Flink 写入 ClickHouse 总耗时如图-8所示。可以看出checkpoint 周期对于不开启 Exactly-Once 的任务耗时没有影响。对于开启 Exactly-Once 的任务在5s 到60s的范围内耗时呈现一个先降低后增长的趋势。原因是在 checkpoint 周期较短时开启 Exactly-Once 的 Operator 与 Clickhouse 之间有关事务的交互过于频繁在 checkpoint 周期较长时开启 Exactly-Once 的 Operator 需要等待 checkpoint 周期结束才能提交最后一次事务使数据可见。在本测试中checkpoint周期数据仅作为一个参考生产环境中需要根据机器规格和数据写入速度进行调整。总体来说Flink写入Clickhouse时开启 Exactly-Once 特性性能会稍有影响这个结论是符合我们预期的。 图-8 Flink 写入 ClickHouse 测试
五、未来规划
该版本 EMR ClickHouse 实现的事务还不是很完善只支持单机事务不支持分布式事务。分布式系统一般都是通过 Meta Server 来做统一元数据管理来支持分布式事务机制。当前我们也正在规划设计 ClickHouse MetaServer 来支持分布式事务同时可以移除 ClickHouse 对 ZooKeeper 的依赖。
原文链接 本文为阿里云原创内容未经允许不得转载。