pyspark+多线程提高并发度

2019-12-21  本文已影响0人  高稚商de菌

最近写了一个spark项目,先用spark.sql执行多个小任务,最后将所有小任务的结果集中处理,非常耗时。
通过增加executor的数量和配置,并不能提高效率。从时间上来看,耗时基本相当于所有任务串行执行。从spark的框架UI上,能看到资源利用率其实很低,stage很多,应该是sql用了比较多的join。作为一个spark小白玩家,我并没有找到优化的线索。
于是想到能不能在spark中引入多线程,并发处理多个小任务,提高并发度。于是最终用了如下一段代码:

spark = SparkSession.builder.enableHiveSupport().getOrCreate()
df_0 = spark.sql(sql_0)
df_0.cache()
df_0.createOrReplaceTempView("table_0") 
df_0.show()

def executor_run(sql):
      logging.info(sql)
      df = spark.sql(sql)
      df.cache()
      df.show()
      return df

fs = dict()
with ThreadPoolExecutor(20) as executor:
       for task_sql in task_sqls:
            task = executor.submit(executor_run, task_sql)
            fs[task] = task_sql

dfs = list()
for future in as_completed(fs, 7200):
     tmp = future.result()
     dfs.append(tmp)
     logging.info('thread return %s %s' % (fs[future], str(tmp)))

df = reduce(lambda x,y:x.unionAll(y), dfs)
# 这里略去后续的执行业务逻辑代码......
for df in dfs:
      df.unpersist()
df_0.unpersist()

最终执行速度提高了5倍!stage的数量也大大减少了。
这里有几个注意点:

  1. spark是惰性执行的。临时表table_0最好在线程池开始前用action(比如show)触发执行,以避免在线程池里执行。(如果在线程池里被触发执行会不会有问题,我没试过)
  2. 在executor_run也要用action触发执行。如果不触发,那么被正式执行时将会在多线程之外。这样就只有大概1倍的提升,猜测是在多线程中生成的DAG图,相比之前已经有了优化。
  3. 用多线程要注意提高driver的内存。我调试的过程中出现过java OOM。我最终用了16G的driver内存。
上一篇下一篇

猜你喜欢

热点阅读