Cannot grow BufferHolder by size

2020-11-09  本文已影响0人  NatsuYori

今天用spark的时候碰到的问题,直接对一个大的dataframe做agg,导致buffer超了。
可以人为的在dataframe上append一个新的字段,根据字段先做一个agg,最后再agg,就不会超了

import random

def get_rand(i):
  return random.randint(1,10000)
randUdf = udf(get_rand,IntegerType())

getP = udf(get_placement, ArrayType(IntegerType()))

tmp_df.withColumn("salt_key",randUdf(col('placement_ids'))).groupby('salt_key').agg(getP(collect_list(struct('placement_ids'))).alias("ids")).show()

这样再agg一次,就没有问题啦

上一篇 下一篇

猜你喜欢

热点阅读