初识Hive

2020-08-15  本文已影响0人  谭英智

Hive是Facebook开源的用于解决海量结构化日志的数据统计工具。它是基于Hadoop的一个数据仓库工具,可以将结构化数据文件映射成一张表,并提供类SQL查询功能。本质是将HQL转化成MapReduce程序

优点

缺点

架构

hive-overview

默认的MetaStore是derby,它只支持单用户模式,不能支持多个客户端同时访问,所以一般把它替换成Msql之类的数据库,通过JDBC对接

Hive只是Hadoop的一个客户端,它只做了SQL的分析和转黄成MR和表元数据的管理

元数据和HDFS存储数据分离,通过表名来关联,所以即使metastore有两个,如果他们的表一致,那么他们也可以访问到同一个HDFS文件

与数据库比较

交互命令

hive -e "sql"
hive -f fileName

数据类型

DDL

创建数据库和表,实际是跟HDFS的目录做关联,如果目录不存在,则创建,如果存在,则关联

create database dbName;
create database dbName location 'pathInHdfs';
create table dbName.tableName;
create database if not exists dbName;
show databases;
show databases like 'hive*';
desc database <extended> dbName;
#只能修改额外属性
alter database dbName set dbproperties(attrName=value)
drop database dbName <cascade>;
create [external] table [if not exists] tableName
[like tableName]
[(colName type [comment])]
[comment table]
[partitioned by (colName type [comment])] #分区表
[clustered by (colName, colName...) ino num_buckets buckets] #分桶表
[sorted by (colName [ASC|DESC], ...)] 
[row format row_format]
[stroed as file_format]
[location hdfs_path]

客户端

开启hiveserver2可以使用JDBC连接hive,例如beeline

内部表

也称管理表,如果删除表,HDFS上的数据也会删除

外部表

删除外部表,只会删除元数据,不会删除HDFS上的数据

内部表和内部表的转换

alter table tableName set tblproperties('ETERNAL'='TRUE/FALSE');

desc formatted tableName

分区表

创建分区表的新字段,会以文件夹名出现,并可以使用where来查询,可以提高查询效率,如果只需要查特定的分区

可以使用多个字段来创建分区,建立二级分区表

在插入数据时,需要指定分区字段的值

load data local inpath 'path' into table databaseName.tableName partition(colName='value')

如果手工上传文件到HDFS,则需要对齐元数据与分区或者手工添加分区元数据

msck repair table tableName

修改表

alter table tableName rename to newTableName;
alter table tableName change [column] colOldName colNewName colType [comment string] [first|after colName]
alter table tableName add|replace colums (colName colTYpe [comment])

DML

数据载入

load data [local] inpath 'path' overwrite|into table tableName [partition(col1=value1,...)]
#这相当于把文件直接put到hdfs上,不经过mr,所以在插入分桶分区的数据时,要使用insert,而不是load,走mr把数据插入
insert overwrite|into table tableName partition(col=v) values (...);
from tableName
insert overwirte|into tableName2 partition(..)
select * where month='202001'
insert overwirte|into tableName2 partition(..)
select * where month='202002'
create table tableName as select ... from tableName2
import table tableName from 'hdfs dir'

数据导出

insert overwrite|into [local] directory 'path'
row format delimited fields terminated  by '\t'
select * from tableName 
dfs -get 'hdfsPath' localPath
bin/hive -e 'sql'> localFileName
export table databaseName.tableName to 'hdfsPath'
#会导出元数据和数据文件

清空数据

truncate table tableName;

查询

select  [all|distinct] ...
from tableName
[where condition]
[group by ..] [having ...]
[order by ...ASC|DESC]
[limit number]

MapReduce内部排序sort by

set mapreduce.job.reduce=3
insert overwrite local directory 'path' select from tableName sort by colName asc|desc
#可以看到结果会分成3个文件,文件内部排序输出
#实际是起了三个reducer,map通过shuffer到三个R中,然后R排序并分别输出

分区排序(Distribute by)

set mapreduce.job.reduce=3
insert overwrite local directory 'path' select from tableName distribute by colName1 sort by colName2 asc|desc
#流程与上面类似,只是在shuffer的时候,通过指定的col来hash

