网站上传用什么软件做视频,wordpress 跳转 计数,济南网站制作价格,海淀深圳网站建设公司第五章 实战案例
5.1. 案例一
5.1.1. 案例介绍
MySQL数据库中有两张表#xff1a;用户表(users)#xff0c;订单表(orders)。其中用户表中存储的是所有的用户的信息#xff0c;订单表中存储的是所有的订单的信息。表结构如下#xff1a; 用户表 users: id#xff1a;用…第五章 实战案例
5.1. 案例一
5.1.1. 案例介绍
MySQL数据库中有两张表用户表(users)订单表(orders)。其中用户表中存储的是所有的用户的信息订单表中存储的是所有的订单的信息。表结构如下 用户表 users: id用户idusername用户名password用户密码email用户邮箱phone用户手机号码real_name用户的真实姓名registration_time用户的注册时间last_login_time用户的上次登录时间status用户的状态活跃、不活跃、冻结 订单表 orders: id订单IDuser_id用户IDseller_id卖家IDproduct_id商品IDproduct_name商品名称product_price商品单价quantity购买数量total_price订单总价order_time订单时间
业务系统中每天都有新的用户注册每天也都在产生大量的订单。今天公司刚刚搭建了数据仓库需要将已有的数据导入到Hive表中此时需要将已有的数据全量的导入到Hive的表中。后续每天产生的新用户注册和新的订单增量的导入到对应的Hive表中。
5.1.2. 数据准备
MySQL中初始数据
# 创建数据库
CREATE DATABASE datax_shop;
USE datax_shop;# 创建用户表
DROP TABLE IF EXISTS users;
CREATE TABLE users (id INT(11) UNSIGNED NOT NULL AUTO_INCREMENT,username VARCHAR(50) NOT NULL,password VARCHAR(255) NOT NULL,email VARCHAR(255) NOT NULL,phone VARCHAR(20) NOT NULL,real_name VARCHAR(50) NOT NULL,registration_time DATE NOT NULL,last_login_time DATE NULL DEFAULT NULL,status ENUM(active, inactive, frozen) NOT NULL DEFAULT active,PRIMARY KEY (id),UNIQUE KEY (username),UNIQUE KEY (email),UNIQUE KEY (phone)
);# 插入一些数据
INSERT INTO users (username, password, email, phone, real_name, registration_time, last_login_time) VALUES
(johndoe,123456,johndoe163.com,17767827612,John Doe,2020-12-12,2022-09-12),
(janedoe,123123,janedoeqq.com,18922783392,Jane Doe,2021-02-12,2022-12-10),
(bobsmith,121212,bobsmith126.com,17122811292,Bob Smith,2020-10-11,2022-01-15),
(sarahlee,11111,sarahleeqq.com,17122810911,Sarah Lee,2019-03-15,2022-02-15),
(jimmychang,123121,jimmychangqq.com,155514442134,Jimmy Chang,2022-12-11, NULL),
(alexjohnson,121212,alexjohnson126.com,15522427212,Alex Johnson,2021-09-01, NULL);# 创建订单表
DROP TABLE IF EXISTS orders;
CREATE TABLE orders (id INT PRIMARY KEY AUTO_INCREMENT,user_id INT NOT NULL,seller_id INT NOT NULL,product_id INT NOT NULL,product_name VARCHAR(255) NOT NULL,product_price DECIMAL(10, 2) NOT NULL,quantity INT NOT NULL,total_price DECIMAL(10, 2) NOT NULL,order_time DATE NOT NULL
);# 插入一些数据
INSERT INTO orders (user_id, seller_id, product_id, product_name, product_price, quantity, total_price, order_time) VALUES
(1, 1, 12, 电动牙刷, 90, 1, 90, 2020-12-20),
(1, 2, 15, 洗面奶, 45, 1, 45, 2020-12-20),
(1, 3, 17, 面膜, 110, 2, 220, 2020-12-20),
(2, 1, 11, iPad, 5990, 1, 5990, 2021-12-20),
(2, 2, 19, iPhone数据线, 18, 1, 18, 2021-11-20),
(3, 1, 20, iPhone手机壳, 80, 1, 80, 2020-12-20),
(3, 2, 22, 榴莲, 45, 4, 180, 2021-09-12),
(3, 3, 23, 西瓜, 12, 5, 60, 2021-11-11),
(4, 1, 4, 洗地机, 2990, 1, 2990, 2020-06-18),
(4, 2, 7, 油污清洁剂, 78, 2, 156, 2020-07-11),
(4, 3, 11, 镜子, 10, 1, 10, 2020-06-20),
(5, 1, 9, 健力宝, 48, 2, 96, 2022-12-20);Hive表的创建
-- 创建数据库
create database datax_shop;-- 创建用户表
drop table if exists datax_shop.users;
create table datax_shop.users (id int,username string,password string,email string,phone string,real_name string,registration_time string,last_login_time string,status string
)
row format delimited
fields terminated by \t
lines terminated by \n
stored as orcfile;-- 创建订单表
drop table if exists datax_shop.orders;
create table datax_shop.orders (id int,user_id int,seller_id int,product_id int,product_name string,product_price double,quantity int,total_price double,order_time string
)
partitioned by (year string, month string)
row format delimited
fields terminated by \t
lines terminated by \n
stored as orcfile;5.1.3. 数据全量导入
用户表全量导入
{job: {setting: {speed: {channel: 1}},content: [{reader: {name: mysqlreader,parameter: {username: root,password: 123456,column: [id,username,password,email,phone,real_name,registration_time,last_login_time,status],connection: [{jdbcUrl: [jdbc:mysql://qianfeng01:3306/datax_shop],table: [users]}]}},writer: {name: hdfswriter,parameter: {defaultFS: hdfs://qianfeng01:9820,path: /user/hive/warehouse/datax_shop.db/users,fileName: original,writeMode: append,fieldDelimiter: \t,fileType: orc,column: [{name: id, type: int},{name: username, type: string},{name: password, type: string},{name: email, type: string},{name: phone, type: string},{name: real_name, type: string},{name: registration_time, type: string},{name: last_login_time, type: string},{name: status, type: string}]}}}]}
}订单表全量导入
订单表在全量导入的时候因为要按照订单创建时候的日期作为分区的字段。所以需要创建一张临时表先将MySQL中的订单数据全量的导入到这个临时表中然后再从这个临时表加载到订单表的指定分区中。
-- 创建临时表用来承接全量导入的订单信息
drop table if exists datax_shop.orders_origin;
create table datax_shop.orders_origin (id int,user_id int,seller_id int,product_id int,product_name string,product_price double,quantity int,total_price double,order_time string,year string,month string
)
row format delimited
fields terminated by \t
lines terminated by \n;创建数据同步方案同步MySQL的订单数据到这个临时表中
{job: {setting: {speed: {channel: 1}},content: [{reader: {name: mysqlreader,parameter: {username: root,password: 123456,connection: [{jdbcUrl: [jdbc:mysql://qianfeng01:3306/datax_shop],table: [orders]}],column: [id,user_id,seller_id,product_id,product_name,product_price,quantity,total_price,order_time,year(order_time),lpad(month(order_time), 2, 0)]}},writer: {name: hdfswriter,parameter: {defaultFS: hdfs://qianfeng01:9820,path: /user/hive/warehouse/datax_shop.db/orders_origin/,fileName: orders_origin,writeMode: append,fieldDelimiter: \t,fileType: text,column: [{name: id, type: int},{name: user_id, type: int},{name: seller_id, type: int},{name: product_id, type: int},{name: product_name, type: string},{name: product_price, type: double},{name: quantity, type: double},{name: total_price, type: double},{name: order_time, type: string},{name: year, type: string},{name: month, type: string}]}}}]}
}加载数据到orders表的对应分区中
-- 关闭严格模式
set hive.exec.dynamic.partition.modenonstrict;-- 导入数据到订单表中
insert into datax_shop.orders partition(year, month) select * from datax_shop.orders_origin;5.1.4. 增量数据导入
用户表增量导入
在现有数据全量导入到Hive表中之后每日新增的数据只需要增量导入到Hive即可。此时我们就可以按照用户注册的时间来确定需要将什么数据导入到Hive的用户表中。
首先我们在将现有的数据全量的导入到Hive之后模拟新用户的注册。
INSERT INTO users (username, password, email, phone, real_name, registration_time, last_login_time) VALUES
(natalielin,121212,natalielinqq.com,17788889999,Natalie Lin,2023-01-01, NULL),
(harrytran,123123,harrytran126.com,17666228192,Harry Tran,2023-01-01, NULL),
(gracewang,313131,gracewang163.com,18872631272,Grace Wang,2023-01-01, NULL),
(peterlee,123123,peterleeqq.com,19822781829,Peter Lee,2023-01-01,NULL);现在我们需要将在 2023-01-01 注册的用户增量导入到Hive的用户表中。
{job: {setting: {speed: {channel: 1}},content: [{reader: {name: mysqlreader,parameter: {username: root,password: 123456,column: [id,username,password,email,phone,real_name,registration_time,last_login_time,status],connection: [{jdbcUrl: [jdbc:mysql://qianfeng01:3306/datax_shop],table: [users]}],where: registration_time $date}},writer: {name: hdfswriter,parameter: {defaultFS: hdfs://qianfeng01:9820,path: /user/hive/warehouse/datax_shop.db/users,fileName: append,writeMode: append,fieldDelimiter: \t,fileType: orc,column: [{name: id, type: int},{name: username, type: string},{name: password, type: string},{name: email, type: string},{name: phone, type: string},{name: real_name, type: string},{name: registration_time, type: string},{name: last_login_time, type: string},{name: status, type: string}]}}}]}
}在上述的数据同步的方案中我们使用到了变量 date 用来表示需要增量导入用户的注册时间。在使用这个数据同步方案的时候需要指定变量 date 的值
python3 /usr/local/datax/bin/datax.py -p -Ddate2023-01-01 incr-users.json订单表增量导入
在现有的所有订单数据全量导入到Hive的订单表后每天仍然会有大量的订单数据生成。此时我们只需要按照天为单位将某一天产生的所有的数据增量导入到Hive的订单表中并放置在指定的分区位置即可。
首先在现有的订单数据全量导入到Hive的订单表之后我们来模拟一些新增的订单信息
INSERT INTO orders (user_id, seller_id, product_id, product_name, product_price, quantity, total_price, order_time) VALUES
(6, 1, 110, 大米, 90, 1, 90, 2023-01-01),
(6, 2, 120, 护手霜, 20, 2, 40, 2023-01-01),
(6, 3, 130, 地板, 120, 5, 600, 2023-01-01),
(7, 1, 140, 筒灯, 100, 10, 1000, 2023-01-01),
(7, 2, 150, 假发, 2000, 1, 2000, 2023-01-01),
(7, 3, 160, 牛奶, 100, 2, 200, 2023-01-01),
(8, 1, 170, 百褶裙, 1000, 2, 2000, 2023-01-01),
(8, 2, 180, 真丝丝巾, 300, 2, 600, 2023-01-01),
(8, 3, 190, 太阳镜, 250, 1, 250, 2023-01-01),
(9, 1, 200, 遮阳伞, 120, 1, 120, 2023-01-01),
(9, 2, 210, 盆栽, 220, 5, 1100, 2023-01-01),
(10, 1, 220, 口琴, 50, 1, 50, 2023-01-01),
(10, 2, 230, RIO, 12, 10, 120, 2023-01-01);现在我们需要将某一天的数据增量的导入到Hive对应的分区中其实这个过程就是使用hdfswriter将增量的数据直接写入到HDFS的指定分区目录下即可。但是需要保证这个分区已经被创建出来了。
-- 检查指定分区是否存在
show partitions datax_shop.orders partition(year2023, month01);-- 如果这个分区不存在就创建这个分区
alter table datax_shop.orders add partition(year2023, month01);分区创建完成之后就可以将某天新增的数据同步到对应的分区目录了
{job: {setting: {speed: {channel: 1}},content: [{reader: {name: mysqlreader,parameter: {username: root,password: 123456,column: [id,user_id,seller_id,product_id,product_name,product_price,quantity,total_price,order_time],connection: [{jdbcUrl: [jdbc:mysql://qianfeng01:3306/datax_shop],table: [orders]}],where: order_time $date}},writer: {name: hdfswriter,parameter: {defaultFS: hdfs://qianfeng01:9820,path: /user/hive/warehouse/datax_shop.db/orders/year$year/month$month,fileName: append,writeMode: append,fieldDelimiter: \t,fileType: orc,column: [{name: id, type: int},{name: user_id, type: int},{name: seller_id, type: int},{name: product_id, type: int},{name: product_name, type: string},{name: product_price, type: double},{name: quantity, type: double},{name: total_price, type: double},{name: order_time, type: string}]}}}]}
}在上述的数据同步方案中我们定义了三个变量date、year、month用来控制从MySQL数据库导入的数据和存放到Hive的对应的分区。因此我们在使用这个配置同步数据的时候需要指定这三个变量值
python3 /usr/local/datax/bin/datax.py -p -Ddate2023-01-01 -Dyear2023 -Dmonth01 incr-orders.json5.1.5. 脚本调度
我们已经实现了将指定日期2023-01-01的增量的数据导入到Hive对应的数据表中但是这样做不够灵活我们不能每一次在需要增量导入的时候都去执行上述的一个个命令。为了能够更加方便的同步数据以及可以定时的进行调度我们可以将其做成一个脚本在需要的时候直接调用即可。
shell脚本
#!/bin/bash# 获取需要同步的数据的日期为昨天
# dtdate -d yesterday %Y-%m-%d
dt2023-01-01# 提取年
year${dt:0:4}
month${dt:5:2}# 设置DataX路径
DATAX_HOME/usr/local/datax# 设置jobs路径
JOBS_HOME/root/datax-example# 增量导入用户数据
python $DATAX_HOME/bin/datax.py -p -Ddate$dt $JOBS_HOME/incr-users.json# 增量导入订单数据
# 1. 检查Hive表目标分区是否存在如果目标分区不存在创建分区
if [ ! hive -e show partitions datax_shop.orders partition(year$year, month$month) ]; thenhive -e alter table datax_shop.orders add partition(year$year, month$month)
fi# 3. 执行增量导入订单
python $DATAX_HOME/bin/datax.py -p -Ddate$dt -Dyear$year -Dmonth$month $JOBS_HOME/incr-orders.jsonpython脚本
# Author :
# Company : import datetime
import os
from pyhive import hive# 在脚本中需要和Hive进行交互查询Hive表的分区是否存在、创建分区等操作因此需要使用到PyHive
# 如果没有安装的话需要手动安装一下PyHive
# yum install cyrus-sasl-plain cyrus-sasl-devel cyrus-sasl-gssapi
# pip3 install thrift
# pip3 install sasl
# pip3 install thrift-sasl
# pip3 install pyhive# PyHive需要使用Hive的ThriftServer服务因此需要保证你的Hive对应的服务是打开的
# nohup hive --service hiveserver2 /var/log/hiveserver2 21 # 设置 DataX 和 Jobs 的Home路径
DATAX_HOME /usr/local/datax
JOBS_HOME /root/datax-example# 设置同步任务的JSON配置文件名
INCR_USERS incr-users.json
INCR_ORDERS incr-orders.json# 获取当前时间
# now datetime.datetime(2023, 1, 1)
now datetime.datetime.now()
dt str(now.date())
year f{now.year:04}
month f{now.month:02}# 增量导入用户数据
os.system(fpython {DATAX_HOME}/bin/datax.py -p -Ddate{dt} {JOBS_HOME}/{INCR_USERS})# 查看Hive的指定分区是否存在如果不存在创建分区
with hive.Connection(host192.168.10.111, port10000, usernameroot, databasedatax_shop) as conn:with conn.cursor() as cursor:cursor.execute(fshow partitions orders partition(year{year}, month{month}))partitions cursor.fetchone()if partitions is None:# 说明这个分区不存在创建cursor.execute(falter table orders add partition(year{year}, month{month}))# 增量导入订单数据
os.system(fpython {DATAX_HOME}/bin/datax.py -p -Ddate{dt} -Dyear{year} -Dmonth{month} {JOBS_HOME}/{INCR_ORDERS})