使用Python操作Hive
2018-10-28 本文已影响59人
9c46ece5b7bd
Python操作Hive
注意:想要使用hive,必须要有一个可用的hive集群,同时为了保证可用使用API操作hive,我们需要要求提供hiveserver2服务
假设我们的hiveserver2地址为10.0.1.18:10000
,且用户为hdfs
.使用PyHive库链接Hive.
安装pyhive模块
# 过程中可能需要依赖sasl,thrift等相关服务,如有需要可以使用系统的包管理器安装(apt-get或yum)
pip install sasl thrift thrift-sasl PyHive
Python链接Hive以及基本使用
$ cat pytest_hive.py
# 导入hive模块
from pyhive import hive
# 获取一个hive链接对象(链接到HiveServer2上)
## Connection类的__init__方法:__init__(self, host=None, port=None, username=None, database=u'default', auth=None, configuration=None, kerberos_service_name=None, password=None, thrift_transport=None)
hiveconn = hive.Connection(host='10.0.1.18', port=10000, username='hdfs', database='aiops')
# 使用连接的cursor()方法获取一个游标对象
hivecur = hiveconn.cursor()
# 使用游标对象的execute()方法进行执行hivesql语句
## execute(self, operation, parameters=None, **kwargs)
hivecur.execute("show databases")
## executemany(self, operation, seq_of_parameters) method of pyhive.hive.Cursor instance 参数是一个序列
hivecur.executemany()
# 使用游标对象的fetch类方法获取执行结果(fetchone和fetchall以及fetchmany)
onedata = hivecur.fetchone()
alldata = hivecur.fetchall()
## fetchmany(self, size=None) method of pyhive.hive.Cursor instance
manydata = hivecur.fetchmany()
# 关闭cursor游标对象和conn连接对象
hivecur.close()
hiveconn.close()
# hive的回滚操作
hiveconn.rollback()
尝试用python脚本进行数据库操作
1. 数据库查询操作
# 首先我们使用pyhive库链接hive并查看指定数据库下的表
$ cat pyhive_test.py
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyhive import hive
class hiveObj:
def __init__(self,host,user,dbname=u'default',port=10000):
self.host = host
self.dbname = dbname
self.user = user
self.port = port
def hiveConIns(self):
conn = hive.Connection(host=self.host, port=self.port, username=self.user, database=self.dbname)
return conn
#通常查询个别数量的数据建议在sql中进行优化,可以仅使用cursor的fetchall()方法进行批量操作
def querydata(self,sql,args=None):
conn = self.hiveConIns()
cur = conn.cursor()
cur.execute(sql,args)
alldata = cur.fetchall()
cur.close()
#cur.fetch类方法返回一个[tuple,tuple]
for data in alldata:
print(data)
conn.close()
if __name__ == '__main__':
#默认database为default,默认port为10000
hiveobj = hiveObj("10.0.1.18","hdfs")
#查询数据
sql = '''show tables'''
hiveobj.querydata(sql)
$ python pyhive_test.py
(u'asset',)
2. 数据库更新操作
思考:其实数据库可以分为两种操作(读和写),一种为单纯的查询操作,不会对库表结构和数据造成变更,也即为读操作;另外一种为写操作,会对库表结构和数据造成的变更操作,也即写操作.
# 给我们的hiveObj类增加一个写数据操作方法
$ cat pyhive_test.py
....
....
def changedata(self,sql,args=None):
conn = self.hiveConIns()
cur = conn.cursor()
try:
#做一个粗暴的判断当args是list时就进行批量插入
if isinstance(args,list):
#executemany(sql,args)方法args支持tuple或者list类型
cur.executemany(sql,args)
else:
#execute(sql,args)方法args支持string,tuple,list,dict
cur.execute(sql,args)
conn.commit()
except Exception as e:
#因为hive不支持事务,因此虽然提供了rollback()但是是没用的
#conn.rollback()
print(e)
finally:
cur.close()
conn.close()
# 使用创建表来模拟库表变更(实际上库的变更操作应该由专业的管理员进行审核后操作)
if __name__ == '__main__':
#默认database为default,默认port为10000
hiveobj = hiveObj("10.0.1.18","hdfs")
#查询数据
sql = '''show tables'''
hiveobj.querydata(sql)
#hive库表变更操作
tabledesc = '''
create table appinfo (
appname string,
level string,
leader string,
dep string,
ips array<string>)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
'''
print("creating a table....")
hiveobj.changedata(tabledesc)
hiveobj.querydata(sql)
$ python pyhive_test.py
(u'asset',)
creating a table....
(u'appinfo',)
(u'asset',)
3. 进行数据加载和读取操作
注意:上面其实我们已经封装了两个抽象的读写方法,可以对hive表进行数据加载和读取操作了
# 假如我们的hdfs上已经存在一份如下结构化的数据
$ hdfs dfs -cat /ips.txt;
data-web|p0|bgbiao|ops1|10.0.0.1,10.0.0.2
data-api|p0|biaoge|sre1|192.168.0.1,192.168.0.2
data-models|p1|xxbandy|sre1|10.0.0.3,192.168.0.3
$ cat pyhive_test.py
...
...
if __name__ == '__main__':
#首先进行将hdfs中的数据加载到appinfo表中,加载完成后查询appinfo表
sql1 = "load data inpath 'hdfs://hdfs-name/ips.txt' overwrite into table appinfo"
hiveobj.changedata(sql1)
hiveobj.querydata('select * from appinfo')
$ python pyhive_test.py
(u'data-web', u'p0', u'bgbiao', u'ops1', u'["10.0.0.1","10.0.0.2"]')
(u'data-api', u'p0', u'biaoge', u'sre1', u'["192.168.0.1","192.168.0.2"]')
(u'data-models', u'p1', u'xxbandy', u'sre1', u'["10.0.0.3","192.168.0.3"]')
# 接下来我们对上述表进行一个拆分查询
$ cat pyhive_test.py
...
...
if __name__ == '__main__':
#对array对象中的元素进行遍历查询
sql = "select ip,appname,leader,dep from appinfo LATERAL VIEW explode(ips) appinfo AS ip"
hiveobj.querydata(sql)
# 这样子我们就知道每个ip对应的关联关系了
$ python pyhive_test.py
(u'10.0.0.1', u'data-web', u'bgbiao', u'ops1')
(u'10.0.0.2', u'data-web', u'bgbiao', u'ops1')
(u'192.168.0.1', u'data-api', u'biaoge', u'sre1')
(u'192.168.0.2', u'data-api', u'biaoge', u'sre1')
(u'10.0.0.3', u'data-models', u'xxbandy', u'sre1')
(u'192.168.0.3', u'data-models', u'xxbandy', u'sre1')
# 临时表的创建和使用
#对array对象中的元素进行遍历查询[临时表的创建第一次必须使用create table name as select ],更新数据需要使用[insert into|overwrite table name select] into是追加数据,overwrite是覆盖数据
#sql = "create table tmpapp as select ip,appname,leader,dep from appinfo LATERAL VIEW explode(ips) appinfo AS ip"
#sql = "insert into table tmpapp select ip,appname,leader,dep from appinfo LATERAL VIEW explode(ips) appinfo AS ip"
sql = "insert overwrite table tmpapp select ip,appname,leader,dep from appinfo LATERAL VIEW explode(ips) appinfo AS ip"
hiveobj.changedata(sql)
hiveobj.querydata('select * from tmpapp limit 1')
4. 源码文件
$ cat pyhive_test.py
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyhive import hive
class hiveObj:
def __init__(self,host,user,dbname=u'default',port=10000):
self.host = host
self.dbname = dbname
self.user = user
self.port = port
def hiveConIns(self):
conn = hive.Connection(host=self.host, port=self.port, username=self.user, database=self.dbname)
return conn
#通常查询个别数量的数据建议在sql中进行优化,可以仅使用cursor的fetchall()方法进行批量操作
def querydata(self,sql,args=None):
conn = self.hiveConIns()
cur = conn.cursor()
cur.execute(sql,args)
alldata = cur.fetchall()
cur.close()
#cur.fetch类方法返回一个[tuple,tuple]
for data in alldata:
print(data)
conn.close()
#注意:hivesql的execute类方法的args是执行过程的参数,而不是sql的参数.比如cursor.execute('SELECT * FROM my_awesome_data LIMIT 10', async=True)表示异步执行
def changedata(self,sql,args=None):
conn = self.hiveConIns()
cur = conn.cursor()
try:
#做一个粗暴的判断当args是list时就进行批量插入
if isinstance(args,list):
#executemany(sql,args)方法args支持tuple或者list类型
cur.executemany(sql,args)
else:
#execute(sql,args)方法args支持string,tuple,list,dict
cur.execute(sql,args)
conn.commit()
except Exception as e:
#因为hive不支持事务,因此虽然提供了rollback()但是是没用的
#conn.rollback()
print(e)
finally:
cur.close()
conn.close()
if __name__ == '__main__':
#默认database为default,默认port为10000
hiveobj = hiveObj("10.0.1.18","hdfs")
'''
#查询数据
sql = "show tables"
hiveobj.querydata(sql)
#hive创建表
tabledesc = "create table appinfo (appname string,level string,leader string,dep string,ips array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' COLLECTION ITEMS TERMINATED BY ',' "
print("creating a table....")
hiveobj.changedata(tabledesc)
hiveobj.querydata(sql)
#插入数据
sql1 = "load data inpath 'hdfs://hdfs-name/ips.txt' overwrite into table appinfo"
hiveobj.changedata(sql1)
hiveobj.querydata('select * from appinfo')
'''
#对array对象中的元素进行遍历查询[临时表的创建第一次必须使用create table name as select ],更新数据需要使用[insert into|overwrite table name select] into是追加数据,overwrite是覆盖数据
#sql = "create table tmpapp as select ip,appname,leader,dep from appinfo LATERAL VIEW explode(ips) appinfo AS ip"
#sql = "insert into table tmpapp select ip,appname,leader,dep from appinfo LATERAL VIEW explode(ips) appinfo AS ip"
sql = "insert overwrite table tmpapp select ip,appname,leader,dep from appinfo LATERAL VIEW explode(ips) appinfo AS ip"
hiveobj.changedata(sql)
hiveobj.querydata('select * from tmpapp limit 1')