中文文档 pyspark.sql.DataFrame

2020-03-28  本文已影响0人  cassie_xs

class pyspark.sql.DataFrame(jdf, sql_ctx)

分布式的收集数据分组到命名列中。

一个DataFrame相当于在Spark SQL中一个相关的表,可在SQLContext使用各种方法创建,

2.1 agg(*exprs)

没有组的情况下聚集整个DataFrame (df.groupBy.agg()的简写)。

>>>l=[('cassie',5),('beiwang',4),('xs',2)]

>>>df = sqlContext.createDataFrame(l,['name','age'])

>>>df.agg({"age": "max"}).collect()[Row(max(age)=5)]

>>>from pyspark.sql importfunctions as F

>>> df.agg(F.min(df.age)).collect()

[Row(min(age)=2)]

2.2 alias(alias)

In [57]: l = [('cassie',2), ('beiwang',3)]

In [58]: df = sqlContext.createDataFrame(l,['name', 'age'])

In [59]: from pyspark.sql.functions import *

In [60]: df1 = df.alias('df1')

In [61]: df2 = df.alias('df2')

In [62]: join_df = df1.join(df2, col("df1.name")==col("df2.name"), 'inner')

In [63]: join_df.select("df1.name")

Out[63]: DataFrame[name: string]

In [64]: join_df.select(col("df1.name"))

Out[64]: DataFrame[name: string]

In [65]: join_df.select(col("df1.name")).collect()

Out[65]: [Row(name=u'beiwang'), Row(name=u'cassie')]

In [66]: join_df.select("df1.name").collect()

Out[66]: [Row(name=u'beiwang'), Row(name=u'cassie')]

2.3 cache()

用默认的存储级别缓存数据(MEMORY_ONLY_SER).

2.4 coalesce(numPartitions)

返回一个有确切的分区数的分区的新的DataFrame。

与在一个RDD上定义的合并类似, 这个操作产生一个窄依赖。 如果从1000个分区到100个分区,不会有shuffle过程, 而是每100个新分区会需要当前分区的10个。

2.5 collect()

返回所有的记录数为行的列表。

>>> df.collect()

[Row(name=u'cassie', age=2), Row(name=u'beiwang', age=3)]

2.6 columns

返回所有列名的列表。

>>> df.columns

['age','name']

2.7 corr(col1, col2, method=None)

计算一个DataFrame相关的两列为double值。通常只支持皮尔森相关系数。DataFrame.corr()和DataFrameStatFunctions.corr()类似。

参数:●  col1 – 第一列的名称

    ●  col2 – 第二列的名称

    ●  method – 相关方法.当前只支持皮尔森相关系数

df.stat.corr('age','hobby')

2.8 count()

返回DataFrame的行数。

>>> df.count()2

2.9 cov(col1, col2)

计算由列名指定列的样本协方差为double值。DataFrame.cov()和DataFrameStatFunctions.cov()类似。

参数:●  col1 – 第一列的名称

●  col2 – 第二列的名称

df.stat.cov('hobby','age')

2.10 crosstab(col1, col2)

计算给定列的分组频数表,也称为相关表。每一列的去重值的个数应该小于1e4.最多返回1e6个非零对.每一行的第一列会是col1的去重值,列名称是col2的去重值。第一列的名称是$col1_$col2. 没有出现的配对将以零作为计数。DataFrame.crosstab() and DataFrameStatFunctions.crosstab()类似。

参数:●  col1 – 第一列的名称. 去重项作为每行的第一项。

●  col2 – 第二列的名称. 去重项作为DataFrame的列名称。

df.stat.crosstab("hobby", "age").show()

+---------+---+---+

|hobby_age|  2|  3|

+---------+---+---+

|        5|  0|  1|

|      10|  1|  0|

+---------+---+---+

2.11 cube(*cols)

创建使用指定列的当前DataFrame的多维立方体,这样可以聚合这些数据。

 df.cube('hobby', df.age).count().show()

+-----+----+-----+

|hobby| age|count|

+-----+----+-----+

