mlsql流任务实现distinct
流计算场景里distinct很常用,spark sql对 stream dataset不支持 SELECT COUNT(DISTINCT Company) FROM Orders
这种用法,但是dataframe支持dropDuplicates,可以指定columns。下面要加上dropDuplicates的操作。
通过et可以实现对dataframe的操作
register语法
register的语法是这样的:
REGISTER format '.' path as functionName where? expression? booleanExpression*
比如:register ScriptUDF.
scriptTableas plusFun
注册一个udf
那参考这个语法,我想要对tablex去重,命令应该是:
register DistinctExt.
tablexas tbx where inputTable="{}" and columns="{}"
as语句是语法要求,后面的tbx其实没有用处。
register语法的原理
image.png通过debug会发现,format
是DistinctExt
,path
是tablex
,option
是where
后面的配置项们
mlsql设计的ET
是要实现一个SQLAlg
接口:
这些接口起的名字似乎都跟机器学习有关,训练,预测,模型之类的,哈哈,个人感觉这块可以再细分下,给每一种功能单独设计一个接口,可以是SQLAlg的子类,SQLAlg本身不要定义那么多接口,比较容易理解。
实现SQLDistinctExt
我参考SQLSendMessage
实现一个SQLDistinctExt
,只需要把load方法实现下就行了
override def load(spark: SparkSession, _path: String, params: Map[String, String]): Any = {
val inputTable = params.getOrElse("inputTable", _path)
val columns = params.getOrElse("columns", "")
val df = spark.table(inputTable)
if (columns.isEmpty) {
df.dropDuplicates().createOrReplaceTempView(inputTable)
} else {
df.dropDuplicates(columns.split(",")).createOrReplaceTempView(inputTable)
}
null
}
将这个类放到合适的地方能够被mlsql找到
def findAlg(name: String) = {
mapping.get(name.capitalize) match {
case Some(clzz) =>
Class.forName(clzz).newInstance().asInstanceOf[SQLAlg]
case None =>
if (!name.contains(".") && (name.endsWith("InPlace") || name.endsWith("Ext"))) {
Class.forName(s"streaming.dsl.mmlib.algs.SQL${name}").newInstance().asInstanceOf[SQLAlg]
} else {
try {
Class.forName(name).newInstance().asInstanceOf[SQLAlg]
}
catch {
case e: Exception =>
throw new RuntimeException(s"${name} is not found")
}
}
}
}
还可以支持!这种宏,在CommandCollection
中添加:
使用方法
在流或者批任务里都可以使用哈,比如你经过处理得到一个table叫做t1,有a,b,c,d,e,f字段,根据a,b,c三个字段去重,你可以这样写
!distinct t1 "a,b,c"
你再select t1
会发现重复的已经去掉了,当然还可以设置watermarking