2018-06-04 初识spark

2018-12-27  本文已影响0人  江江江123

什么是spark:
用户大数据计算的引擎
特点:非常快 原因:内存迭代运算
易用
通用
不能做什么?
不能做数据存储,依赖于hbase,hdfs

入门第一步 wordcount

object WordCount{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("ScalaWordCount")
    //描述下mast spark://必带,集群master的ip:hadoop1, 端口号7077 可以在浏览器ip:8080查看界面,当然得是已经正常启动的spark  
    conf.set("spark.master","spark://hadoop1:7077")
    //非常重要的一个对象SparkContext
    val sc = new SparkContext(conf)
//hdfs 
    val textFile = sc.textFile("hdfs://hadoop1:9000/user/test.txt")
    val counts = textFile.flatMap(line => line.split(" "))
      .map(word => (word, 1))
      .reduceByKey(_ + _)
    counts.saveAsTextFile("hdfs://hadoop1:9000/user/testResult.txt")
  }
}

入门第二步:读写hive

object SparkHive {
//配置本地启动
  var conf = new SparkConf().setAppName("HiveApp").setMaster("local").setJars(List("F:\\ideaWorkSpace\\spark\\target\\scalaDemo.jar"))
  System.setProperty("hadoop.home.dir", "E:/hadoops")
  val sc = new SparkContext(conf);
  val sqlContext = new HiveContext(sc);
  
//加载数据
  def loadDate(filePath:String,tableName:String):Unit={
    sqlContext.sql("load data local inpath '"+filePath+"' into table "+tableName);
  }
//查询数据
  def getPeopleByName(name: String):Person={
    val row = sqlContext.sql("select * from people t where t.name='"+name+"'").collect().apply(0);
    return new Person(row.getAs[String](0),row.getAs[Int](1));
  }
//关闭连接
  def destory():Unit={
    conf = null;
    sc.stop();
  }
  def main(args: Array[String]): Unit = {
//测试
    var filePath = "D:/a.txt";
    val tableName = "people";
    sqlContext.sql("drop table if exists people");
    sqlContext.sql("create table people (name string,account int)  ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE");
    sqlContext.sql("show tables").show();
    loadDate(filePath,tableName);
    sqlContext.sql("select * from people").collect().foreach(println)
    getPeopleByName("1");
    sc.stop()
  }
}
case class Person (name: String, var account: Int){

}

入门第三步:读写hbse

object SparkHbse {
  System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
  private val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local");
  private val sc = new SparkContext(sparkConf);
  val conf = HBaseConfiguration.create()
  //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
  conf.set("hbase.zookeeper.quorum","hadoop1,hadoop2,hadoop3")
  //设置zookeeper连接端口,默认2181
  conf.set("hbase.zookeeper.property.clientPort", "2181")

  def main(args: Array[String]): Unit = {
    //测试 新增,修改,查询
    val people1 = new Person("1",1000);
   /* val people2 = new Person("2",800);
    val persons =  List(people1,people2);*/
   /* SparkHbse.putRdd(persons,"table1")*/
    var hBaseRDD=SparkHbse.getTableRdd("table1");
    hBaseRDD = hBaseRDD.filter(peopleRdd => people1.name.equals(Bytes.toString(peopleRdd._2.getRow)));
    hBaseRDD.foreach{case (_,result) =>{
      //获取行键
      val key = Bytes.toString(result.getRow)
      println(key +":"+ people1.name +"=="+key.equals(people1.name))
      //通过列族和列名获取列
      val cid = Bytes.toInt(result.getValue("cf".getBytes,"cid".getBytes))
      println("Row key:"+key+" cid:"+cid)
    }}
    sc.stop();
  }
  def getTableRdd(tableName: String):RDD[(ImmutableBytesWritable,org.apache.hadoop.hbase.client.Result)]={
    conf.set(TableInputFormat.INPUT_TABLE, tableName)
    val admin = new HBaseAdmin(conf)
    //读取数据并转化成rdd
    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])
    admin.close();
    return  hBaseRDD;
  }
  def putRdd(persons: List[Person],tableName: String):Unit={
    val jobConf = new JobConf(conf);
    jobConf.setOutputFormat(classOf[TableOutputFormat]);
    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName);
    val indataRdd = sc.makeRDD(persons);
    val rdd = indataRdd.map{person=>{
      val put = new Put(Bytes.toBytes(person.name));
      put.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("cid"),Bytes.toBytes(person.account));
      (new ImmutableBytesWritable,put);
    }}
    rdd.saveAsHadoopDataset(jobConf);
  }
  def destory():Unit = {
    sc.stop()
  }
}

入门第四步:spark stream 读取kafka数据

object LogStream {
  def main(args: Array[String]): Unit = {
//设置本地启动
    val sparkConf = new SparkConf().setAppName("logStream").setMaster("local");
    println("start")
//线程必须大于0
    val numThreads = 1;
  //组id若未在kafka种设置可随意添加
    val groupId = "groupid"
//设置每6秒执行一次
    val ssc = new StreamingContext(sparkConf,Seconds(6))
    val topics = Set("log-flume");
    val brokers = "hadoop1:9092,hadoop2:9092,hadoop3:9092"
    val kafkaParam = Map[String,String]("metadata.broker.list" -> brokers,
      "serializer.class" -> "kafka.serializer.StringEncoder",
      "group.id" -> groupId,
//这个参数表示每次从头开始获取。。如果获取实时数据可不添加,此处用于测试
      "auto.offset.reset" -> OffsetRequest.SmallestTimeString);
    //获取所有数据
    val message = KafkaUtils.createDirectStream[String,String,StringDecoder
      ,StringDecoder](ssc,kafkaParam,topics).map(_._2);
    message.foreachRDD(lines=>{
        println(lines);
      })

    })
    ssc.start();
    ssc.awaitTermination();
  }
}
上一篇 下一篇

猜你喜欢

热点阅读