|    5|null|    1|

| null|null|    2|

|  10|null|    1|

|  10|  2|    1|

| null|  2|    1|

|    5|  3|    1|

| null|  3|    1|

+-----+----+-----+

2.12 describe(*cols)

计算数值列的统计信息。

包括计数,平均,标准差,最小和最大。如果没有指定任何列,这个函数计算统计所有数值列。

2.13 distinct()

返回行去重的新的DataFrame。

df.distinct().count()

2.14 drop(col)

返回删除指定列的新的DataFrame

df.drop('hobby').collect()

2.15 dropDuplicates(subset=None)

返回去掉重复行的一个新的DataFrame,通常只考虑某几列。

drop_duplicates()和dropDuplicates()类似。

df.dropDuplicates().show()

>>>df.dropDuplicates(['name','height']).show()

2.16 drop_duplicates(subset=None)

与以上相同。

2.17 dropna(how='any', thresh=None, subset=None)

返回一个删除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()类似。

参数:●  how – 'any'或者'all'。如果'any',删除包含任何空值的行。如果'all',删除所有值为null的行。

● thresh – int,默认为None,如果指定这个值,删除小于阈值的非空值的行。这个会重写'how'参数。

●  subset – 选择的列名称列表。

df.na.drop().show()

dfnew.na.drop(how='all',thresh=2).show()

2.18 dtypes

返回所有列名及类型的列表。

>>> df.dtypes

[('age','int'), ('name','string')]

2.19 explain(extended=False)

将(逻辑和物理)计划打印到控制台以进行调试。

参数:●  extended – boolean类型,默认为False。如果为False,只打印物理计划。

df.explain(True)

2.20 fillna(value, subset=None)

替换空值,和na.fill()类似,DataFrame.fillna()和dataframenafunctions.fill()类似。

参数:●  value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset参数将被忽略。值必须是要替换的列的映射,替换值必须是int,long,float或者string.

●  subset - 要替换的列名列表。在subset指定的列,没有对应数据类型的会被忽略。例如,如果值是字符串,subset包含一个非字符串的列,这个非字符串的值会被忽略。

dfnew.na.fill(50).show()

dfnew.na.fill({'age': 50, 'name': 'unknown'}).show()

2.21 filter(condition)

用给定的条件过滤行。

where()和filter()类似。

参数:●  条件 - 一个列的bool类型或字符串的SQL表达式。

df.where(df.age == 2).collect()

df.filter(df.age == 2).collect()

2.22 first()

返回第一行。

>>> df.first()

Row(age=2, name=u'Alice')

2.23 flatMap(f)

返回在每行应用F函数后的新的RDD,然后将结果压扁。

是df.rdd.flatMap()的简写。

>>>df.rdd.flatMap(lambda p: p.name).collect()

[u'A', u'l', u'i', u'c', u'e', u'B', u'o', u'b']

2.24 foreach(f)

应用f函数到DataFrame的所有行。

是df.rdd.foreach()的简写。

def f(person):

     print(person.name)

>>> df.foreach(f)

Alice

2.25 foreachPartition(f)

应用f函数到DataFrame的每一个分区。

是 df.rdd.foreachPartition()的缩写。

>>>def f(people):

...    forpersonin people:

...        print(person.name)>>> df.foreachPartition(f)

2.26  freqItems(cols, support=None)

参数:●  cols – 要计算重复项的列名,为字符串类型的列表或者元祖。

●  support – 要计算频率项的频率值。默认是1%。参数必须大于1e-4.

df.stat.freqItems(['name']).collect()

2.27 groupBy(*cols)

使用指定的列分组DataFrame,这样可以聚合计算。可以从GroupedData查看所有可用的聚合方法。

groupby()和groupBy()类似。

参数:●  cols – 分组依据的列。每一项应该是一个字符串的列名或者列的表达式。

df.groupBy(['name', df.age]).count().collect()

2.28 groupby(*cols)

和以上一致

2.29 head(n=None)

返回前n行

参数:●  n – int类型,默认为1,要返回的行数。

