python读写hdfs四种不同格式文件

2022-02-19  本文已影响0人  王文强Python

从hdfs读写 parquet 、textfile 、 csv 、 xlsx四种文件格式代码简写后如下:

import io
import os
import uuid
import hdfs
import pandas as pd
from hdfs.ext.kerberos import KerberosClient
from krbcontext import krbcontext
from hdfs import HdfsError


def Myhdfs():
    def __init__(self,url):
        self.client = self.get_client(url)

    def get_client(url):
        with krbcontext(using_keytab=True,keytab_file='keytab文件地址',principal='用户名',ccache_file='缓存文件'):
            hclient = KerberosClient(url,session='')
            return hclient

    def df_writeto_parquet(self,df,path):
        if df.empty:
            return False
        path = os.path.join(path,str(uuid.uuid4()))
        byt = io.BytesIO()
        df.to_parquet(byt)
        self.client.write(path,byt.getvalue(),True)
        return True

    def df_writeto_textfile(self,df,path):
        path = os.path.join(path,str(uuid.uuid4()))
        rest = '\n'.join([','.join(row) for n,row in df.iterrows()])
        self.client.write(path,rest)
        return True

    def df_readfrom_hdfs(self,path,format = 'textfile',columns = None):
        if format =='textfile':
            if columns:
                with self.client.read(path) as reader:
                    be = str(reader.read(),'utf-8')
                    data = map(lambda s:s.split('\001'),be.split('\n'))
                    df = pd.DataFrame(data=data,columns=columns)
                    return df
            else:
                raise Exception('columns 不能为空')
        elif format =='parquet':
            with self.client.read(path) as reader:
                be = io.BytesIO(reader.read())
                df = pd.read_parquet(be)
                return df
        elif format =='csv':
            for code in ['gbk','utf-8','gb18030']:
                try:
                    with self.client.read(path,encoding = code) as reader:
                        df = pd.read_csv(reader)
                        return df
                except UnicodeEncodeError:
                    continue
                except Exception as e:
                    print(e)
                    continue
        elif format == 'xls':
            with self.client.read(path) as reader:
                df = pd.read_excel(reader.read())
                return df

    def show_files(self,path,filelist):
        files = self.client.list(path)
        for file in files:
            curpath = os.path.join(path,file)
            try:
                self.show_files(curpath,filelist)
            except HdfsError as e:
                filelist.append(curpath)
            except Exception as e:
                print(e)
        return filelist

上一篇下一篇

猜你喜欢

热点阅读