数据仓库之Hive快速入门 - 离线&实时数仓架构
数据仓库VS数据库
数据仓库的定义:
- 数据仓库是将多个数据源的数据经过ETL(Extract(抽取)、Transform(转换)、Load(加载))理之后,按照一定的主题集成起来提供决策支持和联机分析应用的结构化数据环境
数据仓库VS数据库:
- 数据库是面向事务的设计,数据仓库是面向主题设计的
- 数据库一般存储在线交易数据,数据仓库存储的一般是历史数据
- 数据库设计是避免冗余,采用三范式的规则来设计,数据仓库在设计是有意引入冗余,采用反范式的方式来设计
OLTP VS OLAP:
- 联机事务处理OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,例如银行交易
- 联机分析处理OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果
常规的数仓架构:
image.png
为什么建设数据仓库:
- 各个业务数据存在不一致,数据关系混乱
- 业务系统一般针对于OLTP,而数据仓库可以实现OLAP分析
- 数据仓库是多源的复杂环境,可以对多个业务的数据进行统一分析
数据仓库建设目标:
- 集成多源数据,数据来源和去向可追溯,梳理血缘关系
- 减少重复开发,保存通用型中间数据,避免重复计算
- 屏蔽底层业务逻辑,对外提供一致的、 结构清晰的数据
如何实现:
- 实现通用型数据ETL工具
- 根据业务建立合理的数据分层模型
数据仓库分层建设
数仓建设背景:
- 数据建设刚起步,大部分数据经过粗暴的数据接入后直接对接业务
- 数据建设发展到一定阶段,发现数据的使用杂乱无章,各种业务都是从原始数据直接计算而得。
- 各种重复计算,严重浪费了计算资源,需要优化性能
为什么进行数仓分层:
- 清晰数据结构:每个数据分层都有对应的作用域
- 数据血缘追踪:对各层之间的数据表转换进行跟踪,建立血缘关系
- 减少重复开发:规范数据分层,开发通用的中间层数据
- 屏蔽原始数据的异常:通过数据分层管控数据质量
- 屏蔽业务的影响:不必改一次业务就需要重新接入数据
- 复杂问题简单化:将复杂的数仓架构分解成多个数据层来完成
常见的分层含义:
image.png
STG层
- 原始数据层:存储原始数据,数据结构与采集数据一致
- 存储周期:保存全部数据
- 表命名规范:
stg_主题_表内容_分表规则
ODS层
- 数据操作层:对STG层数据进行初步处理,如去除脏数据,去除无用字段.
- 存储周期:默认保留近30天数据
- 表命名规范:
ods_主题_表内容_分表规则
DWD层
- 数据明细层:数据处理后的宽表,目标为满足80%的业务需求
- 存储周期:保留历史至今所有的数据
- 表命名规范:
dwd_业务描述时间粒度
DWS层
- 数据汇总层:汇总数据,解决数据汇总计算和数据完整度问题
- 存储周期:保留历史至今所有的数据
- 表命名规范:
dws_业务描述_时间粒度_sum
DIM层
- 公共维度层:存储公共的信息数据,用于DWD、DWS的数据关联
- 存储周期:按需存储,一般保留历史至今所有的数据
- 表命名规范:
dim_维度描述
DM层
- 数据集市层:用于BI、多维分析、标签、数据挖掘等
- 存储周期:按需存储,--般保留历史至今所有的数据
- 表命名规范:
dm_主题_表内容_分表规则
分层之间的数据流转:
image.png
Hive是什么
Hive简介:
- Hive是基于Hadoop的数据仓库工具,提供类SQL语法(HiveQL)
- 默认以MR作为计算引擎(也支持其他计算引擎,例如tez)、HDFS 作为存储系统,提供超大数据集的计算/扩展能力
- Hive是将数据映射成数据库和一张张的表,库和表的元数据信息一般存在关系型数据库
Hive的简单架构图:
image.png
Hive VS Hadoop:
- Hive数据存储:Hive的数据是存储在HDFS.上的,Hive的库和表是对HDFS.上数据的映射
- Hive元数据存储:元数据存储一般在外部关系库( Mysql )与Presto Impala等共享
- Hive语句的执行过程:将HQL转换为MapReduce任务运行
Hive与关系数据库Mysql的区别
产品定位
Hive是数据仓库,为海量数据的离线分析设计的,不支持OLTP(联机事务处理所需的关键功能ACID,而更接近于OLAP(联机分析技术)),适给离线处理大数据集。而MySQL是关系型数据库,是为实时业务设计的。
可扩展性
Hive中的数据存储在HDFS(Hadoop的分布式文件系统),metastore元数据一 般存储在独立的关系型数据库中,而MySQL则是服务器本地的文件系统。因此Hive具有良好的可扩展性,数据库由于ACID语义的严格限制,扩展性十分有限。
读写模式
Hive为读时模式,数据的验证则是在查询时进行的,这有利于大数据集的导入,读时模式使数据的加载非常迅速,数据的加载仅是文件复制或移动。MySQL为写时模式,数据在写入数据库时对照模式检查。写时模式有利于提升查询性能,因为数据库可以对列进行索引。
数据更新
Hive是针对数据仓库应用设计的,而数仓的内容是读多写少的,Hive中不支持对数据进行改写,所有数据都是在加载的时候确定好的。而数据库中的数据通常是需要经常进行修改的。
索引
Hive支持索引,但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。Hive提供了有限的索引功能,可以为-些字段建立索引,一张表的索引数据存储在另外一张表中。由于数据的访问延迟较高,Hive不适合在线数据查询。数据库在少星的特定条件的数据访问中,索引可以提供较低的延迟。
计算模型
image.pngHive默认使用的模型是MapReduce(也可以on spark、on tez),而MySQL使用的是自己设计的Executor计算模型
Hive安装部署
参考:
Hive基本使用(上)Hive数据类型/分区/基础语法
Hive数据类型:
- 基本数据类型:int、 float、 double、 string、 boolean、 bigint等
- 复杂类型:array、map、 struct
Hive分区:
- Hive将海量数据按某几个字段进行分区,查询时不必加载全部数据
- 分区对应到HDFS就是HDFS的目录.
- 分区分为静态分区和动态分区两种
Hive常用基础语法:
USE DATABASE_NAME
CREATE DATABASE IF NOT EXISTS DB NAME
DESC DATABASE DB NAME
CREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t" STORE AS TEXTFILE
SELECT * FROM TABLE NAME
ALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME
写个Python脚本生成一些测试数据:
import json
import random
import uuid
name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline')
hobby = ('reading', 'play', 'dancing', 'sing')
subject = ('math', 'chinese', 'english', 'computer')
data = []
for item in name:
scores = {key: random.randint(60, 100) for key in subject}
data.append("|".join([uuid.uuid4().hex, item, ','.join(
random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])]))
with open('test.csv', 'w') as f:
f.write('\n'.join(data))
执行该脚本,生成测试数据文件:
[root@hadoop01 ~/py-script]# python3 gen_data.py
[root@hadoop01 ~/py-script]# ll -h
...
-rw-r--r--. 1 root root 745 11月 9 11:09 test.csv
[root@hadoop01 ~/py-script]#
我们可以看一下生成的数据:
[root@hadoop01 ~/py-script]# cat test.csv
f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77
...
- 数据以
|
符进行分割,前两个字段都是string
类型,第三个字段是array
类型,第四个字段是map
类型
创建测试用的数据库:
0: jdbc:hive2://localhost:10000> create database hive_test;
No rows affected (0.051 seconds)
0: jdbc:hive2://localhost:10000> use hive_test;
No rows affected (0.06 seconds)
0: jdbc:hive2://localhost:10000>
创建测试表:
CREATE TABLE test(
user_id string,
user_name string,
hobby array<string>,
scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
将本地数据加载到Hive中:
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test;
No rows affected (0.785 seconds)
0: jdbc:hive2://localhost:10000>
查询数据:
image.png
Hive将HQL转换为MapReduce的流程
了解了Hive中的SQL基本操作之后,我们来看看Hive是如何将SQL转换为MapReduce任务的,整个转换过程分为六个阶段:
- Antr定义SQL的语法规则,完成SQL词法,语法解析,将SQL 转化为抽象语法树AST Tree
- 遍历AST Tree,抽象出查询的基本组成单元QueryBlock
- 遍历QueryBlock,翻译为执行操作树OperatorTree
- 逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shufle数据量
- 遍历OperatorTree,翻译为MapReduce任务
- 物理层优化器进行MapReduce任务的变换,生成最终的执行计划
与普通SQL一样,我们可以通过在HQL前面加上explain
关键字查看HQL的执行计划:
explain select * from test where id > 10 limit 1000
Hive会将这条语句解析成一个个的Operator,Operator就是Hive解析之后的最小单元,每个Operator其实都是对应一个MapReduce任务。例如,上面这条语句被Hive解析后,就是由如下Operator组成:
image.png
同时,Hive实现了优化器对这些Operator的顺序进行优化,帮助我们提升查询效率。Hive中的优化器主要分为三类:
- RBO(Rule-Based Optimizer):基于规则的优化器
- CBO(Cost-Based Optimizer):基于代价的优化器,这是默认的优化器
- 动态CBO:在执行计划生成的过程中动态优化的方式
Hive基本使用(中)内部表/外部表/分区表/分桶表
内部表:
和传统数据库的Table概念类似,对应HDFS上存储目录,删除表时,删除元数据和表数据。内部表的数据,会存放在HDFS中的特定的位置中,可以通过配置文件指定。当删除表时,数据文件也会一并删除。适用于临时创建的中间表。
外部表:
指向已经存在的HDFS数据,删除时只删除元数据信息。适用于想要在Hive之外使用表的数据的情况,当你删除External Table时,只是删除了表的元数据,它的数据并没有被删除。适用于数据多部门共享。建表时使用
create external table
。指定external
关键字即可。
分区表:
Partition对应普通数据库对Partition列的密集索引,将数据按照Partition列存储到不同目录,便于并行分析,减少数据量。分区表创建表的时候需要指定分区字段。
分区字段与普通字段的区别:分区字段会在HDFS表目录下生成一个分区字段名称的目录,而普通字段则不会,查询的时候可以当成普通字段来使用,一般不直接和业务直接相关。
分桶表:
对数据进行hash,放到不同文件存储,方便抽样和join查询。可以将内部表,外部表和分区表进一步组织成桶表,可以将表的列通过Hash算法进一步分解成不同的文件存储。
对于内部表和外部表的概念和应用场景我们很容易理解,我们需要重点关注一下分区表和分桶表。 我们为什么要建立分区表和分桶表呢?HQL通过where
子句来限制条件提取数据,那么与其遍历一张大表,不如将这张大表拆分成多个小表,并通过合适的索引来扫描表中的一小部分,分区和分桶都是采用了这种理念。
分区会创建物理目录,并且可以具有子目录(通常会按照时间、地区分区),目录名以 分区名=值
形式命名,例如:create_time=202011
。分区名会作为表中的伪列,这样通过where
字句中加入分区的限制可以在仅扫描对应子目录下的数据。通过 partitioned by (feld1 type, ...)
创建分区列。
分桶可以继续在分区的基础上再划分小表,分桶根据哈希值来确定数据的分布(即MapReducer中的分区),比如分区下的一部分数据可以根据分桶再分为多个桶,这样在查询时先计算对应列的哈希值并计算桶号,只需要扫描对应桶中的数据即可。分桶通过clustered by(field) into n buckets
创建。
接下来简单演示下这几种表的操作,首先将上一小节生成的测试数据文件上传到hdfs中:
[root@hadoop01 ~]# hdfs dfs -mkdir /test
[root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test
[root@hadoop01 ~]# hdfs dfs -ls /test
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /test/test.csv
[root@hadoop01 ~]#
内部表
建表SQL:
CREATE TABLE test_table(
user_id string,
user_name string,
hobby array<string>,
scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
将hdfs数据加载到Hive中:
0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table;
No rows affected (0.169 seconds)
0: jdbc:hive2://localhost:10000>
查看创建的表存储在hdfs的哪个目录下:
0: jdbc:hive2://localhost:10000> show create table test_table;
+----------------------------------------------------+
| createtab_stmt |
+----------------------------------------------------+
| CREATE TABLE `test_table`( |
| `user_id` string, |
| `user_name` string, |
| `hobby` array<string>, |
| `scores` map<string,int>) |
| ROW FORMAT SERDE |
| 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' |
| WITH SERDEPROPERTIES ( |
| 'collection.delim'=',', |
| 'field.delim'='|', |
| 'line.delim'='\n', |
| 'mapkey.delim'=':', |
| 'serialization.format'='|') |
| STORED AS INPUTFORMAT |
| 'org.apache.hadoop.mapred.TextInputFormat' |
| OUTPUTFORMAT |
| 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
| LOCATION |
| 'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' |
| TBLPROPERTIES ( |
| 'bucketing_version'='2', |
| 'transient_lastDdlTime'='1604893190') |
+----------------------------------------------------+
22 rows selected (0.115 seconds)
0: jdbc:hive2://localhost:10000>
在hdfs中可以查看到数据文件:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv
[root@hadoop01 ~]#
删除表:
0: jdbc:hive2://localhost:10000> drop table test_table;
No rows affected (0.107 seconds)
0: jdbc:hive2://localhost:10000>
查看hdfs会发现该表所对应的存储目录也一并被删除了:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/
Found 2 items
drwxr-xr-x - root supergroup 0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table
drwxr-xr-x - root supergroup 0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test
[root@hadoop01 ~]#
外部表
建表SQL,与内部表的区别就在于external
关键字:
CREATE external TABLE external_table(
user_id string,
user_name string,
hobby array<string>,
scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
将数据文件加载到Hive中:
0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table;
No rows affected (0.182 seconds)
0: jdbc:hive2://localhost:10000>
此时会发现hdfs中的数据文件会被移动到hive的目录下:
[root@hadoop01 ~]# hdfs dfs -ls /test
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]#
删除表:
0: jdbc:hive2://localhost:10000> drop table external_table;
No rows affected (0.112 seconds)
0: jdbc:hive2://localhost:10000>
查看hdfs会发现该表所对应的存储目录仍然存在:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]#
分区表
建表语句:
CREATE TABLE partition_table(
user_id string,
user_name string,
hobby array<string>,
scores map<string,integer>
)
PARTITIONED BY (create_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
将数据文件加载到Hive中,并指定分区:
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011');
No rows affected (0.747 seconds)
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012');
No rows affected (0.347 seconds)
0: jdbc:hive2://localhost:10000>
执行如下sql,可以从不同的分区统计结果:
0: jdbc:hive2://localhost:10000> select count(*) from partition_table;
+------+
| _c0 |
+------+
| 16 |
+------+
1 row selected (15.881 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011';
+------+
| _c0 |
+------+
| 8 |
+------+
1 row selected (14.639 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012';
+------+
| _c0 |
+------+
| 8 |
+------+
1 row selected (15.555 seconds)
0: jdbc:hive2://localhost:10000>
分区表在hdfs中的存储结构:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table
Found 2 items
drwxr-xr-x - root supergroup 0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
drwxr-xr-x - root supergroup 0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
Found 1 items
-rw-r--r-- 1 root supergroup 745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv
[root@hadoop01 ~]#
分桶表
建表语句:
CREATE TABLE bucket_table(
user_id string,
user_name string,
hobby array<string>,
scores map<string,integer>
)
clustered by (user_name) sorted by (user_name) into 2 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';
将test
表中的数据插入到bucket_table
中:
0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test;
No rows affected (17.393 seconds)
0: jdbc:hive2://localhost:10000>
抽样查询:
image.png
分桶表在hdfs的存储目录如下:
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table
Found 2 items
-rw-r--r-- 1 root supergroup 465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0
-rw-r--r-- 1 root supergroup 281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0
[root@hadoop01 ~]#
Hive基本使用(下)内置函数/自定义函数/实现UDF
Hive常见内置函数:
- 字符串类型:concat、substr、 upper、 lower
- 时间类型:year、month、 day
- 复杂类型:size、 get_json_object
查询引擎都自带了一部分函数来帮助我们解决查询过程当中一些复杂的数据计算或者数据转换操作,但是有时候自带的函数功能不能满足业务的需要。这时候就需要我们自己开发自定义的函数来辅助完成了,这就是所谓的用户自定义函数UDF(User-Defined Functions)。Hive支持三类自定义函数:
- UDF:普通的用户自定义函数。用来处理输入一行,输出一行的操作,类似Map操作。如转换字符串大小写,获取字符串长度等
- UDAF:用户自定义聚合函数(User-defined aggregate function),用来处理输入多行,输出一行的操作,类似Reduce操作。比如MAX、COUNT函数。
- UDTF:用户自定义表产生函数(User defined table-generating function),用来处理输入一行,输出多行(即一个表)的操作, 不是特别常用
UDF函数其实就是一段遵循一定接口规范的程序。在执行过程中Hive将SQL转换为MapReduce程序,在执行过程当中在执行我们的UDF函数。
本小节简单演示下自定义UDF函数,首先创建一个空的Maven项目,然后添加hive-exec
依赖,版本与你安装的Hive版本需对应上。完整的pom
文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>hive-udf-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
首先创建一个继承UDF
的类,我们实现的这个自定义函数功能就是简单的获取字段的长度:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public class StrLen extends UDF {
public int evaluate(final Text col) {
return col.getLength();
}
}
以上这种自定义函数只能支持处理普通类型的数据,如果要对复杂类型的数据做处理则需要继承GenericUDF
,并实现其抽象方法。例如,我们实现一个对测试数据中的scores
字段求平均值的函数:
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import java.text.DecimalFormat;
public class AvgScore extends GenericUDF {
/**
* 函数的名称
*/
private static final String FUNC_NAME = "AVG_SCORE";
/**
* 函数所作用的字段类型,这里是map类型
*/
private transient MapObjectInspector mapOi;
/**
* 控制精度只返回两位小数
*/
DecimalFormat df = new DecimalFormat("#.##");
@Override
public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
// 在此方法中可以做一些前置的校验,例如检测函数参数个数、检测函数参数类型
mapOi = (MapObjectInspector) objectInspectors[0];
// 指定函数的输出类型
return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
// 函数的核心逻辑,取出map中的value进行求平均值,并返回一个Double类型的结果值
Object o = deferredObjects[0].get();
double v = mapOi.getMap(o).values().stream()
.mapToDouble(a -> Double.parseDouble(a.toString()))
.average()
.orElse(0.0);
return Double.parseDouble(df.format(v));
}
@Override
public String getDisplayString(String[] strings) {
return "func(map)";
}
}
对项目进行打包,并上传到服务器中:
[root@hadoop01 ~/jars]# ls
hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]#
将jar包上传到hdfs中:
[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs
[root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs
[root@hadoop01 ~/jars]# hdfs dfs -ls /udfs
Found 1 items
-rw-r--r-- 1 root supergroup 4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]#
在Hive中添加该jar包:
0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;
No rows affected (0.022 seconds)
0: jdbc:hive2://localhost:10000>
然后注册临时函数,临时函数只会在当前的session中生效:
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen";
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore";
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000>
使用自定义函数处理:
0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test;
+------------+---------+------------+
| user_name | length | avg_score |
+------------+---------+------------+
| Tom | 3 | 80.25 |
| Jerry | 5 | 77.5 |
| Jim | 3 | 83.75 |
| Angela | 6 | 84.5 |
| Ann | 3 | 90.0 |
| Bella | 5 | 69.25 |
| Bonnie | 6 | 76.5 |
| Caroline | 8 | 84.5 |
+------------+---------+------------+
8 rows selected (0.083 seconds)
0: jdbc:hive2://localhost:10000>
删除已注册的临时函数:
0: jdbc:hive2://localhost:10000> drop temporary function strlen;
No rows affected (0.01 seconds)
0: jdbc:hive2://localhost:10000> drop temporary function avg_score;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000>
临时函数只会在当前的session中生效,如果需要注册成永久函数则只需要把TEMPORARY
关键字给去掉即可。如下所示:
0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.049 seconds)
0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000>
删除永久函数也是把TEMPORARY
关键字给去掉即可。如下所示:
0: jdbc:hive2://localhost:10000> drop function strlen;
No rows affected (0.031 seconds)
0: jdbc:hive2://localhost:10000> drop function avg_score;
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000>
Hive存储结构 - OrcFile
Hive支持的存储格式:
image.png
- TextFile是默认的存储格式,通过简单的分隔符可以对csv等类型的文件进行解析。但实际应用中通常都是使用OrcFile格式,因为ORCFile是列式存储格式,更加适合大数据查询的场景。
我们都知道关系型数据库基本是使用行式存储作为存储格式,而大数据领域更多的是采用列式存储,因为大数据分析场景中通常需要读取大量行,但是只需要少数的几个列。这也是为什么通常使用OrcFile作为Hive的存储格式的原因。由此可见,大数据的绝大部分应用场景都是OLAP场景。
OLAP场景的特点
读多于写
不同于事务处理(OLTP)的场景,比如电商场景中加购物车、下单、支付等需要在原地进行大量insert、update、delete操作,数据分析(OLAP)场景通常是将数据批量导入后,进行任意维度的灵活探索、BI工具洞察、报表制作等。
数据一次性写入后,分析师需要尝试从各个角度对数据做挖掘、分析,直到发现其中的商业价值、业务变化趋势等信息。这是一个需要反复试错、不断调整、持续优化的过程,其中数据的读取次数远多于写入次数。这就要求底层数据库为这个特点做专门设计,而不是盲目采用传统数据库的技术架构。
大宽表,读大量行但是少量列,结果集较小
在OLAP场景中,通常存在一张或是几张多列的大宽表,列数高达数百甚至数千列。对数据分析处理时,选择其中的少数几列作为维度列、其他少数几列作为指标列,然后对全表或某一个较大范围内的数据做聚合计算。这个过程会扫描大量的行数据,但是只用到了其中的少数列。而聚合计算的结果集相比于动辄数十亿的原始数据,也明显小得多。
数据批量写入,且数据不更新或少更新
OLTP类业务对于延时(Latency)要求更高,要避免让客户等待造成业务损失;而OLAP类业务,由于数据量非常大,通常更加关注写入吞吐(Throughput),要求海量数据能够尽快导入完成。一旦导入完成,历史数据往往作为存档,不会再做更新、删除操作。
无需事务,数据一致性要求低
OLAP类业务对于事务需求较少,通常是导入历史日志数据,或搭配一款事务型数据库并实时从事务型数据库中进行数据同步。多数OLAP系统都支持最终一致性。
灵活多变,不适合预先建模
分析场景下,随着业务变化要及时调整分析维度、挖掘方法,以尽快发现数据价值、更新业务指标。而数据仓库中通常存储着海量的历史数据,调整代价十分高昂。预先建模技术虽然可以在特定场景中加速计算,但是无法满足业务灵活多变的发展需求,维护成本过高。
行式存储和列式存储
行式存储和列式存储的对比图:
image.png
与行式存储将每一行的数据连续存储不同,列式存储将每一列的数据连续存储。相比于行式存储,列式存储在分析场景下有着许多优良的特性:
- 如前所述,分析场景中往往需要读大量行但是少数几个列。在行存模式下,数据按行连续存储,所有列的数据都存储在一个block中,不参与计算的列在IO时也要全部读出,读取操作被严重放大。而列存模式下,只需要读取参与计算的列即可,极大的减低了IO cost,加速了查询。
- 同一列中的数据属于同一类型,压缩效果显著。列存往往有着高达十倍甚至更高的压缩比,节省了大量的存储空间,降低了存储成本。
- 更高的压缩比意味着更小的data size,从磁盘中读取相应数据耗时更短。
- 自由的压缩算法选择。不同列的数据具有不同的数据类型,适用的压缩算法也就不尽相同。可以针对不同列类型,选择最合适的压缩算法。
- 高压缩比,意味着同等大小的内存能够存放更多数据,系统cache效果更好。
OrcFile
OrcFile存储格式:
image.png
Orc列式存储优点:
- 查询时只需要读取查询所涉及的列,降低IO消耗,同时保存每一列统计信息,实现部分谓词下推
- 每列数据类型一致,可针对不同的数据类型采用其高效的压缩算法
- 列式存储格式假设数据不会发生改变,支持分片、流式读取,更好的适应分布式文件存储的特性
除了Orc外,Parquet也是常用的列式存储格式。Orc VS Parquet:
- OrcFile和Parquet都是Apache的顶级项目
- Parquet不支持ACID、不支持更新,Orc支持有限的ACID和更新
- Parquet的压缩能力较高,Orc的查询效率较高
离线数仓VS实时数仓
image.png离线数仓:
- 离线数据仓库主要基于Hive等技术来构建T+1的离线数据
- 通过定时任务每天拉取增量数据导入到Hive表中
- 创建各个业务相关的主题维度数据,对外提供T+1的数据查询接口
离线数仓架构:
- 数据源通过离线的方式导入到离线数仓中
- 数据分层架构:ODS、DWD、 DM
- 下游应用根据业务需求选择直接读取DM
实时数仓:
- 实时数仓基于数据采集工具,将原始数据写入到Kafka等数据通道
- 数据最终写入到类似于HBase这样支持快速读写的存储系统
- 对外提供分钟级别、甚至秒级别的查询方案
实时数仓架构:
- 业务实时性要求的不断提高,实时处理从次要部分变成了主要部分
- Lambda架构:在离线大数据架构基础上加了一个加速层,使用流处理技术完成实时性较高的指标计算
- Kappa架构:以实时事件处理为核心,统一数据处理
图解Lambda架构数据流程
Lambda 架构(Lambda Architecture)是由 Twitter 工程师南森·马茨(Nathan Marz)提出的大数据处理架构。这一架构的提出基于马茨在 BackType 和 Twitter 上的分布式数据处理系统的经验。
Lambda 架构使开发人员能够构建大规模分布式数据处理系统。它具有很好的灵活性和可扩展性,也对硬件故障和人为失误有很好的容错性。
Lambda 架构总共由三层系统组成:批处理层(Batch Layer),速度处理层(Speed Layer),以及用于响应查询的服务层(Serving Layer)。
image.png
在 Lambda 架构中,每层都有自己所肩负的任务。批处理层存储管理主数据集(不可变的数据集)和预先批处理计算好的视图。批处理层使用可处理大量数据的分布式处理系统预先计算结果。它通过处理所有的已有历史数据来实现数据的准确性。这意味着它是基于完整的数据集来重新计算的,能够修复任何错误,然后更新现有的数据视图。输出通常存储在只读数据库中,更新则完全取代现有的预先计算好的视图。
速度处理层会实时处理新来的数据。速度层通过提供最新数据的实时视图来最小化延迟。速度层所生成的数据视图可能不如批处理层最终生成的视图那样准确或完整,但它们几乎在收到数据后立即可用。而当同样的数据在批处理层处理完成后,在速度层的数据就可以被替代掉了。
本质上,速度层弥补了批处理层所导致的数据视图滞后。比如说,批处理层的每个任务都需要 1 个小时才能完成,而在这 1 个小时里,我们是无法获取批处理层中最新任务给出的数据视图的。而速度层因为能够实时处理数据给出结果,就弥补了这 1 个小时的滞后。
所有在批处理层和速度层处理完的结果都输出存储在服务层中,服务层通过返回预先计算的数据视图或从速度层处理构建好数据视图来响应查询。
所有的新用户行为数据都可以同时流入批处理层和速度层。批处理层会永久保存数据并且对数据进行预处理,得到我们想要的用户行为模型并写入服务层。而速度层也同时对新用户行为数据进行处理,得到实时的用户行为模型。
而当“应该对用户投放什么样的广告”作为一个查询(Query)来到时,我们从服务层既查询服务层中保存好的批处理输出模型,也对速度层中处理的实时行为进行查询,这样我们就可以得到一个完整的用户行为历史了。
一个查询就如下图所示,既通过批处理层兼顾了数据的完整性,也可以通过速度层弥补批处理层的高延时性,让整个查询具有实时性。
image.png
Kappa 架构 VS Lambda
Lambda 架构的不足
虽然 Lambda 架构使用起来十分灵活,并且可以适用于很多的应用场景,但在实际应用的时候,Lambda 架构也存在着一些不足,主要表现在它的维护很复杂。
使用 Lambda 架构时,架构师需要维护两个复杂的分布式系统,并且保证他们逻辑上产生相同的结果输出到服务层中。举个例子吧,我们在部署 Lambda 架构的时候,可以部署 Apache Hadoop 到批处理层上,同时部署 Apache Flink 到速度层上。
我们都知道,在分布式框架中进行编程其实是十分复杂的,尤其是我们还会针对不同的框架进行专门的优化。所以几乎每一个架构师都认同,Lambda 架构在实战中维护起来具有一定的复杂性。
那要怎么解决这个问题呢?我们先来思考一下,造成这个架构维护起来如此复杂的根本原因是什么呢?
维护 Lambda 架构的复杂性在于我们要同时维护两套系统架构:批处理层和速度层。我们已经说过了,在架构中加入批处理层是因为从批处理层得到的结果具有高准确性,而加入速度层是因为它在处理大规模数据时具有低延时性。
那我们能不能改进其中某一层的架构,让它具有另外一层架构的特性呢?例如,改进批处理层的系统让它具有更低的延时性,又或者是改进速度层的系统,让它产生的数据视图更具准确性和更加接近历史数据呢?
另外一种在大规模数据处理中常用的架构——Kappa 架构(Kappa Architecture),便是在这样的思考下诞生的。
Kappa 架构
Kappa 架构是由 LinkedIn 的前首席工程师杰伊·克雷普斯(Jay Kreps)提出的一种架构思想。克雷普斯是几个著名开源项目(包括 Apache Kafka 和 Apache Samza 这样的流处理系统)的作者之一,也是现在 Confluent 大数据公司的 CEO。
克雷普斯提出了一个改进 Lambda 架构的观点:
我们能不能改进 Lambda 架构中速度层的系统性能,使得它也可以处理好数据的完整性和准确性问题呢?我们能不能改进 Lambda 架构中的速度层,使它既能够进行实时数据处理,同时也有能力在业务逻辑更新的情况下重新处理以前处理过的历史数据呢?
他根据自身多年的架构经验发现,我们是可以做到这样的改进的。我们知道像 Apache Kafka 这样的流处理平台是具有永久保存数据日志的功能的。通过Kafka的这一特性,我们可以重新处理部署于速度层架构中的历史数据。
下面我就以 Kafka 为例来介绍整个全新架构的过程。
第一步,部署 Kafka,并设置数据日志的保留期(Retention Period)。
这里的保留期指的是你希望能够重新处理的历史数据的时间区间。例如,如果你希望重新处理最多一年的历史数据,那就可以把 Apache Kafka 中的保留期设置为 365 天。如果你希望能够处理所有的历史数据,那就可以把 Apache Kafka 中的保留期设置为“永久(Forever)”。
第二步,如果我们需要改进现有的逻辑算法,那就表示我们需要对历史数据进行重新处理。我们需要做的就是重新启动一个 Kafka 作业实例(Instance)。这个作业实例将重头开始,重新计算保留好的历史数据,并将结果输出到一个新的数据视图中。
我们知道 Kafka 的底层是使用 Log Offset 来判断现在已经处理到哪个数据块了,所以只需要将 Log Offset 设置为 0,新的作业实例就会重头开始处理历史数据。
第三步,当这个新的数据视图处理过的数据进度赶上了旧的数据视图时,我们的应用便可以切换到从新的数据视图中读取。
第四步,停止旧版本的作业实例,并删除旧的数据视图。
这个架构就如同下图所示。
image.png
与 Lambda 架构不同的是,Kappa 架构去掉了批处理层这一体系结构,而只保留了速度层。你只需要在业务逻辑改变又或者是代码更改的时候进行数据的重新处理。Kappa 架构统一了数据的处理方式,不再维护离线和实时两套代码逻辑。
Kappa 架构的不足
Kappa 架构也是有着它自身的不足的。因为 Kappa 架构只保留了速度层而缺少批处理层,在速度层上处理大规模数据可能会有数据更新出错的情况发生,这就需要我们花费更多的时间在处理这些错误异常上面。如果需求发生变化或历史数据需要重新处理都得通过上游重放来完成。并且重新处理历史的吞吐能力会低于批处理。
还有一点,Kappa 架构的批处理和流处理都放在了速度层上,这导致了这种架构是使用同一套代码来处理算法逻辑的。所以 Kappa 架构并不适用于批处理和流处理代码逻辑不一致的场景。
Lambda VS Kappa
image.png主流大公司的实时数仓架构
阿里菜鸟实时数仓
image.pngimage.png
美团实时数仓
image.png实时数仓建设特征
- 整体架构设计通过分层设计为OLAP查询分担压力
- 复杂的计算统一在实时计算层做,避免给OLAP查询带来过大的压力
- 汇总计算通过OLAP数据查询引擎进行
- 整个架构中实时计算一般 是Spark+Flink配合
- 消息队列Kafka一家独大,配合HBase、ES、 Mysq|进行数据落盘
- OLAP领域Presto、Druid、 Clickhouse、 Greenplum等等层出不穷