返回值: 如果n大于1,返回行列表,如果n为1,返回单独的一行。

2.30 insertInto(tableName, overwrite=False)

插入DataFrame内容到指定表。

注:在1.4中已过时,使用DataFrameWriter.insertInto()代替。

2.31 intersect(other)

返回新的DataFrame,包含仅同时在当前框和另一个框的行。

相当于SQL中的交集。

df.intersect(df8)

如果collect()和take()方法可以运行在本地(不需要Spark executors)那么返回True

2.32 join(other, on=None, how=None)

使用给定的关联表达式,关联另一个DataFrame。

以下执行df1和df2之间完整的外连接。

参数:● other – 连接的右侧

● on – 一个连接的列名称字符串, 列名称列表,一个连接表达式(列)或者列的列表。如果on参数是一个字符串或者字符串列表,表示连接列的名称,这些名称必须同时存在join的两个表中, 这样执行的是一个等价连接。

● how – 字符串,默认'inner'。inner,outer,left_outer,right_outer,leftsemi之一。

df.join(df8,on='name',how='inner').show()

2.33 limit(num)

将结果计数限制为指定的数字。

df.limit(1).collect()

2.34 map(f)

通过每行应用f函数返回新的RDD。

是 df.rdd.map()的缩写。

>>>df.rdd.map(lambda p: p.name).collect()

2.35 mapPartitions(f, preservesPartitioning=False)

通过每个分区应用f函数返回新的RDD

是df.rdd.mapPartitions()的缩写。

>>>rdd = sc.parallelize([1, 2, 3, 4], 4)

>>>def f(iterator):yield 1

>>> rdd.mapPartitions(f).sum() 4

2.36 na

返回DataFrameNaFunctions用于处理缺失值

df.na.drop(how='all').show()

2.37 orderBy(*cols, **kwargs)

返回按照指定列排序的新的DataFrame。

参数:● cols – 用来排序的列或列名称的列表。

● ascending – 布尔值或布尔值列表(默认 True). 升序排序与降序排序。指定多个排序顺序的列表。如果指定列表, 列表的长度必须等于列的长度。

df.orderBy('age',ascending=False).show()

df.orderBy(['age','height'],ascending=[1,0]).show()

2.38 persist(storageLevel=StorageLevel(False, True, False, False, 1))

设置存储级别以在第一次操作运行完成后保存其值。这只能用来分配新的存储级别,如果RDD没有设置存储级别的话。如果没有指定存储级别,默认为(memory_only_ser)。

from pyspark import StorageLevel

df.persist(StorageLevel.MEMORY_ONLY)

print StorageLevel.MEMORY_ONLY

2.39 printSchema()

打印schema以树的格式

df.printSchema()

splits = rdd.randomSplit([0.7, 0.3],42)

 splits[0].collect(): [0, 1, 2, 3, 4, 5, 7, 9, 10, 12, 13, 14, 16, 18, 19]

splits[1].collect(): [6, 8, 11, 15, 17]

2.40 rdd:返回内容为行的RDD。

2.41 registerAsTable(name): 在1.4中已过时,使用registerTempTable()代替。

2.42 registerTempTable(name)

使用给定的名字注册该RDD为临时表

这个临时表的有效期与用来创建这个DataFrame的SQLContext相关

df.registerTempTable("people")

 df2 = sqlContext.sql("select * from people")

 df2.show()

+---+------+-----+

|age|height| name|

+---+------+-----+

|  5|    80|Alice|

|  5|    80|Alice|

| 10|    80|Alice|

+---+------+-----+

2.43 repartition(numPartitions, *cols)

按照给定的分区表达式分区,返回新的DataFrame。产生的DataFrame是哈希分区。

numPartitions参数可以是一个整数来指定分区数,或者是一个列。如果是一个列,这个列会作为第一个分区列。如果没有指定,将使用默认的分区数。

1.6版本修改: 添加可选参数可以指定分区列。如果分区列指定的话,numPartitions也是可选的。

2.44 replace(to_replace, value, subset=None)

