Spark权威指南读书笔记(二):结构化API

2020-11-28  本文已影响0人  kaiker

第四章 结构化API概述

结构化API主要指三种核心分布式集合类型API:Dataset、DataFrame、SQL表和视图

DataFrame和Dataset类型

Schema

定义了DataFrame的列名和类型

两者比较

结构化API执行概述

Catalyst优化器
逻辑计划
逻辑计划
物理计划
物理计划

第五章 基本的结构化操作

主要是针对DataFrame的操作

模式

schema 文件的结构可以通过schema指定,也可查看schema

val mySchema = StructType(Array(
StructFiled(名称,类型,是否为空),
StructFiled(名称,类型,是否为空),
))
 val df = spark.read.format("json").schema(mySchema).load("xxx")

列和表达式

df.col("count")
col("xxx")
column("xxx")

表达式

记录和行

DataFrame转换操作

df.createOrReplaceTempView

df.select(col(xxx)) df.selectExpr(xxx)

df.withColumn("列名", 表达式)

df.filter(col("count") < 2)

df.select("xxx").distinct().count()

df.union(otherdf)

df.sort("xxx")
df.orderBy(expr("count desc"))

df.repartition(4)

第六章 处理不同类型的数据

布尔

val priceFilter = col("unitPrice") > 600
df.withColum("isExpensive",priceFilter) // 创建一个列判断价格是否大于600

字符串

import org.apache.spark.sql.functions.regexp_replace
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
// the | signifies `OR` in regular expression syntax
df.select(
  regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
  col("Description")).show(2)

空值

df.na.drop()
df.na.fill("填的值", Seq("列名1", "列名2"))

用户自定义函数

val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)
  1. 当使用Scala Java的UDF时,JVM中运行,可能导致性能下降(GC)
  2. 使用Python写的UDF,Spark在worker上启动一个Python进程,将所有数据序列化为Python可解释的格式,在Python进程中对该数据进行执行函数,最终将结果返回JVM和Spark。这会有两个问题:计算安规,进入Python进程后,Spark无法管理workjer内存
UDF

第七章 聚合操作

每个分组操作都会返回RelationGroupedDataset,基于它来进行聚合操作

聚合操作

df.select(count("xxx"))

df.select(countDistinct("xxx"))

df.select(approx_count_distinct("xxx",0.1))

df.select(first("xxx"))

分组

df.groupBy("xxx")

df.groupBy("xxxx").agg(count("xxx"))

df.groupBy("xxx").agg("colname"->"avg","colname2"->"聚合操作")

window函数

窗口函数原理
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
// 每个用户id是一个分区,内部的quantity降序排序
val windowSpec = Window
  .partitionBy("CustomerId", "date")
  .orderBy(col("Quantity").desc)
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

自定义聚合函数UDAF

类似于hive的udaf

import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// 返回是不是全是ture
class BoolAnd extends UserDefinedAggregateFunction {
  def inputSchema: org.apache.spark.sql.types.StructType =
    StructType(StructField("value", BooleanType) :: Nil)
  def bufferSchema: StructType = StructType(
    StructField("result", BooleanType) :: Nil
  )
  def dataType: DataType = BooleanType
  def deterministic: Boolean = true
  def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = true
  }
  def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
  }
  def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
  }
  def evaluate(buffer: Row): Any = {
    buffer(0)
  }
}

第八章 连接操作

基本的链接操作

内、外、左外、右外、左半(left semi)、左反(left anti)、笛卡尔

val joinExpression = person.col("xxx") = otherdf.col("xxx")

joinType = "outer|left_outer|right_outer|left_sermi|left_anti|cross"

person.join(otherdf, joinExpression, joinType).show()

常见问题

复杂类型连接

可以自定返回Boolean的joinExpression来完成复杂类型连接

import org.apache.spark.sql.functions.expr
person.withColumnRenamed("id", "personId")
  .join(sparkStatus, expr("array_contains(spark_status, id)")).show()
重复列名

Spark如何进行连接

spark以两种不同的方式处理集群通信问题,要么执行all-to-all通信的shuffle join,要么执行广播join

shuffle join

每个节点都与其他所有节点进行通信,对网络传输有一定要求


shuffle join
broadcast join

当表的大小足够下能够放入当个节点内存还有空余的时候,可以用。
在开始之前会有一次大规模通信,分发这个表,通信结束之后节点间不再有通信。
需要注意这个会先给driver,driver的空间也有够,还要注意第一次大规模通信的timeout


broadcast join

第九章 数据源

数据源API结构

Read API
spark.read.format("格式")
.option("配置key", "配置value")
.schema(xxx)
.load()
Write API
spark.writer.format("格式").
.option("配置key", "配置value")
//.partuitionBy()
//.sortBy()
//.bucketBy()
.save()

Parquet & ORC

都是压缩格式
但书里提到Parquet针对Spark进行优化 ORC针对hive进行优化

写入SQL数据库

val driver =  "org.sqlite.JDBC"
val path = "/data/flight-data/jdbc/my-sqlite.db"
val url = s"jdbc:sqlite:/${path}"
val tablename = "flight_info"
val dbDataFrame = spark.read.format("jdbc").option("url", url)
  .option("dbtable", tablename).option("driver",  driver).load()

高级IO概念

val numberBuckets = 10
val columnToBucketBy = "count"

csvFile.write.format("parquet").mode("overwrite")
  .bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")

第十章 SparkSQL

Spark和Hive的关系

Spark SQL可以与Hive metastores链接。 Hive metastore维护了Hive跨回话数据表的信息

如何运行Spark SQL查询

Spark可编程SQL接口

spark.sql("select 1+1").show()
这会返回一个DataFrame

Catalog

Spark SQL中最高级别的抽象是Catalog,用于存储用户数据中的元数据以及数据库、数据表、函数、视图等有用的东西

托管表和非托管表

第十一章 Dataset

何时使用Dataset

创建Dataset

case class Flight(DEST_COUNTRY_NAME: String,
                  ORIGIN_COUNTRY_NAME: String, count: BigInt)

val flightsDF = spark.read.parquet("F:/spark-3.0.1-bin-hadoop2.7/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
上一篇 下一篇

猜你喜欢

热点阅读