做宠物网站心得,wordpress 红色主题,网站注册人查询,c 2015 做网站写在前面好久没更新Blog了#xff0c;从CRUD Boy转型大数据开发#xff0c;拉宽了不少的知识面#xff0c;从今年年初开始筹备、组建、招兵买马#xff0c;到现在稳定开搞中#xff0c;期间踏过无数的火坑#xff0c;也许除了这篇还很写上三四篇。进入主题#xff0c;通… 写在前面 好久没更新Blog了从CRUD Boy转型大数据开发拉宽了不少的知识面从今年年初开始筹备、组建、招兵买马到现在稳定开搞中期间踏过无数的火坑也许除了这篇还很写上三四篇。 进入主题通常企业为了实现数据统计、数据分析、数据挖掘、解决信息孤岛等全局数据的系统化运作管理 为BI、经营分析、决策支持系统等深度开发应用奠定基础挖掘数据价值 企业会开始着手建立数据仓库数据中台。而这些数据来源则来自于企业的各个业务系统的数据或爬取外部的数据从业务系统数据到数据仓库的过程就是一个ETLExtract-Transform-Load行为包括了采集、清洗、数据转换等主要过程通常异构数据抽取转换使用Sqoop、DataX等日志采集Flume、Logstash、Filebeat等。 数据抽取分为全量抽取和增量抽取全量抽取类似于数据迁移或数据复制全量抽取很好理解增量抽取在全量的基础上做增量只监听、捕捉动态变化的数据。如何捕捉数据的变化是增量抽取的关键一是准确性必须保证准确的捕捉到数据的动态变化二是性能不能对业务系统造成太大的压力。增量抽取方式 通常增量抽取有几种方式各有优缺点。1. 触发器 在源数据库上的目标表创建触发器监听增、删、改操作捕捉到数据的变更写入临时表。优点操作简单、规则清晰对源表不影响缺点对源数据库有侵入对业务系统有一定的影响2. 全表比对 在ETL过程中抽取方建立临时表待全量抽取存储然后在进行比对数据。优点对源数据库、源表都无需改动完全交付ETL过程处理统一管理缺点ETL效率低、设计复杂数据量越大速度越慢时效性不确定3. 全表删除后再插入 在抽取数据之前先将表中数据清空然后全量抽取。优点ETL 操作简单速度快。缺点全量抽取一般采取T1的形式抽取数据量大的表容易对数据库造成压力4. 时间戳 时间戳的方式即在源表上增加时间戳列对发生变更的表进行更新然后根据时间戳进行提取。优点操作简单ELT逻辑清晰性能比较好缺点对业务系统有侵入数据库表也需要额外增加字段。对于老的业务系统可能不容易做变更。5. CDC方式 变更数据捕获Change Data Capture简称CDCSQLServer为实时更新数据同步提供了CDC机制类似于Mysql的binlog将数据更新操作维护到一张CDC表中。开启CDC的源表在插入INSERT、更新UPDATE和删除DELETE活动时会插入数据到日志表中。cdc通过捕获进程将变更数据捕获到变更表中通过cdc提供的查询函数可以捕获这部分数据。详情可以查看官方介绍关于变更数据捕获 (SQL Server)优点提供易于使用的API 来设置CDC 环境缩短ETL 的时间无需修改业务系统表结构。缺点受数据库版本的限制实现过程相对复杂。CDC增量抽取先决条件1. 已搭建好Kafka集群Zookeeper集群2. 源数据库支持CDC版本采用开发版或企业版。案例环境Ubuntu 20.04Kafka 2.13-2.7.0Zookeeper 3.6.2SQL Server 2012步骤 除了数据库开启CDC支持以外主要还是要将变更的数据通过Kafka Connect传输数据Debezium是目前官方推荐的连接器它支持绝大多数主流数据库MySQL、PostgreSQL、SQL Server、Oracle等等详情查看Connectors。1. 数据库步骤开启数据库CDC支持 在源数据库执行以下命令EXEC sys.sp_cdc_enable_db GO
附上关闭语句exec sys.sp_cdc_disable_db
查询是否启用select * from sys.databases where is_cdc_enabled 1
创建测试数据表已有表则跳过此步骤create table T_LioCDC
(ID int identity(1,1) primary key ,Name nvarchar(16),Sex bit,CreateTime datetime,UpdateTime datetime
);
对源表开启CDC支持exec sp_cdc_enable_table
source_schemadbo,
source_nameT_LioCDC,
role_namenull,
supports_net_changes 1;
确认是否有权限访问CDC TableEXEC sys.sp_cdc_help_change_data_capture确认SQL Server Agent已开启EXEC master.dbo.xp_servicecontrol NQUERYSTATE,NSQLSERVERAGENT以上则完成对数据库的CDC操作。2. Kafka步骤 Kafka Connect的工作模式分为两种分别是standalone模式和distributed模式。standalone用于单机测试本文用distributed模式用于生产环境。Kafka必须先运行启动再进行以下步骤进行配置。下载Sql Server Connector 下载连接器后创建一个文件夹来存放解压到该目录下即可例子路径 /usr/soft/kafka/kafka_2.13_2.7.0/plugins 记住这个路径配置中要用到。 下载地址debezium-connector-sqlserver-1.5.0.Final-plugin.tar.gz编辑connect-distributed.properties配置 修改Kafka connect配置文件$KAFKA_HOME/config/connect-distributed.properties变更内容如下//kafka集群ipport
bootstrap.servers172.192.10.210:9092,172.192.10.211:9092,172.192.10.212:9092key.converter.schemas.enablefalse
value.converter.schemas.enablefalseoffset.storage.topicconnect-offsets
offset.storage.replication.factor1
offset.storage.partitions3
offset.storage.cleanup.policycompactconfig.storage.topicconnect-configs
config.storage.replication.factor1status.storage.topicconnect-status
status.storage.replication.factor1
status.storage.partitions3
//刚刚下载连接器解压的路径
plugin.path/usr/soft/kafka/kafka_2.13_2.7.0/plugins
看到配置中有三个Topic分别是config.storage.topic用以保存connector和task的配置信息需要注意的是这个主题的分区数只能是1而且是有多副本的。
offset.storage.topic用以保存offset信息。
status.storage.topic用以保存connetor的状态信息。这些Topic可以不用创建启动后会默认创建。启动Kafka集群 保存配置之后将connect-distributed.properties分发到集群中然后启动bin/connect-distributed.sh config/connect-distributed.properties
检查是否启动 connector支持REST API的方式进行管理所以用Post man或者Fiddler可以调用相关接口进行管理。检查是否启动 不用奇怪上面配置集群的IP是172段这里的192.168.1.177仍是我的集群中的一个服务器因为服务器都使用了双网卡。因为还没有连接器相关配置所以接口返回是一个空数组接下来将新增一个连接器。编写sqlserver-cdc-source.json{name: sqlserver-cdc-source,config: {connector.class : io.debezium.connector.sqlserver.SqlServerConnector,database.server.name : JnServer,database.hostname : 172.192.20.2, --目标数据库的ipdatabase.port : 1433, --目标数据库的端口database.user : sa, --目标数据库的账号database.password : 123456, --密码database.dbname : Dis, --目标数据库的数据库名称table.whitelist: dbo.T_LioCDC, --监听表名schemas.enable : false, mode:incrementing, --增量模式incrementing.column.name: ID, --增量列名database.history.kafka.bootstrap.servers : 172.192.10.210:9092,172.192.10.211:9092,172.192.10.212, --kafka集群database.history.kafka.topic: TopicTLioCDC, --kafka topic内部使用不是由消费者使用value.converter.schemas.enable:false,value.converter:org.apache.kafka.connect.json.JsonConverter}
}
//源文地址https://www.cnblogs.com/EminemJK/p/14688907.html
还有其他额外的配置可以参考官方文档。然后执行继续执行检查就发现连接器已经成功配置了其他APIGET /connectors – 返回所有正在运行的connector名。
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段name是connector的名字config是json格式必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息。
GET /connectors/{name}/config – 获取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。
GET /connectors/{name}/status – 获取指定connector的状态包括它是否在运行、停止、或者失败如果发生错误还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
PUT /connectors/{name}/pause – 暂停connector和它的task停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
POST /connectors/{name}/restart – 重启一个connector尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector停止它的所有task并删除配置。//源文地址 https://www.cnblogs.com/EminemJK/p/14688907.html
查看Topic/usr/soft/kafka/kafka_2.13_2.7.0# bin/kafka-topics.sh --list --zookeeper localhost:2000
Topic JnServer.dbo.T_LioCDC 则是供我们消费的主题启动一个消费者进行监听测试bin/kafka-console-consumer.sh --bootstrap-server 172.192.10.210:9092 --consumer-property group.idgroup1 --consumer-property client.idconsumer-1 --topic JnServer.dbo.T_LioCDC
然后再源表进行一些列增删改操作--测试代码
insert into T_LioCDC(name, sex, createtime,UpdateTime) values (A,1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values (B,0,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values (C,1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values (D,0,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values (E,1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values (F,1,getdate(),getdate())
insert into T_LioCDC(name, sex, createtime,UpdateTime) values (G,0,getdate(),getdate())update T_LioCDC
set NameLio.Huang,UpdateTimegetdate()
where ID7
已经成功捕捉到数据的变更对比几个操作Json依次是insert、update、delete 最后 下班注文中的指定链接可在阅读原文中获取