广播变量

2019-07-10  本文已影响0人  焉知非鱼

从 mysql 读取数据作为广播变量时, 虽然有 checkpoint 但是 kill 掉任务后,重启程序会失败。

  class GacXs6Offline @Inject()( sparkConf            : SparkConfiguration,
                               mysqlConf            : MysqlConfiguration,
                               hbaseConf            : HbaseConfiguration,
                               sparkContext         : EnterpriseSparkContext[SparkContext],
                               source               : NationDStream[(String,NaSourceData)],
                               naDriveTrip          : NaDriveTrip
                             ) extends Serializable {
    val naDriveTripDS = naDriveTrip.exteact(source)
    saveNaDriveTrip("drive_trip", naDriveTrip)
  }
  
  // serializable error, because the use of hbaseConf
  def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
    naDriveTrip.foreachRDD(rdd => {
      val conf = HBaseConfiguration.create()
      val jobConf = new JobConf(conf)
      jobConf.set("hbase.zookeeper.quorum", hbaseConf.hbaseUrl)
      jobConf.set("zookeeper.znode.parent", "/hbase")
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
      jobConf.setOutputFormat(classOf[TableOutputFormat])
      rdd.map(x => {
        (new ImmutableBytesWritable,  (new NaHbaseDao).putNaTripData(x._1, x._2))
      }).saveAsHadoopDataset(jobConf)
    })
  }

    // serializable ok,  because we create a new hbaseConf
    def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
    naDriveTrip.foreachRDD(rdd => {
      val conf = HBaseConfiguration.create()
      val jobConf = new JobConf(conf)
      val naHbaseConf =  new HbaseConfiguration
      jobConf.set("hbase.zookeeper.quorum", naHbaseConf.hbaseUrl)
      jobConf.set("zookeeper.znode.parent", "/hbase")
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
      jobConf.setOutputFormat(classOf[TableOutputFormat])

      rdd.map(x => {
        (new ImmutableBytesWritable,  (new NaHbaseDao).putNaTripData(x._1, x._2))
      }).saveAsHadoopDataset(jobConf)
    })
  }

  // serializable ok, because we create a new hbaseConf
  def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
    naDriveTrip.foreachRDD(rdd => {
      rdd.foreachPartition(partitionRdd => {
        val hbaseConf = new HbaseConfiguration
        val hbase = new HbaseUtil(hbaseConf)
        val connection = hbase.getHbaseConn
        val table      = connection.getTable(TableName.valueOf(tableName))
        val list = partitionRdd.map(data => {
          NaHbaseDao.putNaTripData(data._1, data._2)
        }).toList
        if (null != list && list.nonEmpty) {
          NaHbaseDao.saveData(list, table)
        }
      })
    })
  }
上一篇下一篇

猜你喜欢

热点阅读