flink之dataset参数传递
2022-05-11 本文已影响0人
万州客
发现scala比java方便一点的地方了,class可以写在一个文件中~
一,代码
package org.bbk.flink
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
object Demo {
def main(args:Array[String]):Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val sourceDataSet: DataSet[String] = env
.fromElements("hello world", "spark flink", "hive sqoop")
//val filterSet: DataSet[String] = sourceDataSet
// .filter(new MyFilterFunction("hello"))
val configuration = new Configuration()
configuration.setString("parameterKey", "flink")
val filterSet:DataSet[String] = sourceDataSet
.filter(new MyFilter)
.withParameters(configuration)
filterSet.print()
env.execute()
}
}
class MyFilterFunction(parameter: String) extends FilterFunction[String] {
override def filter(t: String): Boolean = {
if (t.contains(parameter)) {
true
} else {
false
}
}
}
class MyFilter extends RichFilterFunction[String] {
var value:String = ""
override def open(parameters: Configuration): Unit = {
value = parameters.getString("parameterKey", "defaultValue")
}
override def filter(t: String): Boolean = {
if (t.contains(value)) {
true
} else {
false
}
}
}