累加器应用-日志采集

2020-05-02  本文已影响0人  ryancao_b9b9

一、需求背景
在Driver端统一采集各Executor输出的日志信息(后续将汇聚至ES中)

二、软件版本(Spark 2.3)
Spark2.x之后,之前的的accumulator被废除,用AccumulatorV2代替;
累加器(accumulator):Accumulator是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它们可用于实现计数器(如MapReduce)或总和计数。
Accumulator是存在于Driver端的,Executor节点不断把值发到Driver端,在Driver端计数(Spark UI在SparkContext创建时被创建,即在Driver端被创建,因此它可以读取Accumulator的数值),存在于Driver端的一个值,从节点是读取不到的。
Spark提供的Accumulator主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,但是却给我们提供了多个task对于同一个变量并行操作的功能,但是task只能对Accumulator进行累加操作,不能读取它的值,只有Driver程序可以读取Accumulator的值。
自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。

注意:用累加器的过程中只能使用一次action的操作才能保证结果的准确性,否则,要使用cache,persist将之前的依赖截断。不然你action第二次,将会连着第一次的结果加上

三、AccumulatorV2接口梳理

isZero: 当AccumulatorV2中存在类似数据不存在这种问题时,是否结束程序。 
copy: 拷贝一个新的AccumulatorV2 
reset: 重置AccumulatorV2中的数据 
add: 操作数据累加方法实现 
merge: 合并数据 
value: AccumulatorV2对外访问的数据结果

四、自定义累加器实现部分代码
1、LogAccumulator

class LogAccumulator extends AccumulatorV2[LogKey, ConcurrentLinkedQueue[String]] {

  // 用于限定日志最少保存量,防止当日志量达到maxLogSize时频繁的进行clear操作
  private lazy val minLogSize = PropUtils.getInt(PropKeys.SPARK_FIRE_ACC_LOG_MIN_SIZE, DefaultVals.minLogSize).abs
  // 用于限定日志最大保存量,防止日志量过大,撑爆driver
  private lazy val maxLogSize = PropUtils.getInt(PropKeys.SPARK_FIRE_ACC_LOG_MAX_SIZE, DefaultVals.maxLogSize).abs
  // 用于存放日志的队列
  private val logQueue = new ConcurrentLinkedQueue[String]
  // 判断是否打开日志累加器
  private lazy val isEnable = PropUtils.getBoolean(PropKeys.SPARK_FIRE_ACC_ENABLE, true) && PropUtils.getBoolean(PropKeys.SPARK_FIRE_ACC_LOG_ENABLE, true)

  /**
    * 判断累加器是否为空
    */
  override def isZero: Boolean = this.logQueue.size() == 0

  /**
    * 用于复制累加器
    */
  override def copy(): AccumulatorV2[LogKey, ConcurrentLinkedQueue[String]] = new LogAccumulator

  /**
    * driver端执行有效,用于清空累加器
    */
  override def reset(): Unit = this.logQueue.clear

  /**
    * executor端执行,用于收集日志信息
    *
    * @param timeCost
    * 日志记录对象
    */
  override def add(logKey: LogKey): Unit = {
    if (this.isEnable && logKey != null) {
      this.logQueue.add(JSON.toJSONString(logKey, SerializerFeature.WriteNullStringAsEmpty))
      this.clear
    }
  }

  /**
    * executor端向driver端merge累加数据
    *
    * @param other
    * executor端累加结果
    */
  override def merge(other: AccumulatorV2[LogKey, ConcurrentLinkedQueue[String]]): Unit = {
    if (other != null && other.value.size() > 0) {
      this.logQueue.addAll(other.value)
      this.clear
    }
  }

  /**
    * driver端获取累加器的值
    *
    * @return
    * 收集到的日志信息
    */
  override def value: ConcurrentLinkedQueue[String] = this.logQueue

  /**
    * 当日志累积量超过maxLogSize所设定的值时清理过期的日志数据
    * 直到达到minLogSize所设定的最小值,防止频繁的进行清理
    */
  def clear: Unit = {
    if (this.logQueue.size() > this.maxLogSize) {
      while (this.logQueue.size() > this.minLogSize) {
        this.logQueue.poll
      }
    }
  }
}

2、LogKey

public class LogKey implements Serializable {
    private String id = UUID.randomUUID().toString();
    private String ip;
    private String load;
    private static String applicationId;
    private static String executorId;
    private Integer stageId;
    private Long taskId;
    private Integer partitionId;
    private String stackTraceInfo;
    private String level = "WARN";
    private Long start;
    private String startTime;
    private String endTime;
    ...
}
上一篇 下一篇

猜你喜欢

热点阅读