Flink维表关联系列2-全量加载方式

2019-11-06  本文已影响0人  LZhan

最近看了公众号[Flink 实战剖析]的部分文章,觉得其中维表关联系列的文章总结得挺全面,因此做一次搬运工,并进行一些总结。

1.前言

在维表关联时,定时全量加载的方式适用于:
维表数据量较少并且业务对于维表数据变化的敏感程度较低
注意点:
<1> 全量加载有可能会比较耗时,所以必须是一个异步加载过程
<2> 内存维表数据需要被流表数据关联读取、也需要被定时重新加载,这两个过程是不同线程执行,为了尽可能保证数据一致性,可使用原子引用变量包装内存维表数据对象即AtomicReference
<3> 查内存维表数据非异步io过程

2.代码示例
class SideFlatMapFunction extends RichFlatMapFunction[AdData, AdData] {

  private var sideInfo: AtomicReference[java.util.Map[Int, Int]] = _


  override def open(parameters: Configuration): Unit = {

    sideInfo = new AtomicReference[java.util.Map[Int, Int]]()

    sideInfo.set(loadData)

    val executors=Executors.newSingleThreadScheduledExecutor()

    executors.scheduleAtFixedRate(new Runnable {

      override def run(): Unit = reload()

    },5,5, TimeUnit.MINUTES)

  }


  override def flatMap(value: AdData, out: Collector[AdData]): Unit = {

    val tid=value.tId

    val aid=sideInfo.get().get(tid)

    var newV=AdData(aid,value.tId,value.clientId,value.actionType,value.time)

    out.collect(newV)

  }

  def reload()={

    try{

      println("do reload~")

      val newData=loadData()

      sideInfo.set(newData)

      println("reload ok~")

    }catch {

      case e:Exception=>{

        e.printStackTrace()

      }

    }

  }

  //连接数据库,查询出广告位和广告主的关系
  def loadData(): util.Map[Int, Int] = {

    val data = new util.HashMap[Int, Int]()

    Class.forName("com.mysql.jdbc.Driver")

    val con = DriverManager.getConnection("jdbc:mysql://localhost:3306/paul", "root", "123456")

    val sql = "select aid,tid from ads"

    val statement = con.prepareStatement(sql)

    val rs = statement.executeQuery()

    while (rs.next()) {

      val aid = rs.getInt("aid")

      val tid = rs.getInt("tid")

      data.put(tid, aid)

    }
    con.close()
    data
  }

}
上一篇下一篇

猜你喜欢

热点阅读