大数据

使用Python操作Hive

2018-10-28  本文已影响59人  9c46ece5b7bd

Python操作Hive

注意:想要使用hive,必须要有一个可用的hive集群,同时为了保证可用使用API操作hive,我们需要要求提供hiveserver2服务

假设我们的hiveserver2地址为10.0.1.18:10000,且用户为hdfs.使用PyHive库链接Hive.

安装pyhive模块

# 过程中可能需要依赖sasl,thrift等相关服务,如有需要可以使用系统的包管理器安装(apt-get或yum)
pip install sasl thrift thrift-sasl PyHive

Python链接Hive以及基本使用

$ cat pytest_hive.py
# 导入hive模块
from pyhive import hive

# 获取一个hive链接对象(链接到HiveServer2上)
## Connection类的__init__方法:__init__(self, host=None, port=None, username=None, database=u'default', auth=None, configuration=None, kerberos_service_name=None, password=None, thrift_transport=None)
hiveconn = hive.Connection(host='10.0.1.18', port=10000, username='hdfs', database='aiops')

# 使用连接的cursor()方法获取一个游标对象
hivecur = hiveconn.cursor()

# 使用游标对象的execute()方法进行执行hivesql语句
## execute(self, operation, parameters=None, **kwargs)
hivecur.execute("show databases")
## executemany(self, operation, seq_of_parameters) method of pyhive.hive.Cursor instance 参数是一个序列
hivecur.executemany()

# 使用游标对象的fetch类方法获取执行结果(fetchone和fetchall以及fetchmany)
onedata = hivecur.fetchone()
alldata = hivecur.fetchall()
## fetchmany(self, size=None) method of pyhive.hive.Cursor instance
manydata = hivecur.fetchmany()

# 关闭cursor游标对象和conn连接对象
hivecur.close()
hiveconn.close()

# hive的回滚操作
hiveconn.rollback()

尝试用python脚本进行数据库操作

1. 数据库查询操作

# 首先我们使用pyhive库链接hive并查看指定数据库下的表
$ cat pyhive_test.py
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyhive import hive

class hiveObj:
    def __init__(self,host,user,dbname=u'default',port=10000):
        self.host = host
        self.dbname = dbname
        self.user = user
        self.port = port
    def hiveConIns(self):
        conn = hive.Connection(host=self.host, port=self.port, username=self.user, database=self.dbname)
        return conn
    #通常查询个别数量的数据建议在sql中进行优化,可以仅使用cursor的fetchall()方法进行批量操作
    def querydata(self,sql,args=None):
        conn = self.hiveConIns()
        cur = conn.cursor()
        cur.execute(sql,args)
        alldata = cur.fetchall()
        cur.close()
        #cur.fetch类方法返回一个[tuple,tuple]
        for data in alldata:
            print(data)
        conn.close()

if __name__ == '__main__':
    #默认database为default,默认port为10000
    hiveobj = hiveObj("10.0.1.18","hdfs")
    #查询数据
    sql = '''show tables'''
    hiveobj.querydata(sql)

$ python pyhive_test.py
(u'asset',)


2. 数据库更新操作

思考:其实数据库可以分为两种操作(读和写),一种为单纯的查询操作,不会对库表结构和数据造成变更,也即为读操作;另外一种为写操作,会对库表结构和数据造成的变更操作,也即写操作.

# 给我们的hiveObj类增加一个写数据操作方法
$ cat pyhive_test.py
....
....
    def changedata(self,sql,args=None):
        conn = self.hiveConIns()
        cur = conn.cursor()
        try:
            #做一个粗暴的判断当args是list时就进行批量插入
            if isinstance(args,list):
                #executemany(sql,args)方法args支持tuple或者list类型
                cur.executemany(sql,args)
            else:
                #execute(sql,args)方法args支持string,tuple,list,dict
                cur.execute(sql,args)
            conn.commit()
        except Exception as e:
            #因为hive不支持事务,因此虽然提供了rollback()但是是没用的
            #conn.rollback()
            print(e)
        finally:
            cur.close()
            conn.close()



# 使用创建表来模拟库表变更(实际上库的变更操作应该由专业的管理员进行审核后操作)
if __name__ == '__main__':
    #默认database为default,默认port为10000
    hiveobj = hiveObj("10.0.1.18","hdfs")
    #查询数据
    sql = '''show tables'''
    hiveobj.querydata(sql)

    #hive库表变更操作
    tabledesc = '''
     create table appinfo (
        appname string,
        level string,
        leader string,
        dep string,
        ips  array<string>)
     ROW FORMAT DELIMITED
     FIELDS TERMINATED BY '|'
     COLLECTION ITEMS TERMINATED BY ','
    '''
    print("creating a table....")
    hiveobj.changedata(tabledesc)
    hiveobj.querydata(sql)

$ python pyhive_test.py
(u'asset',)
creating a table....
(u'appinfo',)
(u'asset',)

3. 进行数据加载和读取操作

注意:上面其实我们已经封装了两个抽象的读写方法,可以对hive表进行数据加载和读取操作了

