spark convert RDD[Map] to DataFr

2017-10-12  本文已影响642人  breeze_lsw

将RDD[Map[String,String]] 转化为展平 DataFrame,类似于pyspark 中 dict 结构toDF的效果。

input

val mapRDD: RDD[Map[String, String]] = sc.parallelize(Seq(
   Map("name" -> "zhangsan", "age" -> "18", "addr" -> "bj"),
   Map("name" -> "lisi", "age" -> "20", "addr" -> "hz"),
))

output

name     age addr
zhangsan 18  bj
lisi     20  hz

1. Map中元素固定

每个 Map 只有三个元素的情况下

val columns=mapRDD.take(1).flatMap(_.keys)

val resultantDF=mapRDD.filter(_.nonEmpty).map{m=>
      val seq=m.values.toSeq
      (seq(0),seq(1),seq(2))
      }.toDF(columns:_*)

resultantDF.show()

2. Map中元素不固定
RDD[Map[String,String]] -> RDD[Row] -> DataFrame

  def map2DF(spark: SparkSession, rdd: RDD[Map[String, String]]): DataFrame = {
    val cols = rdd.take(1).flatMap(_.keys)
    val resRDD = rdd.filter(_.nonEmpty).map { m =>
      val seq = m.values.toSeq
      Row.fromSeq(seq)
    }

    val fields = cols.map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

    spark.createDataFrame(resRDD, schema)
  }
上一篇下一篇

猜你喜欢

热点阅读