网站的基础知识,网站备案注意事项,郑州建站费用,四川省建设厅注册管理中心网站首页数据仓库VS数据库数据仓库的定义:数据仓库是将多个数据源的数据经过ETL(Extract(抽取)、Transform(转换)、Load(加载))理之后#xff0c;按照一定的主题集成起来提供决策支持和联机分析应用的结构化数据环境数据仓库VS数据库#xff1a;数据库是面向事务的设计#xff0c;数…数据仓库VS数据库数据仓库的定义:数据仓库是将多个数据源的数据经过ETL(Extract(抽取)、Transform(转换)、Load(加载))理之后按照一定的主题集成起来提供决策支持和联机分析应用的结构化数据环境数据仓库VS数据库数据库是面向事务的设计数据仓库是面向主题设计的数据库一般存储在线交易数据数据仓库存储的一般是历史数据数据库设计是避免冗余采用三范式的规则来设计数据仓库在设计是有意引入冗余采用反范式的方式来设计OLTP VS OLAP联机事务处理OLTP是传统的关系型数据库的主要应用主要是基本的、日常的事务处理例如银行交易联机分析处理OLAP是数据仓库系统的主要应用支持复杂的分析操作侧重决策支持并且提供直观易懂的查询结果常规的数仓架构为什么建设数据仓库各个业务数据存在不一致数据关系混乱业务系统一般针对于OLTP而数据仓库可以实现OLAP分析数据仓库是多源的复杂环境可以对多个业务的数据进行统一分析数据仓库建设目标集成多源数据数据来源和去向可追溯梳理血缘关系减少重复开发保存通用型中间数据避免重复计算屏蔽底层业务逻辑对外提供一致的、 结构清晰的数据如何实现实现通用型数据ETL工具根据业务建立合理的数据分层模型数据仓库分层建设数仓建设背景数据建设刚起步大部分数据经过粗暴的数据接入后直接对接业务数据建设发展到一定阶段发现数据的使用杂乱无章各种业务都是从原始数据直接计算而得。各种重复计算严重浪费了计算资源需要优化性能为什么进行数仓分层清晰数据结构每个数据分层都有对应的作用域数据血缘追踪对各层之间的数据表转换进行跟踪建立血缘关系减少重复开发规范数据分层开发通用的中间层数据屏蔽原始数据的异常通过数据分层管控数据质量屏蔽业务的影响不必改一次业务就需要重新接入数据复杂问题简单化将复杂的数仓架构分解成多个数据层来完成常见的分层含义STG层原始数据层存储原始数据数据结构与采集数据一致存储周期保存全部数据表命名规范stg_主题_表内容_分表规则ODS层数据操作层对STG层数据进行初步处理如去除脏数据去除无用字段.存储周期默认保留近30天数据表命名规范ods_主题_表内容_分表规则DWD层数据明细层数据处理后的宽表目标为满足80%的业务需求存储周期保留历史至今所有的数据表命名规范dwd_业务描述时间粒度DWS层数据汇总层汇总数据解决数据汇总计算和数据完整度问题存储周期保留历史至今所有的数据表命名规范dws_业务描述_时间粒度_sumDIM层公共维度层存储公共的信息数据用于DWD、DWS的数据关联存储周期按需存储一般保留历史至今所有的数据表命名规范dim_维度描述DM层数据集市层用于BI、多维分析、标签、数据挖掘等存储周期按需存储--般保留历史至今所有的数据表命名规范dm_主题_表内容_分表规则分层之间的数据流转Hive是什么Hive简介Hive是基于Hadoop的数据仓库工具提供类SQL语法(HiveQL)默认以MR作为计算引擎(也支持其他计算引擎例如tez)、HDFS 作为存储系统提供超大数据集的计算/扩展能力Hive是将数据映射成数据库和一张张的表库和表的元数据信息一般存在关系型数据库Hive的简单架构图Hive VS HadoopHive数据存储Hive的数据是存储在HDFS.上的Hive的库和表是对HDFS.上数据的映射Hive元数据存储元数据存储一般在外部关系库( Mysql )与Presto Impala等共享Hive语句的执行过程将HQL转换为MapReduce任务运行Hive与关系数据库Mysql的区别产品定位Hive是数据仓库为海量数据的离线分析设计的不支持OLTP(联机事务处理所需的关键功能ACID而更接近于OLAP(联机分析技术))适给离线处理大数据集。而MySQL是关系型数据库是为实时业务设计的。可扩展性Hive中的数据存储在HDFS(Hadoop的分布式文件系统)metastore元数据一 般存储在独立的关系型数据库中而MySQL则是服务器本地的文件系统。因此Hive具有良好的可扩展性数据库由于ACID语义的严格限制扩展性十分有限。读写模式Hive为读时模式数据的验证则是在查询时进行的这有利于大数据集的导入读时模式使数据的加载非常迅速数据的加载仅是文件复制或移动。MySQL为写时模式数据在写入数据库时对照模式检查。写时模式有利于提升查询性能因为数据库可以对列进行索引。数据更新Hive是针对数据仓库应用设计的而数仓的内容是读多写少的Hive中不支持对数据进行改写所有数据都是在加载的时候确定好的。而数据库中的数据通常是需要经常进行修改的。索引Hive支持索引但是Hive的索引与关系型数据库中的索引并不相同比如Hive不支持主键或者外键。Hive提供了有限的索引功能可以为-些字段建立索引一张表的索引数据存储在另外一张表中。由于数据的访问延迟较高Hive不适合在线数据查询。数据库在少星的特定条件的数据访问中索引可以提供较低的延迟。计算模型Hive默认使用的模型是MapReduce(也可以on spark、on tez)而MySQL使用的是自己设计的Executor计算模型Hive安装部署参考Hive基本使用(上)Hive数据类型/分区/基础语法Hive数据类型基本数据类型int、 float、 double、 string、 boolean、 bigint等复杂类型array、map、 structHive分区Hive将海量数据按某几个字段进行分区查询时不必加载全部数据分区对应到HDFS就是HDFS的目录.分区分为静态分区和动态分区两种Hive常用基础语法USE DATABASE_NAMECREATE DATABASE IF NOT EXISTS DB NAMEDESC DATABASE DB NAMECREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY \t STORE AS TEXTFILESELECT * FROM TABLE NAMEALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME写个Python脚本生成一些测试数据import jsonimport randomimport uuidname (Tom, Jerry, Jim, Angela, Ann, Bella, Bonnie, Caroline)hobby (reading, play, dancing, sing)subject (math, chinese, english, computer)data []for item in name:scores {key: random.randint(60, 100) for key in subject}data.append(|.join([uuid.uuid4().hex, item, ,.join(random.sample(set(hobby), 2)), ,.join([{0}:{1}.format(k, v) for k, v in scores.items()])]))with open(test.csv, w) as f:f.write(\n.join(data))执行该脚本生成测试数据文件[roothadoop01 ~/py-script]# python3 gen_data.py[roothadoop01 ~/py-script]# ll -h...-rw-r--r--. 1 root root 745 11月 9 11:09 test.csv[roothadoop01 ~/py-script]#我们可以看一下生成的数据[roothadoop01 ~/py-script]# cat test.csvf4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77...数据以 | 符进行分割前两个字段都是string类型第三个字段是array类型第四个字段是map类型创建测试用的数据库0: jdbc:hive2://localhost:10000 create database hive_test;No rows affected (0.051 seconds)0: jdbc:hive2://localhost:10000 use hive_test;No rows affected (0.06 seconds)0: jdbc:hive2://localhost:10000创建测试表CREATE TABLE test(user_id string,user_name string,hobby array,scores map)ROW FORMAT DELIMITEDFIELDS TERMINATED BY |COLLECTION ITEMS TERMINATED BY ,MAP KEYS TERMINATED BY :LINES TERMINATED BY \n;将本地数据加载到Hive中0: jdbc:hive2://localhost:10000 load data local inpath /root/py-script/test.csv overwrite into table test;No rows affected (0.785 seconds)0: jdbc:hive2://localhost:10000查询数据Hive将HQL转换为MapReduce的流程了解了Hive中的SQL基本操作之后我们来看看Hive是如何将SQL转换为MapReduce任务的整个转换过程分为六个阶段Antr定义SQL的语法规则完成SQL词法语法解析将SQL 转化为抽象语法树AST Tree遍历AST Tree抽象出查询的基本组成单元QueryBlock遍历QueryBlock翻译为执行操作树OperatorTree逻辑层优化器进行OperatorTree变换合并不必要的ReduceSinkOperator减少shufle数据量遍历OperatorTree翻译为MapReduce任务物理层优化器进行MapReduce任务的变换生成最终的执行计划与普通SQL一样我们可以通过在HQL前面加上explain关键字查看HQL的执行计划explain select * from test where id 10 limit 1000Hive会将这条语句解析成一个个的OperatorOperator就是Hive解析之后的最小单元每个Operator其实都是对应一个MapReduce任务。例如上面这条语句被Hive解析后就是由如下Operator组成同时Hive实现了优化器对这些Operator的顺序进行优化帮助我们提升查询效率。Hive中的优化器主要分为三类RBO(Rule-Based Optimizer)基于规则的优化器CBO(Cost-Based Optimizer)基于代价的优化器这是默认的优化器动态CBO在执行计划生成的过程中动态优化的方式Hive基本使用(中)内部表/外部表/分区表/分桶表内部表和传统数据库的Table概念类似对应HDFS上存储目录删除表时删除元数据和表数据。内部表的数据会存放在HDFS中的特定的位置中可以通过配置文件指定。当删除表时数据文件也会一并删除。适用于临时创建的中间表。外部表指向已经存在的HDFS数据删除时只删除元数据信息。适用于想要在Hive之外使用表的数据的情况当你删除External Table时只是删除了表的元数据它的数据并没有被删除。适用于数据多部门共享。建表时使用create external table。指定external关键字即可。分区表Partition对应普通数据库对Partition列的密集索引将数据按照Partition列存储到不同目录便于并行分析减少数据量。分区表创建表的时候需要指定分区字段。分区字段与普通字段的区别分区字段会在HDFS表目录下生成一个分区字段名称的目录而普通字段则不会查询的时候可以当成普通字段来使用一般不直接和业务直接相关。分桶表对数据进行hash放到不同文件存储方便抽样和join查询。可以将内部表外部表和分区表进一步组织成桶表可以将表的列通过Hash算法进一步分解成不同的文件存储。对于内部表和外部表的概念和应用场景我们很容易理解我们需要重点关注一下分区表和分桶表。 我们为什么要建立分区表和分桶表呢HQL通过where子句来限制条件提取数据那么与其遍历一张大表不如将这张大表拆分成多个小表并通过合适的索引来扫描表中的一小部分分区和分桶都是采用了这种理念。分区会创建物理目录并且可以具有子目录(通常会按照时间、地区分区)目录名以 分区名值 形式命名例如create_time202011。分区名会作为表中的伪列这样通过where字句中加入分区的限制可以在仅扫描对应子目录下的数据。通过 partitioned by (feld1 type, ...) 创建分区列。分桶可以继续在分区的基础上再划分小表分桶根据哈希值来确定数据的分布(即MapReducer中的分区)比如分区下的一部分数据可以根据分桶再分为多个桶这样在查询时先计算对应列的哈希值并计算桶号只需要扫描对应桶中的数据即可。分桶通过clustered by(field) into n buckets创建。接下来简单演示下这几种表的操作首先将上一小节生成的测试数据文件上传到hdfs中[roothadoop01 ~]# hdfs dfs -mkdir /test[roothadoop01 ~]# hdfs dfs -put py-script/test.csv /test[roothadoop01 ~]# hdfs dfs -ls /testFound 1 items-rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /test/test.csv[roothadoop01 ~]#内部表建表SQLCREATE TABLE test_table(user_id string,user_name string,hobby array,scores map)ROW FORMAT DELIMITEDFIELDS TERMINATED BY |COLLECTION ITEMS TERMINATED BY ,MAP KEYS TERMINATED BY :LINES TERMINATED BY \n;将hdfs数据加载到Hive中0: jdbc:hive2://localhost:10000 load data inpath /test/test.csv overwrite into table test_table;No rows affected (0.169 seconds)0: jdbc:hive2://localhost:10000查看创建的表存储在hdfs的哪个目录下0: jdbc:hive2://localhost:10000 show create table test_table;----------------------------------------------------| createtab_stmt |----------------------------------------------------| CREATE TABLE test_table( || user_id string, || user_name string, || hobby array, || scores map) || ROW FORMAT SERDE || org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe || WITH SERDEPROPERTIES ( || collection.delim,, || field.delim|, || line.delim\n, || mapkey.delim:, || serialization.format|) || STORED AS INPUTFORMAT || org.apache.hadoop.mapred.TextInputFormat || OUTPUTFORMAT || org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat || LOCATION || hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table || TBLPROPERTIES ( || bucketing_version2, || transient_lastDdlTime1604893190) |----------------------------------------------------22 rows selected (0.115 seconds)0: jdbc:hive2://localhost:10000在hdfs中可以查看到数据文件[roothadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_tableFound 1 items-rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv[roothadoop01 ~]#删除表0: jdbc:hive2://localhost:10000 drop table test_table;No rows affected (0.107 seconds)0: jdbc:hive2://localhost:10000查看hdfs会发现该表所对应的存储目录也一并被删除了[roothadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/Found 2 itemsdrwxr-xr-x - root supergroup 0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_tabledrwxr-xr-x - root supergroup 0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test[roothadoop01 ~]#外部表建表SQL与内部表的区别就在于external关键字CREATE external TABLE external_table(user_id string,user_name string,hobby array,scores map)ROW FORMAT DELIMITEDFIELDS TERMINATED BY |COLLECTION ITEMS TERMINATED BY ,MAP KEYS TERMINATED BY :LINES TERMINATED BY \n;将数据文件加载到Hive中0: jdbc:hive2://localhost:10000 load data inpath /test/test.csv overwrite into table external_table;No rows affected (0.182 seconds)0: jdbc:hive2://localhost:10000此时会发现hdfs中的数据文件会被移动到hive的目录下[roothadoop01 ~]# hdfs dfs -ls /test[roothadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_tableFound 1 items-rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv[roothadoop01 ~]#删除表0: jdbc:hive2://localhost:10000 drop table external_table;No rows affected (0.112 seconds)0: jdbc:hive2://localhost:10000查看hdfs会发现该表所对应的存储目录仍然存在[roothadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_tableFound 1 items-rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv[roothadoop01 ~]#分区表建表语句CREATE TABLE partition_table(user_id string,user_name string,hobby array,scores map)PARTITIONED BY (create_time string)ROW FORMAT DELIMITEDFIELDS TERMINATED BY |COLLECTION ITEMS TERMINATED BY ,MAP KEYS TERMINATED BY :LINES TERMINATED BY \n;将数据文件加载到Hive中并指定分区0: jdbc:hive2://localhost:10000 load data local inpath /root/py-script/test.csv overwrite into table partition_table partition (create_time202011);No rows affected (0.747 seconds)0: jdbc:hive2://localhost:10000 load data local inpath /root/py-script/test.csv overwrite into table partition_table partition (create_time202012);No rows affected (0.347 seconds)0: jdbc:hive2://localhost:10000执行如下sql可以从不同的分区统计结果0: jdbc:hive2://localhost:10000 select count(*) from partition_table;------| _c0 |------| 16 |------1 row selected (15.881 seconds)0: jdbc:hive2://localhost:10000 select count(*) from partition_table where create_time202011;------| _c0 |------| 8 |------1 row selected (14.639 seconds)0: jdbc:hive2://localhost:10000 select count(*) from partition_table where create_time202012;------| _c0 |------| 8 |------1 row selected (15.555 seconds)0: jdbc:hive2://localhost:10000分区表在hdfs中的存储结构[roothadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_tableFound 2 itemsdrwxr-xr-x - root supergroup 0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time202011drwxr-xr-x - root supergroup 0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time202012[roothadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time202011Found 1 items-rw-r--r-- 1 root supergroup 745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time202011/test.csv[roothadoop01 ~]#分桶表建表语句CREATE TABLE bucket_table(user_id string,user_name string,hobby array,scores map)clustered by (user_name) sorted by (user_name) into 2 bucketsROW FORMAT DELIMITEDFIELDS TERMINATED BY |COLLECTION ITEMS TERMINATED BY ,MAP KEYS TERMINATED BY :LINES TERMINATED BY \n;将test表中的数据插入到bucket_table中0: jdbc:hive2://localhost:10000 insert into bucket_table select * from test;No rows affected (17.393 seconds)0: jdbc:hive2://localhost:10000抽样查询分桶表在hdfs的存储目录如下[roothadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_tableFound 2 items-rw-r--r-- 1 root supergroup 465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0-rw-r--r-- 1 root supergroup 281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0[roothadoop01 ~]#Hive基本使用(下)内置函数/自定义函数/实现UDFHive常见内置函数字符串类型concat、substr、 upper、 lower时间类型year、month、 day复杂类型size、 get_json_object查询引擎都自带了一部分函数来帮助我们解决查询过程当中一些复杂的数据计算或者数据转换操作但是有时候自带的函数功能不能满足业务的需要。这时候就需要我们自己开发自定义的函数来辅助完成了这就是所谓的用户自定义函数UDF(User-Defined Functions)。Hive支持三类自定义函数UDF普通的用户自定义函数。用来处理输入一行输出一行的操作类似Map操作。如转换字符串大小写,获取字符串长度等UDAF用户自定义聚合函数(User-defined aggregate function)用来处理输入多行输出一行的操作类似Reduce操作。比如MAX、COUNT函数。UDTF用户自定义表产生函数(User defined table-generating function)用来处理输入一行输出多行(即一个表)的操作 不是特别常用UDF函数其实就是一段遵循一定接口规范的程序。在执行过程中Hive将SQL转换为MapReduce程序在执行过程当中在执行我们的UDF函数。本小节简单演示下自定义UDF函数首先创建一个空的Maven项目然后添加hive-exec依赖版本与你安装的Hive版本需对应上。完整的pom文件内容如下xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd4.0.0org.examplehive-udf-test1.0-SNAPSHOTorg.apache.hivehive-exec3.1.2org.apache.maven.pluginsmaven-compiler-plugin88首先创建一个继承UDF的类我们实现的这个自定义函数功能就是简单的获取字段的长度package com.example.hive.udf;import org.apache.hadoop.hive.ql.exec.UDF;import org.apache.hadoop.io.Text;public class StrLen extends UDF {public int evaluate(final Text col) {return col.getLength();}}以上这种自定义函数只能支持处理普通类型的数据如果要对复杂类型的数据做处理则需要继承GenericUDF并实现其抽象方法。例如我们实现一个对测试数据中的scores字段求平均值的函数package com.example.hive.udf;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import java.text.DecimalFormat;public class AvgScore extends GenericUDF {/*** 函数的名称*/private static final String FUNC_NAME AVG_SCORE;/*** 函数所作用的字段类型这里是map类型*/private transient MapObjectInspector mapOi;/*** 控制精度只返回两位小数*/DecimalFormat df new DecimalFormat(#.##);Overridepublic ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {// 在此方法中可以做一些前置的校验例如检测函数参数个数、检测函数参数类型mapOi (MapObjectInspector) objectInspectors[0];// 指定函数的输出类型return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;}Overridepublic Object evaluate(DeferredObject[] deferredObjects) throws HiveException {// 函数的核心逻辑取出map中的value进行求平均值并返回一个Double类型的结果值Object o deferredObjects[0].get();double v mapOi.getMap(o).values().stream().mapToDouble(a - Double.parseDouble(a.toString())).average().orElse(0.0);return Double.parseDouble(df.format(v));}Overridepublic String getDisplayString(String[] strings) {return func(map);}}对项目进行打包并上传到服务器中[roothadoop01 ~/jars]# lshive-udf-test-1.0-SNAPSHOT.jar[roothadoop01 ~/jars]#将jar包上传到hdfs中[roothadoop01 ~/jars]# hdfs dfs -mkdir /udfs[roothadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs[roothadoop01 ~/jars]# hdfs dfs -ls /udfsFound 1 items-rw-r--r-- 1 root supergroup 4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar[roothadoop01 ~/jars]#在Hive中添加该jar包0: jdbc:hive2://localhost:10000 add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;No rows affected (0.022 seconds)0: jdbc:hive2://localhost:10000然后注册临时函数临时函数只会在当前的session中生效0: jdbc:hive2://localhost:10000 CREATE TEMPORARY FUNCTION strlen as com.example.hive.udf.StrLen;No rows affected (0.026 seconds)0: jdbc:hive2://localhost:10000 CREATE TEMPORARY FUNCTION avg_score as com.example.hive.udf.AvgScore;No rows affected (0.008 seconds)0: jdbc:hive2://localhost:10000使用自定义函数处理0: jdbc:hive2://localhost:10000 select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test;---------------------------------| user_name | length | avg_score |---------------------------------| Tom | 3 | 80.25 || Jerry | 5 | 77.5 || Jim | 3 | 83.75 || Angela | 6 | 84.5 || Ann | 3 | 90.0 || Bella | 5 | 69.25 || Bonnie | 6 | 76.5 || Caroline | 8 | 84.5 |---------------------------------8 rows selected (0.083 seconds)0: jdbc:hive2://localhost:10000删除已注册的临时函数0: jdbc:hive2://localhost:10000 drop temporary function strlen;No rows affected (0.01 seconds)0: jdbc:hive2://localhost:10000 drop temporary function avg_score;No rows affected (0.009 seconds)0: jdbc:hive2://localhost:10000临时函数只会在当前的session中生效如果需要注册成永久函数则只需要把TEMPORARY关键字给去掉即可。如下所示0: jdbc:hive2://localhost:10000 create function strlen as com.example.hive.udf.StrLen using jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;No rows affected (0.049 seconds)0: jdbc:hive2://localhost:10000 create function avg_score as com.example.hive.udf.AvgScore using jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;No rows affected (0.026 seconds)0: jdbc:hive2://localhost:10000删除永久函数也是把TEMPORARY关键字给去掉即可。如下所示0: jdbc:hive2://localhost:10000 drop function strlen;No rows affected (0.031 seconds)0: jdbc:hive2://localhost:10000 drop function avg_score;No rows affected (0.026 seconds)0: jdbc:hive2://localhost:10000Hive存储结构 - OrcFileHive支持的存储格式TextFile是默认的存储格式通过简单的分隔符可以对csv等类型的文件进行解析。但实际应用中通常都是使用OrcFile格式因为ORCFile是列式存储格式更加适合大数据查询的场景。我们都知道关系型数据库基本是使用行式存储作为存储格式而大数据领域更多的是采用列式存储因为大数据分析场景中通常需要读取大量行但是只需要少数的几个列。这也是为什么通常使用OrcFile作为Hive的存储格式的原因。由此可见大数据的绝大部分应用场景都是OLAP场景。OLAP场景的特点读多于写不同于事务处理(OLTP)的场景比如电商场景中加购物车、下单、支付等需要在原地进行大量insert、update、delete操作数据分析(OLAP)场景通常是将数据批量导入后进行任意维度的灵活探索、BI工具洞察、报表制作等。数据一次性写入后分析师需要尝试从各个角度对数据做挖掘、分析直到发现其中的商业价值、业务变化趋势等信息。这是一个需要反复试错、不断调整、持续优化的过程其中数据的读取次数远多于写入次数。这就要求底层数据库为这个特点做专门设计而不是盲目采用传统数据库的技术架构。大宽表读大量行但是少量列结果集较小在OLAP场景中通常存在一张或是几张多列的大宽表列数高达数百甚至数千列。对数据分析处理时选择其中的少数几列作为维度列、其他少数几列作为指标列然后对全表或某一个较大范围内的数据做聚合计算。这个过程会扫描大量的行数据但是只用到了其中的少数列。而聚合计算的结果集相比于动辄数十亿的原始数据也明显小得多。数据批量写入且数据不更新或少更新OLTP类业务对于延时(Latency)要求更高要避免让客户等待造成业务损失而OLAP类业务由于数据量非常大通常更加关注写入吞吐(Throughput)要求海量数据能够尽快导入完成。一旦导入完成历史数据往往作为存档不会再做更新、删除操作。无需事务数据一致性要求低OLAP类业务对于事务需求较少通常是导入历史日志数据或搭配一款事务型数据库并实时从事务型数据库中进行数据同步。多数OLAP系统都支持最终一致性。灵活多变不适合预先建模分析场景下随着业务变化要及时调整分析维度、挖掘方法以尽快发现数据价值、更新业务指标。而数据仓库中通常存储着海量的历史数据调整代价十分高昂。预先建模技术虽然可以在特定场景中加速计算但是无法满足业务灵活多变的发展需求维护成本过高。行式存储和列式存储行式存储和列式存储的对比图与行式存储将每一行的数据连续存储不同列式存储将每一列的数据连续存储。相比于行式存储列式存储在分析场景下有着许多优良的特性如前所述分析场景中往往需要读大量行但是少数几个列。在行存模式下数据按行连续存储所有列的数据都存储在一个block中不参与计算的列在IO时也要全部读出读取操作被严重放大。而列存模式下只需要读取参与计算的列即可极大的减低了IO cost加速了查询。同一列中的数据属于同一类型压缩效果显著。列存往往有着高达十倍甚至更高的压缩比节省了大量的存储空间降低了存储成本。更高的压缩比意味着更小的data size从磁盘中读取相应数据耗时更短。自由的压缩算法选择。不同列的数据具有不同的数据类型适用的压缩算法也就不尽相同。可以针对不同列类型选择最合适的压缩算法。高压缩比意味着同等大小的内存能够存放更多数据系统cache效果更好。OrcFileOrcFile存储格式Orc列式存储优点查询时只需要读取查询所涉及的列降低IO消耗同时保存每一列统计信息实现部分谓词下推每列数据类型一致可针对不同的数据类型采用其高效的压缩算法列式存储格式假设数据不会发生改变支持分片、流式读取更好的适应分布式文件存储的特性除了Orc外Parquet也是常用的列式存储格式。Orc VS ParquetOrcFile和Parquet都是Apache的顶级项目Parquet不支持ACID、不支持更新Orc支持有限的ACID和更新Parquet的压缩能力较高Orc的查询效率较高离线数仓VS实时数仓离线数仓离线数据仓库主要基于Hive等技术来构建T1的离线数据通过定时任务每天拉取增量数据导入到Hive表中创建各个业务相关的主题维度数据对外提供T1的数据查询接口离线数仓架构数据源通过离线的方式导入到离线数仓中数据分层架构ODS、DWD、 DM下游应用根据业务需求选择直接读取DM实时数仓实时数仓基于数据采集工具将原始数据写入到Kafka等数据通道数据最终写入到类似于HBase这样支持快速读写的存储系统对外提供分钟级别、甚至秒级别的查询方案实时数仓架构业务实时性要求的不断提高实时处理从次要部分变成了主要部分Lambda架构在离线大数据架构基础上加了一个加速层使用流处理技术完成实时性较高的指标计算Kappa架构以实时事件处理为核心统一数据处理图解Lambda架构数据流程Lambda 架构(Lambda Architecture)是由 Twitter 工程师南森·马茨(Nathan Marz)提出的大数据处理架构。这一架构的提出基于马茨在 BackType 和 Twitter 上的分布式数据处理系统的经验。Lambda 架构使开发人员能够构建大规模分布式数据处理系统。它具有很好的灵活性和可扩展性也对硬件故障和人为失误有很好的容错性。Lambda 架构总共由三层系统组成批处理层(Batch Layer)速度处理层(Speed Layer)以及用于响应查询的服务层(Serving Layer)。在 Lambda 架构中每层都有自己所肩负的任务。批处理层存储管理主数据集(不可变的数据集)和预先批处理计算好的视图。批处理层使用可处理大量数据的分布式处理系统预先计算结果。它通过处理所有的已有历史数据来实现数据的准确性。这意味着它是基于完整的数据集来重新计算的能够修复任何错误然后更新现有的数据视图。输出通常存储在只读数据库中更新则完全取代现有的预先计算好的视图。速度处理层会实时处理新来的数据。速度层通过提供最新数据的实时视图来最小化延迟。速度层所生成的数据视图可能不如批处理层最终生成的视图那样准确或完整但它们几乎在收到数据后立即可用。而当同样的数据在批处理层处理完成后在速度层的数据就可以被替代掉了。本质上速度层弥补了批处理层所导致的数据视图滞后。比如说批处理层的每个任务都需要 1 个小时才能完成而在这 1 个小时里我们是无法获取批处理层中最新任务给出的数据视图的。而速度层因为能够实时处理数据给出结果就弥补了这 1 个小时的滞后。所有在批处理层和速度层处理完的结果都输出存储在服务层中服务层通过返回预先计算的数据视图或从速度层处理构建好数据视图来响应查询。所有的新用户行为数据都可以同时流入批处理层和速度层。批处理层会永久保存数据并且对数据进行预处理得到我们想要的用户行为模型并写入服务层。而速度层也同时对新用户行为数据进行处理得到实时的用户行为模型。而当“应该对用户投放什么样的广告”作为一个查询(Query)来到时我们从服务层既查询服务层中保存好的批处理输出模型也对速度层中处理的实时行为进行查询这样我们就可以得到一个完整的用户行为历史了。一个查询就如下图所示既通过批处理层兼顾了数据的完整性也可以通过速度层弥补批处理层的高延时性让整个查询具有实时性。Kappa 架构 VS LambdaLambda 架构的不足虽然 Lambda 架构使用起来十分灵活并且可以适用于很多的应用场景但在实际应用的时候Lambda 架构也存在着一些不足主要表现在它的维护很复杂。使用 Lambda 架构时架构师需要维护两个复杂的分布式系统并且保证他们逻辑上产生相同的结果输出到服务层中。举个例子吧我们在部署 Lambda 架构的时候可以部署 Apache Hadoop 到批处理层上同时部署 Apache Flink 到速度层上。我们都知道在分布式框架中进行编程其实是十分复杂的尤其是我们还会针对不同的框架进行专门的优化。所以几乎每一个架构师都认同Lambda 架构在实战中维护起来具有一定的复杂性。那要怎么解决这个问题呢我们先来思考一下造成这个架构维护起来如此复杂的根本原因是什么呢维护 Lambda 架构的复杂性在于我们要同时维护两套系统架构批处理层和速度层。我们已经说过了在架构中加入批处理层是因为从批处理层得到的结果具有高准确性而加入速度层是因为它在处理大规模数据时具有低延时性。那我们能不能改进其中某一层的架构让它具有另外一层架构的特性呢例如改进批处理层的系统让它具有更低的延时性又或者是改进速度层的系统让它产生的数据视图更具准确性和更加接近历史数据呢另外一种在大规模数据处理中常用的架构——Kappa 架构(Kappa Architecture)便是在这样的思考下诞生的。Kappa 架构Kappa 架构是由 LinkedIn 的前首席工程师杰伊·克雷普斯(Jay Kreps)提出的一种架构思想。克雷普斯是几个著名开源项目(包括 Apache Kafka 和 Apache Samza 这样的流处理系统)的作者之一也是现在 Confluent 大数据公司的 CEO。克雷普斯提出了一个改进 Lambda 架构的观点我们能不能改进 Lambda 架构中速度层的系统性能使得它也可以处理好数据的完整性和准确性问题呢我们能不能改进 Lambda 架构中的速度层使它既能够进行实时数据处理同时也有能力在业务逻辑更新的情况下重新处理以前处理过的历史数据呢他根据自身多年的架构经验发现我们是可以做到这样的改进的。我们知道像 Apache Kafka 这样的流处理平台是具有永久保存数据日志的功能的。通过Kafka的这一特性我们可以重新处理部署于速度层架构中的历史数据。下面我就以 Kafka 为例来介绍整个全新架构的过程。第一步部署 Kafka并设置数据日志的保留期(Retention Period)。这里的保留期指的是你希望能够重新处理的历史数据的时间区间。例如如果你希望重新处理最多一年的历史数据那就可以把 Apache Kafka 中的保留期设置为 365 天。如果你希望能够处理所有的历史数据那就可以把 Apache Kafka 中的保留期设置为“永久(Forever)”。第二步如果我们需要改进现有的逻辑算法那就表示我们需要对历史数据进行重新处理。我们需要做的就是重新启动一个 Kafka 作业实例(Instance)。这个作业实例将重头开始重新计算保留好的历史数据并将结果输出到一个新的数据视图中。我们知道 Kafka 的底层是使用 Log Offset 来判断现在已经处理到哪个数据块了所以只需要将 Log Offset 设置为 0新的作业实例就会重头开始处理历史数据。第三步当这个新的数据视图处理过的数据进度赶上了旧的数据视图时我们的应用便可以切换到从新的数据视图中读取。第四步停止旧版本的作业实例并删除旧的数据视图。这个架构就如同下图所示。与 Lambda 架构不同的是Kappa 架构去掉了批处理层这一体系结构而只保留了速度层。你只需要在业务逻辑改变又或者是代码更改的时候进行数据的重新处理。Kappa 架构统一了数据的处理方式不再维护离线和实时两套代码逻辑。Kappa 架构的不足Kappa 架构也是有着它自身的不足的。因为 Kappa 架构只保留了速度层而缺少批处理层在速度层上处理大规模数据可能会有数据更新出错的情况发生这就需要我们花费更多的时间在处理这些错误异常上面。如果需求发生变化或历史数据需要重新处理都得通过上游重放来完成。并且重新处理历史的吞吐能力会低于批处理。还有一点Kappa 架构的批处理和流处理都放在了速度层上这导致了这种架构是使用同一套代码来处理算法逻辑的。所以 Kappa 架构并不适用于批处理和流处理代码逻辑不一致的场景。Lambda VS Kappa主流大公司的实时数仓架构阿里菜鸟实时数仓美团实时数仓实时数仓建设特征整体架构设计通过分层设计为OLAP查询分担压力复杂的计算统一在实时计算层做避免给OLAP查询带来过大的压力汇总计算通过OLAP数据查询引擎进行整个架构中实时计算一般 是SparkFlink配合消息队列Kafka一家独大配合HBase、ES、 Mysq|进行数据落盘OLAP领域Presto、Druid、 Clickhouse、 Greenplum等等层出不穷