
Spark MLlib Basic Statistics

Calculating the correlation between two series of data is a common operation in Statistics. In spark.ml we provide the flexibility to calculate pairwise correlations among many series. The supported correlation methods are currently Pearson’s and Spearman’s correlation.


import org.apache.spark.ml.linalg.{Matrix, Vectors}
import org.apache.spark.ml.stat.Correlation
import org.apache.spark.sql.{Row, SparkSession}

 * 相关性

object CorrelationExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("CorrelationExample").master("local[16]").getOrCreate()
    import spark.implicits._
     * 1.0  0.0 0.0 -2.0
     * 4.0  5.0 0.0 3.0
     * 6.0  7.0 0.0 8.0
     * 9.0  0.0 0.0 1.0
    // Correlation

    val data = Seq(
      //稀疏向量v[0]=1.0 v[3]=-2.0
      Vectors.sparse(4, Seq((0, 1.0), (3, -2.0))),
      Vectors.dense(4.0, 5.0, 0.0, 3.0),
      Vectors.dense(6.0, 7.0, 0.0, 8.0),
      Vectors.sparse(4, Seq((0, 9.0), (3, 1.0)))

    val df = data.map(Tuple1.apply).toDF("features")

    val Row(coeff1: Matrix) = Correlation.corr(df, "features").head
    println(s"Pearson correlation matrix:\n $coeff1")

    val Row(coeff2: Matrix) = Correlation.corr(df, "features", "spearman").head
    println(s"Spearman correlation matrix:\n $coeff2")




Pearson correlation matrix:
1.0                   0.055641488407465814  NaN  0.4004714203168137  
0.055641488407465814  1.0                   NaN  0.9135958615342522  
NaN                   NaN                   1.0  NaN                 
0.4004714203168137    0.9135958615342522    NaN  1.0    


Spearman correlation matrix:
1.0                  0.10540925533894532  NaN  0.40000000000000174  
0.10540925533894532  1.0                  NaN  0.9486832980505141   
NaN                  NaN                  1.0  NaN                  
0.40000000000000174  0.9486832980505141   NaN  1.0      

输出结果是一个矩阵M,M[i][j]表示第i个向量与第j个向量的相关系数, 以皮尔逊相关系数的输出矩阵为例,第一行

1.0                   0.055641488407465814  NaN  0.4004714203168137


Hypothesis testing

Hypothesis testing is a powerful tool in statistics to determine whether a result is statistically significant, whether this result occurred by chance or not. spark.ml currently supports Pearson’s Chi-squared ( χ2) tests for independence.

ChiSquareTest conducts Pearson’s independence test for every feature against the label. For each feature, the (feature, label) pairs are converted into a contingency matrix for which the Chi-squared statistic is computed. All label and feature values must be categorical.



import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.stat.ChiSquareTest
import org.apache.spark.sql.SparkSession

 An example for Chi-square hypothesis testing.
 * Run with
 * 皮尔逊卡方检验
object ChiSquareTestExample {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("ChiSquareTestExample").master("local[16]").getOrCreate()
    import spark.implicits._

    val data = Seq(
      (0.0, Vectors.dense(0.5, 10.0)),
      (0.0, Vectors.dense(1.5, 20.0)),
      (1.0, Vectors.dense(1.5, 30.0)),
      (0.0, Vectors.dense(3.5, 30.0)),
      (0.0, Vectors.dense(3.5, 40.0)),
      (1.0, Vectors.dense(3.5, 40.0))

    val df = data.toDF("label", "features")
    val chi = ChiSquareTest.test(df, "features", "label").head
    println(s"pValues = ${chi.getAs[Vector](0)}")
    println(s"degreesOfFreedom ${chi.getSeq[Int](1).mkString("[", ",", "]")}")
    println(s"statistics ${chi.getAs[Vector](2)}")
We provide vector column summary statistics for Dataframe through Summarizer. Available metrics are the column-wise max, min, mean, variance, and number of nonzeros, as well as the total count.

import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.stat.Summarizer
import org.apache.spark.sql.SparkSession

object SummarizerExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("ChiSquareTestExample").master("local[16]").getOrCreate()

    import Summarizer._
    import spark.implicits._

    val data = Seq(
      (Vectors.dense(2.0, 3.0, 5.0), 1.0),
      (Vectors.dense(4.0, 6.0, 7.0), 2.0)

    val df = data.toDF("features", "weight")

    val (meanVal, varianceVal) = df.select(metrics("mean", "variance")
      .summary($"features", $"weight").as("summary"))
      .select("summary.mean", "summary.variance")
      .as[(Vector, Vector)].first()

    println(s"with weight: mean = ${meanVal}, variance = ${varianceVal}")

    val (meanVal2, varianceVal2) = df.select(mean($"features"), variance($"features"))
      .as[(Vector, Vector)].first()

    println(s"without weight: mean = ${meanVal2}, sum = ${varianceVal2}")
