人工智能 大数据 计算机科学

PySpark之select、collect操作

2020-11-18  本文已影响0人  HaloZhang

Select操作

在PySpark中,select()函数是用来从DataFrame结构中选择一个或多个列,同样可以选择嵌套的列。select()在PySpark中是一个transformation函数,它返回一个包含指定列的新的DataFrame。

首先,我们先创建一个DataFrame。

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data = [("James","Smith","USA","CA"),
    ("Michael","Rose","USA","NY"),
    ("Robert","Williams","USA","CA"),
    ("Maria","Jones","USA","FL")
  ]
columns = ["firstname","lastname","country","state"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)
输出如下:

选取单列

我们可以通过给select()函数传入列的名称来筛选指定的列。由于DataFrame是不可变的,因此此函数会返回一个包含选定列的新DataFrame。

df.select("lastname").show()
结果:

选取多行

如果要选取多列的话,只需要把每个列的名称传入select()函数即可。

df.select("firstname", "lastname").show()
结果:

除此之外,还可以使用DataFrame对象名称来筛选:

df.select(df.firstname, df.lastname).show()

也可以使用col函数来指定列:

from pyspark.sql.functions import col
df.select(col("firstname"), col("lastname")).show()

结果与上面是一样的。

选取嵌套的列

我们先创建一个带有嵌套列结构的DataFrame,如果你不清楚怎么创建,可以先看下这篇文章

data = [
        (("James",None,"Smith"),"OH","M"),
        (("Anna","Rose",""),"NY","F"),
        (("Julia","","Williams"),"OH","F"),
        (("Maria","Anne","Jones"),"NY","M"),
        (("Jen","Mary","Brown"),"NY","M"),
        (("Mike","Mary","Williams"),"OH","M")
        ]

from pyspark.sql.types import StructType,StructField, StringType        
schema = StructType([
    StructField('name', StructType([
         StructField('firstname', StringType(), True),
         StructField('middlename', StringType(), True),
         StructField('lastname', StringType(), True)
         ])),
     StructField('state', StringType(), True),
     StructField('gender', StringType(), True)
     ])
df2 = spark.createDataFrame(data = data, schema = schema)
df2.printSchema()
df2.show(truncate=False) # shows all columns
输出:

在第一列“name”中,又包含了3个子列“firstname”、“middlename”、“lastname”。我们先选择"name"列,看一下输出:

df2.select("name").show(truncate=False)
结果:

可以看到,它直接显示出了第一列,那么如果我们想选取嵌套在第一列中的某一列,可以使用以下方式:

df2.select("name.firstname","name.lastname").show(truncate=False)
输出:

当然也可以获取嵌套在第一列中的所有列:

df2.select("name.*").show(truncate=False)
输出:

Collect操作

PySpark的collect()操作是用来将所有结点中的数据收集到驱动结点上(PySpark基于分布式架构)。因此collect()操作一般用于小型数据及上,在大型数据及上使用可能会导致内存不足。
还是使用前一节定义的数据:

df.show()
结果:

使用collect()函数来收集数据:

datacollect = df.collect()
print(type(datacollect))
print(datacollect)

结果:


可以看到collect()返回的是一个列表,列表的元素是Row类型,一个Row对象代表的就是DataFrame中的一行。
对于Row对象,我们可以使用列名称来访问其中的数据,如下:
for row in datacollect:
    print(row['firstname'] + ': ' + row['state'])
结果:

Select() VS Collect()

  1. select()返回一个包含指定列的新的DataFrame,而collect()以列表形式返回整个数据集。
  2. select()是一个transformation操作,而collect()是一个action操作。

参考

上一篇下一篇

猜你喜欢

热点阅读