Hive学习笔记

2018-08-21  本文已影响0人  卡卡xx

hive简介

解释一:Hive是一个数据仓库基础工具在Hadoop中用来处理结构化数据。它架构在Hadoop之上,总归为大数据,并使得查询和分析方便。并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行

解释二:Hive 是建立在 Hadoop 上的数据仓库基础构架。它提供了一系列的工具,可以用来进行数据提取转化加载(ETL),这是一种可以存储、查询和分析存储在 Hadoop 中的大规模数据的机制。Hive 定义了简单的类 SQL 查询语言,称为 QL,它允许熟悉 SQL 的用户查询数据。同时,这个语言也允许熟悉 MapReduce 开发者的开发自定义的 mapper 和 reducer 来处理内建的 mapper 和 reducer 无法完成的复杂的分析工作。

通俗来讲就是传统mysql无法处理大数据,而大数据文件系统HDFS不能使用SQL,Hive就是一种可以用类SQL语句对大数据文件系统中的结构化数据进行操作的工具

Hive的系统架构

Hive系统架构图

(JobTracker是Hadoop1.x中的ResouceManager + AppMaster
TaskTracker相当于NodeManager + YarnChild)

Hive和普通关系数据库的异同

  1. 查询语言。由于SQL被广泛的应用在数据仓库中,因此,专门针对Hive的特性设计了类SQL的查询语言HQL。熟悉SQL开发就很容易入手Hive开发。
  2. 数据仓库位置。Hive是建立在Hadoop之上的,所有Hive的数据都是存储在HDFS中的。而数据库则可以将数据保存在块设备或者本地文件系统中。
  3. 数据格式。 Hive中没有定义专门的数据格式,数据格式可以由用户指定,用户定义数据格式需要指定三个属性,列分隔符(通常为空格)、行分隔符(“\n”)以及读取文件数据的方法(Hive中默认有三个文件格式TextFile,SequenceFile以及RCFile)。由于在加载数据的过程中,不需要从用户数据格式到Hive定义的数据格式的转换,因此,Hive在加载的过程中不会对数据本身进行任何修改,而只是将数据内容复制或者移动到相应的HDFS目录中。
  4. 数据更新。由于Hive是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive中不支持对数据的改写和添加,所有的数据都是在加载的时候中确定好的。而数据库中的数据通常是需要经常进行修改的,因此可以使用 INSERT INTO … VALUES 添加数据,使用 UPDATE … SET修改数据。
  5. 索引。Hive在加载数据的过程中不会对数据进行任何处理,甚至不会对数据进行扫描,因此也没有对数据中的某些Key建立索引。Hive要访问数据中满足条件的特定值时,需要暴力扫描整个数据,因此访问延迟较高。由于 MapReduce 的引入, Hive 可以并行访问数据,因此即使没有索引,对于大数据量的访问,Hive 仍然可以体现出优势。数据库中,通常会针对一个或者几个列建立索引,因此对于少量的特定条件的数据的访问,数据库可以有很高的效率,较低的延迟。由于数据的访问延迟较高,决定了 Hive 不适合在线数据查询。
  6. 执行。Hive中大多数查询的执行是通过 Hadoop 提供的 MapReduce 来实现的(类似 select * from tbl的查询不需要MapReduce)。而数据库通常有自己的执行引擎。
  7. 执行延迟。Hive 在查询数据的时候,由于没有索引,需要扫描整个表,因此延迟较高。另外一个导致 Hive 执行延迟高的因素是 MapReduce框架。由于MapReduce 本身具有较高的延迟,因此在利用MapReduce 执行Hive查询时,也会有较高的延迟。相对的,数据库的执行延迟较低。当然,这个低是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时 候,Hive的并行计算显然能体现出优势。
  8. 可扩展性。由于Hive是建立在Hadoop之上的,因此Hive的可扩展性是和Hadoop的可扩展性是一致的。而数据库由于 ACID 语义的严格限制,扩展行非常有限。目前最先进的并行数据库 Oracle 在理论上的扩展能力也只有100台左右。
  9. 规模。由于Hive建立在集群上并可以利用MapReduce进行并行计算,因此可以支持很大规模的数据;对应的,数据库可以支持的数据规模较小。

Hive数据类型

在对Hive进行操作之前,首先要明白Hive数据类型有哪些。

Hive实践操作

配置Hive

上一阶段配置CDH的时候就一起配置了Hive,也可以使用Apache Hadoop单独配置
这里出现一个问题,hive启动不起来,一直等待在界面没反应,去master:10002查看了一下hive服务,发现无响应,说明hive没有启动起来,所以去管理界面查看情况,运行状态不良

想了一下,这个问题可能是有几次强制关机造成的,只有重新安装CDH。所以千万不能强制关掉CDH集群,因为这样会导致各种各样的玄学问题。。。

