专业做二手房的网站有哪些,网站开发选定制还是模板,海外电商怎么做如何从零开始,建网站能挣钱吗前言 今天来学习一个新的大数据小工具 Maxwell #xff0c;它和 Sqoop 很像。Sqoop主要用于在 Hadoop #xff08;比如 HDFS、Hive、HBase 等#xff09;和关系型数据库之间进行数据的批量导入和导出#xff0c;而 Maxwell 则主要用于监控数据库的变化#xff08;通过监控…前言 今天来学习一个新的大数据小工具 Maxwell 它和 Sqoop 很像。Sqoop主要用于在 Hadoop 比如 HDFS、Hive、HBase 等和关系型数据库之间进行数据的批量导入和导出而 Maxwell 则主要用于监控数据库的变化通过监控 binlog 并将变化的数据以JSON格式发布到消息队列一般是 Kafka或 Redis 中。 Sqoop 一般都是在特定时间比如用 Azkaban 调度在每天0点进行作业可以用于离线数仓的数据同步问题但是对于实时数据分析Sqoop 并不方便。所以这里就引出了 Maxwell 这个工具它的特点包括轻量级、能够捕获完整的历史数据通过bootstrap功能以及支持断点还原即在错误解决后可以从上次的位置继续读取数据。然而Maxwell只支持 json 格式并且不能直接支持HA高可用性。 这里说起 Sqoop 我又想到了 DataX 它也是一款全量数据采集工具和 Sqoop 很相似但是还是有所区别
Sqoop是基于Hadoop生态系统的充分利用了MapReduce计算框架进行数据的导入导出。而DataX仅仅在运行DataX的单台机器上进行数据的抽取和加载。Sqoop采用MapReduce框架具有良好的并发性和容错性可以在多个节点同时进行import或export操作。而DataX则是单进程的没有并发性和容错性。Sqoop主要支持在关系型数据库和Hadoop组件如HDFS、Hive、HBase之间进行数据迁移但不支持在Hadoop组件之间或关系型数据库之间进行数据迁移。相比之下DataX支持更广泛的数据迁移场景包括关系型数据库、Hadoop组件以及其他数据源之间的数据迁移。Sqoop在导入导出数据时会生成一个MapReduce作业在Hadoop集群中运行可以处理大规模的数据但不具备统计和校验能力。而DataX在传输过程中可以进行数据过滤并且可以统计传输数据的信息对于业务场景复杂、表结构变更的情况更为适用。Sqoop采用命令行的方式调用容易与现有的调度监控方案相结合。而DataX采用XML配置文件的方式开发运维上相对不太方便。
接下来就来学习 Maxwell 内容不多过两天再把 DataX 搞定。
1、MaxWell 简介
1.1 Maxwell概述 Maxwell 是由美国 Zendesk 公司开源用 Java 编写的 MySQL 变更数据抓取软件。它会实时监控Mysql数据库的数据变更操作包括insert、update、delete并将变更数据以 JSON 格式发送给 Kafka、Kinesi、RabbitMQ、Redis 或者其它平台的应用程序。 官网地址Maxwells Daemon
1.2 Maxwell 输出数据格式 我们知道 Maxwell 是以 JSON 格式传输数据的上面显示了不同数据库数据操作对应的 json 格式这里是这些 json 字段的说明 字段 解释 database 变更数据所属的数据库 table 表更数据所属的表 type 数据变更类型 ts 数据变更发生的时间 xid 事务id commit 事务提交标志可用于重新组装事务 data 对于insert类型表示插入的数据对于update类型标识修改之后的数据对于delete类型表示删除的数据 old 对于update类型表示修改之前的数据只包含变更字段
2、Maxwell 原理 Maxwell的工作原理是实时读取MySQL数据库的二进制日志Binlog从中获取变更数据再将变更数据以JSON格式发送至Kafka等流处理平台。
2.1 MySQL二进制日志
2.1.1、什么是 binlog 二进制日志Binlog是MySQL服务端非常重要的一种日志它记录了所有的 DDL 和 DML 除了查询语句语句以事件的形式记录还包含所执行的消耗的时间MySQL 的二进制日志是事务安全类型的。Maxwell的工作原理和主从复制密切相关。 一般开启二进制日志会有 1% 的性能损耗。二进制日志有两个最重要的场景
MySQL Replication 在 Master 端开启 binlogMaster 把它的二进制日志传递给其它 salves 来达到 master-salve 数据一致的目的。通过使用 musqlbinlog 工具来进行数据恢复。 MySQL二进制日志的存储机制很像我们之前学的Kafka 文件存储机制它也包含两类文件二进制索引文件存储二进制日志文件索引后缀为 .index和二进制日志文件用于记录所有 DDL 和 DML 语句后缀为 .00000*。
2.1.2、binlog 的开启
找到 MySQL 配置文件的位置linux/etc/my.cnf 可以通过 locate my.cnf 查找位置window\my.ini修改配置在 [mysqlId] 区块设置添加 log-binmysql-bin 这个表示 binlog 的前缀是 mysql-bin也可以换成别的也就是说以后生成的日志文件就是 mysq-bin.000001 格式的文件每次 mysql 重启或者达到单个文件大小的阈值时生成一个新的日志文件、按顺序编号
2.1.3、binlog 的分类设置
mysql binlog 的格式共有三种STATEMENTMINEDROW。
在配置文件中可以选择配置binllog_format statment|mixed|row
statement语句级它会记录每一次执行写操作的雨具。相比较 row 模式节省空间但是可能产生不一致比如 update student enroll_time now(); 这种 SQL 如果进行数据恢复那一定是有问题的。 优点节省空间缺点有可能造成数据不一致row行级binlog 记录每次操作后每行记录的变化相当于它记录的是我们每次操作后表格的内容。 优点保持数据的绝对一致性不管什么sql它只记录执行后的结果缺点占用磁盘较大mixed混合级别statement 的升级版一定程度上解决了 statement 模式因为一些情况而造成的数据不一致问题。比如对于 SQL 中包含随机数或者时间的语句它保存的是 SQL 执行的结果对于 SQL 不包含随机数或者时间的语句它保存的是 SQL 语句本身。 优点节省空间缺点极个别情况下仍然存在不一致
综上Maxwell 做监控分析选择 row 模式比较合适必须选择 row因为我们的下游比如 Kafka 是无法执行 sql 语句的
2.1.4、Maxwell 和 Canal 对比
对比CanalMaxwell语言 java java数据格式自由json采集模式增量全量/增量数据落地定制支持 kafka 等多种平台HA支持支持
Canal 的原理和 Maxwell 都是通过监控 binlog 实现的。
2.2 MySQL主从复制 MySQL的主从复制就是用来建立一个和主数据库完全一样的数据库环境这个数据库称为从数据库。
1主从复制的应用场景如下
1做数据库的热备主数据库服务器故障后可切换到从数据库继续工作。
2读写分离主数据库只负责业务数据的写入操作而多个从数据库只负责业务数据的查询工作在读多写少场景下可以提高数据库工作效率。
2主从复制的工作原理如下
1Master主库将数据变更记录写到二进制日志(binary log)中
2Slave从库向mysql master发送dump协议将master主库的binary log events拷贝到它的中继日志(relay log)
3Slave从库读取并执行中继日志中的事件将改变的数据同步到自己的数据库。 2.3 Maxwell原理 Maxwell 的原理很简单就是将自己伪装成slave并遵循MySQL主从复制的协议从master同步数据。
3、Maxwell 部署
3.1、启用 MySQL binlog
MySQL服务器的Binlog默认是未开启的如需进行同步需要先进行开启。
1修改MySQL配置文件/etc/my.cnf sudo vim /etc/my.cnf
2增加如下配置
[mysqld]#数据库id
server-id 1
#启动binlog该参数的值会作为binlog的文件名
log-binmysql-bin
#binlog类型maxwell要求为row类型
binlog_formatrow
#启用binlog的数据库需根据实际情况作出修改
binlog-do-dbgmall
#如果需要监控多个数据库 不要加逗号 直接新开一行
binlog-do-dbtest
如果我们要监控除了个别数据库之外的其它所有数据库我们可以使用 binlog-ignore-db 属性。
3重启MySQL服务
sudo systemctl restart mysqld
4查看是否修改完成
show variables like %binlog%; 我们看到 binlog_format 的 Value 现在是 ROW 说明 binlog 配置成功。
5查看 binlog 文件
进入 /var/lib/mysql 目录查看 MySQL 生成的 binlog 文件
cd /var/lib/mysql
sudo ls -l | grep mysql-bin 注MySQL 生成的 binlog 文件初始大小一定是 514 字节。
3.2、初始化 maxwell 元数据库
1创建元数据库 maxwell 用于存储 maxwell 的元数据
create database maxwell; 2设置 mysql 用户密码安全级别
set global validate_password_policy0;
set global validate_password_length4; 如果上面的命令报错去 /etc/my.cnf 的 [mysqld] 下添加下面两行
plugin-load-addvalidate_password.so
validate-passwordFORCE_PLUS_PERMANENT
然后重启 mysqld 服务
sudo systemctl restart mysqld
3分配一个用户账号可以操作该元数据库
GRANT ALL ON maxwell.* TO maxwell% IDENTIFIED BY 123456;
4分配这个账号可以监控其他数据库的权限
GRANT SELECT ,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO maxwell%;5刷新 mysql 表权限
flush privileges;
3.3、Maxwell 进程启动
3.3.1、使用命令行参数启动
bin/maxwell --usermaxwell --password123456 --hosthadoop102 --producerstdout
user连接 mysql 的用户上面我们已经创建了password连接 mysql 的用户密码host mysql 安装的主机名producer生产者模式stdout控制台kafkakafka 集群
查看 maxwell 输出 我们可以看到我们的 insert 语句被 maxwell 转为了 json 被输出到了控制台。 3.3.2、定制化配置文件启动 编辑配置文件 config.properties
log_levelinfoproducerstdout
kafka.bootstrap.serverslocalhost:9092# mysql login info
hosthadoop102
usermaxwell
password123456启动 Maxwell 4、Maxwell 使用
4.1、监控 MySQL 数据并在控制台打印 上面我们已经演示过了也就是首先在 mysql 的配置文件中指定需要打印 binlog 的数据库然后通过命令或者配置文件来开启 Maxwell 进程
bin/maxwell --usermaxwell --password123456 --hosthadoop102 --producerstdout
接着我们对被监听的数据库的操作信息就会以 json 的格式打印出来 4.2、监控 MySQL 数据并输出到 Kafka 监控 MySQL 数据并输出到 Kafka这种需求是我们实际工作用的最多的这也是我们离线数仓和实时数仓所必须的。
4.2.1、实例
1启动 Zookeeper 和 Kafka
2启动 Maxwell 监控 binlog
bin/maxwell --usermaxwell --password123456 --hosthadoop102 --producerkafka --kafka.bootstrap.servershadoop102:9092 --kafka_topicmaxwell
注这里我们制定了 Kafka 集群地址为 hadoop102:9092 主题为 maxwell虽然主题并不存在但是 kafka 会自动创建我们指定的主题
3在 hadoop103 消费 maxwell 主题
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic maxwell
4在 MySQL 中增加一条数据再删除 可以看到我们的数据成功被 Kafka 消费说明业务数据日志成功传输到 Kafka 。
4.2.2、Kafak 主题数据的分区控制 上面的案例中我们不论 MySQL 的那个数据库哪张表都被保存到了 Kafka 的 maxwell 主题这样显然很乱。所以这里我们需要解决一下 在生产环境中我们一般都会用 maxwell 监控多个 mysql 库的数据然后将这些数据发往 Kafka 的一个主题并且这个主题肯定是多分区的为了提高并发度。那么如何控制这些数据的分区问题那就变得至关重要实现步骤如下 1修改 maxwell 的配置文件定制化启动 maxwell 进程
log_levelinfoproducerkafka
kafka.bootstrap.servershadoop102:9092# mysql login info
hosthadoop102
usermaxwell
password123456# kafka 主题
kafka_topicmaxwell3# 默认配好的
kafka.compression.typesnappy
kafka.retries0
kafka.acks1#producer_partition_bydatabase # [database, table, primary_key, transaction_id, thread_id, column]
# 常用的按照库分区、按照表分区这里我们测试按照库分区
producer_partition_bydatabase2手动创建 Kafka 多分区主题
kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic maxwell3 查看主题信息 这样我们不同数据库的操作记录就会保存到 Kafka 不同的分区当中 分区可以使得集群负载均衡还可以提高提高读写效率。
4.3、监控 MySQL 指定表输出控制台
1运行 maxwell 来监控 mysql 指定表数据更新
监控 test 库下的 staff 表
bin/maxwell --usermaxwell --password123456 --hosthadoop102 --producerstdout --filter exclude: *.*,include:test.staff
2向 staff 表中插入数据查看 maxwell 控制台输出 3向 ws2 表中插入一条数据查看
注表 ws2 并不是我们指定监控的表 结果maxwell 控制台并没有任何输出
注还可以设置 include.test.* 来表示监控 test 库下的所有表
4.4、监控 MySQL 指定表全量输出到控制台 Maxwell 进程默认只能监控 mysql 的 binlog 日志的新增以及变化的但是 Maxwell 是支持数据初始化的可以通过修改 Maxwell 的元数据来对 MySQL 中的某张表进行数据初始化也就是我们常说的全量同步。具体操作如下 需求将 test 库下的 staff 表中的数据全量导入到 maxwell 控制台打印。
我们可以在数据库 maxwell 这个数据库是我们初始化 maxwell 元数据库时创建的中查看元数据表 positions 的内容 这里的 server_id 是我们在 mysql 的配置文件 /etc/my.cnf 中配置的这里的 binlog_file 代表的是最新的 binlog 文件 mysql-bin.000003 这里的 binlog_position 的值为 2802 代表目前已经读到了第 2802 个字节的位置意思就说我们现在是从这个偏移量之后开始监听的。
注也可以通过 SQLshow master status; 来查看当前的 binlog 状态。 1. 向元数据表 maxwell.bootstrap 中插入数据
insert into maxwell.bootstrap(database_name,table_name) values(test,staff);
这就相当于告诉 maxwell下一次启动 maxwell 作业时就会调用一个初始化作业把我们指定的这个 test 数据库下的 staff 表进行一次增量同步。 我们可以看到通过 bootstrap 表插入数据来完成初始化这样插入的数据是 json 中 type 字段的值是 bootstrap-insert 而不是 insert。
再次查看 bootstrap 表 我们可以发现is_complete 字段的值从默认值 0 变成了 1inserted_rows 从 0 变成了 6后面的 started_at 和 completed_at 字段的值也不再为空了。
2. 使用 maxwell-bootstrap 脚本工具
除了上面的直接向 maxwell 元数据库中的 bootstrap 表插入数据也可以使用 maxwell 提供的 maxwell-bootstrap 脚本本质其实是一样的具体看你觉得哪个省事。
bin/maxwell-bootstrap --database gmall --table user_info --config ./config.properties
这样这个初始化作业就算完成了当我们下次打开 maxwell 进程时它并不会再次执行这个初始化任务因为它已经完成过了。
总结 至此Maxwell 的学习也已经结束了待会吧离线数仓项目里的 maxwell 部分完成。总体来说这个工具还是很好用且简单的希望有一天自己也可以用屎山堆砌出这么一个自己的框架