Spark

Learning Spark [5] - UDF(User-de

2021-01-27  本文已影响0人  屹然1ran

UDF

为了满足用户的不同的分析需求,Spark允许使用者自己定义函数,供用户在Spark SQL中使用。例如数据科学家可以将一个机器学习模型封装在一个函数内,提供给数据分析师在无需知道模型内部复杂的知识下,直接使用。

例子:创建一个返回立方的函数

# in Python
from pyspark.sql.types import LongType

# create function
def cubed(num):
    return num ** 3

# register UDF
spark.udf.register('cubed', cubed, LongType())

# generate a temp view
spark.range(1,9).createOrReplaceTempView('udf_test')

# query
spark.sql('SELECT id, cubed(id) AS id_cubed FROM udf_test').show()
+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+

Pandas-UDF

为了提升UDF的计算效率,可以使用Python中的Pandas包来创建Pandas UDF(或者叫向量化(Vectorized)UDF)。

关于向量化函数,在Pandas包以及R中的dply族函数,都是很好的例子。

# In Python
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

def cubed(a: pd.Series) -> pd.Series:
    return a ** 3

cubed_udf = pandas_udf(cubed, returnType = LongType())

spark.range(1,9).createOrReplaceTempView('udf_test')

spark.sql('SELECT id, cubed(id) AS id_cubed FROM udf_test').show()
+---+--------+
| id|id_cubed|
+---+--------+
|  1|       1|
|  2|       8|
|  3|      27|
|  4|      64|
|  5|     125|
|  6|     216|
|  7|     343|
|  8|     512|
+---+--------+
上一篇下一篇

猜你喜欢

热点阅读