网站页面设计主要包括,动漫网页制作源代码,服装行业网站建设,深圳单位名称和单位地址1、canal是什么#xff0c;可以用来作什么 canal是阿里开源的一个用于监听数据库binlog#xff0c;从而实现数据同步的工具。 2、安装 我使用的是1.1.5版本#xff0c;太高的版本需要的jdk版本和mysql的驱动版本会更高#xff0c;可以根据自己的环境选择。 如果是自己玩的话…1、canal是什么可以用来作什么
canal是阿里开源的一个用于监听数据库binlog从而实现数据同步的工具。 2、安装
我使用的是1.1.5版本太高的版本需要的jdk版本和mysql的驱动版本会更高可以根据自己的环境选择。 如果是自己玩的话安装 canal.deployer-1.1.5.tar.gz就可以了 地址 Release v1.1.5 · alibaba/canal · GitHub 3、springbootmysqlcanal实现数据同步可以在网上找到很多博客不在赘述 4、源码梳理
1、既然用到springboot肯定有一个自动注入的autoconfigure的start。 可以看到spring.factories会自动注入几个client。 2、找到一个看着顺眼的client进去看看 我选择的是SimpleClientAutoConfiguration Configuration
EnableConfigurationProperties({CanalSimpleProperties.class})
ConditionalOnBean({EntryHandler.class})
ConditionalOnProperty(value {canal.mode},havingValue simple,matchIfMissing true
)
Import({ThreadPoolAutoConfiguration.class})
public class SimpleClientAutoConfiguration {private CanalSimpleProperties canalSimpleProperties;public SimpleClientAutoConfiguration(CanalSimpleProperties canalSimpleProperties) {this.canalSimpleProperties canalSimpleProperties;}Beanpublic RowDataHandlerRowData rowDataHandler() {return new RowDataHandlerImpl(new EntryColumnModelFactory());}BeanConditionalOnProperty(value {canal.async},havingValue true,matchIfMissing true)public MessageHandler messageHandler(RowDataHandlerRowData rowDataHandler, ListEntryHandler entryHandlers, ExecutorService executorService) {return new AsyncMessageHandlerImpl(entryHandlers, rowDataHandler, executorService);}BeanConditionalOnProperty(value {canal.async},havingValue false)public MessageHandler messageHandler(RowDataHandlerRowData rowDataHandler, ListEntryHandler entryHandlers) {return new SyncMessageHandlerImpl(entryHandlers, rowDataHandler);}Bean(initMethod start,destroyMethod stop)public SimpleCanalClient simpleCanalClient(MessageHandler messageHandler) {String server this.canalSimpleProperties.getServer();String[] array server.split(:);return SimpleCanalClient.builder().hostname(array[0]).port(Integer.parseInt(array[1])).destination(this.canalSimpleProperties.getDestination()).userName(this.canalSimpleProperties.getUserName()).password(this.canalSimpleProperties.getPassword()).messageHandler(messageHandler).batchSize(this.canalSimpleProperties.getBatchSize()).filter(this.canalSimpleProperties.getFilter()).timeout(this.canalSimpleProperties.getTimeout()).unit(this.canalSimpleProperties.getUnit()).build();}
} 看到会注入SimpleCanalClient。并且指明了初始化方法和销毁的方法。进去看看。发现是继承了一个抽象的client这个类是关键内部有start和stop的具体实现。 很明显start就是启动一个线程 while(true)的去循环执行binlog的获取和处理。 如何获取的代码没有跟进但是可以猜到应该是通过连接然后去获取数据。 这里着重看一下处理数据的代码 public abstract class AbstractMessageHandler implements MessageHandlerMessage {private MapString, EntryHandler tableHandlerMap;private RowDataHandlerCanalEntry.RowData rowDataHandler;public AbstractMessageHandler(List? extends EntryHandler entryHandlers, RowDataHandlerCanalEntry.RowData rowDataHandler) {this.tableHandlerMap HandlerUtil.getTableHandlerMap(entryHandlers);this.rowDataHandler rowDataHandler;}Overridepublic void handleMessage(Message message) {ListCanalEntry.Entry entries message.getEntries(); 第一步 for (CanalEntry.Entry entry : entries) {if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) { 第二步try {EntryHandler? entryHandler HandlerUtil.getEntryHandler(tableHandlerMap, entry.getHeader().getTableName()); 第三步if(entryHandler!null){CanalModel model CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName()).executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();CanalContext.setModel(model);CanalEntry.RowChange rowChange CanalEntry.RowChange.parseFrom(entry.getStoreValue());ListCanalEntry.RowData rowDataList rowChange.getRowDatasList(); 第四步CanalEntry.EventType eventType rowChange.getEventType();for (CanalEntry.RowData rowData : rowDataList) {rowDataHandler.handlerRowData(rowData,entryHandler,eventType);}}} catch (Exception e) {throw new RuntimeException(parse event has an error , data: entry.toString(), e);}finally {CanalContext.removeModel();}}}}} 进入rowDataHandler.handlerRowData(maps, entryHandler, eventType);实现类选择的是RowDataHandlerImpl。 public class RowDataHandlerImpl implements RowDataHandlerCanalEntry.RowData {private IModelFactoryListCanalEntry.Column modelFactory;public RowDataHandlerImpl(IModelFactory modelFactory) {this.modelFactory modelFactory;}Overridepublic R void handlerRowData(CanalEntry.RowData rowData, EntryHandlerR entryHandler, CanalEntry.EventType eventType) throws Exception {if (entryHandler ! null) {switch (eventType) {case INSERT:R object modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());entryHandler.insert(object);break;case UPDATE:SetString updateColumnSet rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated).map(CanalEntry.Column::getName).collect(Collectors.toSet());R before modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);R after modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());entryHandler.update(before, after);break;case DELETE:R o modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());entryHandler.delete(o);break;default:break;}}}
} 回想一下springboot中使用canal的时候会有一个注解CanalTable和一个实现类EntryHandler。 这里的代码要做的就是1、匹配合适的语句类型insert、delete、update。2、insert和delete只需要记录一下操作的值update需要记录一下修改前和修改后的值。也很好理解insert和delete回滚只需要反向重放代码就行而update需要知道之前的数据采集重新update。 进入newInstance方法选择AbstractModelFactory public abstract class AbstractModelFactoryT implements IModelFactoryT {Overridepublic R R newInstance(EntryHandler entryHandler, T t) throws Exception {String canalTableName HandlerUtil.getCanalTableName(entryHandler);if (TableNameEnum.ALL.name().toLowerCase().equals(canalTableName)) {return (R) t;}ClassR tableClass GenericUtil.getTableClass(entryHandler);if (tableClass ! null) {return newInstance(tableClass, t);}return null;}abstract R R newInstance(ClassR c, T t) throws Exception;
} 重点来了有两个HandlerUtil.getCanalTableName和GenericUtil.getTableClass。还记得咱们再springboot中的代码会指定 CanalTable 处理的是那个表和EntryHandler泛型吗。 第一步判断这个EntryHandler实现类有没有指定要处理那个表如果指定了All。那么就要就走自定义的返回值这个返回值通常不是我们需要的。所以在使用中一定尽量指定要处理的表。 第二步需要匹配EntryHandler中的泛型类进行赋值操作了。 最后进入newInstance方法 public class EntryColumnModelFactory extends AbstractModelFactoryListCanalEntry.Column {......OverrideR R newInstance(ClassR c, ListCanalEntry.Column columns) throws Exception {R object c.newInstance();MapString, String columnNames EntryUtil.getFieldName(object.getClass());for (CanalEntry.Column column : columns) {String fieldName columnNames.get(column.getName());if (StringUtils.isNotEmpty(fieldName)) {FieldUtil.setFieldValue(object, fieldName, column.getValue());}}return object;}} 代码比较简单通过反射给对象赋值。如果不太清楚这里是怎么把数据解析出来的可以自己搭建起来服务执行一下看看canal返回的结构体我下边也提出来我的返回并且我也会将上边代码中和数据解析的地方标红。 获取消息 Message[id14,entries[header {version: 1logfileName: mysql-bin.000004logfileOffset: 19806serverId: 1serverenCode: UTF-8executeTime: 1706838103000sourceType: MYSQLschemaName: tableName: eventLength: 80
}
entryType: TRANSACTIONBEGIN
storeValue: 9
, header {version: 1logfileName: mysql-bin.000004logfileOffset: 19939serverId: 1serverenCode: UTF-8executeTime: 1706838103000sourceType: MYSQLschemaName: testtableName: firsteventLength: 53eventType: INSERTprops {key: rowsCountvalue: 1}
}
entryType: ROWDATA
storeValue: \b\341\001\020\001P\000b\203\001\022\\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0010\000B\0016R\006bigint\022%\b\001\020\f\032\aaddress \000(\0010\000B\003333R\vvarchar(10)\0226\b\002\020]\032\vcreate_time \000(\0010\000B\0232024-02-02 09:41:43R\bdatetime
, header {version: 1logfileName: mysql-bin.000004logfileOffset: 19992serverId: 1serverenCode: UTF-8executeTime: 1706838103000sourceType: MYSQLschemaName: tableName: eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: \022\006381841
],rawfalse,rawEntries[]] 至此在springboot中通过canal获取binlog的日志并且解析为自定义的entry对象的流程就已经分析、梳理完了。至于后续要怎么处理就有很多的方式了。 最后在分享一个idea跟踪源码的小技巧 比如我们看到一个比较重要的注解但是不知道这个注解具体实现在哪里可以进入注解中选中注解名称然后选择Find Usages。就可以看到哪里使用了。