大数据学习Scala学习

Spark中使用Scala synchronized 并发加锁创

2020-10-30  本文已影响0人  xiaogp

摘要:SparkScalasynchronized

executor端共享变量

Spark的rdd调用map或者foreachmapPartitionforeachPartition等方法时,每一个元素或者分区都会并发调用其他方法进行操作,如果其中涉及比如新建一个内存对象,新建一个数据库连接等操作则需要频繁创建对象增加不必要的开销,此时需要将这些对象处理成单例,让程序仅在内存中创建一个对象,executor中所有partition共享这些对象。

synchronized关键字

Scala中的synchronized可以对代码块或者方法使用,使得每次只能有一个线程访问,当一个线程获取了锁,其他线程在队列上等待。
在SiteTypeUtils单例对象中创建whiteSiteList属性初始值为null,synchronized加锁双重校验whiteSiteList是否为null,加锁后并发调用的线程排队进入,第一个线程进入后读取mysql数据实例化whiteSiteList,第二个线程进入再次判断whiteSiteList,此时whiteSiteList不为null此后所有进入锁的线程不会在内存中再次读取mysql创建whiteSiteList。下一轮并发过来先判断whiteSiteList已经不为null,不需要再去走加锁的代码段,一开始就判断null的目的是不需要每次都去获取锁,提升性能。

object SiteTypeUtils {
  var whiteSiteList: List[String] = _

  def getSiteType(configProperties: Properties, url: String): String = {
      if (whiteSiteList == null) {
        this.synchronized {
          if (whiteSiteList == null) {
            println("---------------------初始化读取白名单mysql数据---------------------")
            whiteSiteList = getSiteUrlStatus(configProperties, "select url from dt_white_site where status = 'white'")
            println("white数量:", whiteSiteList.size)
          }
        }
      }
    }

  def main(args: Array[String]): Unit = {
  }
}

观察Spark打印的日志,发现在每个executor中只读取了一次whiteSiteList


spark日志1.png

如果不使用synchronized双重校验,会在Spark第一次并发调用这个方法时读取多次mysql和重复创建whiteSiteList,日志如下


spark日志2.png
上一篇 下一篇

猜你喜欢

热点阅读