返回用另外一个值替换了一个值的新的DataFrame。DataFrame.replace() 和 DataFrameNaFunctions.replace() 类似。

参数:● to_replace – 整形,长整形,浮点型,字符串,或者列表。要替换的值。如果值是字典,那么值会被忽略,to_replace必须是一个从列名(字符串)到要替换的值的映射。要替换的值必须是一个整形,长整形,浮点型,或者字符串。

● value – 整形,长整形,浮点型,字符串或者列表。要替换为的值。要替换为的值必须是一个整形,长整形,浮点型,或者字符串。如果值是列表或者元组,值应该和to_replace有相同的长度。

● subset – 要考虑替换的列名的可选列表。在subset指定的列如果没有匹配的数据类型那么将被忽略。例如,如果值是字符串,并且subset参数包含一个非字符串的列,那么非字符串的列被忽略。

l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)]

df4 = sqlContext.createDataFrame(l4,['name','age','height'])

 df4.replace(10, 20).show() #把 10替换20 加不加na都可以

df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()+----+----+------+

2.45 rollup(*cols)

使用指定的列为当前的DataFrame创建一个多维汇总, 这样可以聚合这些数据。

In [240]: l=[('Alice',2,80),('Bob',5,None)]

In [241]: df = sqlContext.createDataFrame(l,['name','age','height'])

In [242]: df.show()

+-----+---+------+

| name|age|height|

+-----+---+------+

|Alice|  2|    80|

|  Bob|  5|  null|

+-----+---+------+

In [243]:  df.rollup('name', df.age).count().show()

+-----+----+-----+

| name| age|count|

+-----+----+-----+

| null|null|    2|

|  Bob|  5|    1|

|Alice|  2|    1|

|  Bob|null|    1|

|Alice|null|    1|

+-----+----+-----+

2.46 sample(withReplacement, fraction, seed=None)

sample(是否放回, fraction, seed)

withReplacement:true抽取放回,false抽取不放回。

fraction:1)false抽取不放回的情况下,抽取的概率(0-1)。0-全不抽1-全抽2)true抽取放回的情况下,抽取的次数。seed:随机数种子。

返回DataFrame的子集采样。

>>>df.sample(False, 0.5, 42).count()

2.47 sampleBy(col, fractions, seed=None)

根据每个层次上给出的分数,返回没有替换的分层样本。

返回没有替换的分层抽样 基于每层给定的一小部分 在给定的每层的片段

参数:● col – 定义层的列

● fractions – 每层的抽样数。如果没有指定层, 将其数目视为0.

● seed – 随机数

返回值: 返回代表分层样本的新的DataFrame

sampled = dataset.sampleBy("key", fractions={0: 1, 1: 0}, seed=0) 表示key取的比例

2.48 save(path=None, source=None, mode='error', **options)

保存DataFrame的数据到数据源。

注:在1.4中已过时,使用DataFrameWriter.save()代替。

2.49 saveAsParquetFile(path)

保存内容为一个Parquet文件,代表这个schema。

注:在1.4中已过时,使用DataFrameWriter.parquet() 代替。

2.50 saveAsTable(tableName, source=None, mode='error', **options)

将此DataFrame的内容作为表保存到数据源。

注:在1.4中已过时,使用DataFrameWriter.saveAsTable() 代替。

2.51  schema

返回DataFrame的schema为types.StructType。

>>>l=[('Alice',2),('Bob',5)]>>>df = sqlContext.createDataFrame(l,['name','age'])>>> df.schema

StructType(List(StructField(name,StringType,true),StructField(age,LongType,true)))

2.52  select(*cols)

提供一组表达式并返回一个新的DataFrame。

参数:●cols – 列名(字符串)或表达式(列)列表。 如果其中一列的名称为“*”,那么该列将被扩展为包括当前DataFrame中的所有列。

In [275]: df.select('name','age').show()

+-----+---+

| name|age|

+-----+---+

|Alice|  2|

|  Bob|  5|

+-----+---+

2.53 show(n=20, truncate=True) 

