Spark使用GeoLiteCity字典库解析IP

2017-11-03  本文已影响0人  阿海与蜗牛

一、背景

用户画像项目中,埋点数据获取到移动端、PC端用户IP, 业务需求从IP解析出IP对应的地址。分析发现,数据有以下特点:
1. 用户类型为Global,IP 为全球范围,且包括IPv4,IPv6 协议
2. 用户量特别巨大。对解析效率得要求很高。

二、思路

一般的IP 解析方法是在网上找IP字典库,然后对于给定IP 搜索字典库,返回地址信息。
可参考知乎上 (国内哪个精确到城市的 IP 地址库最精确完善?)由于要解析全球IP,并且要同时支持IPv4,IPv6 所以选择 maxMind 的GeoLiteCity 字典库。另外,snowplowanalytics 开源包scala-maxmind-iplookups 不仅支持Scala, 同时支持随机查询,查询效率很高。

知乎关于IP解析的讨论:https://www.zhihu.com/question/19584593
字典库:http://dev.maxmind.com/geoip/legacy/geolite/

三、实现

  1. 依赖配置
resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"
resolvers += "Twitter Maven Repo" at "http://maven.twttr.com/"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.0.0" % "provided",
  "org.apache.spark" %% "spark-sql" % "2.0.0" % "provided",
  "org.apache.spark" %% "spark-hive" % "2.0.0" % "provided",
  "org.apache.spark" %% "spark-streaming" % "2.0.0" % "provided",
  "org.apache.spark" %% "spark-graphx" % "2.0.0" % "provided",
  "org.apache.spark" %% "spark-mllib" % "2.0.0" % "provided",
  "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"

)
  1. 实现逻辑
    可选两种方式使用字典库。
    1) 将字典库打入到 resouces 目录下。但会导致打出得包很大
    2) 将字典库传入到HDFS上,通过Spark addfile 广播到每个executor上,再在executor 中获取该文件
    具体如下:
val spark = SparkSession.builder()
      .appName(s"${this.getClass.getSimpleName}")
      .enableHiveSupport()
      .getOrCreate()

    //val db = "d_devicemetrics"
    val db = "proj_gucp_dw"

    val sc = spark.sparkContext
    // HDFS 路径
    val path = "hdfs://namenodeha/user/p70_g108/GeoCity.dat"
    //  广播文件
    sc.addFile(path)
    val sql = s"select d_id,preserve1 from $db.dw_pc_base where preserve1 !='' limit 1000 "

    import spark.implicits._
    val ips = spark.sql(sql).map(x=>(x.getString(0),x.getString(1))).filter(_._2 != "")
    val l =  ips.map(x=>{
      val geoFile =SparkFiles.get("GeoCity.dat")
      // 获取广播的文件
      val ipLookups = IpLookups(Some(geoFile), Some(geoFile), Some(geoFile), Some(geoFile), Some(geoFile), true, 100)
      var locValue= (x._1,x._2,"","","")
      for (loc <- ipLookups.performLookups(x._2)._1) {
        val ip = x._2
        val id = x._1
        val cty = loc.countryName
        val region = loc.region.getOrElse("")
        val ct = loc.city.getOrElse("")
        locValue = (id,ip,cty,region,ct)
      }
      locValue
    }).toDF("id","ip","country","region","city")
    //ips.write.("hdfs://chileap/user/p66_u71_zhangly4/files/out2")
    l.write.mode("overwrite").saveAsTable(s"$db.ipLocation")
  1. 问题难点

四、总结

  1. Spark任务又两种加入本地文件方式,打入resource或者在HDFS上广播。需要注意实际文件访问路径的关系。
  2. RandomAccessFile 能提高字典类文件的搜索效率
上一篇 下一篇

猜你喜欢

热点阅读