网站都有后台吗,网页空间和数据库的区别,深圳网站建设建设,南昌seo排名收费1、FlinkCDC是什么
1.1 CDC是什么
CDC是Chanage Data Capture#xff08;数据变更捕获#xff09;的简称。其核心原理就是监测并捕获数据库的变动#xff08;例如增删改#xff09;#xff0c;将这些变更按照发生顺序捕获#xff0c;将捕获到的数据#xff0c;写入数据…1、FlinkCDC是什么
1.1 CDC是什么
CDC是Chanage Data Capture数据变更捕获的简称。其核心原理就是监测并捕获数据库的变动例如增删改将这些变更按照发生顺序捕获将捕获到的数据写入数据库种如神策数据的核心kudu、doris、mysql、kakfa等。
1.2 CDC的实现方式
1.2.1 基于查询的CDC
离线调度查询作业批处理。把一张表同步到其他系统每次通过查询去获取表中最新的数据无法保障数据一致性查的过程中有可能数据已经发生了多次变更不保障实时性基于离线调度存在天然的延迟。
1.2.2 基于日志的CDC
实时消费日志流处理例如 MySQL 的 binlog 日志完整记录了数据库中的变更可以把 binlog 文件当作流的数据源保障数据一致性因为 binlog 文件包含了所有历史变更明细保障实时性因为类似 binlog 的日志文件是可以流式消费的提供的是实时数据。
1.2.3 常见的开源的CDC方案比较 1.2.4 个人对于CDC领域的一些浅见
其实对于CDC领域在数仓行业中很常见无论是离线数仓也好还是实时数仓也好或者说是业务系统也好例如京东就是使用CDC方案来同步优惠卷的。其实在很多的CDC的同步方案中大部分公司其实选用的是第一种查询同步方案为什么这么做呢很多人可能会问实时同步不好吗我想说的是实时的CDC太复杂虽然一致性不高但是其实运营或者其他人员并不需要这么高的实时性可能某些领域需要当然也有很多的表结构设计没有update_time字段这样的话如果同步一张表可能会有点麻烦但是并非是不能同步如果数据量不大的话或者有其他自增键的话会很方便但是如果没有的话就会很麻烦也可以做可以做整行的md5这里我就不一一赘述了在进行查询cdc同步的一些情况。日志cdc呢其实根本原理就是监控类似于mysql的binlog。可以让整个数据的增删改进行捕获从而可以达到两个数据的一致性当然这个一致性并不是实时的哪怕是mysql的主从都有可能延迟更别提咱们监控binlog了当然这种延迟几乎很少见业务也不会发现这种CDC虽然听上去很好但是实现较为困难限制比较大例如下游的数据源要支持改不像离线可以用拉链表来解决。但是这种方式真的很好如果开发人员和架构设计人员以及数据设计人员的设计比较好这种方式效果是最棒的我司的mysql同步器就支持这两种方式根据使用人员的喜好来进行选择。
2、Flink CDC的原理
2.1 1.x Flink CDC
Flink1.x的cdc依赖于Debezium组件debezium为了保证数据的一致性在全量读取时会加锁。 此时呢会分为全局锁权限和无全局锁权限。
那么为什么debezium为什么要这么做呢要加上全局锁呢因为数据一致性问题这就涉及到数据库的全局锁和表锁了数据库的全局锁以mysql为例全局锁就是对整个数据库实例加锁。MySQL 提供了一个加全局读锁的方法命令是Flush tables with read lock (FTWRL)。 当你需要让整个库处于只读状态的时候可以使用这个命令之后其他线程的以下语句会被阻塞数据更新语句数据的增删改、数据定义语句包括建表、修改表结构等和更新类事务的提交语句。一般全局锁的使用场景在数据库备份上当然如果主库加锁的话会导致一些问题。例如加锁后这个数据库实例无法更新业务基本就停止了。从库呢也不能从binlog拉取数据这就导致了主从延迟假如有的业务使用的是从库的话就会出现问题。当然全局锁有问题那么不加锁会导致什么问题呢数据不一致问题 比如手机卡购买套餐信息
这里分为两张表 u_acount (用于余额表)u_pricing (资费套餐表)
步骤:
1. u_account 表中数据 用户A 余额300u_pricing 表中数据 用户A 套餐空
2. 发起备份备份过程中先备份u_account表备份完了这个表这个时候u_account 用户余额是300
3. 这个时候套用户购买了一个资费套餐100餐购买完成写入到u_print套餐表购买成功备份期间的数据。
4. 备份完成可以看到备份的结果是u_account 表中的数据没有变 u_pricing 表中的数据 已近购买了资费套餐100.
哪这时候用这个备份文件来恢复数据的话用户A 赚了100 用户是不是很舒服啊。但是你得想想公司利益啊。 也就是说不加锁的话备份系统备份的得到的库不是一个逻辑时间点这个数据是逻辑不一致的。
当然mysql的备份工具mysqldump可以在备份的时候支持更新基于MVCC的机制。MVCC Multiversion Concurrency Control多版本并发控制。顾名思义MVCC 是通过数据行的 多个版本 管理来实现数据库的 并发控制。这项技术使得在InnoDB的事务隔离级别下执行 一致性读操 作有了保证。换言之就是为了查询一些正在被另一个事务更新的行并且可以看到它们被更新之前的值这样在做查询的时候就不用等待另一个事务释放锁。 不再深入解释mysql的核心机制了。 表锁是什么呢顾名思义就是锁住了整张表。在加表锁的表上无法进行DDL、DML操作。当然在mysql5.5以后有一个表锁是MDLMDL不需要显示的使用在访问一个表的时候会被自动加上。MDL 的作用是保证读写的正确性。你可以想象一下如果一个查询正在遍历一个表中的数据而执行期间另一个线程对这个表结构做变更删了一列那么查询线程拿到的结果跟表结构对不上肯定是不行的。因此在 MySQL 5.5 版本中引入了 MDL当对一个表做增删改查操作的时候加 MDL读锁当要对表做结构变更操作的时候加 MDL 写锁。
读锁之间不互斥因此你可以有多个线程同时对一张表增删改查。读写锁之间、写锁之间是互斥的用来保证变更表结构操作的安全性。因此如果有两个线程要同时给一个表加字段其中一个要等另一个执行完才能开始执行。
MDL锁有一些问题假如在多个读session中进行更改表结构操作的话可能会卡死。
这个就是debezium在flink1.x中的应用。
2.2 2.x Flink CDC
Flink 2.x不仅引入了增量快照读取机制还带来了一些其他功能的改进。以下是对Flink 2.x的主要功能的介绍
增量快照读取Flink 2.x引入了增量快照读取机制这是一种全新的数据读取方式。该机制支持并发读取和以chunk为粒度进行checkpoint。在增量快照读取过程中Flink首先根据表的主键将其划分为多个块chunk然后将这些块分配给多个读取器并行读取数据。这一机制极大地提高了数据读取的效率。 精确一次性处理Flink 2.x引入了Exactly-Once语义确保数据处理结果的精确一次性。MySQL CDC 连接器是Flink的Source连接器可以利用Flink的checkpoint机制来确保精确一次性处理。 动态加表Flink 2.x支持动态加表通过使用savepoint来复用之前作业的状态解决了动态加表的问题。 无主键表的处理Flink 2.x对无主键表的读取和处理进行了优化。在无主键表中Flink可以通过一些额外的字段来识别数据记录的唯一性从而实现准确的数据读取和处理。
对于Flink 2.x的CDC方案呢可以理解为全量读取时在划分chunk块的时候采用了查询读他是将主键进行切分的。默认一个chunk8096条数据知道这些就可以了。 2.x的 Flink cdc实现较为复杂这里就不一一赘述了。
3、FlinkCDC的使用
3.1 导入依赖
dependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion1.12.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_2.12/artifactIdversion1.12.0/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_2.12/artifactIdversion1.12.0/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.49/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_2.12/artifactIdversion1.12.0/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion2.0.0/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.75/version/dependency
/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupId!-- 可以将依赖打到jar包中 --artifactIdmaven-assembly-plugin/artifactIdversion3.0.0/versionconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins
/build3.2 代码实操
import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkCDC {public static void main(String[] args) throws Exception {//1.获取Flink执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//通过FlinkCDC构建SourceFunctionDebeziumSourceFunctionString sourceFunction MySqlSource.Stringbuilder().hostname(hadoop102).port(3306).username(root).password(123456).databaseList(cdc_test) //监控的数据库.tableList(cdc_test.user_info) //监控的数据库下的表.deserializer(new StringDebeziumDeserializationSchema())//反序列化.startupOptions(StartupOptions.initial()).build();DataStreamSourceString dataStreamSource env.addSource(sourceFunction);//3.数据打印dataStreamSource.print();//4.启动任务env.execute(FlinkCDC);}
}4、Flink CDC输出数据解析
4.1 数据的数据结构
flink cdc的输出结果大概可以分为 before、after、 before代表变更前数据after代表变更后数据。
还有个op这个op代表的是事务的操作 r读取历史 d删除 c创建 u更新