将前n行打印到控制台。

参数:●n – 要显示的行数。

● truncate – 是否截断长字符串并对齐单元格。

2.54 sort(*cols, **kwargs)

返回按指定列排序的新DataFrame。

参数:●cols – 要排序的列或列名称列表。

● ascending – 布尔值或布尔值列表(默认为True)。 排序升序降序。 指定多个排序顺序的列表。 如果指定了列表,列表的长度必须等于列的长度。

df.sort(df.age.desc()).collect()

2.55 sortWithinPartitions(*cols, **kwargs)

返回一个新的DataFrame,每个分区按照指定的列排序。

参数:●cols – 要排序的列或列名称列表。

ascending – 布尔值或布尔值列表(默认为True)。 排序升序降序。 指定多个排序顺序的列表。 如果指定了列表,列表的长度必须等于列的长度。

df.sortWithinPartitions("age", ascending=False).show()

2.56 stat 

返回统计功能的DataFrameStatFunctions。

2.57 subtract(other)

返回一个新的DataFrame,这个DataFrame中包含的行不在另一个DataFrame中。

这相当于SQL中的EXCEPT。

In [278]: df.subtract(df4).show()

+-----+---+------+

| name|age|height|

+-----+---+------+

|Alice|  2|    80|

+-----+---+------+

2.58 take(num)

返回前num行的行列表

>>>df.take(2)

2.59 toDF(*cols)

返回一个新类:具有新的指定列名称的DataFrame。

参数:● cols – 新列名列表(字符串)。

In [280]: df.toDF('a','b','c').show()

+-----+---+----+

|    a|  b|  c|

+-----+---+----+

|Alice|  2|  80|

|  Bob|  5|null|

+-----+---+----+

In [281]: df.show()

+-----+---+------+

| name|age|height|

+-----+---+------+

|Alice|  2|    80|

|  Bob|  5|  null|

+-----+---+------+

2.60 toJSON(use_unicode=True)

将DataFrame转换为字符串的RDD。

每行都将转换为JSON格式作为返回的RDD中的一个元素。

In [284]: df.toJSON().take(2)

Out[284]: [u'{"name":"Alice","age":2,"height":80}', u'{"name":"Bob","age":5}']

2.61 toPandas()

将此DataFrame的内容返回为Pandas pandas.DataFrame。

这只有在pandas安装和可用的情况下才可用。

表:In [286]: df.toPandas()

Out[286]:

    name  age  height

0  Alice    2    80.0

1    Bob    5    NaN

2.62 unionAll(other)

返回包含在这个frame和另一个frame的行的联合的新DataFrame。

这相当于SQL中的UNION ALL。

In [297]: df.unionAll(df4).show()

+-----+----+------+

| name| age|height|

+-----+----+------+

|Alice|  2|    80|

|  Bob|  5|  null|

|Alice|  10|    80|

|  Bob|  5|  null|

|  Tom|null|  null|

| null|null|  null|

+-----+----+------+

2.63 unpersist(blocking=True)

将DataFrame标记为非持久性,并从内存和磁盘中删除所有的块。

2.64 where(condition)

使用给定表达式过滤行。

where()是filter()的别名。

2.65 withColumn(colName, col)

通过添加列或替换具有相同名称的现有列来返回新的DataFrame。

参数:● colName – 字符串,新列的名称

● col – 新列的列表达式

>>>df.withColumn('age2', df.age + 2).collect()

[Row(name=u'Alice', age=2, age2=4), Row(name=u'Bob', age=5, age2=7)]

2.66 withColumnRenamed(existing, new)

通过重命名现有列来返回新的DataFrame。

参数:● existing – 字符串,要重命名的现有列的名称

● col – 字符串,列的新名称

>>>df.withColumnRenamed('age', 'age2').collect()

[Row(name=u'Alice', age2=2), Row(name=u'Bob', age2=5)]

2.67 write

用于将DataFrame的内容保存到外部存储的接口。

返回:DataFrameWriter

上一篇 下一篇

猜你喜欢

热点阅读