建立内表、外表

  1. 在导入数据到外部表,数据并没有移动到自己的数据仓库目录下(如果指定了location的话),也就是说外部表中的数据并不是由它自己来管理的!而内部表则不一样;
  2. 在删除内部表的时候,Hive将会把属于表的元数据和数据全部删掉;而删除外部表的时候,Hive仅仅删除外部表的元数据,数据是不会删除的!
  3. 在创建内部表或外部表时加上location 的效果是一样的,只不过表目录的位置不同而已,加上partition用法也一样,只不过表目录下会有分区目录而已,load data local inpath直接把本地文件系统的数据上传到hdfs上,有location上传到location指定的位置上,没有的话上传到hive默认配置的数据仓库中。

数据选择Sogou用户查询语料信息,下载之后由于编码有问题,还需要进行编码转为为utf8。
iconv -f gbk -t utf8 -c SogouQ.sample > SogouQ.sample2
因为外表建立需要明确行列分隔符,而数据当中存在列分隔符不一致的情况,需要利用sed命令进行转换。

替换前 使用命令cat SogouQ.sample2 | sed -e 's/ /\t/g' >>Sogou可以将空格替换为\t,替换规则为sed 's/要被取代的字串/新的字串/g'再配合管道和追加到Sogou文件中就完成了替换。
替换后

这里指定了外部表的具体存储位置为/tmp/sogou文件夹,方便以后查看。由于原始数据里行分隔符为\t,所以创建时要匹配为\t。

外表和内表都导入成功了,那么来看一下两个表在hdfs中存储位置有什么区别。

hive sql

hive的大多数查询都会转化为MapReduce程序,可以导master:8088查看相应的MapReduce程序。


  1. select后面的非聚合列必须出现在group by中(比如上面的rank)
  2. 除了普通列就是一些聚合操作(比如上面的count(rank))
select col,col2...  
from tableName  
where condition  
order by col1,col2 [asc|desc] 
  1. order by后面可以有多列进行排序,默认按字典排序。
  2. order by为全局排序。
  3. order by需要reduce操作,且只有一个reduce,无法配置(因为多个reduce无法完成全局排序)。
    select * from sogou order by rank desc limit 5;获取排名前5的所有信息

表示将rank哈希后(这里并不是简单的把rank相同的数据划分为一个cluster,而是对rank值哈希后再对cluster个数进行求余)相同的数据全部送到一个reducer进行处理,这就是因为指定了distribute by rank,再配合sort by这样的话就可以统计出每个rank排名中所有数据的时间顺序了。这里需要注意的是distribute by必须要写在sort by之前。

注意被cluster by指定的列只能是升序,不能指定asc和desc。

Java、Python调用Hive

和下面Python调用一样,只需要下载相应的jar,跟着官方文档即可。

pyhive

首先安装连接hive需要的依赖

pip install sasl
pip install thrift
pip install thrift-sasl
pip install PyHive

然后就可以按照固定的格式开启连接了

from pyhive import hive

conn = hive.Connection(host='192.168.233.100',port=10000,username='root',database='mytest')
cursor = conn.cursor()
cursor.execute('select * from sogou limit 10')
for result in cursor.fetchall():
    print(result)

但是报错了
thrift.transport.TTransport.TTransportException: Could not start SASL: b'Error in sasl_client_start (-4) SASL(-4): no mechanism available: Unable to find a callback: 2'
查看hive的日志
java.lang.RuntimeException: org.apache.thrift.transport.TSaslTransportException: No data or no sasl data in the stream
网上找了一下也没什么好的解决方案,看这个报错信息应该是windows系统底层的问题,所以换到centos里执行刚刚的python程序。
成功连接hive并得到查询结果。

Java Hive

UDF、UDAF、UDTF

Hive中,除了提供丰富的内置函数之外,还允许用户使用Java开发自定义的UDF函数。UDF有几种:

开发自定义UDF函数有两种方式
继承org.apache.hadoop.hive.ql.exec.UDF
继承org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
如果是针对简单的数据类型(比如String、Integer等)可以使用UDF,如果是针对复杂的数据类型(比如Array、Map、Struct等),可以使用GenericUDF,另外,GenericUDF还可以在函数开始之前和结束之后做一些初始化和关闭的处理操作。

UDF

使用自定义UDF时,只需要继承org.apache.hadoop.hive.ql.exec.UDF,并定义public Object evaluate(Object args) {} 方法即可。例如:

package li;


import org.apache.hadoop.hive.ql.exec.UDF;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public class HashMd5 {
    public String evaluate(String password) {
        try {
            // 得到一个信息摘要器
            MessageDigest digest = MessageDigest.getInstance("md5");
            byte[] result = digest.digest(password.getBytes());
            StringBuffer buffer = new StringBuffer();
            // 把每一个byte 做一个与运算 0xff;
            for (byte b : result) {
                // 与运算
                int number = b & 0xff;// 加盐
                String str = Integer.toHexString(number);
                if (str.length() == 1) {
                    buffer.append("0");
                }
                buffer.append(str);
            }

            // 标准的md5加密后的结果
            return buffer.toString();
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
            return "";
        }

    }
}

evaluate方法就是要进行处理的函数内容。这个函数的意思是将输入的内容转为它的MD5值。
用IDEA把刚刚写好的Java文件打成jar包(这里特别需要注意编译成jar包的环境JDK要和hive平台的JDK一致,所以我把JDK变为了1.6)


