Spark的ip编程题

2019-07-09  本文已影响0人  不愿透露姓名的李某某

部分数据如下

1.26.32.0|1.26.39.255|18489344|18491391|亚洲|中国|内蒙古|通辽||联通|150500|China|CN|122.263119|43.617429

1.26.40.0|1.26.43.255|18491392|18492415|亚洲|中国|内蒙古|赤峰||联通|150400|China|CN|118.956806|42.275317

1.26.44.0|1.26.95.255|18492416|18505727|亚洲|中国|内蒙古|通辽||联通|150500|China|CN|122.263119|43.617429

1.26.96.0|1.26.143.255|18505728|18518015|亚洲|中国|内蒙古|赤峰||联通|150400|China|CN|118.956806|42.275317

1.26.144.0|1.26.147.255|18518016|18519039|亚洲|中国|内蒙古|呼伦贝尔||联通|150700|China|CN|119.758168|49.215333

1.26.148.0|1.26.159.255|18519040|18522111|亚洲|中国|内蒙古|兴安盟||联通|152200|China|CN|122.070317|46.076268

1.26.160.0|1.26.171.255|18522112|18525183|亚洲|中国|内蒙古|通辽||联通|150500|China|CN|122.263119|43.617429

1.26.172.0|1.26.179.255|18525184|18527231|亚洲|中国|内蒙古|兴安盟||联通|152200|China|CN|122.070317|46.076268

1.26.180.0|1.26.195.255|18527232|18531327|亚洲|中国|内蒙古|赤峰||联通|150400|China|CN|118.956806|42.275317

1.26.196.0|1.26.207.255|18531328|18534399|亚洲|中国|内蒙古|呼伦贝尔||联通|150700|China|CN|119.758168|49.215333

将数据ip进行统计并写入mysql中

object IpCount {

  def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf()

      .setAppName(this.getClass.getSimpleName)

      .setMaster("local[*]")

    val sc = new SparkContext(conf)

    //ip的规则数据

    val ipData: RDD[String] = sc.textFile("D:\\data\\data\\ip\\ip.txt")

    //用户数据

    val ipaccess: RDD[String] = sc.textFile("D:\\data\\data\\ip\\ipaccess.log")

    //获取用户的ip信息的十进制Long

    val rddUserIP: RDD[Long] = ipaccess.map(t => {

      val userIpStr: String = t.split("\\|")(1)

      ip2Long(userIpStr)

    })

    //不能在一个RDD中,操作另外一个RDD

  /* ipData.map(t=>{

      rddUserIP.map(m=>{

      })

    })*/

    //把规则数据提取到内存中

    val ipArr: Array[(Long, Long, String)] = ipData.map(t => {

      val str: Array[String] = t.split("\\|")

      val startIp: Long = str(2).toLong

      val endIp: Long = str(3).toLong

      val addr: String = str(6)

      (startIp, endIp, addr)

    }).coalesce(1).collect()

    val addrAndOne: RDD[(String, Int)] = rddUserIP.map(t => {

      //将用户的ip地址转换成省份和1

      val addr: String = binarySearch(t, ipArr)

      (addr, 1)

    })

    //对结果进行归并,并排序:按上网用户的数量降序

    val res: RDD[(String, Int)] = addrAndOne.reduceByKey(_+_).sortBy(t=>(-t._2,t._1))

  //将结果写入到SQL中

    //不能再driver端catch executor的异常

    res.foreachPartition(it=>{

      var connection : Connection = null

      var ps: PreparedStatement = null

      try{

        Class.forName("com.mysql.jdbc.Driver")

        connection= DriverManager.

          getConnection("jdbc:mysql://hdp03:3306/bdpro02?useUnicode=true&characterEncoding=utf-8","root","root")

        val sql = "insert into ipLog values(?,?)"

        ps= connection.prepareStatement(sql)

        it.foreach(t=>{

          ps.setString(1,t._1)

          ps.setInt(2,t._2)

          ps.executeUpdate()

        })

      }catch {

        case e:Exception => e.printStackTrace()

      }finally {

        if(connection!=null) connection.close()

        if(ps!=null) ps.close()

      }

    })

/*    res.foreach(t=>{

      //如果要在集群环境中运行,需要--jar

      //也可以,把mysqljar手动添加到lib中

      try{

        //try catch

        val statement: PreparedStatement = connection.prepareStatement("insert into ipLog values(?,?)")

        statement.setString(1,t._1)

        statement.setInt(2,t._2)

        statement.execute()

      }catch {

        case e:Exception=>{e.printStackTrace()}

      }

    })*/

    res.collect().foreach(println)

  }

  //从给定的数组中二分查找给定值

  //二分查找的数组必须是有序的

  def binarySearch(ipLong:Long,arr:Array[(Long, Long, String)]): String ={

    var start: Int = 0

    var end: Int = arr.length-1

    var addr:String= null;

    //这里必须有=,如果没有,就会缺少很多值

    while(end>=start){

      var midd: Int = (start+end)/2

      //如果给定的ip在该数组的中间值的起始ip和结束ip之间,则反回省份份

      if(ipLong>=arr(midd)._1&&ipLong<=arr(midd)._2){

        return arr(midd)._3

      }else if(ipLong<arr(midd)._1){

        end=midd-1

      }else{

        start=midd+1

      }

    }

    addr

  }

  def ip2Long(ip:String): Long ={

    val fragments = ip.split("[.]")

    var ipNum = 0L

    for (i <- 0 until fragments.length) {

      ipNum = fragments(i).toLong | ipNum << 8L

    }

    ipNum

  }

}

上一篇 下一篇

猜你喜欢

热点阅读