初识Hive
Hive是Facebook开源的用于解决海量结构化日志的数据统计工具。它是基于Hadoop的一个数据仓库工具,可以将结构化数据文件映射成一张表,并提供类SQL查询功能。本质是将HQL转化成MapReduce程序
优点
- 操作接口类SQL,提高开发效率
- 避免写MapReduce
- 延迟高,常用于数据分析,对实时性要求不高的场合
- 善于处理大数据,对处理小数据没有优势,因为延迟高
- 支持自定义函数
缺点
- 表达能力有限,无法表达迭代式算法,不善于数据挖掘
- 效率低,生成的MapReduce作业不够智能化
- 调优困难,粒度较粗
架构
hive-overview默认的MetaStore是derby,它只支持单用户模式,不能支持多个客户端同时访问,所以一般把它替换成Msql之类的数据库,通过JDBC对接
Hive只是Hadoop的一个客户端,它只做了SQL的分析和转黄成MR和表元数据的管理
元数据和HDFS存储数据分离,通过表名来关联,所以即使metastore有两个,如果他们的表一致,那么他们也可以访问到同一个HDFS文件
与数据库比较
-
查询语言
HQL与SQL类似
-
存储位置
Hive存于HDFS上,数据库存于本地
-
数据更新
Hive是一个数据仓库,数据仓库的内容是读多写少,因此Hive不建议对数据修改。数据库通常支持频繁修改
-
索引
Hive没有索引,所有查询都是整个数据扫描,因此延迟高,通过MapReduce来并发访问数据,对于大数据来说,即使没有索引,通过并发读,性能也不会太差。数据库一般会建立索引,实时性高
-
执行
Hive通过Hadoop的MapReduce来执行;数据库由自己的引擎执行
-
延迟
Hive延迟高,数据库延迟低
-
可扩展性
Hive是客户端,hadoop集群可以非常高,2009年最大的规模是4000台;数据库扩展性差
-
数据规模
Hive支持大数据;数据库的支持数据规模小
交互命令
hive -e "sql"
hive -f fileName
数据类型
DDL
创建数据库和表,实际是跟HDFS的目录做关联,如果目录不存在,则创建,如果存在,则关联
create database dbName;
create database dbName location 'pathInHdfs';
create table dbName.tableName;
create database if not exists dbName;
show databases;
show databases like 'hive*';
desc database <extended> dbName;
#只能修改额外属性
alter database dbName set dbproperties(attrName=value)
drop database dbName <cascade>;
create [external] table [if not exists] tableName
[like tableName]
[(colName type [comment])]
[comment table]
[partitioned by (colName type [comment])] #分区表
[clustered by (colName, colName...) ino num_buckets buckets] #分桶表
[sorted by (colName [ASC|DESC], ...)]
[row format row_format]
[stroed as file_format]
[location hdfs_path]
客户端
开启hiveserver2可以使用JDBC连接hive,例如beeline
内部表
也称管理表,如果删除表,HDFS上的数据也会删除
外部表
删除外部表,只会删除元数据,不会删除HDFS上的数据
内部表和内部表的转换
alter table tableName set tblproperties('ETERNAL'='TRUE/FALSE');
desc formatted tableName
分区表
创建分区表的新字段,会以文件夹名出现,并可以使用where来查询,可以提高查询效率,如果只需要查特定的分区
可以使用多个字段来创建分区,建立二级分区表
在插入数据时,需要指定分区字段的值
load data local inpath 'path' into table databaseName.tableName partition(colName='value')
如果手工上传文件到HDFS,则需要对齐元数据与分区或者手工添加分区元数据
msck repair table tableName
修改表
alter table tableName rename to newTableName;
alter table tableName change [column] colOldName colNewName colType [comment string] [first|after colName]
alter table tableName add|replace colums (colName colTYpe [comment])
DML
数据载入
load data [local] inpath 'path' overwrite|into table tableName [partition(col1=value1,...)]
#这相当于把文件直接put到hdfs上,不经过mr,所以在插入分桶分区的数据时,要使用insert,而不是load,走mr把数据插入
insert overwrite|into table tableName partition(col=v) values (...);
from tableName
insert overwirte|into tableName2 partition(..)
select * where month='202001'
insert overwirte|into tableName2 partition(..)
select * where month='202002'
create table tableName as select ... from tableName2
import table tableName from 'hdfs dir'
数据导出
insert overwrite|into [local] directory 'path'
row format delimited fields terminated by '\t'
select * from tableName
dfs -get 'hdfsPath' localPath
bin/hive -e 'sql'> localFileName
export table databaseName.tableName to 'hdfsPath'
#会导出元数据和数据文件
清空数据
truncate table tableName;
查询
select [all|distinct] ...
from tableName
[where condition]
[group by ..] [having ...]
[order by ...ASC|DESC]
[limit number]
MapReduce内部排序sort by
set mapreduce.job.reduce=3
insert overwrite local directory 'path' select from tableName sort by colName asc|desc
#可以看到结果会分成3个文件,文件内部排序输出
#实际是起了三个reducer,map通过shuffer到三个R中,然后R排序并分别输出
分区排序(Distribute by)
set mapreduce.job.reduce=3
insert overwrite local directory 'path' select from tableName distribute by colName1 sort by colName2 asc|desc
#流程与上面类似,只是在shuffer的时候,通过指定的col来hash
Cluster By
select * from tableName cluster by sameColName
=
select * from tableName distribute by sameColName sort by sameColName;
分桶表
create table tableName (cols)
clustered by (colName)
into 4 buckets
row format delimited fields terminated by '\t';
set hive.enforce.bucketing=true;
set mapreduce.job.reduces=1
分桶表和分区表的区别
分桶表通过自身字段来hash分文件
分区表通过新增额外字段来hash分目录
都是用来解决文件太大的问题
分桶表抽样查询
select * from table tabesample(bucket x out of y on id)
#table分y份,从第x个bucked开始取,一共取(x/y)*total个样本
#x必须小于y
NVL
select nvl(colName, value|colName2) from table;
case when
select dept,
sum(case sex when 'male' then 1 else 0 end) maleCount,
sum(case sex when 'female' then 1 else 0 end) femaleCount
from table
group by dept;
行转列
select dept, concat_ws(",", collect_set(name))
from table
group by dept
列转行
select user_id,order_value,order_id
from tableName
lateral view explode(split(order_value,',')) num as order_id
窗口查询
over: 指定窗口大小
current row: 当前行
n preceding: 往前n行
n following: 往后n行
unbounded preceding: 第一行
unbounded following: 最后一行
lag(col, n): 往前第n行的col的值
lead(col,n): 往后第n行col的值
select name,count(*) over()
from tableName
where orderDate='2020-05'
group by name;
count输出groupby后的行数
select *, sum(cost) over(distribute|partition by month(orderDate)) from tableName
distribute相当于groupby,同一个月的cost的总数
select *,sum(cost) over (sort by orderdate rows between unbounded preceding and current row)
from tableName;
#排序后,输出第一行到当前行的总数
select *,lag(orderdate,1) over(distribute by name sort by orderdate) from tableName;
#先groupby再排序,取上一条的orderdate
select * from (
select *,ntile(5) over(sort by orderdate) gid
from table
) t
where gid=1
把数据先排序,再增加一列gid,从1到5设置
rank
select name,subject,
rank() over(partition by subject order by score desc),#1,1,3,4
dense_rank() over(partition by subject order by score desc),#1,1,2,3
row_number() over(partition by subject order by score desc) #1,2,3,4
from table
snappy压缩
编译hadoop,把snappy加上
开启Map输出阶段压缩
set hive.exec.compress.intermediate=true;
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
开启reduce输出阶段压缩
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
set mapreduct.output.fileoutputformat.compress.type=BLOCK
文件存储格式
支持textfile/sequencefile/orc/parguet
hive-store- 行存储:数据按一行行存储,一行的数据连续存储,方便查询的时候,多列同时查
- 列存储:数据先按行做切分成多块,块内按列存储,同一列的数据连续存储,方便查询的时候,单列查询
textfile/sequencefile是行存储
orc/parguet是列存储
textfile格式
默认存储格式,数据不压缩,磁盘开销大,可以结合Gzip来使用
ORC格式
hive-orc数据默认压缩,块的大小为256m,块内按列存储。压缩率好,读取速度没差别
create table name (...)
row format delimited fields terminated by '\t'
stored as orc tblproperties ("orc.compress"="SNAPPY")
#默认的orc压缩率(zlib)会更高,但压缩速率没snappy好
parquet格式
与orc类似,但是压缩率没有orc好
优化
Fetch
set hive.fetch.task.conversion=none/minmal/more
none: 所有sql都走mr
minimal:select */filter on partition/limit不走mr,其他都走mr
more:select/filter/limit不走mr,其他都走mr
本地模式
hadoop的Job一般是跑在集群上的,但是对于数据量小的任务,触发执行的时间可能比执行时间多更多,此时,可以启动本地模式,把所有操作都在本地运行,加快执行速度
set hive.exec.mode.local.auto=true #开启本地模式
#当输入小于此字节数时,采用本地mr
set hive.exec.mode.local.auto.inputbytes.max=500000;
#当输入文件个数小于此数时,采用本地mr
set hive.exec.mode.local.auto.input.files.max=10;
小表join大表
一般小表在左边,大表在右边。但是hive的高版本已经做了优化。
set hive.auto.convert.join=true
#开启时,会先缓存小表,并在map端join
#不开启时,不缓存,所有的join会先shuffer到reduce上,再join
hive-mapjoin
空Key过滤
有时join时,某些key对应的数据大多,而相同的key会hash到同一个reducer上,导致集群中的某个reducer承受过多的数据,而很多这样的key是null
select * from (select * from t1 where id is not null) t left join t2 on t.id=t2.id;
select * from t left join t2 on case when t.id is null then (rand()) else t.id end)=t2.id;
#随机生成虽然解决了reducer的数据倾斜,但有可能有些业务无法处理
group by
hive.map.aggr=true #开启本地combinar
hive.groupby.mapaggr.checkinterval=10000 #combinar的条数
#默认为false,true用来解决数据倾斜,所以会导致某些业务无法处理
hive.groupby.skewindata=true
count(distinct)
#会只能用一个reduce来排重,得出最后的结果
select count(distinct id) from bigtable;
#可以分散到多个reducer来计算
select count(1) from (select id from bigtable group by id) t;
笛卡儿积
当join不加on时或者无效的on条件时,hive只能用1个reducer来完成笛卡儿积
行列过滤
在join表前,先把表用where来过滤,以减少join的数据量
分区
分桶
动态分区
当插入分区表时,一般需要在sql静态指明分区字段的值,否者插入会失败
可以通过设置动态设置分区字段通过select的方法
hive.exec.dynamic.partition=true#开启动态分区
hive.exec.dynamic.partition.mode=nonstrict#开启非严格模式,严格模式为至少指定一个分区为非静态分区
hive.exec.max.dynamic.partitions=1000#在所有mr节点最多可以创建多少个动态分区
hive.exec.max.dynamic.partitions.pernode=100#在每个mr节点,最多可以创建多少个动态分区
hive.exec.max.created.file=10000#在mr job中最大可以创建多少个hdfs文件
hive.error.on.empty.partition=false#当分区为空时,是否报错
insert overwrite table tableName partition(p_time)
select id,name,p_time from tableName2;
Map数量
作业一般通过input目录产生一个或多个Map,主要决定因素有input文件总个数,文件大小,集群设置的文件块大小
是不是map越多越好
不是,如果一个任务处理很多小文件,启动job的时间都大于处理文件的时间,则会造成很大的浪费
是不是保证每个map处理128m的文件块就好
不是,如果文件每行需要的计算量非常大,如果还是按照128m来分的话,则有可能job的运行时间会超长
mapreduce.input.fileinputformat.split.maxsize=100;
mapreduce.input.fileinputformat.split.minsize=10;
#通过设置maxsize小于blocksize就可以把文件拆开
reduce数
reduce处理的数据量默认时256m
hive.exec.reducers.bytes.per.reducer=256000000;
每个任务最大的reduce数默认1009
hive.exec.reducers.max=1009
设置reducer数
mapreduce.job.reducer=10;
有多少个reducer就会生成多少个结果文件,太多的reducer会消耗时间和资源,过多的结果文件会造成小文件
小文件合并
通过设置,可以在map前先把小文件合并,以减少map的数量
hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
并行执行
hive的一个查询有可能有很多个阶段,而某些阶段有可能并没有依赖关系,此时可以开启并行模式来执行任务
hive.exec.parallel=true;
hive.exec.parallel.thread.number=16;
严格模式
hive.mapred.mode=strict/nonstrict
#不能查询笛卡儿积
#不能查询分区表而没有过滤分区条件
#不能比较整形和字符串
#不能比较整形和浮点
#不能使用orderby而没有limit
JVM重用
Hadoop可以用一个JVM在同一个Job内重用N次,JVM启动的开销比较大,所以重用可以很好的提高性能
mapreduce.job.jvm.numtasks
10
推测执行
由于数据倾斜的问题,会导致hadoop的负载不均衡,某些job要很久也完成不了,造成任务都阻塞在最后一个job那里,hadoop通过推测机制,为这样的任务启动一个备份任务,让备份任务执行同一个数据,两个任务谁先完成计算,谁的结果就会成为最终结果
mapreduce.map.speculative
true/false