当前位置: 首页 > news >正文

网站下载app连接怎么做的含山建设局网站

网站下载app连接怎么做的,含山建设局网站,湘潭关键词优化服务,重庆谷歌seo关键词优化​ 本文将详细介绍Flink-CDC如何全量及增量采集Sqlserver数据源#xff0c;准备适配Sqlserver数据源的小伙伴们可以参考本文#xff0c;希望本文能给你带来一定的帮助。 一、Sqlserver的安装及开启事务日志 如果没有Sqlserver环境#xff0c;但你又想学习这块的内容#x…​ 本文将详细介绍Flink-CDC如何全量及增量采集Sqlserver数据源准备适配Sqlserver数据源的小伙伴们可以参考本文希望本文能给你带来一定的帮助。 一、Sqlserver的安装及开启事务日志 如果没有Sqlserver环境但你又想学习这块的内容那你只能自己动手通过docker安装一个 myself sqlserver来用作学习当然如果你有现成环境那就检查一下Sqlserver是否开启了代理(sqlagent.enabled)服务和CDC功能。 1.1 docker拉取镜像 看Github上写Flink-CDC 目前支持的Sqlserver版本为2012, 2014, 2016, 2017, 2019但我想全部拉到最新事实证明2022-latest 和latest是一样的因为imagId都是一致的且在后续测试也是没有问题的所以我在docker上拉取镜像时直接采用如下命令 docker pull mcr.microsoft.com/mssql/server:latest1.2 运行Sqlserver并设置代理 标准启动模式没什么好说的主要设置一下密码(密码要求比较严格建议直接在网上搜个随机密码生成器来搞一下)。 docker run -e ACCEPT_EULAY -e SA_PASSWORD${your_password} \-p 1433:1433 --name sqlserver \-d mcr.microsoft.com/mssql/server:latest设置代理sqlagent.enabled,代理设置完成后需要重启Sqlserver因为我们是docker安装的直接用docker restart sqlserver就行了。 [roothdp-01 ~]# docker exec -it --user root sqlserver bash root0274812d0c10:/# /opt/mssql/bin/mssql-conf set sqlagent.enabled true SQL Server needs to be restarted in order to apply this setting. Please run systemctl restart mssql-server.service. root0274812d0c10:/# exit exit [roothdp-01 ~]# docker restart sqlserver sqlserver1.3 启用CDC功能 按照如下步骤执行命令如果看到is_cdc_enabled 1则说明当前数据库 root0274812d0c10:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P ${your_password} 1 create databases test; 2 go 1 use test; 2 go Changed database context to test. 1 EXEC sys.sp_cdc_enable_db; 2 go 1 SELECT is_cdc_enabled FROM sys.databases WHERE name test; 2 go is_cdc_enabled --------------1(1 rows affected) 1 CREATE TABLE t_info (id int,order_date date,purchaser int,quantity int,product_id int,PRIMARY KEY ([id])) 2 go 1 2 3 EXEC sys.sp_cdc_enable_table 4 source_schema dbo, 5 source_name t_info, 6 role_name cdc_role; 7 go Update mask evaluation will be disabled in net_changes_function because the CLR configuration option is disabled. Job cdc.zeus_capture started successfully. Job cdc.zeus_cleanup started successfully. 1 select * from t_info; 2 go id order_date purchaser quantity product_id ----------- ---------------- ----------- ----------- -----------(0 rows affected)1.4 检查CDC是否正常开启 用客户端连接Sqlserver查看test库下的INFORMATION_SCHEMA.TABLES中是否出现TABLE_SCHEMA cdc的表如果出现说明已经成功安装Sqlserver并启用了CDC。 1 use test; 2 go Changed database context to test. 1 select * from INFORMATION_SCHEMA.TABLES; 2 goTABLE_CATALOG TABLE_SCHEMA TABLE_NAME TABLE_TYPE test dbo user_info BASE TABLE test dbo systranschemas BASE TABLE test cdc change_tables BASE TABLE test cdc ddl_history BASE TABLE test cdc lsn_time_mapping BASE TABLE test cdc captured_columns BASE TABLE test cdc index_columns BASE TABLE test dbo orders BASE TABLE test cdc dbo_orders_CT BASE TABLE二、具体实现 2.1 Flik-CDC采集SqlServer主程序 添加依赖包 dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-sqlserver-cdc/artifactIdversion3.0.0/version/dependency编写主函数 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置全局并行度env.setParallelism(1);// 设置时间语义为ProcessingTimeenv.getConfig().setAutoWatermarkInterval(0);// 每隔60s启动一个检查点env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);// checkpoint最小间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);// checkpoint超时时间env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许一个checkpoint// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// Flink处理程序被cancel后会保留Checkpoint数据// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);SourceFunctionString sqlServerSource SqlServerSource.Stringbuilder().hostname(localhost).port(1433).username(SA).password().database(test).tableList(dbo.t_info).startupOptions(StartupOptions.initial()).debeziumProperties(getDebeziumProperties()).deserializer(new CustomerDeserializationSchemaSqlserver()).build();DataStreamSourceString dataStreamSource env.addSource(sqlServerSource, _transaction_log_source);dataStreamSource.print().setParallelism(1);env.execute(sqlserver-cdc-test);}public static Properties getDebeziumProperties() {Properties properties new Properties();properties.put(converters, sqlserverDebeziumConverter);properties.put(sqlserverDebeziumConverter.type, SqlserverDebeziumConverter);properties.put(sqlserverDebeziumConverter.database.type, sqlserver);// 自定义格式可选properties.put(sqlserverDebeziumConverter.format.datetime, yyyy-MM-dd HH:mm:ss);properties.put(sqlserverDebeziumConverter.format.date, yyyy-MM-dd);properties.put(sqlserverDebeziumConverter.format.time, HH:mm:ss);return properties;}2.2 自定义Sqlserver反序列化格式 Flink-CDC底层技术为debezium它捕获到Sqlserver数据变更(CRUD)的数据格式如下 #初始化 Struct{afterStruct{id1,order_date2024-01-30,purchaser1,quantity100,product_id1},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706574924473,snapshottrue,dbzeus,schemadbo,tableorders,commit_lsn0000002b:00002280:0003},opr,ts_ms1706603724432}#新增 Struct{afterStruct{id12,order_date2024-01-11,purchaser6,quantity233,product_id63},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706603786187,dbzeus,schemadbo,tableorders,change_lsn0000002b:00002480:0002,commit_lsn0000002b:00002480:0003,event_serial_no1},opc,ts_ms1706603788461}#更新 Struct{beforeStruct{id12,order_date2024-01-11,purchaser6,quantity233,product_id63},afterStruct{id12,order_date2024-01-11,purchaser8,quantity233,product_id63},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706603845603,dbzeus,schemadbo,tableorders,change_lsn0000002b:00002500:0002,commit_lsn0000002b:00002500:0003,event_serial_no2},opu,ts_ms1706603850134}#删除 Struct{beforeStruct{id11,order_date2024-01-11,purchaser6,quantity233,product_id63},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706603973023,dbzeus,schemadbo,tableorders,change_lsn0000002b:000025e8:0002,commit_lsn0000002b:000025e8:0005,event_serial_no1},opd,ts_ms1706603973859}因此可以根据自己需要自定义反序列化格式将数据按照标准统一数据输出下面是我自定义的格式供大家参考 import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONWriter; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord;import java.util.HashMap; import java.util.Map;public class CustomerDeserializationSchemaSqlserver implements DebeziumDeserializationSchemaString {private static final long serialVersionUID -1L;Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) {MapString, Object resultMap new HashMap();String topic sourceRecord.topic();String[] split topic.split([.]);String database split[1];String table split[2];resultMap.put(db, database);resultMap.put(tableName, table);//获取操作类型Envelope.Operation operation Envelope.operationFor(sourceRecord);//获取数据本身Struct struct (Struct) sourceRecord.value();Struct after struct.getStruct(after);Struct before struct.getStruct(before);String op operation.name();resultMap.put(op, op);//新增,更新或者初始化if (op.equals(Envelope.Operation.CREATE.name()) || op.equals(Envelope.Operation.READ.name()) || op.equals(Envelope.Operation.UPDATE.name())) {JSONObject afterJson new JSONObject();if (after ! null) {Schema schema after.schema();for (Field field : schema.fields()) {afterJson.put(field.name(), after.get(field.name()));}resultMap.put(after, afterJson);}}if (op.equals(Envelope.Operation.DELETE.name())) {JSONObject beforeJson new JSONObject();if (before ! null) {Schema schema before.schema();for (Field field : schema.fields()) {beforeJson.put(field.name(), before.get(field.name()));}resultMap.put(before, beforeJson);}}collector.collect(JSON.toJSONString(resultMap, JSONWriter.Feature.FieldBased, JSONWriter.Feature.LargeObject));}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}2.3 自定义日期格式转换器 debezium会将日期转为5位数字日期时间转为13位的数字因此我们需要根据Sqlserver的日期类型转换成标准的时期或者时间格式。Sqlserver的日期类型主要包含以下几种 字段类型快照类型(jdbc type)cdc类型(jdbc type)DATEjava.sql.Date(91)java.sql.Date(91)TIMEjava.sql.Timestamp(92)java.sql.Time(92)DATETIMEjava.sql.Timestamp(93)java.sql.Timestamp(93)DATETIME2java.sql.Timestamp(93)java.sql.Timestamp(93)DATETIMEOFFSETmicrosoft.sql.DateTimeOffset(-155)microsoft.sql.DateTimeOffset(-155)SMALLDATETIMEjava.sql.Timestamp(93)java.sql.Timestamp(93) import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.apache.kafka.connect.data.SchemaBuilder; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Properties;Sl4j public class SqlserverDebeziumConverter implements CustomConverterSchemaBuilder, RelationalColumn {private static final String DATE_FORMAT yyyy-MM-dd;private static final String TIME_FORMAT HH:mm:ss;private static final String DATETIME_FORMAT yyyy-MM-dd HH:mm:ss;private DateTimeFormatter dateFormatter;private DateTimeFormatter timeFormatter;private DateTimeFormatter datetimeFormatter;private SchemaBuilder schemaBuilder;private String databaseType;private String schemaNamePrefix;Overridepublic void configure(Properties properties) {// 必填参数database.type只支持sqlserverthis.databaseType properties.getProperty(database.type);// 如果未设置或者设置的不是mysql、sqlserver则抛出异常。if (this.databaseType null || !this.databaseType.equals(sqlserver))) {throw new IllegalArgumentException(database.type 必须设置为sqlserver);}// 选填参数format.date、format.time、format.datetime。获取时间格式化的格式String dateFormat properties.getProperty(format.date, DATE_FORMAT);String timeFormat properties.getProperty(format.time, TIME_FORMAT);String datetimeFormat properties.getProperty(format.datetime, DATETIME_FORMAT);// 获取自身类的包名数据库类型为默认schema.nameString className this.getClass().getName();// 查看是否设置schema.name.prefixthis.schemaNamePrefix properties.getProperty(schema.name.prefix, className . this.databaseType);// 初始化时间格式化器dateFormatter DateTimeFormatter.ofPattern(dateFormat);timeFormatter DateTimeFormatter.ofPattern(timeFormat);datetimeFormatter DateTimeFormatter.ofPattern(datetimeFormat);}// sqlserver的转换器public void registerSqlserverConverter(String columnType, ConverterRegistrationSchemaBuilder converterRegistration) {String schemaName this.schemaNamePrefix . columnType.toLowerCase();schemaBuilder SchemaBuilder.string().name(schemaName);switch (columnType) {case DATE:converterRegistration.register(schemaBuilder, value - {if (value null) {return null;} else if (value instanceof java.sql.Date) {return dateFormatter.format(((java.sql.Date) value).toLocalDate());} else {return this.failConvert(value, schemaName);}});break;case TIME:converterRegistration.register(schemaBuilder, value - {if (value null) {return null;} else if (value instanceof java.sql.Time) {return timeFormatter.format(((java.sql.Time) value).toLocalTime());} else if (value instanceof java.sql.Timestamp) {return timeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime().toLocalTime());} else {return this.failConvert(value, schemaName);}});break;case DATETIME:case DATETIME2:case SMALLDATETIME:case DATETIMEOFFSET:converterRegistration.register(schemaBuilder, value - {if (value null) {return null;} else if (value instanceof java.sql.Timestamp) {return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());} else if (value instanceof microsoft.sql.DateTimeOffset) {microsoft.sql.DateTimeOffset dateTimeOffset (microsoft.sql.DateTimeOffset) value;return datetimeFormatter.format(dateTimeOffset.getOffsetDateTime().withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime());} else {return this.failConvert(value, schemaName);}});break;default:schemaBuilder null;break;}}Overridepublic void converterFor(RelationalColumn relationalColumn, ConverterRegistrationSchemaBuilder converterRegistration) {// 获取字段类型String columnType relationalColumn.typeName().toUpperCase();// 根据数据库类型调用不同的转换器if (this.databaseType.equals(sqlserver)) {this.registerSqlserverConverter(columnType, converterRegistration);} else {log.warn(不支持的数据库类型: {}, this.databaseType);schemaBuilder null;}}private String getClassName(Object value) {if (value null) {return null;}return value.getClass().getName();}// 类型转换失败时的日志打印private String failConvert(Object value, String type) {String valueClass this.getClassName(value);String valueString valueClass null ? null : value.toString();return valueString;} } 三、总计 目前Fink-CDC对这种增量采集传统数据库的技术已经封装的很好了并且官方也给了详细的操作教程但如果想要深入的学习一项技能个人觉得还是要从头到尾操作一遍一方面能够快速的提升自己另一方面发现问题时也能从不同的角度来思考解决方案希望本篇文章能够给大家带来一点帮助。
http://www.pierceye.com/news/917284/

