Spark

Learning Spark [2] - Spark API结构

2021-01-07  本文已影响0人  屹然1ran

RDD

RDD是spark最基础的抽象类

其拥有以下几个特点:

Dependencies提供了RDD的结构性,例如需要重新输出结果,Spark就可以使用Denpendecies重新创建RDD,来复制一个运行。这个特性使得RDD更加灵活。
Partitions是的Spark可以将一个任务拆分开,使用不同的Executors平行运行。另外有些时候,例如从HDFS读取信息,Spark会利用本地信息去派发任务到更接近数据的Executors。
最后,一个RDD拥有Compute Function,他可以为RDD中的数据输出一个遍历器Iterator[T]。

目前存在以下几个问题:

因无法检测计算类型,Spark是没有办法去优化表达式的。

Key Merits and Benefits

例子:根据名字聚合平均年龄。

使用low-level RDD API:

## Create an RDD of tuples (name, age)
dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 20), ("Brooke", 25)])

## Use map and reduceByKey transformation with their lambda expression to aggregate and then compute average
agesRDD = (dataRDD.
           map(lambda x:(x[0], (x[1], 1))). # [('Brooke', (20, 1)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (20, 1)), ('Brooke', (25, 1))]
           reduceByKey(lambda x, y:(x[0] + y[0], x[1] + y[1])). # [('Brooke', (45, 2)), ('Denny', (31, 1)), ('Jules', (30, 1)), ('TD', (20, 1))]
           map(lambda x: (x[0], x[1][0]/x[1][1]))) # [('Brooke', 22.5), ('Denny', 31), ('Jules', 30), ('TD', 20)]

使用High-level DSL:

from pyspark.sql import SparkSession 
from pyspark.sql.functions import avg 

## Create DataFrame Using SparkSession
spark = (SparkSession.
         builder.
         appName('AuthorsAges').
         getOrCreate())

## Create a DataFrame
data_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 20), ("Brooke", 25)], ['name', 'age'])
avg_df = data_df.groupby('name').agg(avg('age'))
avg_df.show()

DataFrame API

Basic Data Type:

Structured Data Type:

Schemas 和创建DataFrames

Schema指列名和该列的数据类型。提前声明Schema的优点:

声明Schemas的两种方法

from pyspark.sql.types import * 
schema = StructType([StructField("author", StringType(), False),  StructField("title", StringType(), False),  StructField("pages", IntegerType(), False)]) 
schema = "author STRING, title STRING, pages INT"

DataFrame运算

from pyspark.sql.types import *
# Programmatic way to define a schema 
fire_schema = StructType([StructField('CallNumber', IntegerType(), True),                StructField('UnitID', StringType(), True),                StructField('IncidentNumber', IntegerType(), True),                StructField('CallType', StringType(), True),                                  StructField('CallDate', StringType(), True),                      StructField('WatchDate', StringType(), True),                StructField('CallFinalDisposition', StringType(), True),                StructField('AvailableDtTm', StringType(), True),                StructField('Address', StringType(), True),                       StructField('City', StringType(), True),                       StructField('Zipcode', IntegerType(), True),                       StructField('Battalion', StringType(), True),                                 StructField('StationArea', StringType(), True),                       StructField('Box', StringType(), True),                       StructField('OriginalPriority', StringType(), True),                       StructField('Priority', StringType(), True),                       StructField('FinalPriority', IntegerType(), True),                       StructField('ALSUnit', BooleanType(), True),                       StructField('CallTypeGroup', StringType(), True),                StructField('NumAlarms', IntegerType(), True),                StructField('UnitType', StringType(), True),                StructField('UnitSequenceInCallDispatch', IntegerType(), True),                StructField('FirePreventionDistrict', StringType(), True),                StructField('SupervisorDistrict', StringType(), True),                StructField('Neighborhood', StringType(), True),                StructField('Location', StringType(), True),                StructField('RowID', StringType(), True),                StructField('Delay', FloatType(), True)])
# Use the DataFrameReader interface to read a CSV file 
sf_fire_file = "/databricks-datasets/learning-spark-v2/sf-fire/sf-fire-calls.csv" fire_df = spark.read.csv(sf_fire_file, header=True, schema=fire_schema)

# Save to a path
parquet_path = ... 
fire_df.write.format("parquet").save(parquet_path) 

# Save to Hive Table
parquet_table = ... # name of the table fire_df.write.format("parquet").saveAsTable(parquet_table)
few_fire_df = (fire_df  .select("IncidentNumber", "AvailableDtTm", "CallType")   .where(col("CallType") != "Medical Incident")) few_fire_df.show(5, truncate=False)

from pyspark.sql.functions import * 
fire_df.
select("CallType").
where(col("CallType").
isNotNull()).
agg(countDistinct("CallType").
alias("DistinctCallTypes")).
show()

Spark SQL 和底层引擎

Spark SQL允许开发者使用ANSI SQL:2003-compatible对有结构且包含schema的数据进行查询。Spark SQL是在Spark 1.3添加至核心引擎,所以大多数high-level程式同样可以使用。除了可以使用类似SQL的查询语句外,Spark SQL还会提供以下几个功能:

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

上一篇下一篇

猜你喜欢

热点阅读