北京注销网站备案,环球资源的服务种类,wordpress 官方文档,接技术标做网站每一个成功人士的背后#xff0c;必定曾经做出过勇敢而又孤独的决定。放弃不难#xff0c;但坚持很酷~最近有一个将 mysql 数据导入到 MongoDB 中的需求#xff0c;打算使用 Kettle 工具实现。本文章记录了数据导入从 0 到 1 的过程#xff0c;最终实现了每秒钟快速导入约 … 每一个成功人士的背后必定曾经做出过勇敢而又孤独的决定。放弃不难但坚持很酷~最近有一个将 mysql 数据导入到 MongoDB 中的需求打算使用 Kettle 工具实现。本文章记录了数据导入从 0 到 1 的过程最终实现了每秒钟快速导入约 1200 条数据。一起来看吧~一、Kettle 连接图简单说下该转换流程增量导入数据1)根据 source 和 db 字段来获取 MongoDB 集合内 business_time 最大值。2)设置 mysql 语句3)对查询的字段进行改名4)过滤数据只往 MongoDB 里面导入 person_idaddressbusiness_time 字段均不为空的数据。符合过滤条件的数据增加常量并将其导入到 mongoDB 中。不符合过滤条件的数据增加常量将其导入到 Excel 表中记录。二、流程组件解析1、MongoDB input1)Configure connectionHost name(s) or IP address(es)网络名称或者地址。可以输入多个主机名或IP地址用逗号分隔。还可以通过将主机名和端口号与冒号分隔开为每个主机名指定不同的端口号并将主机名和端口号的组合与逗号分隔开。例如要为两个不同的MongoDB实例包含主机名和端口号您将输入localhost 1:27017localhost 2:27018并使 Port 字段为空。Port端口号Username用户名Password密码Authenticate using Kerberos指示是否使用Kerberos服务来管理身份验证过程。Connection timeout连接超时时间(毫秒)Socket timeout等待写操作(以毫秒为单位)的时间2)Input optionsDatabase检索数据的数据库的名称。点击 “Get DBs” 按钮以获取数据库列表。Collection集合名称。点击 “Get collections” 按钮获取集合列表。Read preference表示要先读取哪个节点。Tag set specification/#/Tag Set标签允许您自定义写关注和读取副本的首选项。3)query根据 source 和 db 字段来获取 bussiness_time 的最大值Kettle 的 MongoDB 查询语句如下图所示对应的 MongDB 的写法为记得勾选 Query is aggregation pipeline 选项4)Fields取消选中 Output single JSON field 表示下一组件接收到的结果是一个 Number 类型的单值否则就是一个 json 对象。2、表输入设置 mysql 数据库 jdbc 连接后填好 SQL 语句之后在下方的“从步骤插入数据”下拉列表中选中“MongoDB input”。“MongoDB input” 中的变量在 SQL 语句中用 ? 表示如下图所示如果导数的时候发生中文乱码可以点击 编辑 选择 数据库连接 的 选项添加配置项characterEncoding utf8即可解决。如下图所示3、字段选择如果查询出来的列名需要更改则可以使用“字段选择”组件该组件还可以移除某字段本次应用中主要使用该组件将字段名进行修改。如下图所示4、过滤选择只保留 person_idaddressbusiness_time 字段都不为空的数据5、增加常量很简单在“增加常量”组件内设置好要增加常量的类型和值即可。6、Excel 输出添加“Excel 输出”设置好文件名如果有必要的话还可以设置 Excel 字段格式如下图所示7、MongoDB output1)Configure connection如下图所示由于一开始就介绍了 MongoDB 的连接方式所以在这里不在赘述。2)Output optionsBatch insert size每次批量插入的条数。Truncate collection执行操作前先清空集合Update更新数据Upsert选择 Upsert 选项将写入模式从 insert 更改为 upsert(即如果找到匹配项则更新否则插入新记录)。使用前提是 勾选 Update 选项。Muli-update多次更新可以更新所有匹配的文档而不仅仅是第一个。3)Mongo document fields根据 id、source、db 字段插入更新数据如下图所示更多 MongoDB output 可参考https://wiki.pentaho.com/display/EAI/MongoDBOutput三、索引优化1、mysql为 mysql 查询字段添加索引。(略)2、MongoDB对 MongoDB 查询做优化创建复合索引对于 MongoDB input 组件来说会关联查询出 business_time 最大值所以要创建复合索引创建复合索引时要注意字段顺序按照查询顺序创建db.trajectory_data.createIndex({source: 1, db: 1, business_time: 1})对于 MongoDB output 组件来说因为已经设置了 插入或更新 数据的规则也会涉及到查询所以再设置一个复合索引db.trajectory_data.createIndex({id: 1, source: 1, db: 1})四、运行运行前需要在集合内插入一条含 business_time 字段的 demo 数据否则 MongoDB input 会因为查不到数据而报错db.trajectory_data.insert({ id: 0, source: xx数据, db: 17-db2, business_time: 0})成功插入数据后执行该转换可视化操作命令行操作${KETTLE_HOME}/pan.sh -filexxx.ktr可通过点击 “执行结果” -- “步骤度量” 来查看各组件运行状态如下图所示24 分钟共导了 172 万的数据每秒钟约导入 1200 条数据。这样子这个转换基本就算完成了。可以在 linux 上写一个定时任务去执行这个转换每次转换 mysql 都会将大于 mongoDB 集合中 business_time 字段最大值的数据增量导入到 MongoDB 中。五、不足像上述的 Kettle 流程也是有不足的。假如一次性拉取的数据量过大很有可能导致 Mysql 或 Kettle 内存溢出而报错。所以上述流程只适合小数据量导入。大数据量导入的话还是建议分批次导入或者分页导入大家可以关注我我会持续更新技术干货哦 ~ 热 文 推 荐 ☞ 【实战】Kettle自定义jar包供JavaScript使用☞ HBase原理(一)架构理解☞ Kafka消费者 之 指定位移消费☞ 都快2020年了ambari自定义服务集成你还没掌握吗文末有福利☞Ambari2.6.1集成Apache Kylin服务☞Elasticsearch 6.x 配置详解☞看完您如果还不明白 Kerberos 原理算我输☞ 用心整理 | Spring AOP 干货文章图文并茂附带 AOP 示例 ~☞Spring IOC看完这篇文章我才算是懂了欢迎大家留言讨论? ? ?你点的每个“在看”我都认真当成了喜欢