2020-12-17-Spark-25(序列化问题)

2020-12-26  本文已影响0人  冰菓_

1.spark运行结构

1.application 包括driver和分布在不同集群不同节点上的executor代码
2.driver指的是application中的main函数并且创建的sparkcontext,sparkcontext作用是创建运行环境
spark的运行环境是什么?
一个CoarseGrainedExecutorBackend只有一个executor
spark on yarn

image.png

2.序列化问题(闭包问题)与线程安全问题

在driver端初始化了一个object或class实例,要在executor运行,必须实现序列化接口
如果实例是object类型,则每个executor共享一个,如果是class类型,及new了一个实例,则一个task一个实例

在函数中初始化实例,如果是单例的,则一个进程(executor)只有一个实例,如果是class类型,看调用的算子,如果是map,则没来一条数据就new 一个,如果是mappartition,则一个分区一个实例

多个线程共用一个变量,会出现线程安全问题,例如:

object DateUntil {
  private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  def parse(st: String): Long = {
    val date = format.parse(st)
    date.getTime
  }
}

加锁改进:

object DateUntil {
  private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  def parse(st: String): Long = synchronized {
    val date = format.parse(st)
    date.getTime
  }
}

效果不好,改进:mappartition算子,一个分区一个simpledateformat

   rdd.mapPartitions(data => {
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      data.map(txt => {
        val date = format.parse(txt)
        date.getTime
      })
    })

3.shuffle

image.png

在join之前调用groupbykey,下游的数据明确知道要拉取的数据的分区,就没有shuffle,就无需划分stage,就是窄依赖.

image.png
上一篇 下一篇

猜你喜欢

热点阅读