pyspark pandas udf

2019-07-10  本文已影响0人  AsdilFibrizo
  • 下面介绍pandas_udf的三种函数使用
# 引入spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('learnspark').getOrCreate()
sc = spark.sparkContext

Scalar UDFs函数

通常做一些简单计算使用该函数,输出长度与输入长度不变

# pandans udf 使用说明
# Scalar UDFs
# 使用PandasUDFType.SCALAR,当使用select和withColumn是使用该函数
# 输入pandas.Series输出pandas.Series,输入和输出长度相同
# 在函数内部,spark通过将列拆分来批量执行pandas udf,将每个批的函数作为数据的子集调用,然后合并
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import LongType,FloatType
from pyspark.sql.types import StructField, StructType
# 形式1
def multiply_func(a, b):
    return a * b
multiply = pandas_udf(multiply_func, returnType=FloatType(),functionType=PandasUDFType.SCALAR) # 这里使用PandasUDFType.SCALAR作为functiontype
# 形式2
@pandas_udf(returnType=FloatType(), functionType=PandasUDFType.SCALAR)
def multiply2(a,b):
  return a+b

# 现造一个数据集
df = spark.createDataFrame(
    [(1,1.0, 1.0, 1), (1,1.2, 2.0, 1), (1,5.6, 3.0, 2), (1,24.4, 5.0, 2), (1,22.3, 10.0, 2)],
    ('id',"X", "Y","Z"))
df.show()
# 运行函数
df = (df
       .withColumn('M', multiply(df.X,df.Y))
       .withColumn('N', multiply2(df.X,df.Y))
      )
df.show()
[out]:
+---+----+----+---+
| id|   X|   Y|  Z|
+---+----+----+---+
|  1| 1.0| 1.0|  1|
|  1| 1.2| 2.0|  1|
|  1| 5.6| 3.0|  2|
|  1|24.4| 5.0|  2|
|  1|22.3|10.0|  2|
+---+----+----+---+

+---+----+----+---+-----+----+
| id|   X|   Y|  Z|    M|   N|
+---+----+----+---+-----+----+
|  1| 1.0| 1.0|  1|  1.0| 2.0|
|  1| 1.2| 2.0|  1|  2.4| 3.2|
|  1| 5.6| 3.0|  2| 16.8| 8.6|
|  1|24.4| 5.0|  2|122.0|29.4|
|  1|22.3|10.0|  2|223.0|32.3|
+---+----+----+---+-----+----+

Grouped map UDFs函数

当需要对每行做统计计算但不改变其长度,比如减均值等操作

# Grouped map UDFs
# 这个函数适用于groupBy().apply() 实现“拆分应用组合”模式,它分为三个步骤
# 1.使用DataFrame.groupBy分割数据
# 2.对每个group使用函数,输入输出都是pandas.DataFrame,输入数据包含每个groupy的所有行和列
# 3.合并结果称为一个新的DataFrame
# 使用groupBy().apply()你必须做以下定义
# 1.定义每个group的计算的python函数
# 2.一个StructType对象或者string定义输出DataFrame的schema
### 在应用函数之前,组的所有数据都将加载到内存中。这可能导致内存不足异常,特别是不同group是倾斜的

# 这里将每行减去goup的均值
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# 形式1
schema = StructType([
  StructField("id", LongType(),True),
  StructField("v", FloatType(),True)
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()

# 形式2
def _subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())
subtract_mean2 = pandas_udf(_subtract_mean, returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP) # 这里使用PandasUDFType.GROUPED_MAP作为functiontype
df.groupby("id").apply(subtract_mean2).show()

Grouped aggregate UDFs函数

一些统计计算,返回与输入长度不同

# Grouped aggregate UDFs
# 使用groupBy().agg() 时使用,作为统计函数使用
from sklearn.metrics import silhouette_score # 计算轮廓系数
import pandas as pd
df = spark.createDataFrame(
    [(1,1.0, 1.0, 1), (1,1.2, 2.0, 1), (1,5.6, 3.0, 2), (1,24.4, 5.0, 2), (1,22.3, 10.0, 2)],
    ('id',"X", "Y","Z"))
@pandas_udf(returnType='float',functionType=PandasUDFType.GROUPED_AGG)
def sc(call, Y, X):
    merge_df = pd.concat([X, Y], axis=1)
    try:
        return silhouette_score(merge_df.values, call.values)
    except:
        return None
df.groupBy('id').agg(sc(df.Z,df.X,df.Y)).show()
[out]:
+---+-----------+
| id|sc(Z, X, Y)|
+---+-----------+
|  1|  0.4213251|
+---+-----------+
上一篇下一篇

猜你喜欢

热点阅读