相关文章:

  • 天津大学生专业做网站建设网站价格
  • 携程网站建设进度及实施过程文具电子商务网站开发内容
  • 怎么查看网站打开速度网站源码整站下载
  • 北京城乡住房建设部网站常见的网络营销推广方式有哪些
  • 做网站的成本费用钱宝网站怎么做任务
  • 网站上的格式用html怎么做部队网站设计
  • 帮客户做网站内容社交网站有哪些如何做
  • 网站开发与设计实训总结两千字公众号制作的网站开发
  • 一个公司做2个产品网站怎么做的用html5做的网站素材
  • 内乡网站建设咸阳网站建设报价
  • 企业网站多少钱扶余手机网站开发
  • 做外汇网站卖判刑多少年如何找回网站后台密码
  • 怎么做优惠券网站asp.net mvc 5网站开发之美
  • 网站底部浮动电话广告福建住房和城乡建设部网站
  • 建站之星破解版wordpress 置顶排序
  • c2c网站代表和网址涟源市建设局网站
  • 哪个网站有免费的模板免费网上商城系统
  • 一个网站的建设需要什么东西前十强排名家装公司
  • 广州网站建设报价表石家庄搜索排名提升
  • 网站备案步骤企业网站手机版模板免费下载
  • 郑州高端品牌网站建设镇江网站营销推广
  • 网站开发简单的框架南昌手机网站
  • 网站分析与优化百度新闻源网站有哪些
  • 直播网站开发秀色上海综合新闻
  • 电子商务网站建设与管理课后题答案企业网站推广哪家好
  • 网站被挂黑链怎么删除石家庄企业网站建设
  • 网站模板怎么连接域名可视化网页设计在线
  • 美术馆网站建设要求开发软件多少钱一个月
  • 直播网站开发核心技术wordpress访问次数插件
  • wap网站 劣势微信小程序怎么写