Big Data

Spark SQL概述

2018-08-06  本文已影响42人  盗梦者_56f2

简介

Spark SQL 是 Spark 处理结构化数据的一个模块。Spark SQL 提供了查询结构化数据及计算结果等信息的接口。一个 Dataset 是一个分布式的数据集合 ,Dataset API 在Scala和Java是可用的,Python 不支持 Dataset API。一个 DataFrame 是一个 Dataset 组成的指定列.它的概念与一个在关系型数据库或者在 R/Python 中的表是相等的。

创建SparkSession

Spark SQL中所有功能的入口点是 SparkSession类.

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder()
  .appName("name")
  .config("key", "value")
  .getOrCreate()
#支持hive
val spark = SparkSession
  .builder()
  .appName("Spark Hive Example")
  .config("spark.sql.warehouse.dir", "path")
  .enableHiveSupport()
  .getOrCreate()

创建DataFrame

val df = spark.read.json("people.json")
val usersDF = spark.read.load("users.parquet")
val peopleDF = spark.read.format("json").load("people.json")
val parquetFileDF = spark.read.parquet("people.parquet")
val jdbcDF = spark.read
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .load()
#写数据
jdbcDF.write
  .format("jdbc")
  .option("url", "jdbc:postgresql:dbserver")
  .option("dbtable", "schema.tablename")
  .option("user", "username")
  .option("password", "password")
  .save()

创建DataSet

case class Person(name: String, age: Long)
val caseClassDS = Seq(Person("Andy", 32)).toDS()
val peopleDS = spark.read.json(path).as[Person]#dataframe转dataset

RDD转DataFrame

import spark.implicits._
#第一种方式
val peopleDF = spark.sparkContext
  .textFile("people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
#第二种方式
val rowRDD =spark.sparkContext
  .textFile("people.txt")
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))
val schema = StructType(Array(
  StructField("name", StringType, nullable = true),
  StructField("age", StringType, nullable = true)
))
val peopleDF = spark.createDataFrame(rowRDD, schema)

聚合

#第一种方式,继承UserDefinedAggregateFunction抽象类
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object MyAverage extends UserDefinedAggregateFunction {
  // Data types of input arguments of this aggregate function
  def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
  // Data types of values in the aggregation buffer
  def bufferSchema: StructType = {
    StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
  }
  // The data type of the returned value
  def dataType: DataType = DoubleType
  // Whether this function always returns the same output on the identical input
  def deterministic: Boolean = true
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = 0L
    buffer(1) = 0L
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getLong(0)
      buffer(1) = buffer.getLong(1) + 1
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
  // Calculates the final result
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
spark.udf.register("myAverage", MyAverage)
#第二种方式,继承Aggregator抽象类
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.SparkSession

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

object MyAverage extends Aggregator[Employee, Average, Double] {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  def zero: Average = Average(0L, 0L)
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  def reduce(buffer: Average, employee: Employee): Average = {
    buffer.sum += employee.salary
    buffer.count += 1
    buffer
  }
  // Merge two intermediate values
  def merge(b1: Average, b2: Average): Average = {
    b1.sum += b2.sum
    b1.count += b2.count
    b1
  }
  // Transform the output of the reduction
  def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
  // Specifies the Encoder for the intermediate value type
  def bufferEncoder: Encoder[Average] = Encoders.product
  // Specifies the Encoder for the final output value type
  def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
上一篇下一篇

猜你喜欢

热点阅读