大数据 爬虫Python AI Sqlhadoop 大数据底层理解大数据

spark读取Hive

2019-03-11  本文已影响0人  Tim在路上

导入依赖

导入关键的依赖包

    compile("org.scala-lang:scala-library:$scalaVersion")
    compile("org.scala-lang:scala-reflect:$scalaVersion")
    compile("org.scala-lang:scala-compiler:$scalaVersion")

    compile("org.apache.spark:spark-sql_2.11:$sparkVersion")
    compile("org.apache.spark:spark-streaming_2.11:$sparkVersion")
    compile("org.apache.spark:spark-hive_2.11:$sparkVersion")
    compile("org.apache.spark:spark-hive-thriftserver_2.11:$sparkVersion")

启动hive支持

val warehouseLocation = new File("spark-warehouse").getAbsolutePath
   //配置spark
    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .master("local[2]")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .config("hive.metastore.uris", "thrift://hdp2.nsrc.com:9083")
      .config("mapreduce.input.fileinputformat.input.dir.recursive", "true")
      .config("hive.input.dir.recursive", "true")
      .config("hive.mapred.supports.subdirectories", "true")
      .config("hive.supports.subdirectories", "true")
      .config("spark.driver.maxResultSize", "5g")
      //启动hive支持
      .enableHiveSupport()
      .getOrCreate()
    var startDay = "2019-03-08 00:00:00"
    var endDay = "2019-03-10 23:59:59"
    var srcIp = "10.28.137.84"
    //直接使用sparksql进行查询,返回为df
    var resultDf = spark.sql("select * from http_origin where  date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')>= '"+startDay+"'" +
      "and date_format(http_origin.day, 'yyyy-MM-dd HH:mm:ss')<= '"+endDay+"' and http_origin.srcip = '"+srcIp+"'")

hive与hbase关联,可以作为一种查询hbase的方式

创建hive对应的映射语句

CREATE EXTERNAL TABLE IF NOT EXISTS httpsystem_dev( id String, srcIp String, srcPort Int, distIp String, distPort Int, requestURL String, requestMethod String, requestUserAgent String, requestCookie String, responseServer String, responseCode Int, requestHeader String, requestContType String, responseCharset String, httpVersion String, requestHost String, requestBodyString String, requestParameterString String, responseContentType String, responseHeader String, responseBodyReference String, ML_rule_juge String, ML_rule_juge_id String, ML_type String, ML_juge_mal String, ML_juge_type String, DLCNN_rule_juge String, DLCNN_type String, DLCNN_juge_mal String, DLCNN_juge_type String) STORED BY'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES( 'serialization.format'='\t','hbase.columns.mapping'=':key,0:srcIp, 0:srcPort, 0:distIp, 0:distPort, 0:requestURL, 0:requestMethod, 0:requestUserAgent, 0:requestCookie, 0:responseServer, 0:responseCode, 0:requestHeader, 0:requestContType, 0:responseCharset, 0:httpVersion, 0:requestHost, 0:requestBodyString, 0:requestParameterString, 0:responseContentType, 0:responseHeader, 0:responseBodyReference, 0:ML_rule_juge, 0:ML_rule_juge_id, 0:ML_type, 0:ML_juge_mal, 0:ML_juge_type, 0:DLCNN_rule_juge, 0:DLCNN_type, 0:DLCNN_juge_mal, 0:DLCNN_juge_type','field.delim'='\t') TBLPROPERTIES ('hbase.table.name'='httpsystem_dev')

将结果保存csv到HDFS

var url: String = "hdfs://hdp1.nsrc.com:8020/user/http_system/offline_file/" + "123"
        resultDf.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).option("header", "false")
         .save(url);
//        //创建样例列表

创建视图返回局部结果

       resultDf.registerTempTable("offlineResult")
//      var samples = spark.sql("select srcip,distip,requesthost,requesturl,requsetheader," +
//        "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
        var samples = spark.sql("select srcip,distip,requesthost,requesturl,requestheader," +
  "requestbodystring,requestmethod,responsecode,responsebody from offlineResult limit 10")
      samples.show()
           var rows = samples.collect()
           for(row <- rows){
              println(row(1),row(0),row(7))
           }
上一篇下一篇

猜你喜欢

热点阅读