用它做Python并行数据分析,隔壁程序猿都馋哭了
有时候你在做 Python 数据分析的时候,可能会出现这么个情况:用 Pandas 打开一个超大型数据集,想得到一些度量(metrics),然后就尴尬地卡住了。
大家都知道,如果你处理大数据,手里用的是 Pandas,有时要等上一小时才能得到一个 Series 的平均值,甚至都还没调用 apply 函数。这还只是几百万行啊,如果是几十亿行,那最好还是用 Spark 之类的高级工具吧。
那么就没有好办法了吗?有的,就有这么一个工具,能够加速 Python 数据分析,既不需要你使用配置更高的硬件设施,也不必切换编程语言。当然,如果你的数据集超级超级大,它的最终作用也会有限,但比普通的 Python 扩展工具好多了。特别是如果你不用做大量的重建索引,那么这个工具非常适合你。
这个工具叫 Dask,数据科学家 Luciano Strika 专门试用了这个工具,并做了测试,发现 Dask 在做并行数据分析时,比常规 Pandas 快出许多倍。
什么是Dask?
Dask 是一个开源项目,能提供 NumPy Arrays,Pandas Dataframes 和常规列表的抽象,允许你使用多核处理并行运行它们。
下面这段直接摘自教程:
Dask 提供模仿了 NumPy,列表和 Pandas 的高级 Array,Bag 以及 DataFrame 集合,但能够在无法放入主内存的数据集上并行运行。对大型数据集来说,Dask 的高级集合是 NumPy 和 Pandas 的替代方案。
听起来真不错!于是我(作者Luciano Strika)决定亲自试试 Dask Dataframes,并对它们进行了几个基准测试。
阅读文档
我首先阅读了官方文档,看看建议我们使用 Dask 做哪些工作。以下是官方文档(http://docs.dask.org/en/latest/dataframe.html)中的相关部分:
- 操纵大型数据集,即使这些数据集无法放入内存
- 使用许多核来加速长计算
- 使用标准Pandas操作(如 groupby,join 和时间序列计算)对大型数据集进行分布式计算
然后在下面,它列出了一些如果使用 Dask Dataframes 会快速完成的事情:
- 算术运算(乘以或添加到Series)
- 常见聚合(平均值,最小值,最大值,求和等)
- 调用 apply(只要它在索引中,也就是说,不是在 groupby('y')之后'y'不是索引)
- 调用 value_counts(),drop_duplicates()或corr()
- 使用 loc,isin 和行式选择进行过滤
#returns only the rows where x is >5, by reference (writing on them alters original df)
df2 = df.loc[df['x'] > 5]
#returns only the rows where x is 0,1,2,3 or 4, by reference
df3 = df.x.isin(range(4))
#returns only the rows where x is >5, by read-only reference (can't be written on)
df4 = df[df['x']>5]
如何使用 Dask Dataframes
Dask Dataframes 与 Pandas Dataframes 具有相同的 API,只是聚合和 apply 函数延迟执行,并且需要通过调用 compute 方法来计算。要想生成 Dask Dataframe,可以像在 Pandas 中一样调用 read_csv 方法,或者,如果给出 Pandas Dataframe df,只需调用
dd = ddf.from_pandas(df, npartitions=N)
其中 ddf 是你导入 Dask Dataframes 的名称,而 npartitions 是一个参数,告诉 Dataframe 如何对其进行分区。
根据 StackOverflow 上的说法,建议将 Dataframe 划分为与计算机核数数量相同的分区,或者是该数量的几倍,因为每个分区将在不同的线程上运行。如果分区过多,它们之间的通信代价会高很多。
动手吧:做点基准测试
主要有以下要点:
def get_big_mean():
return dfn.salary.mean().compute()
def get_big_mean_old():
return df3.salary.mean()
def get_big_max():
return dfn.salary.max().compute()
def get_big_max_old():
return df3.salary.max()
def get_big_sum():
return dfn.salary.sum().compute()
def get_big_sum_old():
return df3.salary.sum()
def filter_df():
df = dfn[dfn['salary']>5000]
def filter_df_old():
df = df3[df3['salary']>5000]
这里df3是一个常规的 Pandas Dataframe,拥有 2500 万行,使用这段脚本生成,其中列是名称,姓氏和薪水,从列表中随机抽样。我用了一个有 50 行的数据集并连接了 500000 次,因为我对分析本身并不太感兴趣,但运行它时才会。
dfn 就是基于 df3 的 Dask Dataframe。
第一批结果:不太乐观
我首先尝试使用 3 个分区进行测试,因为我的电脑只有 4 个内核,不想过度使用它。这次使用 Dask 的结果非常差,而且还要等很久才能得到结果,不过我怀疑这可能是分区过少的原因:
204.313940048 seconds for get_big_mean
39.7543280125 seconds for get_big_mean_old
131.600986004 seconds for get_big_max
43.7621600628 seconds for get_big_max_old
120.027213097 seconds for get_big_sum
7.49701309204 seconds for get_big_sum_old
0.581165790558 seconds for filter_df
226.700095892 seconds for filter_df_old
可以看到在我使用 Dask 时大多数操作变慢了很多。这给了我一些启示,可能必须使用更多分区才行。产生延迟计算所花的成本也可以忽略不计(在某些情况下不到半秒),所以如果重复使用它,成本不会随着时间推移而摊销。
我还用 apply 方法尝试了这个测试:
def f(x):
return (13*x+5)%7
def apply_random_old():
df3['random']= df3['salary'].apply(f)
def apply_random():
dfn['random']= dfn['salary'].apply(f).compute()
并有非常相似的结果:
369.541605949 seconds for apply_random
157.643756866 seconds for apply_random_old
因此,一般来说,大多数操作的速度都是初始操作的两倍,尽管过滤器的速度要快得多。我觉得或许应该在上面调用 compute,所以对这个结果持保留态度。
更多分区:加速惊人
得到前面这些不尽人意的结果之后,我决定我可能只是没有使用足够的分区。毕竟,整件事情的重点是并行运行,所以或许我只需进一步并行化?所以我尝试了 8 个分区的相同测试,得到了如下结果(省略了非并行数据帧的结果,因为它们基本相同):
3.08352184296 seconds for get_big_mean
1.3314101696 seconds for get_big_max
1.21639800072 seconds for get_big_sum
0.228978157043 seconds for filter_df
112.135010004 seconds for apply_random
50.2007009983 seconds for value_count_test
这就对了!大多数操作的运行速度比常规 Dataframe 快十倍,甚至 apply 的运行速度也更快了!我还运行了 value_count 测试,它只调用“薪水”Series 上的 value_count 方法。对于上下文,在我足足等 10 分钟后在常规 Dataframe 上运行此测试时,必须终止该过程。这次只用了 50 秒!
所以之前都没用对工具,它的速度可比常规 Dataframe 快多了。
最后再说一点
鉴于我是在一台相当旧的 4 核 PC 上运行了 2500 万行数据,所以是相当了不起的。所以我的建议是,下回你必须在本地或从单个 AWS 实例上理数据集时,一定要试试 Dask 这个框架。运行速度简直不要太快。