Spark中使用Scala synchronized 并发加锁创
2020-10-30 本文已影响0人
xiaogp
摘要:Spark
,Scala
,synchronized
executor端共享变量
Spark的rdd调用map
或者foreach
,mapPartition
,foreachPartition
等方法时,每一个元素或者分区都会并发调用其他方法进行操作,如果其中涉及比如新建一个内存对象,新建一个数据库连接等操作则需要频繁创建对象增加不必要的开销,此时需要将这些对象处理成单例
,让程序仅在内存中创建一个对象,executor中所有partition共享
这些对象。
synchronized关键字
Scala中的synchronized
可以对代码块或者方法使用,使得每次只能有一个线程访问,当一个线程获取了锁,其他线程在队列上等待。
在SiteTypeUtils单例对象中创建whiteSiteList属性初始值为null,synchronized
加锁双重校验
whiteSiteList是否为null,加锁后并发调用的线程排队进入
,第一个线程进入后读取mysql数据实例化whiteSiteList,第二个线程进入再次判断whiteSiteList,此时whiteSiteList不为null此后所有进入锁的线程不会在内存中再次读取mysql创建whiteSiteList。下一轮并发过来先判断whiteSiteList已经不为null,不需要再去走加锁的代码段,一开始就判断null的目的是不需要每次都去获取锁,提升性能。
object SiteTypeUtils {
var whiteSiteList: List[String] = _
def getSiteType(configProperties: Properties, url: String): String = {
if (whiteSiteList == null) {
this.synchronized {
if (whiteSiteList == null) {
println("---------------------初始化读取白名单mysql数据---------------------")
whiteSiteList = getSiteUrlStatus(configProperties, "select url from dt_white_site where status = 'white'")
println("white数量:", whiteSiteList.size)
}
}
}
}
def main(args: Array[String]): Unit = {
}
}
观察Spark打印的日志,发现在每个executor中只读取了一次whiteSiteList

如果不使用synchronized双重校验,会在Spark第一次并发调用这个方法时读取多次mysql和重复创建whiteSiteList,日志如下