Cluster By

select * from tableName cluster by sameColName
=
select * from tableName distribute by sameColName sort by sameColName;

分桶表

create table tableName (cols)
clustered by (colName)
into 4 buckets
row format delimited fields terminated by '\t';
set hive.enforce.bucketing=true;
set mapreduce.job.reduces=1
分桶表和分区表的区别

分桶表通过自身字段来hash分文件

分区表通过新增额外字段来hash分目录

都是用来解决文件太大的问题

分桶表抽样查询
select * from table tabesample(bucket x out of y on id)
#table分y份,从第x个bucked开始取,一共取(x/y)*total个样本
#x必须小于y

NVL

select nvl(colName, value|colName2) from table;

case when

select dept, 
sum(case sex when 'male' then 1 else 0 end) maleCount,
sum(case sex when 'female' then 1 else 0 end) femaleCount
from table
group by dept;

行转列

select dept, concat_ws(",", collect_set(name))
from table
group by dept

列转行

select user_id,order_value,order_id
from tableName
lateral view explode(split(order_value,',')) num as order_id

窗口查询

over: 指定窗口大小

current row: 当前行

n preceding: 往前n行

n following: 往后n行

unbounded preceding: 第一行

unbounded following: 最后一行

lag(col, n): 往前第n行的col的值

lead(col,n): 往后第n行col的值

select name,count(*) over()
from tableName
where orderDate='2020-05'
group by name;
count输出groupby后的行数
select *, sum(cost) over(distribute|partition by month(orderDate)) from tableName
distribute相当于groupby,同一个月的cost的总数
select *,sum(cost) over (sort by orderdate rows between unbounded preceding and current row)
from tableName;
#排序后,输出第一行到当前行的总数
select *,lag(orderdate,1) over(distribute by name sort by orderdate) from tableName;
#先groupby再排序,取上一条的orderdate
select * from (
select *,ntile(5) over(sort by orderdate) gid
from table
) t
where gid=1
把数据先排序,再增加一列gid,从1到5设置

rank

select name,subject,
rank() over(partition by subject order by score desc),#1,1,3,4
dense_rank() over(partition by subject order by score desc),#1,1,2,3
row_number() over(partition by subject order by score desc) #1,2,3,4
from table

snappy压缩

编译hadoop,把snappy加上

开启Map输出阶段压缩
set hive.exec.compress.intermediate=true;
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
开启reduce输出阶段压缩
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
set mapreduct.output.fileoutputformat.compress.type=BLOCK

文件存储格式

支持textfile/sequencefile/orc/parguet

hive-store

textfile/sequencefile是行存储

orc/parguet是列存储

textfile格式

默认存储格式,数据不压缩,磁盘开销大,可以结合Gzip来使用

ORC格式

hive-orc

数据默认压缩,块的大小为256m,块内按列存储。压缩率好,读取速度没差别

create table name (...)
row format delimited fields terminated by '\t'
stored as orc tblproperties ("orc.compress"="SNAPPY")
#默认的orc压缩率(zlib)会更高,但压缩速率没snappy好

parquet格式

与orc类似,但是压缩率没有orc好

优化

Fetch

set hive.fetch.task.conversion=none/minmal/more
none: 所有sql都走mr
minimal:select */filter on partition/limit不走mr,其他都走mr
more:select/filter/limit不走mr,其他都走mr

本地模式

hadoop的Job一般是跑在集群上的,但是对于数据量小的任务,触发执行的时间可能比执行时间多更多,此时,可以启动本地模式,把所有操作都在本地运行,加快执行速度

set hive.exec.mode.local.auto=true #开启本地模式
#当输入小于此字节数时,采用本地mr
set hive.exec.mode.local.auto.inputbytes.max=500000;
#当输入文件个数小于此数时,采用本地mr
set hive.exec.mode.local.auto.input.files.max=10;

小表join大表

一般小表在左边,大表在右边。但是hive的高版本已经做了优化。

set hive.auto.convert.join=true
#开启时,会先缓存小表,并在map端join
#不开启时,不缓存,所有的join会先shuffer到reduce上,再join
hive-mapjoin

