hive
Hive 是一个SQL 解析引擎,将SQL语句转译成MR Job,然后再hadoop上运行,达到快速
mysql是存放数据的,而hive是不存放数据的,hive的表是纯逻辑表,只是表的定义,即表的元数据,实际数据在hadoop的磁盘上
Hive的内容是读多写少,不支持对数据的改写和删除,要删除只能把整个表drop掉
当需要导入到hive中的数据,文本中包含'\n',就会以'\n'换行,导致数据串行。
怎么办?
hive的mapreduce
select word, count(*)
from (
select explode(split(sentence,' ')) as word from article_1
) t
group by word
解释:
select explode(split(sentence,' ')) as word from article: 做map操作
explode():这个函数的功能就是行转列
split(sentence,' '):将sentence这个字段里面的内容以空格分割开,返回的是单词的数组
as word 表示新生成的列名字叫做word
t: 新生成的表的别名,新生成的表是临时表【语法是from后面要接一个表】
select word, count(*)
from () t
group by word
--
group by word: 对word做聚合,reduce 的过程
count(*): 求和
测试:
select explode(split(sentence,' ')) as word from article limit 30
select word, count(1) as cnt
from (
select explode(split(sentence,' ')) as word from article
) t
group by word
Hive体系架构
数据存储:
hive数据以文件形式存储在HDFS的指定目录下
hive语句生成查询计划,由mapreduce调用执行
语句转换
解析器:生成抽象语法树
语法分析器:验证查询语句
逻辑计划生成器(包括优化器):生成操作符树
查询计划生成器:转换为map-reduce任务
用户接口
CLI:启动的时候,会同时启动一个Hive的副本
JDBC:Hive的客户端,用户连接至Hive Server
WUI:通过浏览器访问Hive
hive的表的本质就是hadoop的目录
hive创建表的方式:
创建内部表:create table 内部表
创建外部表:create external table location 'hdfs_path' 必须是文件夹路径
在导入数据到外部表,数据并没有移动到自己的数据仓库目录下,也就是说外部表的数据并不是由它自己来管理的,而内部表不一样
在删除表的时候,hive将会把属于表的元数据和数据全部删除;而删除外部表的时候,hive仅仅删除外部表的元数据,数据是不会删除的
- ============================
实战部分
查看数据库
show databases;
查看表
show tables;
创建数据库 user_base_1:
CREATE DATABASE IF NOT EXISTS user_base_1;
hive的mapreduce:
代码:
select word, count(1) as cnt
from (
select explode(split(sentence,' ')) as word from article
) t
group by word
order by cnt desc
limit 100
说明:
1. order by 排序,因为是全局排序,所以只能在一个reduce里面跑
2. order by 是一个任务,所以上面的代码会启动两个Job,第一个Job有一个map一个reduce,第二个Job只有一个reduce
3. 而且会有依赖,必须等第一个Job结束之后才有第二个Job执行
SQL的成本很低,而且在大公司一般都有一个内部使用的web界面,直接在上面写SQL语句就可以了,而且还带提示的,特别方便,用习惯了hive之后,再写python的mapreduce表示回不去了。
SQL是锻炼数据思维、数据处理的能力,需要经常练习。
Hive的SQL可扩展性高,支持UDF/UDAF/UDTF,支持用户自定义的函数方法。
hive的架构:
类比于执行一个C程序
首先编译检查语法是否有问题,检查hive需要调取的那些元数据是否有问题,然后将hive的代码转化为mapreduce的任务,然后在hadoop执行任务,最后生成结果数据。
-
分区 partition
hive表名就是文件夹,好处:根据时间、日期做partition,每天一个partition,每天的数据会存放到一个文件夹里面,相当于将数据按日期划分。
如果只想要查询昨天的数据,只需用对应查询昨天日期的文件夹下的数据 -
分桶 bucket
10bucket 把数据划分10份, 1/10 只需要拿一份,但是因为通过shuffle过程分的,所以可能数量上不是很准
建表,只是建立元数据信息+hdfs目录下给一个表名文件夹,里面是没有数据的
create table article(sentence string)
row format delimited fields terminated by '\n';
从本地导入数据,相当于将path数据 类似于 hadoop fs -put /hive/warehouse/badou.db
load data local inpath 'localpath' into table article;
查看数据:
select * from article limit 3;
查看hadoop中的数据:
hadoop fs -ls /usr/local/src/apache-hive-1.2.2-bin/warehouse/badou.db/article_1
-rwxr-xr-x 3 root supergroup 632207 2019-03-15 22:27 /usr/local/src/apache-hive-1.2.2-bin/warehouse/badou.db/article_1/The_Man_of_Property.txt
外部表
create external table article_2(sentence string)
row format delimited fields terminated by '\n'
stored as textfile #存储成为文本形式
location '/data/ext';
badou.db目录下没有新建的外部表数据(因为是外部表数据)
外部数据源数据未发生变化
drop table article_1;
--发现数据原信息被删除了,但是在hdfs路径下的/data/ext的数据还存在,类似于软链接
partition 建表
create table art_dt(sentence string)
partitioned by (dt string)
row format delimited fields terminated by '\n';
从hive表中的数据插入到新表(分区表)中:从article表中取100条数据插入到art_dt表中
insert overwrite table art_dt partition(dt='20190329')
select * from article limit 100;
hdfs的hive目录下对应数据库中:badou.db/art_dt/dt_20190329
select * from art_dt limit 10;
分析:这个查找是一个全量的查找,相当于查找这个表下面的全量的分区,举个例子:如果只有两个分区的话,等价于:
select * from art_dt where dt between '20190328' and '20190329' limit 10;
如果表的分区数特别多的话,查找就会很慢很慢。
如果知道在哪个分区,直接去那个分区找,查询的效率就会特别高。
select * from art_dt where dt between '20190328' and '20190329' limit 10;
-
partition实际是怎么产生的?用在什么数据上?
每天都会产生用户浏览、点击、收藏、购买的记录。
按照每天的方式去存储数据,按天做partition
--
根据数据来源区分,app/m/pc
例如:logs/dt=20190329/type=app
logs这张表,在20190329这个日期,app端的log数据存放路径
logs/dt=20190329/type=app
logs/dt=20190329/type=m
logs/dt=20190329/type=pc
--
数据量太大的情况下,除了按照天划分数据,还可以按照三端的方式划分数据 -
数据库 存放数据:用户的属性,年龄,性别,blog等等
每天都会有新增用户,修改信息 dt=20190328 dt=20190329 大量信息太冗余了
解决方法:
overwrite 7 每天做overwrite dt=20190328 这天中的信息包含这天之前的所有用户信息(当天之前所有的全量数据)
存7个分区,冗余7份,防止丢失(不是防止机器挂掉了丢失数据,而是防止误操作导致的数据丢失,这个锅很大,背不起),也会有冗余,但是只冗余7份,每天删除7天前的数据。
- 分桶 bucket
create table udata(
user_id string,
item_id string,
rating string,
`timestamp` string
) row format delimited fields terminated by '\t';
load data local inpath '/home/badou/Documents/u.data' into table udata;
# 设置打印列名
set hive.cli.print.header=true;
- bucket
hive中的table可以拆分成partition,table和partition可以通过‘CLUSTERED BY’ 进一步分bucket, bucket中的数据可以通过‘sort by’排序。
sort by 是分桶内的排序,order by 是全局排序。
作用:数据sampling 数据采样
#建表
create table bucket_users (
user_id int,
item_id string,
rating string,
`timestamp` string
) clustered by(user_id) into 4 buckets;
#插入数据
#因为需要分成4个桶,需要设置强制分桶,否则会根据处理的数据量,只会启用一个reduce
set hive.enforce.bucketing = true;
insert overwrite table bucket_users
select cast(user_id as int ) as user_id, item_id, rating, `timestamp` from udata;
#查看结果:可以看到4个分桶的表
$ hadoop fs -ls /usr/local/src/apache-hive-1.2.2-bin/warehouse/badou.db/bucket_users
-rwxr-xr-x 3 root supergroup 466998 2019-03-29 09:06 /usr/local/src/apache-hive-1.2.2-bin/warehouse/badou.db/bucket_users/000000_0
-rwxr-xr-x 3 root supergroup 497952 2019-03-29 09:06 /usr/local/src/apache-hive-1.2.2-bin/warehouse/badou.db/bucket_users/000001_0
-rwxr-xr-x 3 root supergroup 522246 2019-03-29 09:06 /usr/local/src/apache-hive-1.2.2-bin/warehouse/badou.db/bucket_users/000002_0
-rwxr-xr-x 3 root supergroup 491977 2019-03-29 09:06 /usr/local/src/apache-hive-1.2.2-bin/warehouse/badou.db/bucket_users/000003_0
#采样 sampling
tablesample() 函数
格式:tablesample(bucket x out of y)
比如:有32个桶,bucket 3 out of 16,意思就是32/16=2,取两个桶的数据,从第三个桶开始算起,3%16=3,19%16=3,最终结果就是取第3个桶和第19个桶的数据,这样就达到了采样的目的。
#查看任意一个bucket的数据
select * from bucket_users tablesample(bucket 1 out of 4 on user_id);
#计算任意一个bucket有多少数据
select count(*) from bucket_users tablesample(bucket 1 out of 4 on user_id);
结果:23572 (总数是100000条)
select count(*) from bucket_users tablesample(bucket 2 out of 4 on user_id);
结果:25159 (总数是100000条)
分桶是进行了partition的过程,分的不是特别精确。
#采样数据,插入到新创建表中
$ create table tmp as select * from bucket_users tablesample(bucket 1 out of 4 on user_id);
- hive join in MR
# 订单商品的历史行为数据
create table order_product_prior(
order_id string,
product_id string,
add_to_cart string, #加购物车
reordered string #重复购买
) row format delimited fields terminated by ',';
load data local inpath '/home/badou/Documents/data/order_data/order_products__prior.csv' into table order_product_prior;
#订单表
# order_number 订单购买顺序
# eval_set 标志是训练集还是测试集
# order_dow dow day of week 那天买的
# order_hour_of_day 一天中什么时候下的订单
# days_since_prior_order 距离上一个订单多久了
create table orders (
order_id string,
user_id string,
eval_set string,
order_number string,
order_dow string,
order_hour_of_day string,
days_since_prior_order string
) row format delimited fields terminated by ',';
load data local inpath '/home/badou/Documents/data/order_data/orders.csv' into table orders;
$ select * from order_product_prior limit 10;
order_id product_id add_to_cart_order reordered
2 33120 1 1
2 28985 2 1
2 9327 3 0
2 45918 4 1
2 30035 5 0
2 17794 6 1
2 40141 7 1
2 1819 8 1
2 43668 9 0
$ select * from orders limit 10;
order_id user_id eval_set order_number order_dow order_hour_of_day days_since_prior_order
2539329 1 prior 1 2 08
2398795 1 prior 2 3 07 15.0
473747 1 prior 3 3 12 21.0
2254736 1 prior 4 4 07 29.0
431534 1 prior 5 4 15 28.0
3367565 1 prior 6 2 07 19.0
550135 1 prior 7 1 09 20.0
3108588 1 prior 8 1 14 14.0
2295261 1 prior 9 1 16 0.0
需求:统计每个用户购买过多少商品
1. 每个订单的商品数量【订单中的商品数量】
select order_id, count(1) as prod_cnt
from order_product_prior
group by order_id
order by prod_cnt desc
limit 30;
2. user - 产品数量的关系
将每个订单的数量带给user join
table1: order_id prod_cnt
table2: order_id user_id
table1 + table2 => order_id, user_id, prod_cnt
-- 这个用户在这个订单中购买了多少商品prod_cnt
select
t2.order_id as order_id,
t2.user_id as user_id,
t1.prod_cnt as prod_cnt
from orders t2
join
(select order_id, count(1) as prod_cnt
from order_product_prior
group by order_id) t1
on t2.order_id=t1.order_id
limit 30;
3. 这个用户所有订单的商品总和
select
user_id,
sum(prod_cnt) as sum_prod_cnt
from
(select
t2.order_id as order_id,
t2.user_id as user_id,
t1.prod_cnt as prod _cnt
from orders t2
join
(select order_id, count(1) as prod_cnt
from order_prodct_prior
group by order_id) t1
on t2.order_id=t1.order_id) t12
group by user_id
order by sum_prod_cnt desc
limit 30;
简写:
select x from (select x from t1) join (select x from t2) on x
group by x
order by x
limit n
写sql,上千行的都有😕
这才哪到哪🙃
- hive优化
合并小文件,减少map数?
适当增加map数?
set mapred.map.tasks = 10;
map的优化主要是在文件数量上的优化,遇到的比较少,主要还是在reduce上的优化,比如最重要的数据倾斜。
- 设置reduce任务处理的数据量
hive.exec.reduceers.bytes.per.reducer - 调整reduce的个数
设置reducer处理的数量
set mapred.reduce.tasks=10 - 一个reduce的情况
全局排序的话,在一个reduce里面进行
笛卡尔积:
select
t1.u1 as u1,
t2.u2 as u2
from
(select user_id as u1 from tmp) t1
join
(select user_id as u2 from tmp) t2;
笛卡尔积会使得数据增加得特别快,需要尽量避免,笛卡尔积是在一个reduce里面进行的。