flink之dataset参数传递

2022-05-11  本文已影响0人  万州客

发现scala比java方便一点的地方了,class可以写在一个文件中~

一,代码

package org.bbk.flink

import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

object Demo {
  def main(args:Array[String]):Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    val sourceDataSet: DataSet[String] = env
      .fromElements("hello world", "spark flink", "hive sqoop")
    //val filterSet: DataSet[String] = sourceDataSet
    //  .filter(new MyFilterFunction("hello"))
    val configuration = new Configuration()
    configuration.setString("parameterKey", "flink")
    val filterSet:DataSet[String] = sourceDataSet
      .filter(new MyFilter)
      .withParameters(configuration)
    filterSet.print()
    env.execute()
  }
}

class MyFilterFunction(parameter: String) extends FilterFunction[String] {
  override def filter(t: String): Boolean = {
    if (t.contains(parameter)) {
      true
    } else {
      false
    }
  }
}

class MyFilter extends RichFilterFunction[String] {
  var value:String = ""

  override def open(parameters: Configuration): Unit = {
    value = parameters.getString("parameterKey", "defaultValue")
  }

  override def filter(t: String): Boolean = {
    if (t.contains(value)) {
      true
    } else {
      false
    }
  }
}


上一篇 下一篇

猜你喜欢

热点阅读