Spark自定义函数(2)
2018-10-12 本文已影响5人
利伊奥克儿
使用SparkSQL UDFs在Apache Spark中创建日期时间
UDFs 或用户定义函数是向SparkSQL语言中添加函数的一种简单方法。这个函数操作分布式数据 DataFrames ,并逐行工作(除非您正在创建一个用户定义的聚合函数)。您可以在各种SQL环境中找到udf,当然也可以在Apache Hive中找到。
这样想,SQL中执行求和函数是很常见的。这里有一个例子,在这个查询中,我们有一个函数makeDateTime,这个函数是一个用户定义的函数,是我们编写并添加到SQL语言中的一个函数,它可以在不同的字段中创建一个日期。
SELECT customer_id, makeDateTime(date_field, time_field, timezone) as datetime, amount
FROM purchases;
Use Cases
那么重点是什么呢?好吧,这使得在你能理解并在你的领域中使用的语言中工作变得容易多了。这是一种简单的方法(就像编程中的函数一样)来抽象一个常见的操作。
让我们来看看定义自己的UDF的过程。我们将从上面的示例中定义UDF,并在Spark DataFrame上使用它。
定义udf非常简单,我们只需创建一个匿名函数,然后通过sqlcontext在rg.apache.spark.sql.functions中通过udf函数注册它。udf取决于您想如何使用它这是设置。设想 purchases 是一个DataFrame在布局:
purchases:
customer_id
purchase_id
date
time
tz
amount
我们的目标是得到一个可以使用的datetime域,让我们来试一下。
case class Purchase(customer_id: Int, purchase_id: Int, date: String, time: String, tz: String, amount:Double)
val x = sc.parallelize(Array(
Purchase(123, 234, "2007-12-12", "20:50", "UTC", 500.99),
Purchase(123, 247, "2007-12-12", "15:30", "PST", 300.22),
Purchase(189, 254, "2007-12-13", "00:50", "EST", 122.19),
Purchase(187, 299, "2007-12-12", "07:30", "UTC", 524.37)
))
val df = sqlContext.createDataFrame(x)
df.registerTempTable("df")
现在让我们定义函数!下划线仅仅表示它是一个部分应用的函数。
def makeDT(date: String, time: String, tz: String) = s"$date $time $tz"
sqlContext.udf.register("makeDt", makeDT(_:String,_:String,_:String))
// 现在我们可以在SparkSQL中直接使用函数。
sqlContext.sql("SELECT amount, makeDt(date, time, tz) from df").take(2)
// 但是在外面不行
df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2) // fails
上面可以看到,我们可以在SQL中使用它,但不在SQL之外。为此,我们需要使用spark.sql.function.udf创建一个不同的UDF,1.6版本以后好像在spark.sql.functions.udf
import org.apache.spark.sql.functions.udf
val makeDt = udf(makeDT(_:String,_:String,_:String))
// 这个时候生效了
df.select($"customer_id", makeDt($"date", $"time", $"tz"), $"amount").take(2)
这是我在看一个spark文档的时候加上自己的理解整理的笔记,文档好像是官方文档来的,英文版,有兴趣的朋友可以去看原文