# 假如我们的hdfs上已经存在一份如下结构化的数据
$ hdfs dfs -cat /ips.txt;
data-web|p0|bgbiao|ops1|10.0.0.1,10.0.0.2
data-api|p0|biaoge|sre1|192.168.0.1,192.168.0.2
data-models|p1|xxbandy|sre1|10.0.0.3,192.168.0.3

$ cat pyhive_test.py
...
...
if __name__ == '__main__':
        #首先进行将hdfs中的数据加载到appinfo表中,加载完成后查询appinfo表
    sql1 = "load data  inpath 'hdfs://hdfs-name/ips.txt' overwrite into table appinfo"
    hiveobj.changedata(sql1)
    hiveobj.querydata('select * from appinfo')

$ python pyhive_test.py
(u'data-web', u'p0', u'bgbiao', u'ops1', u'["10.0.0.1","10.0.0.2"]')
(u'data-api', u'p0', u'biaoge', u'sre1', u'["192.168.0.1","192.168.0.2"]')
(u'data-models', u'p1', u'xxbandy', u'sre1', u'["10.0.0.3","192.168.0.3"]')

# 接下来我们对上述表进行一个拆分查询
$ cat pyhive_test.py
...
...
if __name__ == '__main__':
    #对array对象中的元素进行遍历查询
    sql = "select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    hiveobj.querydata(sql)

# 这样子我们就知道每个ip对应的关联关系了
$ python pyhive_test.py
(u'10.0.0.1', u'data-web', u'bgbiao', u'ops1')
(u'10.0.0.2', u'data-web', u'bgbiao', u'ops1')
(u'192.168.0.1', u'data-api', u'biaoge', u'sre1')
(u'192.168.0.2', u'data-api', u'biaoge', u'sre1')
(u'10.0.0.3', u'data-models', u'xxbandy', u'sre1')
(u'192.168.0.3', u'data-models', u'xxbandy', u'sre1')

# 临时表的创建和使用
    #对array对象中的元素进行遍历查询[临时表的创建第一次必须使用create table name as select ],更新数据需要使用[insert into|overwrite table name select] into是追加数据,overwrite是覆盖数据
    #sql = "create  table tmpapp as select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    #sql = "insert into table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    sql = "insert overwrite table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    hiveobj.changedata(sql)
    hiveobj.querydata('select * from tmpapp limit 1')

4. 源码文件

$ cat pyhive_test.py
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyhive import hive

class hiveObj:
    def __init__(self,host,user,dbname=u'default',port=10000):
        self.host = host
        self.dbname = dbname
        self.user = user
        self.port = port
    def hiveConIns(self):
        conn = hive.Connection(host=self.host, port=self.port, username=self.user, database=self.dbname)
        return conn
    #通常查询个别数量的数据建议在sql中进行优化,可以仅使用cursor的fetchall()方法进行批量操作
    def querydata(self,sql,args=None):
        conn = self.hiveConIns()
        cur = conn.cursor()
        cur.execute(sql,args)
        alldata = cur.fetchall()
        cur.close()
        #cur.fetch类方法返回一个[tuple,tuple]
        for data in alldata:
            print(data)
        conn.close()
    #注意:hivesql的execute类方法的args是执行过程的参数,而不是sql的参数.比如cursor.execute('SELECT * FROM my_awesome_data LIMIT 10', async=True)表示异步执行
    def changedata(self,sql,args=None):
        conn = self.hiveConIns()
        cur = conn.cursor()
        try:
            #做一个粗暴的判断当args是list时就进行批量插入
            if isinstance(args,list):
                #executemany(sql,args)方法args支持tuple或者list类型
                cur.executemany(sql,args)
            else:
                #execute(sql,args)方法args支持string,tuple,list,dict
                cur.execute(sql,args)
            conn.commit()
        except Exception as e:
            #因为hive不支持事务,因此虽然提供了rollback()但是是没用的
            #conn.rollback()
            print(e)
        finally:
            cur.close()
            conn.close()

if __name__ == '__main__':
    #默认database为default,默认port为10000
    hiveobj = hiveObj("10.0.1.18","hdfs")
    '''
    #查询数据
    sql = "show tables"
    hiveobj.querydata(sql)

    #hive创建表
    tabledesc = "create table appinfo (appname string,level string,leader string,dep string,ips  array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' "
    print("creating a table....")
    hiveobj.changedata(tabledesc)
    hiveobj.querydata(sql)

    #插入数据
    sql1 = "load data  inpath 'hdfs://hdfs-name/ips.txt' overwrite into table appinfo"
    hiveobj.changedata(sql1)
    hiveobj.querydata('select * from appinfo')
    '''
    #对array对象中的元素进行遍历查询[临时表的创建第一次必须使用create table name as select ],更新数据需要使用[insert into|overwrite table name select] into是追加数据,overwrite是覆盖数据
    #sql = "create  table tmpapp as select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    #sql = "insert into table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    sql = "insert overwrite table tmpapp select ip,appname,leader,dep from appinfo  LATERAL VIEW explode(ips) appinfo  AS ip"
    hiveobj.changedata(sql)
    hiveobj.querydata('select * from tmpapp limit 1')
上一篇下一篇

猜你喜欢

热点阅读