在spark 使用dplyr 对数据进行操作
概论
dplyr是一个用于处理R内外结构化数据的R包.dplyr使R用户的数据操作变得简单,一致和高效。使用dplyr作为操作Spark DataFrames的接口, 可以:
- 选择,过滤和汇总数据
- 使用窗口函数(例如用于采样)
- 在DataFrame上执行连接
- 将Spark中的数据收集到R中
dplyr中的语句可以使用magrittr R包定义的管道链接在一起
读取数据
可以使用以下函数将数据读入Spark DataFrames:
spark_read_csv 读取CSV文件并提供与dplyr兼容的数据源
spark_read_json 读取JSON文件并提供与dplyr兼容的数据源
spark_read_parquet读取文件并提供与dplyr兼容的数据源
无论数据的格式如何,Spark都支持从各种不同的数据源中读取数据。这些包括存储在HDFS(hdfs://协议),Amazon S3(s3n://协议)或Spark工作节点可用的本地文件(file://协议)上的数据
这些函数中的每一个都返回对Spark DataFrame的引用,该数据可以用作dplyr table(tbl)。
航班数据
本指南将通过使用nycflights13R包中的数据演示dplyr的一些基本数据操作动词。该数据包含2013年从纽约市出发的所有336,776个航班的数据。它还包括有关航空公司,机场,天气和飞机的有用元数据。这些数据来自美国运输统计局
连接到群集并使用该copy_to 功能复制航班数据。警告:飞行数据nycflights13便于进行dplyr演示,因为它很小,但实际上很少有大型数据直接从R对象复制。
library(sparklyr)
library(dplyr)
library(nycflights13)
library(ggplot2)
sc <- spark_connect(master="local")
iris_tbl <- copy_to(sc, iris)
flights_tbl <- copy_to(sc, nycflights13::flights, "flights")
batting_tbl <- copy_to(sc, Lahman::Batting, "batting")
src_tbls(sc)
dplyr操作
动词是用于操纵数据的dplyr命令。连接到Spark DataFrame时,dplyr会将命令转换为Spark SQL语句。远程数据源使用与本地数据源完全相同的五个动词。以下是五个动词及其相应的SQL命令:
- select 〜 SELECT
- filter 〜 WHERE
- arrange 〜 ORDER
- summarise 〜 aggregators: sum, min, sd, etc.
- mutate 〜 operators: +, *, log, etc.
选择列
year:day 的含义是 从year列到day列
> select(flights_tbl, year:day, arr_delay, dep_delay)
# Source: spark<?> [?? x 5]
year month day arr_delay dep_delay
* <int> <int> <int> <dbl> <dbl>
1 2013 1 1 11.0 2.00
2 2013 1 1 20.0 4.00
3 2013 1 1 33.0 2.00
4 2013 1 1 -18.0 -1.00
5 2013 1 1 -25.0 -6.00
6 2013 1 1 12.0 -4.00
7 2013 1 1 19.0 -5.00
8 2013 1 1 -14.0 -3.00
9 2013 1 1 - 8.00 -3.00
10 2013 1 1 8.00 -2.00
# ... with more rows
选择行
> filter(flights_tbl, dep_delay > 1000)
# Source: spark<?> [?? x 19]
year month day dep_time sched_dep_time dep_delay arr_time
* <int> <int> <int> <int> <int> <dbl> <int>
1 2013 1 9 641 900 1301 1242
2 2013 1 10 1121 1635 1126 1239
3 2013 6 15 1432 1935 1137 1607
4 2013 7 22 845 1600 1005 1044
5 2013 9 20 1139 1845 1014 1457
# ... with 12 more variables: sched_arr_time <int>, arr_delay <dbl>,
# carrier <chr>, flight <int>, tailnum <chr>, origin <chr>, dest <chr>,
# air_time <dbl>, distance <dbl>, hour <dbl>, minute <dbl>,
# time_hour <dttm>
对数进行排序
> arrange(flights_tbl, desc(dep_delay))
# Source: spark<?> [?? x 19]
# Ordered by: desc(dep_delay)
year month day dep_time sched_dep_time dep_delay arr_time
* <int> <int> <int> <int> <int> <dbl> <int>
1 2013 1 9 641 900 1301 1242
2 2013 6 15 1432 1935 1137 1607
3 2013 1 10 1121 1635 1126 1239
4 2013 9 20 1139 1845 1014 1457
5 2013 7 22 845 1600 1005 1044
6 2013 4 10 1100 1900 960 1342
7 2013 3 17 2321 810 911 135
8 2013 6 27 959 1900 899 1236
9 2013 7 22 2257 759 898 121
10 2013 12 5 756 1700 896 1058
# ... with more rows, and 12 more variables: sched_arr_time <int>,
# arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
# origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
# minute <dbl>, time_hour <dttm>
对数据进行汇总处理
> summarise(flights_tbl, mean_dep_delay = mean(dep_delay))
# Source: spark<?> [?? x 1]
mean_dep_delay
* <dbl>
1 12.6
添加新的字段
> mutate(flights_tbl, speed = distance / air_time * 60)
# Source: spark<?> [?? x 20]
year month day dep_time sched_dep_time dep_delay arr_time
* <int> <int> <int> <int> <int> <dbl> <int>
1 2013 1 1 517 515 2.00 830
2 2013 1 1 533 529 4.00 850
3 2013 1 1 542 540 2.00 923
4 2013 1 1 544 545 -1.00 1004
5 2013 1 1 554 600 -6.00 812
6 2013 1 1 554 558 -4.00 740
7 2013 1 1 555 600 -5.00 913
8 2013 1 1 557 600 -3.00 709
9 2013 1 1 557 600 -3.00 838
10 2013 1 1 558 600 -2.00 753
# ... with more rows, and 13 more variables: sched_arr_time <int>,
# arr_delay <dbl>, carrier <chr>, flight <int>, tailnum <chr>,
# origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
# minute <dbl>, time_hour <dttm>, speed <dbl>
使用数据库时,dplyr会尽可能地变得懒惰:
它永远不会将数据拉入R,除非你明确要求它。
它会延迟完成任何工作,直到最后一刻:它会收集你想要做的所有事情,然后一步将其发送到数据库。
例如,使用以下代码:
c1 <- filter(flights_tbl, day == 17, month == 5, carrier %in% c('UA', 'WN', 'AA', 'DL'))
c2 <- select(c1, year, month, day, carrier, dep_delay, air_time, distance)
c3 <- arrange(c2, year, month, day, carrier)
c4 <- mutate(c3, air_time_hours = air_time / 60)
这一系列操作从未真正触及数据库。直到你要求数据(例如通过打印c4)dplyr请求数据库的结果。
> c4
# Source: spark<?> [?? x 5]
# Ordered by: carrier
carrier dep_delay air_time distance air_time_hours
* <chr> <dbl> <dbl> <dbl> <dbl>
1 AA -2.00 294 2248 4.90
2 AA -1.00 146 1096 2.43
3 AA -2.00 185 1372 3.08
4 AA -9.00 186 1389 3.10
5 AA 2.00 147 1096 2.45
6 AA -4.00 114 733 1.90
7 AA -7.00 117 733 1.95
8 AA -7.00 142 1089 2.37
9 AA -6.00 148 1089 2.47
10 AA -7.00 137 944 2.28
# ... with more rows
管道
可以使用 magrittr 管道编写更清晰的语法。使用上面的相同示例,可以编写一个更清晰的版本,如下所示:
c4 <- flights_tbl %>%
filter(month == 5, day == 17, carrier %in% c('UA', 'WN', 'AA', 'DL')) %>%
select(carrier, dep_delay, air_time, distance) %>%
arrange(carrier) %>%
mutate(air_time_hours = air_time / 60)
分组
该group_by函数对应GROUP BY于SQL中的语句。
c4 %>%
group_by(carrier) %>%
summarize(count = n(), mean_dep_delay = mean(dep_delay))
下载到R
可以使用将数据从Spark复制到R的内存中collect()。
···
carrierhours <- collect(c4)
···
collect() 执行Spark查询并将结果返回给R以进行进一步分析和可视化。
# Test the significance of pairwise differences and plot the results
with(carrierhours, pairwise.t.test(air_time, carrier))
画图
library(ggplot2)
ggplot(carrierhours, aes(carrier, air_time_hours)) + geom_boxplot()
SQL翻译
在对过滤,变异和汇总时通常使用的表单进行简单的数学运算时,将R代码转换为SQL(或实际上转换为任何编程语言)相对简单。dplyr知道如何将以下R函数转换为Spark SQL:
+, -, *, /, %%, ^
Math functions
abs, acos, asin, asinh, atan, atan2, ceiling, cos, cosh, exp, floor, log, log10, round, sign, sin, sinh, sqrt, tan, tanh
Logical comparisons
<, <=, !=, >=, >, ==, %in%
Boolean operations
&, &&, |, ||, !
Character functions
paste, tolower, toupper, nchar
Casting
as.double, as.integer, as.logical, as.character, as.date
Basic aggregations
mean, sum, min, max, sd, var, cor, cov, n
窗口功能
dplyr支持Spark SQL窗口函数。窗口函数与mutate和filter结合使用以解决各种问题。可以将dplyr语法与使用它生成的查询进行比较 dbplyr::sql_render()。
bestworst <- flights %>%
group_by(year, month, day) %>%
select(dep_delay) %>%
filter(dep_delay == min(dep_delay) || dep_delay == max(dep_delay))
dbplyr::sql_render(bestworst)
bestworst
ranked <- flights %>%
group_by(year, month, day) %>%
select(dep_delay) %>%
mutate(rank = rank(desc(dep_delay)))
dbplyr::sql_render(ranked)
ranked
执行联接
数据分析很少涉及单个数据表。实际上,通常会有许多表格有助于分析,需要灵活的工具来组合它们。在dplyr中,有三个动词系列一次使用两个表:
变异连接,它将新变量从另一个表中的匹配行添加到一个表中。
过滤联接,根据是否与另一个表中的观察匹配来过滤来自一个表的观察结果。
设置操作,将数据集中的观察结果组合起来,就像它们是设置元素一样。
所有双表动词的工作方式都相似。前两个参数是x和 y,并提供要组合的表。输出始终是与新类型相同的新表x。
以下陈述是等效的:
flights %>% left_join(airlines)
flights %>% left_join(airlines, by = "carrier")
lights %>% left_join(airlines, by = c("carrier", "carrier"))
采样
你可以使用sample_n()和sample_frac()随机采样行:sample_n()用于固定数字和sample_frac()固定分数。
sample_n(flights, 10)
sample_frac(flights, 0.01)
写数据
将分析结果或您在Spark集群上生成的表保存到持久存储中通常很有用。在许多情况下,最好的选择是使用spark_write_parquet 函数将表写入 Parquet文件 。例如:
spark_write_parquet(tbl, "hdfs://hdfs.company.org:9000/hdfs-path/data")
这会将tbl R变量引用的Spark DataFrame写入给定的HDFS路径。您可以使用 spark_read_parquet 函数将同一个表读回到后续的Spark会话中:
tbl <- spark_read_parquet(sc, "data", "hdfs://hdfs.company.org:9000/hdfs-path/data")
还可以使用spark_write_csv和 spark_write_json 函数将数据写为CSV或JSON。