Flink维表关联系列2-全量加载方式
2019-11-06 本文已影响0人
LZhan
最近看了公众号[Flink 实战剖析]的部分文章,觉得其中维表关联系列的文章总结得挺全面,因此做一次搬运工,并进行一些总结。
1.前言
在维表关联时,定时全量加载的方式适用于:
维表数据量较少并且业务对于维表数据变化的敏感程度较低
注意点:
<1> 全量加载有可能会比较耗时,所以必须是一个异步加载过程
<2> 内存维表数据需要被流表数据关联读取、也需要被定时重新加载,这两个过程是不同线程执行,为了尽可能保证数据一致性,可使用原子引用变量包装内存维表数据对象即AtomicReference
<3> 查内存维表数据非异步io过程
2.代码示例
class SideFlatMapFunction extends RichFlatMapFunction[AdData, AdData] {
private var sideInfo: AtomicReference[java.util.Map[Int, Int]] = _
override def open(parameters: Configuration): Unit = {
sideInfo = new AtomicReference[java.util.Map[Int, Int]]()
sideInfo.set(loadData)
val executors=Executors.newSingleThreadScheduledExecutor()
executors.scheduleAtFixedRate(new Runnable {
override def run(): Unit = reload()
},5,5, TimeUnit.MINUTES)
}
override def flatMap(value: AdData, out: Collector[AdData]): Unit = {
val tid=value.tId
val aid=sideInfo.get().get(tid)
var newV=AdData(aid,value.tId,value.clientId,value.actionType,value.time)
out.collect(newV)
}
def reload()={
try{
println("do reload~")
val newData=loadData()
sideInfo.set(newData)
println("reload ok~")
}catch {
case e:Exception=>{
e.printStackTrace()
}
}
}
//连接数据库,查询出广告位和广告主的关系
def loadData(): util.Map[Int, Int] = {
val data = new util.HashMap[Int, Int]()
Class.forName("com.mysql.jdbc.Driver")
val con = DriverManager.getConnection("jdbc:mysql://localhost:3306/paul", "root", "123456")
val sql = "select aid,tid from ads"
val statement = con.prepareStatement(sql)
val rs = statement.executeQuery()
while (rs.next()) {
val aid = rs.getInt("aid")
val tid = rs.getInt("tid")
data.put(tid, aid)
}
con.close()
data
}
}