空Key过滤

有时join时,某些key对应的数据大多,而相同的key会hash到同一个reducer上,导致集群中的某个reducer承受过多的数据,而很多这样的key是null

select * from (select * from t1 where id is not null) t left join t2 on t.id=t2.id;
select * from t left join t2 on case when t.id is null then (rand()) else t.id end)=t2.id;
#随机生成虽然解决了reducer的数据倾斜,但有可能有些业务无法处理

group by

hive.map.aggr=true #开启本地combinar
hive.groupby.mapaggr.checkinterval=10000 #combinar的条数
#默认为false,true用来解决数据倾斜,所以会导致某些业务无法处理
hive.groupby.skewindata=true

count(distinct)

#会只能用一个reduce来排重,得出最后的结果
select count(distinct id) from bigtable;
#可以分散到多个reducer来计算
select count(1) from (select id from bigtable group by id) t;

笛卡儿积

当join不加on时或者无效的on条件时,hive只能用1个reducer来完成笛卡儿积

行列过滤

在join表前,先把表用where来过滤,以减少join的数据量

分区

分桶

动态分区

当插入分区表时,一般需要在sql静态指明分区字段的值,否者插入会失败

可以通过设置动态设置分区字段通过select的方法

hive.exec.dynamic.partition=true#开启动态分区
hive.exec.dynamic.partition.mode=nonstrict#开启非严格模式,严格模式为至少指定一个分区为非静态分区
hive.exec.max.dynamic.partitions=1000#在所有mr节点最多可以创建多少个动态分区
hive.exec.max.dynamic.partitions.pernode=100#在每个mr节点,最多可以创建多少个动态分区
hive.exec.max.created.file=10000#在mr job中最大可以创建多少个hdfs文件
hive.error.on.empty.partition=false#当分区为空时,是否报错

insert overwrite table tableName partition(p_time)
select id,name,p_time from tableName2;

Map数量

作业一般通过input目录产生一个或多个Map,主要决定因素有input文件总个数,文件大小,集群设置的文件块大小

是不是map越多越好

不是,如果一个任务处理很多小文件,启动job的时间都大于处理文件的时间,则会造成很大的浪费

是不是保证每个map处理128m的文件块就好

不是,如果文件每行需要的计算量非常大,如果还是按照128m来分的话,则有可能job的运行时间会超长

mapreduce.input.fileinputformat.split.maxsize=100;
mapreduce.input.fileinputformat.split.minsize=10;
#通过设置maxsize小于blocksize就可以把文件拆开

reduce数

reduce处理的数据量默认时256m

hive.exec.reducers.bytes.per.reducer=256000000;

每个任务最大的reduce数默认1009

hive.exec.reducers.max=1009

设置reducer数

mapreduce.job.reducer=10;

有多少个reducer就会生成多少个结果文件,太多的reducer会消耗时间和资源,过多的结果文件会造成小文件

小文件合并

通过设置,可以在map前先把小文件合并,以减少map的数量

hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat

并行执行

hive的一个查询有可能有很多个阶段,而某些阶段有可能并没有依赖关系,此时可以开启并行模式来执行任务

hive.exec.parallel=true;
hive.exec.parallel.thread.number=16;

严格模式

hive.mapred.mode=strict/nonstrict
#不能查询笛卡儿积
#不能查询分区表而没有过滤分区条件
#不能比较整形和字符串
#不能比较整形和浮点
#不能使用orderby而没有limit

JVM重用

Hadoop可以用一个JVM在同一个Job内重用N次,JVM启动的开销比较大,所以重用可以很好的提高性能

mapreduce.job.jvm.numtasks
10

推测执行

由于数据倾斜的问题,会导致hadoop的负载不均衡,某些job要很久也完成不了,造成任务都阻塞在最后一个job那里,hadoop通过推测机制,为这样的任务启动一个备份任务,让备份任务执行同一个数据,两个任务谁先完成计算,谁的结果就会成为最终结果

mapreduce.map.speculative
true/false
上一篇下一篇

猜你喜欢

热点阅读