Spark

Learning Spark [6] - Spark SQL高级

2021-01-28  本文已影响0人  屹然1ran

collect, collect_list, collect_set

collect常用的有两个函数:collect_list(不去重)和collect_set(去重)

# build a test dataframe
df = pd.DataFrame({'type1':['a1', 'a1', 'a1', 'a2', 'a2', 'a2'],
                   'type2':['b1', 'b2', 'b3', 'b4', 'b5', 'b5'],
                   'value':[1, 2, 3, 4, 5, 6]})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
df.view()
+-----+-----+-----+
|type1|type2|value|
+-----+-----+-----+
|   a1|   b1|    1|
|   a1|   b2|    2|
|   a1|   b3|    3|
|   a2|   b4|    4|
|   a2|   b5|    5|
|   a2|   b6|    6|
+-----+-----+-----+

collect_list

spark.sql('''SELECT type1, COLLECT_LIST(type2) as type2 
             FROM collect_test 
             GROUP BY type1''').show()
+-----+------------+
|type1|       type2|
+-----+------------+
|   a2|[b4, b5, b5]|
|   a1|[b1, b2, b3]|
+-----+------------+

collect_set

spark.sql('''SELECT type1, COLLECT_SET(type2) as type2 
             FROM collect_test 
             GROUP BY type1''').show()
+-----+------------+
|type1|       type2|
+-----+------------+
|   a2|    [b4, b5]|
|   a1|[b1, b3, b2]|
+-----+------------+

collect后返回的是一个数组,可以通过array[x]来调用数据。通过这点我们可以进行透视表的操作,类似定义array[0] as a1, array[1] as a2...

explode

explode的定义是将数组的每个数据展开,如下我们就可以将上面的dataframe还原为最初的样式。

spark.sql('''SELECT type1, EXPLODE(type2) as type2
             FROM(SELECT type1, COLLECT_LIST(type2) as type2 
                  FROM collect_test 
                  GROUP BY type1) a''').show()
+-----+-----+
|type1|type2|
+-----+-----+
|   a2|   b4|
|   a2|   b5|
|   a2|   b5|
|   a1|   b1|
|   a1|   b2|
|   a1|   b3|
+-----+-----+

posexplode可以在拆分列的同时,增加一列序号

spark.sql('''SELECT type1, posexplode(type2) as (index, type2)
             FROM(SELECT type1, COLLECT_LIST(type2) as type2 
                  FROM collect_test 
                  GROUP BY type1) a''').show()
+-----+-----+-----+
|type1|index|type2|
+-----+-----+-----+
|   a2|    0|   b4|
|   a2|    1|   b5|
|   a2|    2|   b5|
|   a1|    0|   b1|
|   a1|    1|   b2|
|   a1|    2|   b3|
+-----+-----+-----+

但是如果表内有如下两个一一对应的数组,我们该如何拆分呢?

+-----+------------+---------+
|type1|       type2|    value|
+-----+------------+---------+
|   a2|[b4, b5, b5]|[4, 5, 6]|
|   a1|[b1, b2, b3]|[1, 2, 3]|
+-----+------------+---------+

按照直觉,我们尝试分别explode()

spark.sql('''SELECT type1, explode(type2) as type2, explode(value) as value
             FROM(SELECT type1, COLLECT_LIST(type2) as type2
                    , COLLECT_LIST(value) as value 
                  FROM collect_test 
                  GROUP BY type1) a''').show()
AnalysisException: Only one generator allowed per select clause but found 2: explode(type2), explode(value);

解决这个问题,我们需要使用LATERAL VIEW

lateral view

lateral view可以理解为创建了一个表,然后JOIN到了查询的表上,这样就避免了两个生成器的问题

spark.sql('''SELECT type1, exploded_type2.type2, exploded_value.value
             FROM(SELECT type1, COLLECT_LIST(type2) as type2
                    , COLLECT_LIST(value) as value 
                  FROM collect_test 
                  GROUP BY type1) a
             LATERAL VIEW POSEXPLODE(type2) exploded_type2 as type_index, type2
             LATERAL VIEW POSEXPLODE(value) exploded_value as value_index, value
             WHERE type_index = value_index -- 避免为笛卡尔积
             ''').show()

split

split则是将一个字符串根据分隔符,变化为一个数组

df = pd.DataFrame({'type1':['a', 'b', 'c'],
                   'type2':['1_2_3', '1_23', '_1']})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
spark.sql('''SELECT * FROM collect_test''').show()
+-----+-----+
|type1|type2|
+-----+-----+
|    a|1_2_3|
|    b| 1_23|
|    c|   _1|
+-----+-----+
spark.sql('''SELECT type1, split(type2, '_') as splited_type2 FROM collect_test''').show()
+-----+-------------------+
|type1|splited_type2|
+-----+-------------------+
|    a|          [1, 2, 3]|
|    b|            [1, 23]|
|    c|              [, 1]|
+-----+-------------------+

transform

transform会引用一个函数在数组的每个元素上,返回一个数列

schema = StructType([StructField('celsius', ArrayType(IntegerType()))])
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]] 
t_c = spark.createDataFrame(t_list, schema) 
t_c.createOrReplaceTempView("tC")
t_c.show()
+--------------------+
|             celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+
spark.sql(""" SELECT celsius, TRANSFORM(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit 
              FROM tC """).show()
+--------------------+--------------------+
|             celsius|          fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+

filter

filter为通过条件删选,返回一个数列

spark.sql(""" SELECT celsius, FILTER(celsius, t -> t >= 40) as high_temp 
              FROM tC """).show()
+--------------------+---------+
|             celsius|high_temp|
+--------------------+---------+
|[35, 36, 32, 30, ...| [40, 42]|
|[31, 32, 34, 55, 56]| [55, 56]|
+--------------------+---------+

exists

exists为判断是否包含该元素,返回一个布尔值

spark.sql(""" SELECT celsius, EXISTS(celsius, t -> t >= 40) as is_high_temp 
              FROM tC """).show()
+--------------------+------------+
|             celsius|is_high_temp|
+--------------------+------------+
|[35, 36, 32, 30, ...|        true|
|[31, 32, 34, 55, 56]|        true|
+--------------------+------------+

reduce

reduce为通过两个函数,将数组聚合为一个值,然后对该值进行运算

spark.sql(""" SELECT celsius, 
                     reduce(celsius
                            , (t, acc) -> ((t * 9) div 5) + 32 + acc
                            , acc -> (acc div size(celsius))) as avgFahrenheit 
              FROM tC """).show()
+--------------------+-------------+ 
|             celsius|avgFahrenheit| 
+--------------------+-------------+ 
|[35, 36, 32, 30, ...|           96| 
|[31, 32, 34, 55, 56]|          105| 
+--------------------+-------------+

其他函数

Spark SQL高级函数 part1
Spark SQL高级函数 part2
Spark SQL高级函数 part3

Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

上一篇下一篇

猜你喜欢

热点阅读