如何通过Spark快速加载MongoDB数据

2019-08-15  本文已影响0人  郭彦超

我们的场景内容和表单数据都存储在mongo里面,现需要将这些数据同步到数据仓库用于后期分析,数据体量很大 如何快速同步是个问题?

1、登陆mongo查看索引及索引字段类型

{
        "v" : 1,
        "key" : {
            "sceneId" : 1
        },
        "name" : "sceneId",
        "ns" : "eqs_scene.#",
        "sparse" : false,
        "background" : false
}
"sceneId" : NumberLong(8973702)

这两个操作非常总要,如果没有索引需提前创建,我们接下来会借助spark条件下推的方式拉取数据,这种方式比在spark 全量load后进行条件过滤快上千倍

2、同步脚本

./bin/spark-shell --master yarn --packages "com.stratio.datasource:spark-mongodb_2.11:0.12.0" --num-executors 10

import org.apache.spark.sql.types._ 

#sceneId的定义要保持和MongoDB中数据类型一致,这里使用long类型
val schemaMongo = new StructType().add("elementsJson",StringType) .add("sceneId", LongType)
#spark自动识别scheme会先load全量数据,会执行很长的时间,这里使用提前定义好的scheme  
val df = spark.read.schema(schemaMongo).format("com.stratio.datasource.mongodb").options(Map("host" -> "#:27010", "database" -> "#", "collection" -> "#", "credentials"->"#,#,#")).load

df.createOrReplaceTempView("t1")

val df2 = sql("select  cast(t.id as long) from   eqxdb.eqs_scene t  where publish_date='2018-12-21'")

val ids = df2.map(_.getLong(0)).collectAsList()
#获取需要下推的Id列表 并转化为scala序列
import scala.collection.JavaConverters
#注意数组中的对象类型应为long类型 和scheme定义保持一致 :Seq[Long]
val sid=JavaConverters.asScalaIteratorConverter(ids.iterator()).asScala.toSeq

#查询下推至mongo 数据秒出
df.where(df("sceneId").isin(sid:_*)).show

3、总结

1、如果下推数据量很少,但执行任务长期卡主不动的话,需要查看dataframe中的索引字段名称和类型是否与mongo库中的一致
2、credentials为认证,需依次提供3个参数:用户名、数据库名、密码
3、构建df时需提前查看mongo库的数据结构来定义schema,spark反射出的数据结构会有问题且整个过程很慢

上一篇下一篇

猜你喜欢

热点阅读