济宁做网站的,南昌网站开发公司电话,南宁建站公司模板,wordpress 点赞函数Debezium日常分享系列之#xff1a;Debezium 通知 一、概论二、Debezium通知格式三、Debezium 有关初始快照状态的通知四、Debezium 有关增量快照进度的通知五、启用 Debezium 通知六、访问 Debezium JMX 通知七、自定义通知渠道八、应用案例 一、概论
Debezium 通知提供了一… Debezium日常分享系列之Debezium 通知 一、概论二、Debezium通知格式三、Debezium 有关初始快照状态的通知四、Debezium 有关增量快照进度的通知五、启用 Debezium 通知六、访问 Debezium JMX 通知七、自定义通知渠道八、应用案例 一、概论
Debezium 通知提供了一种获取有关连接器状态信息的机制。通知可以发送到以下渠道
接收器通知通道通过 Connect API 将通知发送到配置的主题。日志通知通道通知会附加到日志中。JmxNotificationChannel通知作为 JMX bean 中的属性公开。定制通知将发送到您实施的自定义渠道。
二、Debezium通知格式
通知消息包含以下信息
属性描述id分配给通知的唯一标识符。对于增量快照通知id 与使用执行快照信号发送的相同。aggregate_type快照类型type提供有关在aggregate_type 字段中指定的事件的状态信息。additional_data包含有关通知的详细信息的 MapString,String。timestamp创建通知的时间。 Epoch unix 时间戳以毫秒为单位
三、Debezium 有关初始快照状态的通知
以下示例显示了提供初始快照状态的典型通知
{id: 5563ae14-49f8-4579-9641-c1bbc2d76f99,aggregate_type: Initial Snapshot,type: COMPLETED, additional_data : {connector_name: myConnector},timestamp: 1695817046353
}类型字段可以包含以下值之一
STARTEDIN_PROGRESSTABLE_SCAN_COMPLETEDCOMPLETEDABORTEDSKIPPED
下表显示了报告初始快照状态的通知中可能存在的不同负载的示例
STARTED
{id:ff81ba59-15ea-42ae-b5d0-4d74f1f4038f,aggregate_type:Initial Snapshot,type:STARTED,additional_data:{connector_name:my-connector},timestamp: 1695817046353
}IN_PROGRESS
{id:6d82a3ec-ba86-4b36-9168-7423b0dd5c1d,aggregate_type:Initial Snapshot,type:IN_PROGRESS,additional_data:{connector_name:my-connector,data_collections:table1, table2,current_collection_in_progress:table1},timestamp: 1695817046353
}Mongo 连接器当前不支持字段 data_collection
TABLE_SCAN_COMPLETED id:6d82a3ec-ba86-4b36-9168-7423b0dd5c1d,aggregate_type:Initial Snapshot,type:TABLE_SCAN_COMPLETED,additional_data:{connector_name:my-connector,data_collection:table1, table2,scanned_collection:table1,total_rows_scanned:100,status:SUCCEEDED},timestamp: 1695817046353
}在前面的示例中additional_data.status 字段可以包含以下值之一
SQL_异常执行快照时发生 SQL 异常。
成功了快照成功完成。
Mongo 连接器当前不支持字段total_rows_scanned 和data_collection
COMPLETED
{id:ff81ba59-15ea-42ae-b5d0-4d74f1f4038f,aggregate_type:Initial Snapshot,type:COMPLETED,additional_data:{connector_name:my-connector},timestamp: 1695817046353
}ABORTED
{id:ff81ba59-15ea-42ae-b5d0-4d74f1f4038f,aggregate_type:Initial Snapshot,type:ABORTED,additional_data:{connector_name:my-connector},timestamp: 1695817046353
}SKIPPED
{id:ff81ba59-15ea-42ae-b5d0-4d74f1f4038f,aggregate_type:Initial Snapshot,type:SKIPPED,additional_data:{connector_name:my-connector},timestamp: 1695817046353
}四、Debezium 有关增量快照进度的通知
下表显示了报告增量快照状态的通知中可能存在的不同负载的示例
Start
{id:ff81ba59-15ea-42ae-b5d0-4d74f1f4038f,aggregate_type:Incremental Snapshot,type:STARTED,additional_data:{connector_name:my-connector,data_collections:table1, table2},timestamp: 1695817046353
}Paused
{id:068d07a5-d16b-4c4a-b95f-8ad061a69d51,aggregate_type:Incremental Snapshot,type:PAUSED,additional_data:{connector_name:my-connector,data_collections:table1, table2},timestamp: 1695817046353
}Resumed
{id:a9468204-769d-430f-96d2-b0933d4839f3,aggregate_type:Incremental Snapshot,type:RESUMED,additional_data:{connector_name:my-connector,data_collections:table1, table2},timestamp: 1695817046353
}Stopped
{id:83fb3d6c-190b-4e40-96eb-f8f427bf482c,aggregate_type:Incremental Snapshot,type:ABORTED,additional_data:{connector_name:my-connector},timestamp: 1695817046353
}Processing chunk
{id:d02047d6-377f-4a21-a4e9-cb6e817cf744,aggregate_type:Incremental Snapshot,type:IN_PROGRESS,additional_data:{connector_name:my-connector,data_collections:table1, table2,current_collection_in_progress:table1,maximum_key:100,last_processed_key:50},timestamp: 1695817046353
}Snapshot completed for a table
{id:6d82a3ec-ba86-4b36-9168-7423b0dd5c1d,aggregate_type:Incremental Snapshot,type:TABLE_SCAN_COMPLETED,additional_data:{connector_name:my-connector,data_collection:table1, table2,scanned_collection:table1,total_rows_scanned:100,status:SUCCEEDED},timestamp: 1695817046353
}在前面的示例中additional_data.status 字段可以包含以下值之一
EMPTY该表不包含任何值。
NO_PRIMARY_KEY无法完成快照表没有主键。
SKIPPED无法完成此类表的快照。有关详细信息请参阅日志。
SQL_EXCEPTION执行快照时发生 SQL 异常。
SUCCEEDED快照成功完成。
UNKNOWN_SCHEMA找不到该表的架构。检查日志中已知表的列表。
Completed
{id:6d82a3ec-ba86-4b36-9168-7423b0dd5c1d,aggregate_type:Incremental Snapshot,type:COMPLETED,additional_data:{connector_name:my-connector},timestamp: 1695817046353
}五、启用 Debezium 通知
要使 Debezium 能够发出通知请通过设置 notification.enabled.channels 配置属性来指定通知通道列表。默认情况下以下通知渠道可用
sinklogjmx
重要的要使用接收器通知通道还必须将 notification.sink.topic.name 配置属性设置为希望 Debezium 发送通知的主题的名称。
六、访问 Debezium JMX 通知
要使 Debezium 能够报告通过 JMX beans 公开的事件请完成以下配置步骤
启用 JMX MBean 服务器以公开通知 bean。将 jmx 添加到连接器配置中的 notification.enabled.channels 属性中。将首选的 JMX 客户端连接到 MBean 服务器。
通知通过名称为 debezium..management.notifications. 的 bean 的“Notifications”属性公开。
下图显示了报告增量快照开始的通知 要放弃通知请对 bean 调用重置操作。
通知还公开为 debezium.notification 类型的 JMX 通知。要使应用程序能够侦听 MBean 发出的 JMX 通知请为应用程序订阅通知。
七、自定义通知渠道
通知机制被设计为可扩展的。可以根据需要实施渠道以最适合的环境的方式传递通知。添加通知通道涉及几个步骤
为通道创建一个Java项目来实现通道并添加Debezium Core作为依赖项。部署通知通道。通过修改连接器配置使连接器能够使用自定义通知通道。
配置自定义通知渠道
自定义通知通道是实现 io.debezium.pipeline.notification.channels.NotificationChannel 服务提供者接口 (SPI) 的 Java 类。例如
public interface NotificationChannel {String name(); 1void init(CommonConnectorConfig config); 2void send(Notification notification); 3void close(); 4
}频道的名称。要使 Debezium 能够使用该通道请在连接器的 notification.enabled.channels 属性中指定此名称。初始化通道所需的特定配置、变量或连接。在频道上发送通知。 Debezium 调用此方法来报告其状态。关闭所有分配的资源。 Debezium 在连接器停止时调用此方法。
Debezium 核心模块依赖项
自定义通知通道 Java 项目具有对 Debezium 核心模块的编译依赖项。必须将这些编译依赖项包含在项目的 pom.xml 文件中如以下示例所示
dependencygroupIdio.debezium/groupIdartifactIddebezium-core/artifactIdversion${version.debezium}/version
/dependency${version.debezium} 表示 Debezium 连接器的版本。
在 META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel 文件中声明您的实现。
部署自定义通知渠道
先决条件有一个自定义通知通道 Java 程序。
程序要将通知通道与 Debezium 连接器结合使用请将 Java 项目导出到 JAR 文件然后将该文件复制到包含要与其一起使用的每个 Debezium 连接器的 JAR 文件的目录。
例如在典型部署中Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中每个连接器 JAR 位于其自己的子目录中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等。要将信号通道与连接器一起使用请将转换器 JAR 文件添加到连接器的子目录中。
注意要将自定义通知通道与多个连接器一起使用必须将通知通道 JAR 文件的副本放置在每个连接器子目录中。
配置连接器以使用自定义通知通道在连接器配置中将自定义通知通道的名称添加到 notification.enabled.channels 属性中。
八、应用案例
Debezium系列之实现增量快照incremental技术的详细步骤Debezium系列之基于数据库信号表和Kafka信号Topic两种技术方案实现增量快照incremental技术的详细步骤Debezium系列之深入理解临时阻塞快照
更多Debezium实战应用可以参考博主Debezium专栏
Debezium专栏Debezium实战应用详细总结