然后就可以在hive交互界面使用命令add jar /usr/tmp/myudf.jar
这里的路径是把jar包存放在linux文件下的路径。
接着生成函数方法
create temporary function md5 as 'li.HashMd5';
因为我只是为了测试,所以用的temporary,表示该方法的作用时间只是这次session,as后面跟着的是你写入自定义函数的类的完整类名
随后就可以写测试hive sql来看看是否成功了。
select rank,md5(rank) from sogou limit 10;
这里第一次报错了
Permission denied: user=root, access=WRITE, inode="/user":hdfs:supergroup:dr
错误显示想要访问/user下的内容,但是权限不够,因为我是在root用户下进入的hive交互界面,所以有些文件不能执行操作,所以切换至hdfs用户就可以了。

这里可以看到同样的值得到了同样的MD5值,所以验证成功。

UDAF

UDAF 接受多个输入数据行,并产生一个输出数据行。
一个计算函数必须实现以下5个方法:

package li;


import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.FloatWritable;

@SuppressWarnings("deprecation")
public class Mean extends UDAF {
    public static class MeanFloatUDAFEvaluator implements UDAFEvaluator {
        public static class PartialResult {
            float sum;
            long count;
        }

        private PartialResult partialresult;

        @Override
        public void init() {
            // TODO Auto-generated method stub
            partialresult = null;
        }

        public boolean iterate(FloatWritable value) {
            if (value == null) {
                return true;
            }
            if (partialresult == null) {
                partialresult = new PartialResult();
            }

            partialresult.sum += value.get();
            ++partialresult.count;
            return true;
        }

        public PartialResult terminatePartial() {
            return partialresult;
        }

        public boolean merge(PartialResult other) {
            if (other == null) {
                return true;
            }
            if (partialresult == null) {
                partialresult = new PartialResult();
            }

            partialresult.sum += other.sum;
            partialresult.count += other.count;

            return true;
        }

        public FloatWritable terminate() {
            if (partialresult == null) {
                return null;
            }
            return new FloatWritable(partialresult.sum / partialresult.count);
        }
    }
}

然后添加jar和临时方法
add jar /usr/tmp/myudf.jar
create temporary function my_mean as 'li.Mean';
可以再show functions里面看到自己的方法了。

分区、分桶

Hive使用select语句进行查询的时候一般会扫描整个表内容,会消耗很多时间做没必要的工作。Hive可以在创建表的时候指定分区空间,这样在做查询的时候就可以很好的提高查询的效率。
Hive中的每个分区对应数据库中相应分区列的一个索引,每个分区对应着表下的一个目录,在HDFS上的表现形式与表在HDFS上的表现形式相同,都是以子目录的形式存在。

 create table t1(
    > id int,
    > name string
    > )
    > partitioned by (pt varchar(8))
    > ;

这里表示将字段pt作为分区的目标字段

1,a
2,b
3,c

load data local inpath '/usr/tmp/data' overwrite into table t1 partition (pt='001');
所有数据都被装载到分区001中去了;再次将数据装载进分区002中去。
load data local inpath '/usr/tmp/data' overwrite into table t1 partition (pt='002');


通过这里可以看出成功将数据装载进了不同的分区,下面再看一下在HDFS中分区是如何实现的。
进入到hive默认存储地址/user/hive/warehouse

在这里我们可以清晰的看到hive首先为这个数据库创建了一个文件夹,然后为每一个分区创建一个子目录,将属于各自分区的数据就存放在各自分区的目录下。

也可以查看有哪些分区和删除指定分区(会同时删除分区中数据)

hive>partitions t1;
OK
pt=001
pt=002
hive> alter table t1 drop partition(pt=001);
Dropped the partition pt=001
OK

对于每一个表或者是分区,Hive可以进一步组织成,也就是说桶是更为细粒度的数据范围划分。Hive是针对某一列进行分桶。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶中。分桶的好处是可以获得更高的查询处理效率。使取样更高效。分桶比分区,更高的查询效率。

create table bucket_test(
     id int,
     name string
     )
     clustered by(id) into 4 buckets
     row format delimited fields terminated by '\t'
     sorted as textfile;

装入数据
load data local inpath '/usr/tmp/data' overwrite into table bucket_test;


数据成功装入后,就去目录中看一看桶的结构。发现分桶没有成功。查了一下资料,说是设置hive分桶为true
set hive.enforce.bucketing = true;
但是还是不行。仔细思考了一下,应该是导入数据的方式有问题,我这里用load导入数据使得hive可能不会逐一检索数据并根据哈希值存入桶,应该用insert方式导入数据,hive才会逐一根据数据指定列的哈希值存入不同的桶。
insert overwrite table bucketed_user
    > select id,name from t1;

这样就将数据库分为了4个桶。仔细观察会发现每个桶不是文件夹,而是文件,这点和分区不一样。当从桶表中进行查询时,hive会根据分桶的字段进行计算分析出数据存放的桶中,然后直接到对应的桶中去取数据,这样做就很好的提高了效率。

上一篇 下一篇

猜你喜欢

热点阅读