东莞网站建没,asp网站用什么软件,上海公司注册流程及需要的材料,许昌网站建设背景 在数据仓库建模中#xff0c;未经任何加工处理的原始业务层数据#xff0c;我们称之为ODS#xff08;Operational Data Store#xff09;数据。在互联网企业中#xff0c;常见的ODS数据有业务日志数据#xff08;Log#xff09;和业务DB数据#xff08;DB#xf…背景 在数据仓库建模中未经任何加工处理的原始业务层数据我们称之为ODSOperational Data Store数据。在互联网企业中常见的ODS数据有业务日志数据Log和业务DB数据DB两类。对于业务DB数据来说从MySQL等关系型数据库的业务数据进行采集然后导入到Hive中是进行数据仓库生产的重要环节。 如何准确、高效地把MySQL数据同步到Hive中一般常用的解决方案是批量取数并Load直连MySQL去Select表中的数据然后存到本地文件作为中间存储最后把文件Load到Hive表中。这种方案的优点是实现简单但是随着业务的发展缺点也逐渐暴露出来 性能瓶颈随着业务规模的增长Select From MySQL - Save to Localfile - Load to Hive这种数据流花费的时间越来越长无法满足下游数仓生产的时间要求。 直接从MySQL中Select大量数据对MySQL的影响非常大容易造成慢查询影响业务线上的正常服务。 由于Hive本身的语法不支持更新、删除等SQL原语对于MySQL中发生Update/Delete的数据无法很好地进行支持。 为了彻底解决这些问题我们逐步转向CDCChange Data Capture Merge的技术方案即实时Binlog采集 离线处理Binlog还原业务数据这样一套解决方案。Binlog是MySQL的二进制日志记录了MySQL中发生的所有数据变更MySQL集群自身的主从同步就是基于Binlog做的。 本文主要从Binlog实时采集和离线处理Binlog还原业务数据两个方面来介绍如何实现DB数据准确、高效地进入数仓。 整体架构 整体的架构如上图所示。在Binlog实时采集方面我们采用了阿里巴巴的开源项目Canal负责从MySQL实时拉取Binlog并完成适当解析。Binlog采集后会暂存到Kafka上供下游消费。整体实时采集部分如图中红色箭头所示。 离线处理Binlog的部分如图中黑色箭头所示通过下面的步骤在Hive上还原一张MySQL表 采用Linkedin的开源项目Camus负责每小时把Kafka上的Binlog数据拉取到Hive上。 对每张ODS表首先需要一次性制作快照Snapshot把MySQL里的存量数据读取到Hive上这一过程底层采用直连MySQL去Select数据的方式。 对每张ODS表每天基于存量数据和当天增量产生的Binlog做Merge从而还原出业务数据。 我们回过头来看看背景中介绍的批量取数并Load方案遇到的各种问题为什么用这种方案能解决上面的问题呢 首先Binlog是流式产生的通过对Binlog的实时采集把部分数据处理需求由每天一次的批处理分摊到实时流上。无论从性能上还是对MySQL的访问压力上都会有明显地改善。 第二Binlog本身记录了数据变更的类型Insert/Update/Delete通过一些语义方面的处理完全能够做到精准的数据还原。 Binlog实时采集 对Binlog的实时采集包含两个主要模块一是CanalManager主要负责采集任务的分配、监控报警、元数据管理以及和外部依赖系统的对接二是真正执行采集任务的Canal和CanalClient。 当用户提交某个DB的Binlog采集请求时CanalManager首先会调用DBA平台的相关接口获取这一DB所在MySQL实例的相关信息目的是从中选出最适合Binlog采集的机器。然后把采集实例Canal Instance分发到合适的Canal服务器上即CanalServer上。在选择具体的CanalServer时CanalManager会考虑负载均衡、跨机房传输等因素优先选择负载较低且同地域传输的机器。 CanalServer收到采集请求后会在ZooKeeper上对收集信息进行注册。注册的内容包括 以Instance名称命名的永久节点。 在该永久节点下注册以自身ip:port命名的临时节点。 这样做的目的有两个 高可用CanalManager对Instance进行分发时会选择两台CanalServer一台是Running节点另一台作为Standby节点。Standby节点会对该Instance进行监听当Running节点出现故障后临时节点消失然后Standby节点进行抢占。这样就达到了容灾的目的。 与CanalClient交互CanalClient检测到自己负责的Instance所在的Running CanalServer后便会进行连接从而接收到CanalServer发来的Binlog数据。 对Binlog的订阅以MySQL的DB为粒度一个DB的Binlog对应了一个Kafka Topic。底层实现时一个MySQL实例下所有订阅的DB都由同一个Canal Instance进行处理。这是因为Binlog的产生是以MySQL实例为粒度的。CanalServer会抛弃掉未订阅的Binlog数据然后CanalClient将接收到的Binlog按DB粒度分发到Kafka上。 离线还原MySQL数据 完成Binlog采集后下一步就是利用Binlog来还原业务数据。首先要解决的第一个问题是把Binlog从Kafka同步到Hive上。 Kafka2Hive 整个Kafka2Hive任务的管理在美团数据平台的ETL框架下进行包括任务原语的表达和调度机制等都同其他ETL类似。而底层采用LinkedIn的开源项目Camus并进行了有针对性的二次开发来完成真正的Kafka2Hive数据传输工作。 对Camus的二次开发 Kafka上存储的Binlog未带Schema而Hive表必须有Schema并且其分区、字段等的设计都要便于下游的高效消费。对Camus做的第一个改造便是将Kafka上的Binlog解析成符合目标Schema的格式。 对Camus做的第二个改造由美团的ETL框架所决定。在我们的任务调度系统中目前只对同调度队列的任务做上下游依赖关系的解析跨调度队列是不能建立依赖关系的。而在MySQL2Hive的整个流程中Kafka2Hive的任务需要每小时执行一次小时队列Merge任务每天执行一次天队列。而Merge任务的启动必须要严格依赖小时Kafka2Hive任务的完成。 为了解决这一问题我们引入了Checkdone任务。Checkdone任务是天任务主要负责检测前一天的Kafka2Hive是否成功完成。如果成功完成了则Checkdone任务执行成功这样下游的Merge任务就可以正确启动了。 Checkdone的检测逻辑 Checkdone是怎样检测的呢每个Kafka2Hive任务成功完成数据传输后由Camus负责在相应的HDFS目录下记录该任务的启动时间。Checkdone会扫描前一天的所有时间戳如果最大的时间戳已经超过了0点就说明前一天的Kafka2Hive任务都成功完成了这样Checkdone就完成了检测。 此外由于Camus本身只是完成了读Kafka然后写HDFS文件的过程还必须完成对Hive分区的加载才能使下游查询到。因此整个Kafka2Hive任务的最后一步是加载Hive分区。这样整个任务才算成功执行。 每个Kafka2Hive任务负责读取一个特定的Topic把Binlog数据写入original_binlog库下的一张表中即前面图中的original_binlog.db其中存储的是对应到一个MySQL DB的全部Binlog。 上图说明了一个Kafka2Hive完成后文件在HDFS上的目录结构。假如一个MySQL DB叫做user对应的Binlog存储在original_binlog.user表中。ready目录中按天存储了当天所有成功执行的Kafka2Hive任务的启动时间供Checkdone使用。每张表的Binlog被组织到一个分区中例如userinfo表的Binlog存储在table_nameuserinfo这一分区中。每个table_name一级分区下按dt组织二级分区。图中的xxx.lzo和xxx.lzo.index文件存储的是经过lzo压缩的Binlog数据。 Merge Binlog成功入仓后下一步要做的就是基于Binlog对MySQL数据进行还原。Merge流程做了两件事首先把当天生成的Binlog数据存放到Delta表中然后和已有的存量数据做一个基于主键的Merge。Delta表中的数据是当天的最新数据当一条数据在一天内发生多次变更时Delta表中只存储最后一次变更后的数据。 把Delta数据和存量数据进行Merge的过程中需要有唯一键来判定是否是同一条数据。如果同一条数据既出现在存量表中又出现在Delta表中说明这一条数据发生了更新则选取Delta表的数据作为最终结果否则说明没有发生任何变动保留原来存量表中的数据作为最终结果。Merge的结果数据会Insert Overwrite到原表中即图中的origindb.table。 Merge流程举例 下面用一个例子来具体说明Merge的流程。 数据表共id、value两列其中id是主键。在提取Delta数据时对同一条数据的多次更新只选择最后更新的一条。所以对id1的数据Delta表中记录最后一条更新后的值value120。Delta数据和存量数据做Merge后最终结果中新插入一条数据id4两条数据发生了更新id1和id2一条数据未变id3。 默认情况下我们采用MySQL表的主键作为这一判重的唯一键业务也可以根据实际情况配置不同于MySQL的唯一键。 上面介绍了基于Binlog的数据采集和ODS数据还原的整体架构。下面主要从两个方面介绍我们解决的实际业务问题。 实践一分库分表的支持 随着业务规模的扩大MySQL的分库分表情况越来越多很多业务的分表数目都在几千个这样的量级。而一般数据开发同学需要把这些数据聚合到一起进行分析。如果对每个分表都进行手动同步再在Hive上进行聚合这个成本很难被我们接受。因此我们需要在ODS层就完成分表的聚合。 首先在Binlog实时采集时我们支持把不同DB的Binlog写入到同一个Kafka Topic。用户可以在申请Binlog采集时同时勾选同一个业务逻辑下的多个物理DB。通过在Binlog采集层的汇集所有分库的Binlog会写入到同一张Hive表中这样下游在进行Merge时依然只需要读取一张Hive表。 第二Merge任务的配置支持正则匹配。通过配置符合业务分表命名规则的正则表达式Merge任务就能了解自己需要聚合哪些MySQL表的Binlog从而选取相应分区的数据来执行。 这样通过两个层面的工作就完成了分库分表在ODS层的合并。 这里面有一个技术上的优化在进行Kafka2Hive时我们按业务分表规则对表名进行了处理把物理表名转换成了逻辑表名。例如userinfo123这张表名会被转换为userinfo其Binlog数据存储在original_binlog.user表的table_nameuserinfo分区中。这样做的目的是防止过多的HDFS小文件和Hive分区造成的底层压力。 实践二删除事件的支持 Delete操作在MySQL中非常常见由于Hive不支持Delete如果想把MySQL中删除的数据在Hive中删掉需要采用“迂回”的方式进行。 对需要处理Delete事件的Merge流程采用如下两个步骤 首先提取出发生了Delete事件的数据由于Binlog本身记录了事件类型这一步很容易做到。将存量数据表A与被删掉的数据表B在主键上做左外连接Left outer join如果能够全部join到双方的数据说明该条数据被删掉了。因此选择结果中表B对应的记录为NULL的数据即是应当被保留的数据。 然后对上面得到的被保留下来的数据按照前面描述的流程做常规的Merge。 总结与展望 作为数据仓库生产的基础美团数据平台提供的基于Binlog的MySQL2Hive服务基本覆盖了美团内部的各个业务线目前已经能够满足绝大部分业务的数据同步需求实现DB数据准确、高效地入仓。在后面的发展中我们会集中解决CanalManager的单点问题并构建跨机房容灾的架构从而更加稳定地支撑业务的发展。 本文主要从Binlog流式采集和基于Binlog的ODS数据还原两方面介绍了这一服务的架构并介绍了我们在实践中遇到的一些典型问题和解决方案。希望能够给其他开发者一些参考价值同时也欢迎大家和我们一起交流。转载于:https://www.cnblogs.com/dadadechengzi/p/10100319.html