周蓬勃SPARK 错误归纳SPARK

Spark 之 Redis on spark java.io.N

2018-04-09  本文已影响21人  步闲

在使用spark streaming 处理数据后,将数据存入redis中,但是出现了为序列化问题,如下图所示:
org.apache.spark.SparkException: Task not serializable

原代码如下:

val sc = new SparkContext(conf)
//.....
//.....
//.....
//建立一个Redis连接
val jedis: Jedis = JodisClient.getPool.getResource
//将record的数据插入数据库
rowRdd.map(
   jedis.set("","")
})

Spark架构原理图如下:

Spark 架构图

原因剖析:

  1. 上面连接Jedis的代码是在Driver中运行的,也就是建立SparkContext的地方
  2. 对RDD的操作是分布式运行在Executor的,不在Driver中运行
  3. 因此在map中的引用jedis,spark会做两件事情:序列化,和分发到各台机器上。

所以问题就在这里,redis的TCP连接已经绑定在Driver Program机上了,是无法分发到各个节点执行的,因此出现问题的根源就在这里。

正确的做法是在map函数中进行数据库的连接,但是我们要用mappartition来代替map,因为map会对每一个record进行连接数据库,而mappartition仅仅是对每一个partition建立一个连接,然后用iterator进行迭代复用。代码如下:

val sc = new SparkContext(conf)
//.....
//.....
//.....
//将record的数据插入数据库
rowRdd.mapPartitions(iter=>{
      while(iter.hasNext){
      //建立一个Redis连接
        val jedis: Jedis = JodisClient.getPool.getResource
        jedis.set("","")
      }
    })

这样的话就不会出现未序列化的问题了!

上一篇 下一篇

猜你喜欢

热点阅读