spark SQL 中 rdd与dataframe相互转换
2019-05-28 本文已影响0人
枫隐_5f5f
将RDD转换成dataframe的方法:
spark.createDataFrame(rdds,colname_list)
将dataFrame转换成RDD的方法
df.rdd
from pyspark.sql import SparkSession
import pandas as pd
import sys
from datetime import datetime
from pyspark.sql.types import *
def parse_babies_feas(line):
cur_year = datetime.now().year
age = cur_year - line.birthday.year
return line.uid, line.deviceid, line.sex, age
if __name__ == "__main__":
spark = SparkSession.builder \
.appName('test Name') \
.config("config","value") \
.getOrCreate()
basePath = "file:///home/devops/warehouseForTest/warehouse/babies_view/"
parquetFile = spark.read.option("basePath",basePath).parquet(basePath + "part-0000*-43289174-77b6-4b9a-9ea7-482b96a395c9-c000.snappy.parquet")
parquetFile.createOrReplaceTempView("parquetFile")
sqlDF = spark.sql("select uid,deviceid,sex,birthday from parquetFile")
#df = sqlDF.toPandas()
#outs = sqlDF.rdd.map(lambda x:"uid {},deviceid {}, sex {}, birthday {}".format(x.uid,x.deviceid,x.sex,x.birthday))
#方法一
outs = sqlDF.rdd.map(parse_babies_feas)
df = spark.createDataFrame(outs,["uid","deviceid","sex","age"])
df.show()
df.describe().show()
sys.exit()
#方法一输出结果
+----------+----------+------+---+
| uid| deviceid| sex|age|
+----------+----------+------+---+
|1032102272|1032102272| male| 3|
|1025067146|1025067146|female| 5|
|1013552618|1013552618| male| 2|
|1022235109|1022235109| male| 3|
|1015684809|1015684809| male| 3|
|1028259548|1028259548|female| 1|
|1028340936|1028340936| male| 5|
|1018877744|1018877744| male| 3|
|1024934655|1024934655|female| 3|
|1026171919|1026171919|female| 3|
|1031382119|1031382119| male| 1|
|1028992823|1028992823|female| 2|
|1022601617|1022601617| male| 8|
|1019934707|1019934707| male| 7|
|1030378799|1030378799|female| 1|
|1034937560|1034937560|female| 1|
|1017226113|1017226113|female| 3|
|1032474875|1032474875|female| 3|
|1027753017|1027753017| male| 8|
|1033458667|1033458667| male| 2|
+----------+----------+------+---+
+-------+--------------------+--------------------+------+------------------+
|summary| uid| deviceid| sex| age|
+-------+--------------------+--------------------+------+------------------+
| count| 386728| 386728|386723| 386728|
| mean|1.1025241785576813E9| 9.689338621435919E8| null|4.5120756707556735|
| stddev| 2.744051701295991E8|2.2159358549030557E8| null|21.669835512465433|
| min| 88888811| 0|female| -542|
| max| 2000043668| 1041729209| male| 2018|
+-------+--------------------+--------------------+------+------------------+
# 方法二
#define schema
fields = [
('uid',IntegerType()),
('deviceid',IntegerType()),
('sex',StringType()),
('age',IntegerType())
]
schema = StructType([StructField(e[0],e[1],True) for e in fields])
outs = spark.createDataFrame(outs,schema)
re1 = outs.select(['age'])
re1 = re1.toPandas()
print (re1.describe())
#方法二输出结果
age
count 386728.000000
mean 4.512076
std 21.669836
min -542.000000
25% 2.000000
50% 4.000000
75% 6.000000
max 2018.000000