Flink从数据库读取配置信息

2020-01-15  本文已影响0人  Jorvi

一、使用State

1.1 主入口

  def main(args: Array[String]): Unit = {

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // default Parallelism
    val (defaultParallelism, invalidDate) = args.length match {
      case 0 => (PropertiesUtil.getOrElse("flink.default.parallelism", "40").toInt, None)
      case 1 => (args.head.toInt, None)
      case _ => (args.head.toInt, Some(args(1)))
    }
    environment.setParallelism(defaultParallelism)

    environment
      .addSource(new SourceFromFile)
      .map(message => {
        var log: LogVO = null
        parseLog(message) match {
          case Success(Some(rawLog)) => log = rawLog
          case Success(None) => {
            logger.error("==========> parsedLog error, please check")
            log = null
          }
          case Failure(e) => {
            logger.error("==========> parsedLog error, error message: " + e.getMessage)
            log = null
          }
        }
        log
      })
      .keyBy(_.timestamp)
      .process(new DemoProcessFunction())
      .print()

    environment.execute()
  }

1.2 Process

class DemoProcessFunction extends KeyedProcessFunction[String, LogVO, LogVO] {

  private val logger: Logger = LoggerFactory.getLogger(this.getClass);
  private var connection: Connection = null
  private var preparedStatement: PreparedStatement = null
  private var valueState: ValueState[Map[String, String]] = _
  private var timerState: ValueState[Long] = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

    var valueStateDesc: ValueStateDescriptor[Map[String, String]] = new ValueStateDescriptor[Map[String, String]]("valueStateDesc",
      TypeInformation.of(classOf[Map[String, String]]))
    valueState = getRuntimeContext.getState(valueStateDesc)

    var timerStateDesc: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("timerStateDesc", TypeInformation.of(classOf[Long]))
    timerState = getRuntimeContext.getState(timerStateDesc)

    connection = MysqlUtil.getConnectionRead
    preparedStatement = connection.prepareStatement("SELECT uuid, id FROM demo")
  }

  override def processElement(value: LogVO, ctx: KeyedProcessFunction[String, LogVO, LogVO]#Context, out: Collector[LogVO]): Unit = {
    var currentState: Map[String, String] = valueState.value()
    if (null == currentState) {
      logger.info("==========> query database for sync...")
      currentState = Map()
      MysqlUtil.execQuery(connection, preparedStatement, (rs: ResultSet) => {
        currentState += (rs.getString("uuid") -> rs.getString("id"))
      })

      valueState.update(currentState)

      // 每15分钟更新一次
      val syncInterval = PropertiesUtil.getOrElse("materials.data.sync.interval", "15").toInt * 60 * 1000
      val ttlTime: Long = System.currentTimeMillis() + syncInterval
      ctx.timerService().registerProcessingTimeTimer(ttlTime)

      timerState.update(ttlTime)
    }

    logger.info("==========> currentState: " + currentState)

    out.collect(value)
  }


  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, LogVO, LogVO]#OnTimerContext, out: Collector[LogVO]): Unit = {
    super.onTimer(timestamp, ctx, out)
    logger.info("==========> sync demo data every 15min...")
    valueState.clear()

    // 清除Timer
    val ttlTime = timerState.value()
    ctx.timerService().deleteProcessingTimeTimer(ttlTime)
  }

}

这样子可以实现:
每15分钟去查一次数据库,更新配置信息。

1.3 测试

