spark

Spark 保存数据到MongoDB

2020-03-12  本文已影响0人  灬臣独秀灬

背景交代

由于我们应用系统使用的是mongo,所以每次操作结果都要输出到MongoDB方便使用 。

思路

1、遇到这样的情景我第一时间打开Spark官网 Mongo数据源
2、根据教程添加好依赖以后,开始配置链接信息。
3、根据教程创建Mongo的配置文件发现不少坑。

采坑
坑一
image.png

官方提供的demo都是不需要登录凭证的。

坑二
image.png

1、找到配置用户密码的缺发现在0.12.* 版本中 MongodbCredentials 类已经移动包名了,直接拷贝文档会报错。
2、MongodbConfigBuilder 0.12.* 。MongodbConfigBuilder(map:Map,list:List)更本没有这个构造函数。
3、源码配置文件没有注释,对于英语不好的我有点D疼。

最终在查看和调试源码下找到关键字段
image.png

1、上面代码大致意思以 MongodbConfig.Credentials 为key 读取config中的属性并转换成[List[MongodbCredentials]] 对象,如果为空则获取默认的配置。最后map 成MongoCredential 对象。 从这里可以看出我们只需要配置MongodbConfig.Credentials 中配置用户密码就好了。

配置代码
package cn.harsons.mbd.util

import cn.harsons.mbd.util.Config._
import com.stratio.datasource.mongodb._
import com.stratio.datasource.mongodb.config.MongodbConfig._
import com.stratio.datasource.mongodb.config.{MongodbConfigBuilder, MongodbCredentials}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
  * mongoDB操作工具
  *
  * @author liyabin
  * @date 2020/3/12 0012
  */
object MongoUtils {
  /**
    * 使用 mongoDB 集成方式 写入mongo
    *
    * @param collectionName 集合名称
    * @param data           数据集
    * @param sparkSession   spark 对象
    */
  def saveToMongo(collectionName: String, data: DataFrame, sparkSession: SparkSession): Unit = {
    // 已经引入配置对象,可以直接使用配置对象的属性
    // Credentials 认证 需要用户名和密码
    val saveConfig = MongodbConfigBuilder(
      Map(Host -> List(mongo_host),
        Database -> mongo_database,
        Collection -> collectionName,
        SamplingRatio -> 1.0,
        WriteConcern -> "normal",
        SplitSize -> 8,
        SplitKey -> "_id",
        Credentials -> List(MongodbCredentials(mongo_user, mongo_authentication, mongo_password.toCharArray)))
    )
    data.saveToMongodb(saveConfig.build())
  }

  /**
    * 使用 Spark 原生的方式 写入mongo
    *
    * @param collectionName 集合名称
    * @param data           数据集
    * @param mode           数据写入模式 (覆盖、追加等)
    * @param sparkSession   sparkSession 对象
    */
  def writeToMongo(collectionName: String, data: DataFrame, mode: SaveMode, sparkSession: SparkSession): Unit = {
    // Credentials 官网上要求 如果是String 类型 使用这种方式 配置用户凭证  user,authDataBase,password
    val options = Map("host" -> mongo_host,
      Credentials -> (mongo_user + "," + mongo_authentication + "," + mongo_password),
      "database" -> mongo_database,
      "collection" -> collectionName)
    data.write.format("com.stratio.datasource.mongodb").mode(mode)
      .options(options)
      .save()
  }

}

上一篇下一篇

猜你喜欢

热点阅读