网站下载app连接怎么做的,含山建设局网站,湘潭关键词优化服务,重庆谷歌seo关键词优化 本文将详细介绍Flink-CDC如何全量及增量采集Sqlserver数据源#xff0c;准备适配Sqlserver数据源的小伙伴们可以参考本文#xff0c;希望本文能给你带来一定的帮助。 一、Sqlserver的安装及开启事务日志
如果没有Sqlserver环境#xff0c;但你又想学习这块的内容#x… 本文将详细介绍Flink-CDC如何全量及增量采集Sqlserver数据源准备适配Sqlserver数据源的小伙伴们可以参考本文希望本文能给你带来一定的帮助。 一、Sqlserver的安装及开启事务日志
如果没有Sqlserver环境但你又想学习这块的内容那你只能自己动手通过docker安装一个 myself sqlserver来用作学习当然如果你有现成环境那就检查一下Sqlserver是否开启了代理(sqlagent.enabled)服务和CDC功能。
1.1 docker拉取镜像
看Github上写Flink-CDC 目前支持的Sqlserver版本为2012, 2014, 2016, 2017, 2019但我想全部拉到最新事实证明2022-latest 和latest是一样的因为imagId都是一致的且在后续测试也是没有问题的所以我在docker上拉取镜像时直接采用如下命令
docker pull mcr.microsoft.com/mssql/server:latest1.2 运行Sqlserver并设置代理
标准启动模式没什么好说的主要设置一下密码(密码要求比较严格建议直接在网上搜个随机密码生成器来搞一下)。
docker run -e ACCEPT_EULAY -e SA_PASSWORD${your_password} \-p 1433:1433 --name sqlserver \-d mcr.microsoft.com/mssql/server:latest设置代理sqlagent.enabled,代理设置完成后需要重启Sqlserver因为我们是docker安装的直接用docker restart sqlserver就行了。
[roothdp-01 ~]# docker exec -it --user root sqlserver bash
root0274812d0c10:/# /opt/mssql/bin/mssql-conf set sqlagent.enabled true
SQL Server needs to be restarted in order to apply this setting. Please run
systemctl restart mssql-server.service.
root0274812d0c10:/# exit
exit
[roothdp-01 ~]# docker restart sqlserver
sqlserver1.3 启用CDC功能
按照如下步骤执行命令如果看到is_cdc_enabled 1则说明当前数据库
root0274812d0c10:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P ${your_password}
1 create databases test;
2 go
1 use test;
2 go
Changed database context to test.
1 EXEC sys.sp_cdc_enable_db;
2 go
1 SELECT is_cdc_enabled FROM sys.databases WHERE name test;
2 go
is_cdc_enabled
--------------1(1 rows affected)
1 CREATE TABLE t_info (id int,order_date date,purchaser int,quantity int,product_id int,PRIMARY KEY ([id]))
2 go
1
2
3 EXEC sys.sp_cdc_enable_table
4 source_schema dbo,
5 source_name t_info,
6 role_name cdc_role;
7 go
Update mask evaluation will be disabled in net_changes_function because the CLR configuration option is disabled.
Job cdc.zeus_capture started successfully.
Job cdc.zeus_cleanup started successfully.
1 select * from t_info;
2 go
id order_date purchaser quantity product_id
----------- ---------------- ----------- ----------- -----------(0 rows affected)1.4 检查CDC是否正常开启
用客户端连接Sqlserver查看test库下的INFORMATION_SCHEMA.TABLES中是否出现TABLE_SCHEMA cdc的表如果出现说明已经成功安装Sqlserver并启用了CDC。
1 use test;
2 go
Changed database context to test.
1 select * from INFORMATION_SCHEMA.TABLES;
2 goTABLE_CATALOG TABLE_SCHEMA TABLE_NAME TABLE_TYPE
test dbo user_info BASE TABLE
test dbo systranschemas BASE TABLE
test cdc change_tables BASE TABLE
test cdc ddl_history BASE TABLE
test cdc lsn_time_mapping BASE TABLE
test cdc captured_columns BASE TABLE
test cdc index_columns BASE TABLE
test dbo orders BASE TABLE
test cdc dbo_orders_CT BASE TABLE二、具体实现
2.1 Flik-CDC采集SqlServer主程序
添加依赖包 dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-sqlserver-cdc/artifactIdversion3.0.0/version/dependency编写主函数 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置全局并行度env.setParallelism(1);// 设置时间语义为ProcessingTimeenv.getConfig().setAutoWatermarkInterval(0);// 每隔60s启动一个检查点env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);// checkpoint最小间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);// checkpoint超时时间env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许一个checkpoint// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// Flink处理程序被cancel后会保留Checkpoint数据// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);SourceFunctionString sqlServerSource SqlServerSource.Stringbuilder().hostname(localhost).port(1433).username(SA).password().database(test).tableList(dbo.t_info).startupOptions(StartupOptions.initial()).debeziumProperties(getDebeziumProperties()).deserializer(new CustomerDeserializationSchemaSqlserver()).build();DataStreamSourceString dataStreamSource env.addSource(sqlServerSource, _transaction_log_source);dataStreamSource.print().setParallelism(1);env.execute(sqlserver-cdc-test);}public static Properties getDebeziumProperties() {Properties properties new Properties();properties.put(converters, sqlserverDebeziumConverter);properties.put(sqlserverDebeziumConverter.type, SqlserverDebeziumConverter);properties.put(sqlserverDebeziumConverter.database.type, sqlserver);// 自定义格式可选properties.put(sqlserverDebeziumConverter.format.datetime, yyyy-MM-dd HH:mm:ss);properties.put(sqlserverDebeziumConverter.format.date, yyyy-MM-dd);properties.put(sqlserverDebeziumConverter.format.time, HH:mm:ss);return properties;}2.2 自定义Sqlserver反序列化格式
Flink-CDC底层技术为debezium它捕获到Sqlserver数据变更(CRUD)的数据格式如下
#初始化
Struct{afterStruct{id1,order_date2024-01-30,purchaser1,quantity100,product_id1},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706574924473,snapshottrue,dbzeus,schemadbo,tableorders,commit_lsn0000002b:00002280:0003},opr,ts_ms1706603724432}#新增
Struct{afterStruct{id12,order_date2024-01-11,purchaser6,quantity233,product_id63},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706603786187,dbzeus,schemadbo,tableorders,change_lsn0000002b:00002480:0002,commit_lsn0000002b:00002480:0003,event_serial_no1},opc,ts_ms1706603788461}#更新
Struct{beforeStruct{id12,order_date2024-01-11,purchaser6,quantity233,product_id63},afterStruct{id12,order_date2024-01-11,purchaser8,quantity233,product_id63},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706603845603,dbzeus,schemadbo,tableorders,change_lsn0000002b:00002500:0002,commit_lsn0000002b:00002500:0003,event_serial_no2},opu,ts_ms1706603850134}#删除
Struct{beforeStruct{id11,order_date2024-01-11,purchaser6,quantity233,product_id63},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706603973023,dbzeus,schemadbo,tableorders,change_lsn0000002b:000025e8:0002,commit_lsn0000002b:000025e8:0005,event_serial_no1},opd,ts_ms1706603973859}因此可以根据自己需要自定义反序列化格式将数据按照标准统一数据输出下面是我自定义的格式供大家参考
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.alibaba.fastjson2.JSONWriter;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.HashMap;
import java.util.Map;public class CustomerDeserializationSchemaSqlserver implements DebeziumDeserializationSchemaString {private static final long serialVersionUID -1L;Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) {MapString, Object resultMap new HashMap();String topic sourceRecord.topic();String[] split topic.split([.]);String database split[1];String table split[2];resultMap.put(db, database);resultMap.put(tableName, table);//获取操作类型Envelope.Operation operation Envelope.operationFor(sourceRecord);//获取数据本身Struct struct (Struct) sourceRecord.value();Struct after struct.getStruct(after);Struct before struct.getStruct(before);String op operation.name();resultMap.put(op, op);//新增,更新或者初始化if (op.equals(Envelope.Operation.CREATE.name()) || op.equals(Envelope.Operation.READ.name()) || op.equals(Envelope.Operation.UPDATE.name())) {JSONObject afterJson new JSONObject();if (after ! null) {Schema schema after.schema();for (Field field : schema.fields()) {afterJson.put(field.name(), after.get(field.name()));}resultMap.put(after, afterJson);}}if (op.equals(Envelope.Operation.DELETE.name())) {JSONObject beforeJson new JSONObject();if (before ! null) {Schema schema before.schema();for (Field field : schema.fields()) {beforeJson.put(field.name(), before.get(field.name()));}resultMap.put(before, beforeJson);}}collector.collect(JSON.toJSONString(resultMap, JSONWriter.Feature.FieldBased, JSONWriter.Feature.LargeObject));}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}2.3 自定义日期格式转换器
debezium会将日期转为5位数字日期时间转为13位的数字因此我们需要根据Sqlserver的日期类型转换成标准的时期或者时间格式。Sqlserver的日期类型主要包含以下几种
字段类型快照类型(jdbc type)cdc类型(jdbc type)DATEjava.sql.Date(91)java.sql.Date(91)TIMEjava.sql.Timestamp(92)java.sql.Time(92)DATETIMEjava.sql.Timestamp(93)java.sql.Timestamp(93)DATETIME2java.sql.Timestamp(93)java.sql.Timestamp(93)DATETIMEOFFSETmicrosoft.sql.DateTimeOffset(-155)microsoft.sql.DateTimeOffset(-155)SMALLDATETIMEjava.sql.Timestamp(93)java.sql.Timestamp(93)
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import org.apache.kafka.connect.data.SchemaBuilder;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Properties;Sl4j
public class SqlserverDebeziumConverter implements CustomConverterSchemaBuilder, RelationalColumn {private static final String DATE_FORMAT yyyy-MM-dd;private static final String TIME_FORMAT HH:mm:ss;private static final String DATETIME_FORMAT yyyy-MM-dd HH:mm:ss;private DateTimeFormatter dateFormatter;private DateTimeFormatter timeFormatter;private DateTimeFormatter datetimeFormatter;private SchemaBuilder schemaBuilder;private String databaseType;private String schemaNamePrefix;Overridepublic void configure(Properties properties) {// 必填参数database.type只支持sqlserverthis.databaseType properties.getProperty(database.type);// 如果未设置或者设置的不是mysql、sqlserver则抛出异常。if (this.databaseType null || !this.databaseType.equals(sqlserver))) {throw new IllegalArgumentException(database.type 必须设置为sqlserver);}// 选填参数format.date、format.time、format.datetime。获取时间格式化的格式String dateFormat properties.getProperty(format.date, DATE_FORMAT);String timeFormat properties.getProperty(format.time, TIME_FORMAT);String datetimeFormat properties.getProperty(format.datetime, DATETIME_FORMAT);// 获取自身类的包名数据库类型为默认schema.nameString className this.getClass().getName();// 查看是否设置schema.name.prefixthis.schemaNamePrefix properties.getProperty(schema.name.prefix, className . this.databaseType);// 初始化时间格式化器dateFormatter DateTimeFormatter.ofPattern(dateFormat);timeFormatter DateTimeFormatter.ofPattern(timeFormat);datetimeFormatter DateTimeFormatter.ofPattern(datetimeFormat);}// sqlserver的转换器public void registerSqlserverConverter(String columnType, ConverterRegistrationSchemaBuilder converterRegistration) {String schemaName this.schemaNamePrefix . columnType.toLowerCase();schemaBuilder SchemaBuilder.string().name(schemaName);switch (columnType) {case DATE:converterRegistration.register(schemaBuilder, value - {if (value null) {return null;} else if (value instanceof java.sql.Date) {return dateFormatter.format(((java.sql.Date) value).toLocalDate());} else {return this.failConvert(value, schemaName);}});break;case TIME:converterRegistration.register(schemaBuilder, value - {if (value null) {return null;} else if (value instanceof java.sql.Time) {return timeFormatter.format(((java.sql.Time) value).toLocalTime());} else if (value instanceof java.sql.Timestamp) {return timeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime().toLocalTime());} else {return this.failConvert(value, schemaName);}});break;case DATETIME:case DATETIME2:case SMALLDATETIME:case DATETIMEOFFSET:converterRegistration.register(schemaBuilder, value - {if (value null) {return null;} else if (value instanceof java.sql.Timestamp) {return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());} else if (value instanceof microsoft.sql.DateTimeOffset) {microsoft.sql.DateTimeOffset dateTimeOffset (microsoft.sql.DateTimeOffset) value;return datetimeFormatter.format(dateTimeOffset.getOffsetDateTime().withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime());} else {return this.failConvert(value, schemaName);}});break;default:schemaBuilder null;break;}}Overridepublic void converterFor(RelationalColumn relationalColumn, ConverterRegistrationSchemaBuilder converterRegistration) {// 获取字段类型String columnType relationalColumn.typeName().toUpperCase();// 根据数据库类型调用不同的转换器if (this.databaseType.equals(sqlserver)) {this.registerSqlserverConverter(columnType, converterRegistration);} else {log.warn(不支持的数据库类型: {}, this.databaseType);schemaBuilder null;}}private String getClassName(Object value) {if (value null) {return null;}return value.getClass().getName();}// 类型转换失败时的日志打印private String failConvert(Object value, String type) {String valueClass this.getClassName(value);String valueString valueClass null ? null : value.toString();return valueString;}
}
三、总计
目前Fink-CDC对这种增量采集传统数据库的技术已经封装的很好了并且官方也给了详细的操作教程但如果想要深入的学习一项技能个人觉得还是要从头到尾操作一遍一方面能够快速的提升自己另一方面发现问题时也能从不同的角度来思考解决方案希望本篇文章能够给大家带来一点帮助。