Spark使用GeoLiteCity字典库解析IP
2017-11-03 本文已影响0人
阿海与蜗牛
一、背景
用户画像项目中,埋点数据获取到移动端、PC端用户IP, 业务需求从IP解析出IP对应的地址。分析发现,数据有以下特点:
1. 用户类型为Global,IP 为全球范围,且包括IPv4,IPv6 协议
2. 用户量特别巨大。对解析效率得要求很高。
二、思路
一般的IP 解析方法是在网上找IP字典库,然后对于给定IP 搜索字典库,返回地址信息。
可参考知乎上 (国内哪个精确到城市的 IP 地址库最精确完善?)由于要解析全球IP,并且要同时支持IPv4,IPv6 所以选择 maxMind 的GeoLiteCity 字典库。另外,snowplowanalytics 开源包scala-maxmind-iplookups 不仅支持Scala, 同时支持随机查询,查询效率很高。
知乎关于IP解析的讨论:https://www.zhihu.com/question/19584593
字典库:http://dev.maxmind.com/geoip/legacy/geolite/
三、实现
- 依赖配置
resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"
resolvers += "Twitter Maven Repo" at "http://maven.twttr.com/"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.0.0" % "provided",
"org.apache.spark" %% "spark-sql" % "2.0.0" % "provided",
"org.apache.spark" %% "spark-hive" % "2.0.0" % "provided",
"org.apache.spark" %% "spark-streaming" % "2.0.0" % "provided",
"org.apache.spark" %% "spark-graphx" % "2.0.0" % "provided",
"org.apache.spark" %% "spark-mllib" % "2.0.0" % "provided",
"com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0"
)
- 实现逻辑
可选两种方式使用字典库。
1) 将字典库打入到 resouces 目录下。但会导致打出得包很大
2) 将字典库传入到HDFS上,通过Spark addfile 广播到每个executor上,再在executor 中获取该文件
具体如下:
val spark = SparkSession.builder()
.appName(s"${this.getClass.getSimpleName}")
.enableHiveSupport()
.getOrCreate()
//val db = "d_devicemetrics"
val db = "proj_gucp_dw"
val sc = spark.sparkContext
// HDFS 路径
val path = "hdfs://namenodeha/user/p70_g108/GeoCity.dat"
// 广播文件
sc.addFile(path)
val sql = s"select d_id,preserve1 from $db.dw_pc_base where preserve1 !='' limit 1000 "
import spark.implicits._
val ips = spark.sql(sql).map(x=>(x.getString(0),x.getString(1))).filter(_._2 != "")
val l = ips.map(x=>{
val geoFile =SparkFiles.get("GeoCity.dat")
// 获取广播的文件
val ipLookups = IpLookups(Some(geoFile), Some(geoFile), Some(geoFile), Some(geoFile), Some(geoFile), true, 100)
var locValue= (x._1,x._2,"","","")
for (loc <- ipLookups.performLookups(x._2)._1) {
val ip = x._2
val id = x._1
val cty = loc.countryName
val region = loc.region.getOrElse("")
val ct = loc.city.getOrElse("")
locValue = (id,ip,cty,region,ct)
}
locValue
}).toDF("id","ip","country","region","city")
//ips.write.("hdfs://chileap/user/p66_u71_zhangly4/files/out2")
l.write.mode("overwrite").saveAsTable(s"$db.ipLocation")
- 问题难点
- 必须将字典文件先广播再获取的方式才能使用
- IpLookups 的实例只能在 map端声明,不能声明在Driver端,因为从源码可看到,该方法会调用Java的 new File 方法读取文件。所以在Dirver 端声明会导致空指针异常
四、总结
- Spark任务又两种加入本地文件方式,打入resource或者在HDFS上广播。需要注意实际文件访问路径的关系。
- RandomAccessFile 能提高字典类文件的搜索效率