1000M双线网站空间,北京网站建设兴田德润官网多少,合肥建设工程网,响应式个人网站psd博主历时三年精心创作的《大数据平台架构与原型实现#xff1a;数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行#xff0c;点击《重磅推荐#xff1a;建大数据平台太难了#xff01;给我发个工程原型吧#xff01;》了解图书详情#xff0c;…博主历时三年精心创作的《大数据平台架构与原型实现数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行点击《重磅推荐建大数据平台太难了给我发个工程原型吧》了解图书详情京东购书链接https://item.jd.com/12677623.html扫描左侧二维码进入京东手机购书页面。
CDC 数据中的记录变更时间标记着这条记录在数据库中执行对应操作创建/更新/删除的时间可以说是天然的“事件时间”特别是对于那些本身没有记录时间字段的表来说就更加合适了。Flink 官方文档 也建议在使用 CDC 的情况下优先使用 CDC 中的这个时间字段这个时间更加精准。
与此同时在定义 Hudi 表时precombine.field 也是一个非常重要的配置显然 CDC 数据中的记录变更时间是最合适的没有之一。
CDC 数据中的记录变更时间属于元数据范畴以 Flink CDC 的 MySQL 数据库为例它提供四种元数据的抽取
KeyDataTypeDescriptiontable_nameSTRING NOT NULLName of the table that contain the row.database_nameSTRING NOT NULLName of the database that contain the row.op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0.row_kindSTRING NOT NULLIt indicates the row kind of the changelog,Note: The downstream SQL operator may fail to compare due to this new added column when processing the row retraction if the source operator chooses to output the ‘row_kind’ column for each record. It is recommended to use this metadata column only in simple synchronization jobs. ‘I’ means INSERT message, ‘-D’ means DELETE message, ‘-U’ means UPDATE_BEFORE message and ‘U’ means UPDATE_AFTER message.
其中的 op_ts 就是我们想要的也就是CDC 数据中的记录变更时间。我们可以在定义数据表时声明这个列Flink CDC 可以将其提取出来作为普通字段供下游使用就像下表中这样
CREATE TABLE IF NOT EXISTS orders_mysql_cdc (order_number INT NOT NULL,order_date DATE NOT NULL,purchaser INT NOT NULL,quantity INT NOT NULL,product_id INT NOT NULL,op_ts TIMESTAMP_LTZ(3) METADATA FROM op_ts VIRTUAL,PRIMARY KEY (order_number) NOT ENFORCED
) WITH (connector mysql-cdc,...
);注意在定义 Flink CDC 源表时op_ts 的数据类型是 TIMESTAMP_LTZ(3)不是 TIMESTAMP(3)写入下游表时可以是 TIMESTAMP(3)。
当我们初次使用这个 op_ts 字段时你会发现写入到的数据库的数据全部都是 1970-01-01 00:00:00.000就像下面这样 你可能会认为是哪里出错了实际上这是 Flink CDC 特别设计的也是合理的Flink CDC 官方文档的解释是 If the record is read from snapshot of the table instead of the binlog, the value is always 0. 我们知道Flink CDC ( 2.0 ) 的一个显著特征是它是全量 增量的一体化读取全量就是经常说的历史数据增量就是实时的数据控制 Flink CDC 是从全部历史数据开始同步整个数据库还是从只当下的 binlog 中同步近期增量数据的配置项是scan.startup.mode ( 官方文档 )该配置项支持 5 种配置而默认配置initial就是以当前分界点数据中的现有数据使用全量方式读取也叫快照读取此后的数据从 binlog 中读取这样就和上面描述的 op_ts 字段的取值吻合上了
当 Flink CDC 使用全量方式读取表中的历史数据时op_ts 字段全部取值为 0即 1970-01-01 00:00:00.000当 Flink CDC 使用增量方式读取 binlog 数据时op_ts 字段的取值为数据发生变更的实际时间。
这种设计还是非常合理的因为Flink CDC 本身在使用快照方式读取时就没有任何变更时间可以读取这个时间只在 binlog 中才有而这对下游也不会造成太大的影响因为此时的数据都是 insert-only 的数据同一主键也不会出现两条记录至少对 Hudi 表是没有影响的。
此外作为一个“额外收获”你会发现op_ts 这个字段本身恰好标记了一条记录是通过全量同步进来的还是增量同步进来的 补充以下是 Flink CDC 官方文档对 scan.startup.mode 5 种同步模式的解释
The config option scan.startup.mode specifies the startup mode for MySQL CDC consumer. The valid enumerations are:
initial (default): Performs an initial snapshot on the monitored database tables upon first startup, and continue to read the latest binlog.earliest-offset: Skip snapshot phase and start reading binlog events from the earliest accessible binlog offset.latest-offset: Never to perform snapshot on the monitored database tables upon first startup, just read from the end of the binlog which means only have the changes since the connector was started.specific-offset: Skip snapshot phase and start reading binlog events from a specific offset. The offset could be specified with binlog filename and position, or a GTID set if GTID is enabled on server.timestamp: Skip snapshot phase and start reading binlog events from a specific timestamp.