Hadoop运维日常

Spark运维

2019-06-14  本文已影响0人  Mr_Qifei

spark on yarn运维

场景:对于集群故障和集群配置调整可能对spark实时任务造成的影响及所需采取措施进行记录,并制定对应解决方案。
方案总结:针对下文总结的实时任务存在的隐患,可对hadoop集群配置进行以下调整优化(优化方案需先在测试集群上测试):

一、磁盘故障对spark任务的影响

1、如果是application master所在机器磁盘发生故障:

由于目前yarn参数默认配置,am进程只有一次容错重启的机会,新am进程再出问题则spark任务会down掉。

2、如果是普通的executor所在机器磁盘发生故障:

同样取决于该executor是否用到该磁盘存储数据,如果没用到则无影响。
如果用到该磁盘,则该节点executor日志持续报错,但executor进程还在;spark任务其他节点上executor进程仍正常执行。

针对磁盘故障问题的可用措施:由于目前已经对磁盘健康状态做了监控,但是由于spark任务通常需要长期运行,不可避免会遇到磁盘故障等硬件或者其他问题,可考虑将am容错重试进行调整,增加am容错次数。

二、NodeManager进程故障对spark任务的影响

1、application master所在nm进程故障:

2、executor所在NM进程故障:

三、DataNode进程重启对spark任务的影响

四、ResourceManager重启对spark任务影响

场景:如果增加RM HA配置,那么修改完配置需要重启rm和nm进程。此时对线上执行的任务有何影响?

目前如果重启ResourceManager进程,线上spark不可避免将被影响。

参考建议:修改yarn配置,存储RM、NM进程运行时状态等,重启时减小对线上任务产生的影响。



开发及运维

1、spark实时任务运维

如遇yarn集群故障,集群恢复后一般重启spark任务即可。

2、Spark任务开发注意事项及优化点

  1. 调整spark任务的资源配置并启动时(如executor数量、core数量等),需先将checkpoint删除,否则任务启动后仍会按照旧的资源配置申请资源。
  2. 复用RDD、使用高性能算子、广播大变量、RDD持久化…
  3. 使用高性能序列化框架Kryo优化序列化性能,比spark默认java序列化机制性能高10倍。
  4. 分配合理的资源参数:num-executors、executor-memory、executor-cores、spark.default.parallelism、spark.storage.memoryFraction、spark.shuffle.memoryFraction……
  5. spark任务数据倾斜优化、shuffle参数优化……

具体优化细节不展开记录,可参考网上spark优化相关文章:https://www.jianshu.com/p/67606a11415b

3、问题记录

spark streaming重启时候,如果不清空checkpoint启动就会报null point的错误,并启动失败。

原因:这个实时任务用到Spark SQL中的sparksession来操作sql, 但是sparksession是不能被序列化的,所以不能在checkpoint中保存。重启后操作sql时候会因为sparksession在checkpoint反序列为空而报错,所以需要sparksession在foreachRdd中定义,每次执行都去重新获取。

解决:

 lines.foreachRDD(rdd => {
      //获取单实例SparkSession
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._
      
      ……
 }

再额外定义一个获取sparksession的类。

/** Lazily instantiated singleton instance of SparkSession */
object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}
上一篇 下一篇

猜你喜欢

热点阅读