副业做网站程序,网站报错 500,app应用商店,全国室内设计学校1.环境搭建
canal可以用来监听mysql数据库的变化#xff0c;用来同步数据
先下载最新的部署版本#xff0c;release地址:Releases alibaba/canal GitHub
包下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz 下载…1.环境搭建
canal可以用来监听mysql数据库的变化用来同步数据
先下载最新的部署版本release地址:Releases · alibaba/canal · GitHub
包下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz 下载完后在linux上新建一个canal文件夹放入tar包解压: tar -zxvf canal.xxx.tar.gz
解压完后修改配置文件
查看conf/canal.properties,其中canal.port是客户端连接的端口需要放开,canal.admin.user和canal.admin.passwd是客户端连接的账号 再打开conf/example/ instance.properties, master.address填数据库地址,dbUsername和dbPassword是数据库账号,flter.regex可以用来过滤数据库默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔 修改完成后进入bin目录执行./startup.sh是启动,./stop.sh是关闭
进入logs/example,执行tail -f -n 300 example.log,看到以下输出说明搭建成功了 2.客户端代码
引入依赖 dependenciesdependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.7/version/dependencydependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.protocol/artifactIdversion1.1.7/version/dependency/dependencies
代码实现:
package cn.hollycloud.iplatform;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;/*** Unit test for simple App.*/
Slf4j
public class CanalTest {private MapString, String errorMap new HashMap();Testpublic void testCanal() {initThread();}private void initThread() {new Thread(new Runnable() {Overridepublic void run() {while (true) {try {initConnect();} catch (Exception e) {String key canal_connection_error;if (!hasSameError(key, e.getMessage())) {log.error(canal连接出错: {}, e);}}try {Thread.sleep(10000);} catch (InterruptedException e) {}}}}).start();}private void initConnect() {String canalIp localhost;int canalPort 11111;String canalDestination example;String canalUsername admin;String canalPassword 123456;CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp,canalPort), canalDestination, canalUsername, canalPassword);int batchSize 200;try {connector.connect(); // 连接到canal serverconnector.subscribe(db_.*\\..*); // 订阅指定的消息connector.rollback(); // 回滚到未进行ack 的地方log.info(canal连接成功);while (true) {Message message connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId message.getId();int size message.getEntries().size();if (batchId -1 || size 0) {try {//未获取到消息则睡眠Thread.sleep(2000);} catch (InterruptedException e) {}} else {try {//处理消息log.info(从canal接收到: {} 条消息,消息批次: {}开始处理, size, message.getId());handleMessage(message.getEntries());} catch (Exception e) {connector.rollback(batchId); // 处理失败, 回滚数据String key canal_sync_data_error;String errMsg e.getMessage();if (StringUtils.isEmpty(errMsg)) errMsg e.toString();if (!hasSameError(key, errMsg)) {log.error(同步数据出错: {}, e);}//休眠一段时间继续获取数据try {Thread.sleep(10000);} catch (InterruptedException ex) {ex.printStackTrace();}continue;}}connector.ack(batchId); // 提交确认}} finally {connector.disconnect();}}private boolean hasSameError(String key, String error) {String lastError errorMap.get(key);if (Objects.equals(lastError, error)) {return true;}errorMap.put(key, error);return false;}private void handleMessage(ListCanalEntry.Entry entrys) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() CanalEntry.EntryType.TRANSACTIONEND) {continue;}//根据数据库名获取租户名String databaseName entry.getHeader().getSchemaName();String tableName entry.getHeader().getTableName();log.info(数据库: {}, 表名: {}, databaseName, tableName);// 获取类型CanalEntry.EntryType entryType entry.getEntryType();// 获取序列化后的数据ByteString storeValue entry.getStoreValue();if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 反序列化数据CanalEntry.RowChange rowChange CanalEntry.RowChange.parseFrom(storeValue);// 获取当前事件的操作类型CanalEntry.EventType eventType rowChange.getEventType();if (eventType CanalEntry.EventType.INSERT || eventType CanalEntry.EventType.UPDATE|| eventType CanalEntry.EventType.DELETE) {// 获取数据集ListCanalEntry.RowData rowDataList rowChange.getRowDatasList();// 遍历rowDataList并打印数据集for (CanalEntry.RowData rowData : rowDataList) {ListCanalEntry.Column afterColumnsList rowData.getAfterColumnsList();ListCanalEntry.Column beforeColumnsList rowData.getBeforeColumnsList();// 变更前数据for (CanalEntry.Column column : beforeColumnsList) {log.info(变更前数据: name: {}, value: {}, column.getName(), column.getValue());}// 变更后数据for (CanalEntry.Column column : afterColumnsList) {log.info(变更后数据: name: {}, value: {}, column.getName(), column.getValue());}}}}}}
}