Flink自定义source

2019-08-23  本文已影响0人  yayooo
package com.atguigu.apiTest


import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._

import scala.util.Random

object SourceTestUDSource {
  def main(args: Array[String]): Unit = {

    //自定义source
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val stream4: DataStream[SensorReading] = env.addSource(new SensorSource())

    stream4.print("stream4").setParallelism(1)

    env.execute()
  }
}


/**
  * //传感器读数样例类
  * case class SensorReading(id:String, timestamp:Long, temperature: Double)
  */
class SensorSource() extends SourceFunction[SensorReading] {

  //定义一个flag,表示数据源是否正常运行
  var running: Boolean = true
  //关闭数据源的生成
  override def cancel(): Unit = {
    running=false
  }

  //正常生成数据
  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    //初始化一个随机数生成器
    val rand = new Random()

    //持续生成
    //初始化定义一组数据
    var curTemp = 1.to(10).map(
      i => ("sensor_" + i, 60 + rand.nextGaussian()* 20 )
    )

    while (running) {
      //更新前一次数据的值
      curTemp = curTemp.map(
        t => (t._1, t._2 + rand.nextGaussian())
      )
      //获取当前时间戳
      val curTime: Long = System.currentTimeMillis()
      curTemp.foreach(
        //使用sourceContext上下文输出
        t=> sourceContext.collect(SensorReading(t._1,curTime,t._2))
      )
      Thread.sleep(500)
    }

    running=true
  }
}
上一篇下一篇

猜你喜欢

热点阅读