spark中位数计算,前向填充,后向填充
2020-05-05 本文已影响0人
达微
//中位数计算
def getMedian(data: DataFrame): java.util.Map[String, Double] = {
val set = Set("byte", "short", "integer", "long", "double", "float")
val fields: Array[String] = data.dtypes.map(r => (r._1, r._2.toLowerCase().replace("type", ""))).filter(r => set.contains(r._2)).map(_._1).toArray
val median = data.stat.approxQuantile(fields, Array(0.5), 0.001).map(
arr => {
if (arr.isEmpty) {
Double.NaN
} else {
arr.head
}
})
val medianMap = (0 until fields.length).map(
idx => {
(fields(idx), median(idx))
}
).toMap
JavaConversions.mapAsJavaMap(medianMap)
}
//前向填充
val window = Window.partitionBy(idField).orderBy(timeField).rowsBetween(-1, 0)
val filled = last(timeField, true).over(window)
outputDF = outputDF.selectExpr(field1: _*).withColumn(rule.getField, filled)
//后向填充
val window = Window.partitionBy(idField).orderBy(timeField).rowsBetween(0, 1)
val filled = last(timeField, true).over(window)
outputDF = outputDF.withColumn(rule.getField, filled)
字段统计
val data: DataFrame = inputDF.select(columns: _*)
val value: RDD[linalg.Vector] = data.rdd.map(row => {
Vectors.dense(row.getValuesMap[Double](fields).values.toArray)
})
val stats = Statistics.colStats(value)
val count = stats.count
val nonZeros = stats.numNonzeros
val mean = stats.mean
val variance = stats.variance
val min = stats.min
val max = stats.max
val normL1 = stats.normL1
val normL2 = stats.normL2
//空值统计
def getNullStatistics(data: DataFrame):
java.util.HashMap[String, Long] = {
val colMissing = new java.util.HashMap[String, Long] //缺失值
val allColArr = data.columns
val dtypes = data.dtypes.map(r => (r._1, r._2.toLowerCase().replace("type", "")))
val colSize = allColArr.size
val separator = "_0_" //分隔符
val len = separator.length
val rddHandle = data.rdd.map(row => {
val str: StringBuilder = new StringBuilder
for (i <- 0 to colSize - 1) {
if (row.get(i) == null) {
str.append(dtypes(i)._1 + ":" + separator)
} else {
str.append(dtypes(i)._1 + ":" + row.get(i) + separator)
}
}
str.toString().substring(0, str.length - len)
}).flatMap(_.split(separator)).map((_, 1L)).reduceByKey(_ + _)
rddHandle.persist()
for (col <- allColArr) {
//logInfo("____******col:" + col)
val colResult = rddHandle.filter(_._1.split(":", 2)(0).equals(col))
val missingResult = colResult.filter(t => StringUtils.isEmpty(t._1.split(":", 2)(1)))
if (missingResult.isEmpty()) {
colMissing.put(col, 0)
} else {
colMissing.put(col, missingResult.first()._2)
}
}
rddHandle.unpersist()
colMissing
}
//0值统计
def getZeroStatistics(data: DataFrame):
java.util.HashMap[String, Long] = {
val typeArray = Array("byte", "short", "int", "float", "double", "long")
val colMissing = new java.util.HashMap[String, Long] //缺失值
val allColArr = data.columns
val dtypes = data.dtypes.map(r => (r._1, r._2.toLowerCase().replace("type", "")))
val colSize = allColArr.size
val separator = "_0_" //分隔符
val len = separator.length
//logError("fieldType:"+dtypes.toString)
val rddHandle = data.rdd.map(row => {
val str: StringBuilder = new StringBuilder
for (i <- 0 to colSize - 1) {
if (row.get(i) == null) {
str.append(dtypes(i)._1 + ":1" + separator)
} else if (typeArray.contains(dtypes(i)._2) && row.get(i).toString.toDouble == 0.0) {
str.append(dtypes(i)._1 + ":" + separator) //0
} else {
str.append(dtypes(i)._1 + ":1" + separator)
}
}
str.toString().substring(0, str.length - len)
}).flatMap(_.split(separator)).map((_, 1L)).reduceByKey(_ + _)
rddHandle.persist()
for (col <- allColArr) {
//logInfo("____******col:" + col)
val colResult = rddHandle.filter(_._1.split(":", 2)(0).equals(col))
val missingResult = colResult.filter(t => StringUtils.isEmpty(t._1.split(":", 2)(1)))
if (missingResult.isEmpty()) {
colMissing.put(col, 0)
} else {
colMissing.put(col, missingResult.first()._2)
}
}
rddHandle.unpersist()
colMissing
}