Spark

PySpark之DataFrame的创建与转换

2020-11-17  本文已影响0人  HaloZhang

简介

DataFrame 结构代表的是数据的一个不可变分布式集合,其数据都被组织到有名字的列中,就像关系型数据库中的表一样。DataFrame 的目的就是要让对大型数据集的处理变得更简单,它让开发者可以为分布式的数据集指定一个模式,进行更高层次的抽象。
本文将着重介绍PySpark中DataFrame的各种创建方式,以及与RDD、Pandas之间的转换。


DataFrame的创建

1. 从RDD中创建

为了从存在的RDD结构中创建出DataFrame,我们先定义一些测试数据,如下:

data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")] #list中的每一个元素都是元祖

接着先创建一个SparkSession,并通过调用SparkContext的parallelize()函数构造出一个RDD对象,代码如下:

import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
rdd = spark.sparkContext.parallelize(data)

1.1 使用toDF()函数

RDD的toDF()方法是用来从一个存在的RDD结构中创建一个DataFrame对象,因为RDD是一个分布式的 Java对象的集合,故它没有包含列的信息,因此DataFrame采用的是默认的列。上面列举的测试数据一共有2列,分别用"_1"和“_2”来表示。

dfFromRDD1 = rdd.toDF()
dfFromRDD1.printSchema()
printSchema函数用于输出DataFrame的结构,即包含了哪些列,以及每一列的名称和类型等等,输出如下:

如果想给DataFrame中的每一列指定名称的话,我们可以在toDF函数中传入列的名称,如下:

columns = ["language","users_count"]
dfFromRDD1 = rdd.toDF(columns)
dfFromRDD1.printSchema()
这样输出DataFrame的结构信息的时候,就会包含列名称以及类型了,如下:

1.2 使用SparkSession中的createDataFrame()函数

我们可以直接使用createDataFrame函数来在一个原始list数据上创建一个DataFrame,并且叠加上toDF()操作,为每一列指定名称,代码如下:

dfFromRDD2 = spark.createDataFrame(rdd).toDF(*columns)
dfFromRDD2.printSchema()

输出与上图是一样的。

2. 从list对象中创建

2.1 使用createDataFrame函数并且指定行类型来创建

先将list中的每个元素都转换成一个PySpark中的row对象,接着使用createDataFrame函数来创建DataFram,代码如下:

rowData = map(lambda x: Row(*x), data)
dfFromData3 = spark.createDataFrame(rowData, columns)
dfFromData3.printSchema()
dfFromData3.show()

2.2 创建DataFrame时指定格式

如果在创建DataFrame的时候,同时想指定每一列的名称以及对应的类型,我们可以先创建一个StructType结构,然后再调用createDataFrame传入。

StructType会在下面的内容中讲述,这里就简单理解它指定了这两列的名称和类型,以及每个字段是否能为空。

schema = StructType([ \
    StructField("language",StringType(),True), \
    StructField("user_count",StringType(),True),
  ])

df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show(truncate=False)

3. 从数据源文件中创建

大部分情况下,我们都是从CSV,文本,JSON,XML等数据源文件中实时创建DataFrame。PySpark默认就支持许多数据格式,因此并不需要再单独导入其他库,我们可以从DataFrameReader类中选择合适的方法来创建DataFrame。

3.1 从CSV文件中创建DataFrame

使用csv()方法从CSV文件中读取并创建一个DataFrame对象。(这里采用的是MovieLens数据集中的用户评分文件)。

df2 = spark.read.csv("/Downloads/ml-latest-small/ratings.csv")
df2.printSchema()
df2.show(truncate=False)
输出如下:

同理,也可以使用text(),json()等方法来读取TXT、Json等文件。

4. 创建带格式的空的DataFrame

有的时候我们并不是直接打开文件来进行处理,而是从网络或者其他地方获取到数据流,那此时创建一个空的DataFrame就很有必要。
一般有两种方式来创建空的DataFrame:

schema = StructType([
  StructField('firstname', StringType(), True),
  StructField('middlename', StringType(), True),
  StructField('lastname', StringType(), True)
  ])

df = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)
df.printSchema()
df1 = spark.sparkContext.parallelize([]).toDF(schema)
df1.printSchema()

df2 = spark.createDataFrame([], schema)
df2.printSchema()
输出均为:

DataFrame与Pandas、RDD的转换

RDD转DataFrame

这个上文已经提及了,使用toDF()函数便可以完成。

dept = [("Finance",10), 
        ("Marketing",20), 
        ("Sales",30), 
        ("IT",40) 
      ]
rdd = spark.sparkContext.parallelize(dept)
deptColumns = ["dept_name","dept_id"]
df = rdd.toDF(deptColumns)
df.printSchema()
df.show(truncate=False)

DataFrame转RDD

最简单的可以直接使用rdd函数:

rdd1 = df.rdd

或者使用:

rdd2 = df.rdd.map(tuple)

DataFrame转Pandas

PySpark中的DataFrame可以通过toPandas()函数转换成Python的Pandas DataFrame结构。这两者的主要区别是,pandas的操作都是在单个结点上执行的,而PySpark运行在多台机器上,因此在处理大量数据时,PySpark会比Pandas快数倍以上。

df.show()
pandas = df.toPandas()
pandas
结果如下:

注意,Pandas给数据添加了序号。


使用StructType和StructField来指定DataFrame的结构

在上面的例子中,其实我们已经使用过StructType和StructField了,这里再详细介绍一下。PySpark中的StructType和StructField是用来指定DataFrame的结构,并且可以用来创建一些复杂的列项,比如嵌套的结构体、数组等。 StructType是一系列StructField’s的集合,而StructField定义了列的名称,数据类型,以及通过布尔值来指定字段是否可以为空以及元数据等。
下面用一个例子来演示一下如何使用StructType和StructField来创建一个DataFrame。

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

spark = SparkSession.builder.master("local[1]") \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)
输出:

定义嵌套结构

对于上面的DataFrame,其实我们很容易发现它有一些不合理的地方。比如前三列都是在表示名称,它们同属与一个名叫“name”的列才算是比较合理的。因此,我们可以重新定义一下结构,将"firstname"、“middlename”、“lastname”这三个字段合并为一个"name"字段,代码如下:

structureData = [
    (("James","","Smith"),"36636","M",3100),
    (("Michael","Rose",""),"40288","M",4300),
    (("Robert","","Williams"),"42114","M",1400),
    (("Maria","Anne","Jones"),"39192","F",5500),
    (("Jen","Mary","Brown"),"","F",-1)
  ]
structureSchema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('id', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('salary', IntegerType(), True)
         ])

df2 = spark.createDataFrame(data=structureData,schema=structureSchema)
df2.printSchema()
df2.show(truncate=False)
输出:

这里要注意一下,如果修改了StructType的结构,那么原始的list中也需要做相应的修改。

参考

上一篇下一篇

猜你喜欢

热点阅读