spark SQL 中 rdd与dataframe相互转换

2019-05-28  本文已影响0人  枫隐_5f5f

将RDD转换成dataframe的方法:
spark.createDataFrame(rdds,colname_list)

将dataFrame转换成RDD的方法
df.rdd

from pyspark.sql import SparkSession
import pandas as pd
import sys
from datetime import datetime
from pyspark.sql.types import *


def parse_babies_feas(line):
    cur_year = datetime.now().year
    age = cur_year - line.birthday.year
    return line.uid, line.deviceid, line.sex, age 


if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName('test Name') \
        .config("config","value") \
        .getOrCreate()
    
    basePath = "file:///home/devops/warehouseForTest/warehouse/babies_view/"
    parquetFile = spark.read.option("basePath",basePath).parquet(basePath + "part-0000*-43289174-77b6-4b9a-9ea7-482b96a395c9-c000.snappy.parquet")
    parquetFile.createOrReplaceTempView("parquetFile")
    sqlDF = spark.sql("select uid,deviceid,sex,birthday from parquetFile")
    #df = sqlDF.toPandas()
    #outs = sqlDF.rdd.map(lambda x:"uid {},deviceid {}, sex {}, birthday {}".format(x.uid,x.deviceid,x.sex,x.birthday))

    #方法一
    outs = sqlDF.rdd.map(parse_babies_feas)
    df = spark.createDataFrame(outs,["uid","deviceid","sex","age"])
    df.show()
    df.describe().show()
    sys.exit()

#方法一输出结果
+----------+----------+------+---+
|       uid|  deviceid|   sex|age|
+----------+----------+------+---+
|1032102272|1032102272|  male|  3|
|1025067146|1025067146|female|  5|
|1013552618|1013552618|  male|  2|
|1022235109|1022235109|  male|  3|
|1015684809|1015684809|  male|  3|
|1028259548|1028259548|female|  1|
|1028340936|1028340936|  male|  5|
|1018877744|1018877744|  male|  3|
|1024934655|1024934655|female|  3|
|1026171919|1026171919|female|  3|
|1031382119|1031382119|  male|  1|
|1028992823|1028992823|female|  2|
|1022601617|1022601617|  male|  8|
|1019934707|1019934707|  male|  7|
|1030378799|1030378799|female|  1|
|1034937560|1034937560|female|  1|
|1017226113|1017226113|female|  3|
|1032474875|1032474875|female|  3|
|1027753017|1027753017|  male|  8|
|1033458667|1033458667|  male|  2|
+----------+----------+------+---+

+-------+--------------------+--------------------+------+------------------+
|summary|                 uid|            deviceid|   sex|               age|
+-------+--------------------+--------------------+------+------------------+
|  count|              386728|              386728|386723|            386728|
|   mean|1.1025241785576813E9| 9.689338621435919E8|  null|4.5120756707556735|
| stddev| 2.744051701295991E8|2.2159358549030557E8|  null|21.669835512465433|
|    min|            88888811|                   0|female|              -542|
|    max|          2000043668|          1041729209|  male|              2018|
+-------+--------------------+--------------------+------+------------------+

    # 方法二
    #define schema
    fields = [
        ('uid',IntegerType()),
        ('deviceid',IntegerType()),
        ('sex',StringType()),
        ('age',IntegerType())
    ]
    schema = StructType([StructField(e[0],e[1],True) for e in fields])
    outs = spark.createDataFrame(outs,schema)
    re1 = outs.select(['age'])
    re1 = re1.toPandas()
    print (re1.describe())

#方法二输出结果
                 age
count  386728.000000
mean        4.512076
std        21.669836
min      -542.000000
25%         2.000000
50%         4.000000
75%         6.000000
max      2018.000000
上一篇下一篇

猜你喜欢

热点阅读