如何向Spark Dataframe 添加一列带有唯一id的列

2017-07-26  本文已影响0人  zy_now

这个有两种方法
1 使用zipWithUniqueId获取id 并重建 DataFrame.

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
import spark.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df =Seq(("a", -1.0), ("b", -2.0), ("c", -3.0)).toDF("foo", "bar")
// 获取df 的表头
val s = df.schema

// 将原表转换成带有rdd,
//再转换成带有id的rdd,
//再展开成Seq方便转化成 Dataframe
val rows = df.rdd.zipWithUniqueId.map{case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

// 再由 row 根据原表头进行转换
val dfWithPK = spark.createDataFrame( rows, StructType(StructField("id", LongType, false) +: s.fields))

2 直接使用spark 自己的api,monotonicallyIncreasingId
这个id虽然是唯一的,但是不能从零开始,也不是顺序排列,可以简单理解为是随机产生的标识码

import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// |foo| bar|         id|
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
上一篇下一篇

猜你喜欢

热点阅读