spark中位数计算,前向填充,后向填充

2020-05-05  本文已影响0人  达微
//中位数计算
def getMedian(data: DataFrame): java.util.Map[String, Double] = {

    val set = Set("byte", "short", "integer", "long", "double", "float")

    val fields: Array[String] = data.dtypes.map(r => (r._1, r._2.toLowerCase().replace("type", ""))).filter(r => set.contains(r._2)).map(_._1).toArray

    val median = data.stat.approxQuantile(fields, Array(0.5), 0.001).map(
      arr => {
        if (arr.isEmpty) {
          Double.NaN
        } else {
          arr.head
        }
      })
    val medianMap = (0 until fields.length).map(
      idx => {
        (fields(idx), median(idx))
      }
    ).toMap
    JavaConversions.mapAsJavaMap(medianMap)
  }
//前向填充
  val window = Window.partitionBy(idField).orderBy(timeField).rowsBetween(-1, 0)
            val filled = last(timeField, true).over(window)
            outputDF = outputDF.selectExpr(field1: _*).withColumn(rule.getField, filled)
//后向填充
    val window = Window.partitionBy(idField).orderBy(timeField).rowsBetween(0, 1)
            val filled = last(timeField, true).over(window)
            outputDF = outputDF.withColumn(rule.getField, filled)

字段统计

 val data: DataFrame = inputDF.select(columns: _*)

    val value: RDD[linalg.Vector] = data.rdd.map(row => {
      Vectors.dense(row.getValuesMap[Double](fields).values.toArray)
    })

    val stats = Statistics.colStats(value)
    val count = stats.count
    val nonZeros = stats.numNonzeros
    val mean = stats.mean
    val variance = stats.variance
    val min = stats.min
    val max = stats.max
    val normL1 = stats.normL1
    val normL2 = stats.normL2
//空值统计
  def getNullStatistics(data: DataFrame):
  java.util.HashMap[String, Long] = {
    val colMissing = new java.util.HashMap[String, Long] //缺失值
    val allColArr = data.columns
    val dtypes = data.dtypes.map(r => (r._1, r._2.toLowerCase().replace("type", "")))
    val colSize = allColArr.size
    val separator = "_0_" //分隔符
    val len = separator.length
    val rddHandle = data.rdd.map(row => {
      val str: StringBuilder = new StringBuilder
      for (i <- 0 to colSize - 1) {
        if (row.get(i) == null) {
          str.append(dtypes(i)._1 + ":" + separator)
        } else {
          str.append(dtypes(i)._1 + ":" + row.get(i) + separator)
        }
      }
      str.toString().substring(0, str.length - len)

    }).flatMap(_.split(separator)).map((_, 1L)).reduceByKey(_ + _)

    rddHandle.persist()

    for (col <- allColArr) {
      //logInfo("____******col:" + col)
      val colResult = rddHandle.filter(_._1.split(":", 2)(0).equals(col))
      val missingResult = colResult.filter(t => StringUtils.isEmpty(t._1.split(":", 2)(1)))

      if (missingResult.isEmpty()) {
        colMissing.put(col, 0)
      } else {
        colMissing.put(col, missingResult.first()._2)
      }

    }

    rddHandle.unpersist()

    colMissing
  }
  //0值统计
  def getZeroStatistics(data: DataFrame):
  java.util.HashMap[String, Long] = {
    val typeArray = Array("byte", "short", "int", "float", "double", "long")
    val colMissing = new java.util.HashMap[String, Long] //缺失值
    val allColArr = data.columns
    val dtypes = data.dtypes.map(r => (r._1, r._2.toLowerCase().replace("type", "")))
    val colSize = allColArr.size
    val separator = "_0_" //分隔符
    val len = separator.length
    //logError("fieldType:"+dtypes.toString)
    val rddHandle = data.rdd.map(row => {
      val str: StringBuilder = new StringBuilder
      for (i <- 0 to colSize - 1) {
        if (row.get(i) == null) {
          str.append(dtypes(i)._1 + ":1" + separator)
        } else if (typeArray.contains(dtypes(i)._2) && row.get(i).toString.toDouble == 0.0) {
          str.append(dtypes(i)._1 + ":" + separator) //0
        } else {
          str.append(dtypes(i)._1 + ":1" + separator)
        }

      }
      str.toString().substring(0, str.length - len)

    }).flatMap(_.split(separator)).map((_, 1L)).reduceByKey(_ + _)

    rddHandle.persist()

    for (col <- allColArr) {
      //logInfo("____******col:" + col)
      val colResult = rddHandle.filter(_._1.split(":", 2)(0).equals(col))
      val missingResult = colResult.filter(t => StringUtils.isEmpty(t._1.split(":", 2)(1)))

      if (missingResult.isEmpty()) {
        colMissing.put(col, 0)
      } else {
        colMissing.put(col, missingResult.first()._2)
      }

    }

    rddHandle.unpersist()

    colMissing
  }
上一篇 下一篇

猜你喜欢

热点阅读