天津做网站找哪家公司,华阴网络推广,重庆建筑信息工程官网,网页设计 效果图seatunnel数据集成#xff08;一#xff09;简介与安装seatunnel数据集成#xff08;二#xff09;数据同步seatunnel数据集成#xff08;三#xff09;多表同步seatunnel数据集成#xff08;四#xff09;连接器使用 1、Connector类型
seatunnel连接器类型丰富#…seatunnel数据集成一简介与安装seatunnel数据集成二数据同步seatunnel数据集成三多表同步seatunnel数据集成四连接器使用 1、Connector类型
seatunnel连接器类型丰富支持以下类型 Source Sink Clickhouse Clickhouse Elasticsearch Elasticsearch FakeSource FakeSource Ftp Ftp Github/Gitlab Github/Gitlab Greenplum Greenplum Hdfs file Hdfs file Hive Hive Http Http Hudi/Iceberg Hudi/Iceberg JDBC JDBC Kudu Kudu MongoDB MongoDB Mysql / MySQL CDC Mysql / MySQL CDC Redis Redis Kafka Kafka StarRocks StarRocks Phoenix Phoenix ... ...
2、mysql to mysql
参数必备
mysql source
urldriverquery
mysql sink
urldriver
样例
env {# You can set flink configuration hereexecution.parallelism 2job.mode BATCH
}
source{Jdbc {url jdbc:mysql://127.0.0.1:3306/testdriver com.mysql.cj.jdbc.Driverconnection_check_timeout_sec 100user userpassword passwordquery select * from base_region limit 4}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url jdbc:mysql://127.0.0.1:3306/dwdriver com.mysql.cj.jdbc.Driveruser userpassword passwordquery insert into base_region(id,region_name) values(?,?)}
}
脚本执行
./bin/seatunnel.sh --config ./config/mysql2mysql_batch.conf
3、mysql to hive
如果用的是Spark/Flink引擎需要Spark/Flink已经集成好了Hive
如果用SeaTunnel Zeta引擎需要将
seatunnel-hadoop3-3.1.4-uber.jarhive-exec-2.3.9.jar
放到 $SEATUNNEL_HOME/lib/ 目录下。
样例
env {job.mode BATCH
}source {Jdbc {url jdbc:mysql:///127.0.0.1:3306/dw?allowMultiQueriestruecharacterEncodingutf-8driver com.mysql.cj.jdbc.Driveruser userpassword passwordquery select * from source_user}
}transform {
}sink {Hive {table_name ods.sink_usermetastore_uri thrift://bigdata101:9083}} 执行脚本
./bin/seatunnel.sh --config ./config/mysql2hive.conf 4、增量同步参数
需求根据创建时间每天增量抽取
表结构
-- db
CREATE TABLE t_order_detail (id bigint NOT NULL AUTO_INCREMENT COMMENT 编号,order_id bigint DEFAULT NULL COMMENT 订单编号,sku_id bigint DEFAULT NULL COMMENT sku_id,sku_name varchar(200) DEFAULT NULL COMMENT sku名称,img_url varchar(200) DEFAULT NULL COMMENT 图片名称,order_price decimal(10,2) DEFAULT NULL COMMENT 购买价格(下单时sku价格,sku_num varchar(200) DEFAULT NULL COMMENT 购买个数,create_time datetime DEFAULT NULL COMMENT 创建时间,PRIMARY KEY (id)
) ENGINEInnoDB AUTO_INCREMENT863 DEFAULT CHARSETutf8mb3 COMMENT订单明细表;-- dw
CREATE TABLE ods_t_order_detail_di (id bigint NOT NULL COMMENT 编号,order_id bigint DEFAULT NULL COMMENT 订单编号,sku_id bigint DEFAULT NULL COMMENT sku_id,sku_name varchar(200) DEFAULT NULL COMMENT sku名称,img_url varchar(200) DEFAULT NULL COMMENT 图片名称,order_price decimal(10,2) DEFAULT NULL COMMENT 购买价格(下单时sku价格,sku_num varchar(200) DEFAULT NULL COMMENT 购买个数,create_time datetime DEFAULT NULL COMMENT 创建时间
) ENGINEInnoDB AUTO_INCREMENT863 DEFAULT CHARSETutf8mb3 COMMENTODS订单明细表;SELECT id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time
FROM t_order_detail
WHERE create_time 2024-02-05
and create_time DATE_ADD(2024-02-05,interval 1 day) 样例
env {execution.parallelism 2job.mode BATCH
}
source {Jdbc {url jdbc:mysql://127.0.0.1:3306/testdriver com.mysql.cj.jdbc.Driverconnection_check_timeout_sec 100user userpassword passwordquery select * from t_order_detail where create_time REPLACE(${etl_dt}, T, ) and create_time date_add(REPLACE(${etl_dt}, T, ),interval 1 day);}
}transform {# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,# please go to https://seatunnel.apache.org/docs/transform/sql
}sink {jdbc {url jdbc:mysql://127.0.0.1:3306/testdriver com.mysql.cj.jdbc.Driverconnection_check_timeout_sec 100user userpassword passwordquery insert into ods_t_order_detail_di (id,order_id,sku_id,sku_name,img_url,order_price,sku_num,create_time) values(?,?,?,?,?,?,?,?)}
}
脚本执行
./bin/seatunnel.sh --config ./config/mysql2mysql_ods_t_order_detail_di.conf -i etl_dt2024-02-05
5、实时
指定作业模式为STREAMING
job.mode STREAMING
基于mysql cdc
env {execution.parallelism 2job.mode STREAMINGcheckpoint.interval 10000#execution.checkpoint.interval 10000#execution.checkpoint.data-uri hdfs://localhost:9000/checkpoint
}source {MySQL-CDC {username userpassword passwordtable-names [test.source_user]base-url jdbc:mysql://127.0.0.1:3306/test}
}sink {jdbc {url jdbc:mysql://127.0.0.1:3306/dwdriver com.mysql.cj.jdbc.Driverusername userpassword passwordgenerate_sink_sql truedatabase dwtable source_user_01primary_keys [userid]}
}
执行脚本
./bin/seatunnel.sh --config ./config/mysql2mysql_rt.conf