==========> query database for sync...
==========> currentState: Map(N-3998 -> 10929133)
1> {"timestamp":"1578907111","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
1> {"timestamp":"1578907222","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
1> {"timestamp":"1578907333","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
1> {"timestamp":"1578907444","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> query database for sync...
==========> currentState: Map(N-3998 -> 10929133)
3> {"timestamp":"1578983555","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
3> {"timestamp":"1578983666","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> currentState: Map(N-3998 -> 10929133)
3> {"timestamp":"1578983777","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}
==========> sync demo data every 15min...
==========> currentState: Map(N-3998 -> 10929133)
3> {"timestamp":"1578983888","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0"}

从测试结果可以发现:

  1. 在同一个Slot中处理的数据,不用每次都查询数据库
  2. 多个Slot会查询多次数据库

出现问题:如果Slot数很多,那么查询数据库的次数会显著增加。

二、使用Broadcast

2.1 主入口

  def main(args: Array[String]): Unit = {

    val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // default Parallelism
    val (defaultParallelism, invalidDate) = args.length match {
      case 0 => (PropertiesUtil.getOrElse("flink.default.parallelism", "40").toInt, None)
      case 1 => (args.head.toInt, None)
      case _ => (args.head.toInt, Some(args(1)))
    }
    environment.setParallelism(defaultParallelism)

    val demoDescriptor: MapStateDescriptor[String, String] = new MapStateDescriptor(
      "demoDescriptor", TypeInformation.of(classOf[String]), TypeInformation.of(classOf[String]))
    val demoBroadcast = environment
      .addSource(new SourceFromDemo)
      .setParallelism(1)
      .broadcast(demoDescriptor)

    environment
      .addSource(new SourceFromFile)
      .map(message => {
        var log: LogVO = null
        parseLog(message) match {
          case Success(Some(rawLog)) => log = rawLog
          case Success(None) => {
            logger.error("==========> parsedLog error, please check")
            log = null
          }
          case Failure(e) => {
            logger.error("==========> parsedLog error, error message: " + e.getMessage)
            log = null
          }
        }
        log
      })
      .keyBy(_.impressionId)
      .connect(demoBroadcast)
      .process(new DemoBroadcastFunction(demoDescriptor))
      .print()

    environment.execute()

  }
}
  1. 自定义 Source 从数据库读取配置信息
  2. 利用 broadcast 方法创建 demoBroadcast,广播上面自定义的配置信息Source
  3. 利用 connect 方法将日志 Source 和 配置信息 Broadcast 连接
  4. 调用 process 方法处理日志数据和 Broadcast 数据

2.2 自定义配置信息Source

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.util.Date
import java.util.concurrent.TimeUnit

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
import org.slf4j.{Logger, LoggerFactory}

class SourceFromDemo extends RichParallelSourceFunction[Map[String, String]] {

  private val logger: Logger = LoggerFactory.getLogger(this.getClass);
  private var connection: Connection = null
  private var preparedStatement: PreparedStatement = null
  private var isRunning: Boolean = false
  private var lastQueryTime: Date = null

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    logger.info("==========> open...")
    connection = MysqlUtil.getConnectionRead
    preparedStatement = connection.prepareStatement("SELECT uuid, id FROM demo")
    isRunning = true
  }


  override def run(ctx: SourceFunction.SourceContext[Map[String, String]]): Unit = {
    while (isRunning) {
      // 初始化更新物料数据
      if (null == lastQueryTime) {
        logger.info("==========> query db for init ...")

        var map: Map[String, String] = Map()
        MysqlUtil.execQuery(connection, preparedStatement, (rs: ResultSet) => {
          map += (rs.getString("uuid") -> rs.getString("id"))
        })
        ctx.collect(map)

        lastQueryTime = new Date()
      }

      // 每15min更新一次物料数据
      val dataSyncInterval: Int = PropertiesUtil.getOrElse("materials.data.sync.interval", "15").toInt
      val nowTime: Date = new Date()
      if (CommonUtil.dataBetweenMin(lastQueryTime, nowTime) >= dataSyncInterval) {
        logger.info("==========> query db for update every 15min ...")

        var map: Map[String, String] = Map()
        MysqlUtil.execQuery(connection, preparedStatement, (rs: ResultSet) => {
          map += (rs.getString("uuid") -> rs.getString("id"))
        })
        ctx.collect(map)
        
        lastQueryTime = nowTime
      }

      TimeUnit.MINUTES.sleep(dataSyncInterval)
    }
  }

  override def close(): Unit = {
    super.close()
    logger.info("==========> close...")
    MysqlUtil.releaseResource(preparedStatement)
    MysqlUtil.releaseResource(connection)
    isRunning = false
  }

  override def cancel(): Unit = {
    logger.info("==========> cancel...")
    MysqlUtil.releaseResource(preparedStatement)
    MysqlUtil.releaseResource(connection)
    isRunning = false
  }

}

2.3 Process

import java.sql.{Connection, PreparedStatement, ResultSet}

import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState}
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.util.Collector
import org.slf4j.{Logger, LoggerFactory}

class DemoBroadcastFunction(demoDescriptor: MapStateDescriptor[String, String]) extends KeyedBroadcastProcessFunction[String, LogVO, Map[String, String], LogVO] {

  private val logger: Logger = LoggerFactory.getLogger(this.getClass);

  private var backupMap: Map[String, String] = null

  override def processElement(value: LogVO, ctx: KeyedBroadcastProcessFunction[String, LogVO, Map[String, String], LogVO]#ReadOnlyContext, out: Collector[LogVO]): Unit = {
    var broadcastState: ReadOnlyBroadcastState[String, String] = ctx.getBroadcastState(demoDescriptor)
    val broadcastFlag = broadcastState.get("broadcastFlag")
    if (null == broadcastFlag || !"ready".equalsIgnoreCase(broadcastFlag)) {
      logger.info("==========> broadcast not ready")
      updateBackupMap()
      logger.info("==========> use backupMap")
      out.collect(value)
    } else {
      logger.info("==========> broadcast is ready")
      logger.info("==========> use broadcast")
      out.collect(value)
    }
  }

  override def processBroadcastElement(value: Map[String, String], ctx: KeyedBroadcastProcessFunction[String, LogVO, Map[String, String], LogVO]#Context, out: Collector[LogVO]): Unit = {
    var broadcastState: BroadcastState[String, String] = ctx.getBroadcastState(demoDescriptor)
    broadcastState.clear()

    for (entry <- value) {
      broadcastState.put(entry._1, entry._2)
    }

    // 加入校验位
    broadcastState.put("broadcastFlag", "ready")

    // 清除后备Map
    backupMap = null
  }

  def updateBackupMap(): Unit = {
    if (null == backupMap) {
      val connection: Connection = MysqlUtil.getConnectionRead
      val ps: PreparedStatement = connection.prepareStatement("SELECT uuid, id FROM demo")

      backupMap = Map[String, String]()
      MysqlUtil.execQuery(connection, ps, (rs: ResultSet) => {
        backupMap += (rs.getString("uuid") -> rs.getString("id"))
      })

      MysqlUtil.releaseResource(ps)
      MysqlUtil.releaseResource(connection)
    }
  }
}

2.4 测试

==========> broadcast not ready
==========> query db for init ...
==========> use backupMap
2> {"timestamp":"1578907111","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast not ready
==========> use backupMap
2> {"timestamp":"1578907222","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
2> {"timestamp":"1578907333","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
2> {"timestamp":"1578907444","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
1> {"timestamp":"1578983555","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
1> {"timestamp":"1578983666","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> broadcast is ready
==========> use broadcast
1> {"timestamp":"1578983777","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}
==========> query db for update every 15min ...
==========> broadcast: ready
==========> use broadcast
1> {"timestamp":"1578983888","logType":"30","dbName":"demo","version":"1.0.0","distribution":"0","distributionType":"0"}

从测试结果可以看出:

  1. 每隔一段时间查询一次数据作为配置信息
  2. 每次的查询结果都会广播到各个Slot上,而不是每个Slot上都查询一次数据库

注意:
在 DemoBroadcastFunction 中,加入了 backupMap,用于在未获取到广播变量中的配置信息时,自行查询数据库获取配置信息。

这样做的原因:
程序刚启动时,没有先调用 processBroadcastElement 方法加载广播变量,从而导致 processElement 方法中获取到的广播变量为空,这样会导致刚开始处理的数据获取不到广播变量,因此必须自行查询数据库获取配置信息。

上一篇下一篇

猜你喜欢

热点阅读