Spark分析fsimage

2021-02-23  本文已影响0人  至垚

"""
create table dev.bj5_hadoop_fsimage(
path string,
replication int,
modificationtime date,
accesstime date,
preferredblocksize int,
blockscount int,
filesize int,
nsquota int,
dsquota int,
permission string,
username string,
groupname string,
path0 string,
path1 string,
path2 string,
path3 string,
path4 string
)
PARTITIONED BY (pt string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';
"""

from pyspark import SparkContext,HiveContext
import time
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import split,concat_ws

def load_file_to_spark(x):
if 'Replication' in x:
pass
else:
return x
if name == 'main':
table_name = ''
sc = SparkContext.getOrCreate()
s_time=time.strftime("%Y-%m", time.localtime())
spark = SparkSession.builder.enableHiveSupport().appName('pyspark').getOrCreate()
sqlContext = HiveContext(sc)
# lines = sc.textFile("/Users/pausky/Downloads/xml_file1")
lines = sc.textFile("/tmp/xml_file")
filt_lines = lines.filter(load_file_to_spark)
parts = filt_lines.map(lambda x: x.split('[<'))
hdfs_file_imager = parts.map(lambda p: Row(path=p[0], replication=p[1], modificationtime=p[2], accesstime=p[3],
preferredblocksize=p[4],blockscount=p[5],filesize=p[6], nsquota=p[7], dsquota=p[8],permission=p[9],
username=p[10],groupname=p[11]))
df_fsimage = spark.createDataFrame(hdfs_file_imager)
split_col = split(df_fsimage['path'], "/")
df_fsimage_with_muti_c = df_fsimage.withColumn('path0', split_col.getItem(1))
.withColumn('path1', split_col.getItem(2)).withColumn('path2', split_col.getItem(3))
.withColumn('path3', split_col.getItem(4)).withColumn('path4', split_col.getItem(5))
# df_fsimage_with_c = df_fsimage_with_muti_c.withColumn('same_c', concat_ws('/',df_fsimage_with_muti_c['path0'],
# df_fsimage_with_muti_c['path1'],
# df_fsimage_with_muti_c['path2'],
# df_fsimage_with_muti_c['path3'],
# df_fsimage_with_muti_c['path4']))
# df_fsimage_delete_c = df_fsimage_with_c.drop('path0').drop('path1').drop('path2').drop('path3').drop('path4')
# df_fsimage_delete_c.write.saveAsTable("dev.bj5_hadoop_fsimage", mode='overwrite')
# df_fsimage_delete_c.write.orc("/user/hive/warehouse/dev.db/bj5_hadoop_fsimage/pt=2021-02/")
# df_fsimage_delete_c.write.option("path","/user/hive/warehouse/dev.db/bj5_hadoop_fsimage/pt=2021-02/").saveAsTable('dev.bj5_hadoop_fsimage')
df_fsimage_with_muti_c.createOrReplaceTempView("bj5_hdfs_fsimage_table")
sqlContext.sql('insert overwrite table '+ table_name +' partition (pt = "'+s_time+'") select path,replication,modificationtime,accesstime,preferredblocksize,blockscount,'
'filesize,nsquota,dsquota,permission,username,groupname,path0,path1,path2,path3,path4 from bj5_hdfs_fsimage_table')
# t = spark.sql('select path,replication,modificationtime,accesstime,preferredblocksize,blockscount,filesize,nsquota,dsquota,permission,username,groupname,same_c from bj5_hdfs_fsimage_table ')
# df_fsimage_with_c.write.parquet('')
# t = spark.sql('select same_c,sum(FileSize)/1024/1024 as size from bj5_hdfs_fsimage_table where same_c="user/hive/123" group by same_c order by size ')
# print(t.show(10))

上一篇 下一篇

猜你喜欢

热点阅读