Spark

Learning Spark [4] - Spark SQL

2021-01-25  本文已影响0人  屹然1ran

Spark SQL为Spark提供了以下几个特性:

例子:基础查询

# Basic Query Example
import os
from pyspark.sql import SparkSession
from pyspark import SparkFiles
os.chdir('D:/Users/fyrli/Desktop/R work/learning spark/chapter 4')
spark = (SparkSession
         .builder
         .appName('SparkSQLExampleApp')
         .getOrCreate())
# path to data set
departure_delays = 'departuredelays.csv'
# Read & Create a Temp View
df = spark.read.csv(departure_delays, header = True, inferSchema = True)
df.createOrReplaceTempView('us_delay_flights_tbl')
df.show(n = 5)
+-------+-----+--------+------+-----------+
|   date|delay|distance|origin|destination|
+-------+-----+--------+------+-----------+
|1011245|    6|     602|   ABE|        ATL|
|1020600|   -8|     369|   ABE|        DTW|
|1021245|   -2|     602|   ABE|        ATL|
|1020605|   -4|     602|   ABE|        ATL|
|1031245|   -4|     602|   ABE|        ATL|
+-------+-----+--------+------+-----------+

这个数据集有五列:

  • date为航班日期,String格式,可以转换为日期格式,例如: 02190925对应02-19 09:25am
  • delay为延误时间,单位为分钟,负数代表提前出发
  • distance为航班的飞行距离
  • origindestination代表起飞和降落机场
    查询distance大于1000英里的航班:
spark.sql("""
    SELECT date, distance, origin, destination        
    FROM us_delay_flights_tbl
    WHERE distance >= 1000
    ORDER BY distance DESC""").show(10)
+-------+--------+------+-----------+
|   date|distance|origin|destination|
+-------+--------+------+-----------+
|3131530|    4330|   HNL|        JFK|
|3071625|    4330|   HNL|        JFK|
|3121530|    4330|   HNL|        JFK|
|3021625|    4330|   HNL|        JFK|
|3061625|    4330|   HNL|        JFK|
|3081530|    4330|   HNL|        JFK|
|3091530|    4330|   HNL|        JFK|
|3011625|    4330|   HNL|        JFK|
|3151530|    4330|   HNL|        JFK|
|3051625|    4330|   HNL|        JFK|
+-------+--------+------+-----------+
only showing top 10 rows

查询所有再SFO和ORD延误了两小时以上的航班

spark.sql("""
    SELECT date, delay, origin, destination 
    FROM us_delay_flights_tbl 
    WHERE delay > 120 AND ORIGIN = 'SFO' AND DESTINATION = 'ORD' 
    ORDER by delay DESC""").show(10)
+-------+-----+------+-----------+
|   date|delay|origin|destination|
+-------+-----+------+-----------+
|2190925| 1638|   SFO|        ORD|
|1031755|  396|   SFO|        ORD|
|1022330|  326|   SFO|        ORD|
|1051205|  320|   SFO|        ORD|
|1190925|  297|   SFO|        ORD|
|2171115|  296|   SFO|        ORD|
|1071040|  279|   SFO|        ORD|
|1051550|  274|   SFO|        ORD|
|3120730|  266|   SFO|        ORD|
|1261104|  258|   SFO|        ORD|
+-------+-----+------+-----------+
only showing top 10 rows

添加一列case when来判断delay的类型

spark.sql("""
    SELECT delay, origin, destination,              
        CASE WHEN delay > 360 THEN 'Very Long Delays'                  
             WHEN delay > 120 AND delay < 360 THEN 'Long Delays'                  
             WHEN delay > 60 AND delay < 120 THEN 'Short Delays'                  
             WHEN delay > 0 and delay < 60 THEN 'Tolerable Delays'                  
             WHEN delay = 0 THEN 'No Delays'                  
             ELSE 'Early'               
        END AS Flight_Delays               
    FROM us_delay_flights_tbl               
    ORDER BY origin, delay DESC""").show(10)
+-----+------+-----------+-------------+
|delay|origin|destination|Flight_Delays|
+-----+------+-----------+-------------+
|  333|   ABE|        ATL|  Long Delays|
|  305|   ABE|        ATL|  Long Delays|
|  275|   ABE|        ATL|  Long Delays|
|  257|   ABE|        ATL|  Long Delays|
|  247|   ABE|        ATL|  Long Delays|
|  247|   ABE|        DTW|  Long Delays|
|  219|   ABE|        ORD|  Long Delays|
|  211|   ABE|        ATL|  Long Delays|
|  197|   ABE|        DTW|  Long Delays|
|  192|   ABE|        ORD|  Long Delays|
+-----+------+-----------+-------------+
only showing top 10 rows

以下的Python代码类似以上的第一条SQL查询

(df.select('distance', 'origin', 'destination')
     .where(col('distance') > 1000)
     .orderBy(desc('distance'))).show(10)

建立SQL数据库和表

默认情况下,Spark会把表建立再default库里。接下来使用美国飞机延误数据,建立一个managed和一个unmanaged table。

spark.sql('CREATE DATABASE learn_spark_db')
spark.sql('USE learn_spark_db')

managed table

spark.sql(""" 
    CREATE TABLE managed_us_delay_flights_tbl 
    (date STRING, dalay INT, distanct INT, origin STRING, destination STRING)""")

unmanaged table

spark.sql("""
    CREATE TABLE us_delay_flights_tbl
    (date STRING, delay INT,   distance INT, origin STRING, destination STRING)   
    USING csv OPTIONS (PATH '/databricks-datasets/learning-spark-v2/flights/departuredelays.csv')
""") 

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

上一篇 下一篇

猜你喜欢

热点阅读