Spark的ip编程题
部分数据如下
1.26.32.0|1.26.39.255|18489344|18491391|亚洲|中国|内蒙古|通辽||联通|150500|China|CN|122.263119|43.617429
1.26.40.0|1.26.43.255|18491392|18492415|亚洲|中国|内蒙古|赤峰||联通|150400|China|CN|118.956806|42.275317
1.26.44.0|1.26.95.255|18492416|18505727|亚洲|中国|内蒙古|通辽||联通|150500|China|CN|122.263119|43.617429
1.26.96.0|1.26.143.255|18505728|18518015|亚洲|中国|内蒙古|赤峰||联通|150400|China|CN|118.956806|42.275317
1.26.144.0|1.26.147.255|18518016|18519039|亚洲|中国|内蒙古|呼伦贝尔||联通|150700|China|CN|119.758168|49.215333
1.26.148.0|1.26.159.255|18519040|18522111|亚洲|中国|内蒙古|兴安盟||联通|152200|China|CN|122.070317|46.076268
1.26.160.0|1.26.171.255|18522112|18525183|亚洲|中国|内蒙古|通辽||联通|150500|China|CN|122.263119|43.617429
1.26.172.0|1.26.179.255|18525184|18527231|亚洲|中国|内蒙古|兴安盟||联通|152200|China|CN|122.070317|46.076268
1.26.180.0|1.26.195.255|18527232|18531327|亚洲|中国|内蒙古|赤峰||联通|150400|China|CN|118.956806|42.275317
1.26.196.0|1.26.207.255|18531328|18534399|亚洲|中国|内蒙古|呼伦贝尔||联通|150700|China|CN|119.758168|49.215333
将数据ip进行统计并写入mysql中
object IpCount {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf()
.setAppName(this.getClass.getSimpleName)
.setMaster("local[*]")
val sc = new SparkContext(conf)
//ip的规则数据
val ipData: RDD[String] = sc.textFile("D:\\data\\data\\ip\\ip.txt")
//用户数据
val ipaccess: RDD[String] = sc.textFile("D:\\data\\data\\ip\\ipaccess.log")
//获取用户的ip信息的十进制Long
val rddUserIP: RDD[Long] = ipaccess.map(t => {
val userIpStr: String = t.split("\\|")(1)
ip2Long(userIpStr)
})
//不能在一个RDD中,操作另外一个RDD
/* ipData.map(t=>{
rddUserIP.map(m=>{
})
})*/
//把规则数据提取到内存中
val ipArr: Array[(Long, Long, String)] = ipData.map(t => {
val str: Array[String] = t.split("\\|")
val startIp: Long = str(2).toLong
val endIp: Long = str(3).toLong
val addr: String = str(6)
(startIp, endIp, addr)
}).coalesce(1).collect()
val addrAndOne: RDD[(String, Int)] = rddUserIP.map(t => {
//将用户的ip地址转换成省份和1
val addr: String = binarySearch(t, ipArr)
(addr, 1)
})
//对结果进行归并,并排序:按上网用户的数量降序
val res: RDD[(String, Int)] = addrAndOne.reduceByKey(_+_).sortBy(t=>(-t._2,t._1))
//将结果写入到SQL中
//不能再driver端catch executor的异常
res.foreachPartition(it=>{
var connection : Connection = null
var ps: PreparedStatement = null
try{
Class.forName("com.mysql.jdbc.Driver")
connection= DriverManager.
getConnection("jdbc:mysql://hdp03:3306/bdpro02?useUnicode=true&characterEncoding=utf-8","root","root")
val sql = "insert into ipLog values(?,?)"
ps= connection.prepareStatement(sql)
it.foreach(t=>{
ps.setString(1,t._1)
ps.setInt(2,t._2)
ps.executeUpdate()
})
}catch {
case e:Exception => e.printStackTrace()
}finally {
if(connection!=null) connection.close()
if(ps!=null) ps.close()
}
})
/* res.foreach(t=>{
//如果要在集群环境中运行,需要--jar
//也可以,把mysqljar手动添加到lib中
try{
//try catch
val statement: PreparedStatement = connection.prepareStatement("insert into ipLog values(?,?)")
statement.setString(1,t._1)
statement.setInt(2,t._2)
statement.execute()
}catch {
case e:Exception=>{e.printStackTrace()}
}
})*/
res.collect().foreach(println)
}
//从给定的数组中二分查找给定值
//二分查找的数组必须是有序的
def binarySearch(ipLong:Long,arr:Array[(Long, Long, String)]): String ={
var start: Int = 0
var end: Int = arr.length-1
var addr:String= null;
//这里必须有=,如果没有,就会缺少很多值
while(end>=start){
var midd: Int = (start+end)/2
//如果给定的ip在该数组的中间值的起始ip和结束ip之间,则反回省份份
if(ipLong>=arr(midd)._1&&ipLong<=arr(midd)._2){
return arr(midd)._3
}else if(ipLong<arr(midd)._1){
end=midd-1
}else{
start=midd+1
}
}
addr
}
def ip2Long(ip:String): Long ={
val fragments = ip.split("[.]")
var ipNum = 0L
for (i <- 0 until fragments.length) {
ipNum = fragments(i).toLong | ipNum << 8L
}
ipNum
}
}