大数据,机器学习,人工智能大数据 爬虫Python AI Sql玩转大数据

DataFrame与RDD的互操作

2018-03-05  本文已影响0人  sparkle123

DataFrame Interoperating with RDDs

参考官网
http://spark.apache.org/docs/2.2.0/sql-programming-guide.html#interoperating-with-rdds

DataFrameRDD互操作的两种方式比较:
1)反射推导式:case class 前提:事先需要知道字段、字段类型
2)编程式:Row 如果第一种情况不能满足要求(事先不知道列等schema信息)

  1. 选型:优先考虑第一种,使用简单

下面的代码演示了

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType,IntegerType}

object DataFrameRDDApp {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate();
    // Create an RDD of Person objects from a text file
    val testRDD = spark.sparkContext.textFile("C:\\Users\\Administrator\\IdeaProjects\\SparkSQLProject\\spark-warehouse\\test.txt")
    //inferReflection(spark,testRDD)

    program(spark,testRDD)

    spark.stop();

  }

  def inferReflection(spark: SparkSession,testRDD: RDD[String]): Unit = {

    // RDD ==> DataFrame

    // For implicit conversions from RDDs to DataFrames
    import spark.implicits._
    val infoDF = testRDD.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF();

    infoDF.show();

    infoDF.filter(infoDF.col("age") > 30).show

    // Register the DataFrame as a temporary view
    infoDF.createOrReplaceTempView("infos")

    // SQL statements can be run by using the sql methods provided by Spark
    spark.sql("select * from infos where age > 30").show();
  }

  def program(spark:SparkSession,testRDD: RDD[String]): Unit = {

    // The schema is encoded in a string
    val schemaString = "id name age"
    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,nullable = true))

    val schema = StructType(fields)


    val structType = StructType(Array(StructField("id",IntegerType,true),
      StructField("name",StringType,true),
      StructField("age",IntegerType,true)))

    // Convert records of the RDD (people) to Rows
    val rowRDD = testRDD.map(_.split(","))
      .map(attributes => Row(attributes(0),attributes(1).trim,attributes(2)))

    val infoDF = spark.createDataFrame(rowRDD,schema)

    infoDF.printSchema()
    infoDF.show()

    infoDF.filter(infoDF.col("age") > 30).show
    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()

  }

  case class Info(id: Int, name: String, age: Int)

}

查看源码,发现里面的注释写的挺好。


SparkSession源码
上一篇下一篇

猜你喜欢

热点阅读