苏州手机网站,绍兴seo,百度信息流广告代理,项目管理系统软件开发一、前言
这部分主要记录 datax 实现增量同步的方案。
二、核心思路
结合datax 提供的preSql、 postSql以及占位符#xff0c;外加另外一张表同步日志表来记录相关同步信息。
三、版本迭代
3.1 初版本
where tbq.opera_date cast(date_format(DATE_SUB(NOW(), inte…一、前言
这部分主要记录 datax 实现增量同步的方案。
二、核心思路
结合datax 提供的preSql、 postSql以及占位符外加另外一张表同步日志表来记录相关同步信息。
三、版本迭代
3.1 初版本
where tbq.opera_date cast(date_format(DATE_SUB(NOW(), interval 5 minute), %Y%m%d%H%i%s000) as unsigned)这个版本是直接以执行时时间为时间戳。 缺点显而易见。当同步时间比较久的时候5 分钟就远远不够。
3.2 版本
阅读 datax 的使用说明里对于 mysql 的写支持 presql 和 postsql 的方式。 因此考虑新建一个表
在同步之前利用 preSql往该表中插入一条数据记录记录同步开始时间。同步完成后利用 postSql 更新当前同步的这条记录记录同步结束时间读取时从该表中获取上次同步开始时间的数据作为同步时间戳。 最终 json 脚本变成如下
{job: {content: [{reader: {name: mysqlreader,parameter: {column: [xxxx],connection: [{xxxx}],where: update_date (select l.sync_start_date from sys_sync_log l where l.sync_business_type gongdan and l.sync_result 1 order by l.sync_start_date desc limit 1),}},writer: {name: mysqlwriter,parameter: {column: [xxx],connection: [{xxxx}],preSql:[insert into sys_sync_log(sync_start_date,sync_result) values(now(),2)],postSql:[update sys_sync_log l set l.sync_end_date now(),l.sync_result 1 where l.id ( select t.id from (select l1.id from sys_sync_log l1 where l1.sync_result 2 order by l1.sync_start_date desc limit 1) t )],writerMode:replace}}}],setting: {speed: {channel: 5}}}
}
此版本相对于上个版本时间戳的获取上比较固定能避免因为同步代码问题导致时间戳获取不准。
3.3 版本
上述版本写的相对复杂需要先查询当前同步记录之后再更新同步结束时间。无法保证一致性即preSql 的插入的记录和 postSql 更新记录可能不是同一个记录。 再结合 datax 的占位符特性可以将记录的主键由外部传入。 因此 json 脚本变成
{preSql:[insert into sys_sync_log(id,sync_start_date,sync_result) values(${logId},now(),2)],postSql:[update sys_sync_log l set l.sync_end_date now(),l.sync_result 1 where l.id ${logId}],
}其中 ${logId}为占位符 liunx 中通过 uuidgen 命令可以获取 uuid。 因此执行同步脚本时参考如下命令执行即可
python ../bin/datax.py -p -DlogIduuidgen ./ssss.json其中-p “-DlogIduuidgen” 为获取 uuid并传给 sss.json中 这个版本可以保证 preSql 和 postSql 处理的记录是同一条。
四、扩展
应该还有更优方案还需继续研究。