大数据学习大数据spark

spark 学习笔记

2017-01-15  本文已影响2191人  哎哟喂喽

Spark学习笔记

Data

Source->Kafka->Spark Streaming->Parquet->Spark SQL(SparkSQL可以结合MLGraphX)->Parquet->其它各种Data Mining

1.1 Spark集群的安装

Spark的运行是构建在hadoop集群之上(默认hadoop集群已经安装好了),在spark集群集群上必须要安装对应版本的scala

1.1.1 scala安装

Ø下载scala版本,解压scala

Ø配置环境变量/etc/profile,添加SCALA_HOME、修改PATH,添加上scala的path路径

Ø进入$SCALA_HOME/bin目录,执行./scala验证scala是否安装成功

Ø集群机器都需要安装scala

1.1.2 spark安装

在集群的所有机器上都必须要安装spark,首先安装master的spark程序

Ø先解压spark程序

Ø修改环境变量/etc/profile添加SPARK_HOME和修改spark PATH路径

Ø配置spark,进入conf目录下

nmv spark-env.sh.template spark-env.sh

其中:spark_master_ip:用于指定master

nvi slaves修改文件,把work节点都添加进去;

Ø至此,spark集群安装完毕

1.1.3启动集群校验

Ø先启动hadoop集群,jps查看进程

Ø再启动spark集群,在sbin目录下执行./start-all.shjps查看进程

ØUi访问,检查集群情况http://master:8080

Ø进入spark/bin目录下,启动spark-shell脚本

1.2 spark-shell的使用

在master机器上的$SPARK_HOME/bin目录下,运行./spark-shell程序启动shark-shell脚本;通过http://master:4040查看spark-shell运行情况

1.2.1 spark-shell操作hdfs文件实战

Ø将spark目录下的README.md文件上传到hdfs上的/test目录下,通过hdfs ui来进行查看slave:50070/explorer.html#/查看文件是否上传成功

Ø在spark-shell脚本程序下,执行sc(SparkContext实例),启动spark-sehll时,系统自动生成

scala> sc

res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@65859b44

# SparkContext是把代码提交到集群或者本地的通道,编写Spark代码,无论是要运行本地还是集群都必须要有SparkContext实例

ØSpark-shell读取hdfs文件的README.md文件

val file = sc.textFile(“hdfs://mapeng:8020/test/README.md”)

#这里把读取到的文本内容赋值给了变量file,(就是一个MappedRDD,在spark的代码中,一切都是基于RDD进行操作的)

Ø读取文本中包含有“spark”的行

scala> val sparks = file.filter(line

=> line.contains("spark"))

sparks: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at

filter at :26

#此时生成了一个FilterRDD

Ø统计spark一共出现多少次

Sparks.count

1.2.2 spark-shell操作及详细说明

1.2.2.1并行化集合(parallelize)

Ø加载集合数据

val data = sc.parallelize(1 to 10)#加载集合数据

或者; val data = sc.parallelize(List(1,2,3,4…))

Ø对集合数据进行*2操作

val data1 = data.map(_*2)

Ø对数据进行过滤:过滤出是2的倍数的集合

val data2 = data.filter(_%2==0)

Ø内存缓存数据

data.cache

Ø触发action,以数据的形式返回结果集

data.collect

Ø返回结果集的第一个元素

data.first

Ø返回结果集的前3个元素

data.take(3)

Ø统计元素的个数

data.count

Ø查看RDD的转换过程

data.toDebugString

1.2.2.2 map数据集合

Ø加载List(Map)数据

val

data=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))

Ø排序sortByKey()

scala> data.sortByKey().collect

res55: Array[(String, Int)] = Array((A,1), (A,4), (B,2), (B,5),

(C,3))

Ø分组groupByKey()

scala>

data.groupByKey().collect

res57: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2,  5)), (A,CompactBuffer(1, 4)), (C,CompactBuffer(3)))

Ø求和reduceByKey(_+_)

scala>

data.reduceByKey(_+_).collect

res59: Array[(String, Int)] = Array((B,7), (A,5), (C,3))

Ø去重distinct

scala> data.distinct.collect

res60: Array[(String, Int)] = Array((A,1), (A,4), (B,5), (C,3),

(B,2))

Ø联合union

scala> val

data1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))

data1:

org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at

parallelize at :24

scala> val data2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5)))

data2:

org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at

parallelize at :24

scala>

data1.union(data2).collect

res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4),

(B,5), (A,4), (A,4), (C,3), (A,4), (B,5))

Ø关联join

相当于笛卡尔积

1.2.2.3保存转换结果saveAsTextFile

data.saveAsTextFile(“path”);//将转换结果存储在hdfs指定的路径

1.2.3 spark cache缓存

对于spark程序,第二次执行要比前面的执行的效率要高

1.3 RDD(弹性分布式数据集)

1.3.1 RDD介绍

ØRDD是一个容错的、并行的数据结构,可以让用户显示的将数据存储在磁盘和内存中,并能控制数据的分区。

ØRDD提供了一套丰富的函数来操作数据

ØRDD作为数据机构,本质上是一个只读的分区记录集合;一个RDD可以包含多个分区,每个分区就是一个dataSet片段;RDD可以相互依赖

n窄依赖:RDD的每个分区最多只能被一个child

RDD的分区使用(例如:map操作)

n宽依赖:RDD的分区可以被多个child

RDD的分区使用(例如:join操作)

区别:

(1)窄依赖可以在集群中的一个节点上如流水般的执行,相反,宽依赖需要所有的父分区的数据都可用

(2)从出现失败恢复的角度来考虑:窄依赖只需要重新计算失败的父RDD的分区,而宽依赖失败会导致其父RDD的多个分区重新计算

1.3.2 RDD分区

1.3.3创建操作

1.3.3.1集合创建操作

Spark提供了两类函数实现从集合生成RDD;

Øparallelize

val rdd = sc.parallelize(1 to 100)

ØmakeRDD:还提供了指定分区参数

val rdd = sc.makeRDD(1 to 100,3)#指定了分区数为3

1.3.3.2存储创建操作

操作hdfs

val rdd = sc.textFile(“hdfs://master:9000/test/xxx.txt”)

1.3.4 RDD的基本转换操作

1.3.4.1 RDD的重新分区

repartition和coalesce是对RDD的分区进行重新划分

Ørepartition(numPartitions:Int):RDD[T]

Øcoalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

repartition只是coalesce接口中shuffle为true的简易实现。

重新划分分区主要有三种情况:(原RDD有N个分区,需要重新划分为M个分区)

ØN

ØN>M(相差不大):面临着要把原分区进行合并的操作,最终合成M个分区,这时将shuffle设置为false

注:在shuffle为false时,设置M>N,coalesce是不起作用的

ØN>>M(差距悬殊):如果将shuffle设置为false,由于父子RDD是窄依赖,会使得它们同处于一个stage中,可能会造成spark程序运行的并行度不够,从而影响效率。

因而,最好设置为true,使得coalesce之前的操作有更好的并行度

1.3.4.2 RDD转换为数组(randomSplit、glom)

ØrandomSplit(weight:Array[Double],seed:Long=System.nanoTime):Array[RDD[T]]

randomSplit函数是将一个RDD切分为多个RDD,返回结果是一个RDD数组;函数的第一个方法传入的参数权重是一个Double类型的数组;权重大的,分到的数据的概率大

val rdd = sc.makeRDD(1 to 10)

val splitRDD = rdd.randomSplit(Array(1.0,3.0,6.0))

#返回的一个RDD数组,查看数组元素

splitRDD(0).collect

splitRDD(1).collect

splitRDD(2).collect

Øglom():RDD[Array[T]]

glom函数是将RDD中每一个分区中类型为T的元素转换为Array[T]

val rdd = sc.makeRDD(1 to 10,3)

val glomRDD= rdd.glom

#返回的结果是一个数组,

glomRDD.collect

scala> glomRDD.collect

res44: Array[Array[Int]] = Array(Array(1, 2, 3), Array(4, 5, 6), Array(7,

8, 9, 10))

1.3.4.3 RDD的集合操作

Øunion(other:RDD[T]) :RDD[T]

将两个RDD的数据进行合并,返回两个RDD的并集,不去重

Øintersection(other:RDD[T]) :RDD[T]

返回两个RDD的交集(会去重)

Øsubtract(other:RDD[T]) :RDD[T]

取差集

Øzip(other:RDD[T]):RDD[T]

zip函数用于将两个RDD组合成key/value形式的RDD,两个RDD的partition个数以及元素的数量都必须要相同,不然会抛出异常

val rdd1 = sc.makeRDD(List(1,2,3,3))

val rdd2 = sc.makeRDD(List(2,3,4))

union操作

rdd1.union(rdd2).collect

#结果

Array[Int] = Array(1, 2, 3, 3, 2, 3, 4)

intersection操作

rdd1.intersection(rdd2).collect

#结果(去重)

Array[Int] = Array(3, 2)

subtract操作

rdd1.subtract(rdd2).collect

#结果(不去重)

Array[Int] = Array(1, 1)

zip操作

scala> val rdd1 = sc.makeRDD(1

to 3)

scala> val rdd2 =

sc.makeRDD(List(1.0,2.0,3.0))

scala> rdd1.zip(rdd2).collect

res8: Array[(Int, Double)] = Array((1,1.0), (2,2.0), (3,3.0))

1.3.4.4键值RDD转换操作

map和flatmap的区别:

(1)map是对每个元素都进行指定的操作,返回每个元素处理后的对象

(2)flatmap对所有的元素都做指定的操作,将所有的对象合并为一个对象返回

val rdd = sc.makeRDD(1 to 3)

rdd.map(x=>Seq(x,x)).collect

#结果

Array[Seq[Int]] = Array(List(1, 1),

List(2, 2), List(3, 3), List(4, 4))

rdd.flatMap(x=>Seq(x,x)).collect

#结果,合并为一个对象返回

Array[Int] = Array(1, 1, 2, 2, 3, 3, 4,  4)

未完待续.....

1.3.5 RDD的行动操作

每调用一次行动操作,都会触发一次spark的调度并返回响应的结果

1.3.5.1集合标量行动操作

Øcount

返回RDD中的元素的个数

Øfirst

返回RDD中的第一个元素

Øreduce(f:(T,T) = >T)

对RDD中的元素进行二元计算,返回计算结果

val rdd = sc.makeRDD(1 to 4)

rdd.reduce(_+_)#10

rdd.reduce(_-_)#-8

Øcollect()

以集合的形式返回RDD的元素

Øtake(number:Int)

返回集合中[0,num-1]下标的元素

Øtop(num:Int)

先降序排序,返回前num个元素

ØtakeOrdered(num:Int)

以与top相反的排序规则(升序),返回前num个元素

Ølookup(key:k):Seq[v]

lookup是针对(k,v)类型RDD的行动操作,针对给定的键值,返回与此键值相对应的所有值

1.3.5.2存储行动操作

RDD不仅可以存储在hdfs中还能存储到Hbase、MangoDB等数据库中

1.4 Spark SQL

1.4.1 spark sql与shark区别

sparksql是一个支持结构化数据处理的spark模块,提供DaraFrame作为可编程的数据抽象,可以对DataFrame执行sql的操作。

spark sql的诞生就是为了解决spark平台上的交互式查询问题,并且提供sql接口兼容原有数据库用户的使用情况

Øshark简单的说,就是spark上的hive,其底层依赖Hive引擎的,但在spark平台上,解析速度是hive的好多倍;就是一个升级版的大数据仓库

Ø在spark1.0版本开始,shark被官方抛弃使用

Øspark sql的优势:

nspark sql完全脱离了hive的限制

nspark sql支持查询原生的RDD,能够高效的处理大数据的各种场景的基础

n能够在scala中写sql语句,支持简单的sql语法检查,将结果取回作为RDD使用

nCatalyst能够帮助用户优化查询,catalyst能够进行一定程度的性能提升

# catalyst是spark sql的调度核心,解析sql形成其对应的执行计划(遵循DAG图)

1.4.2 DataFrame和DataSet

1.4.2.1 RDD与DataFrame的区别

如上图:

Ø左侧的RDD[Person]虽然以Person为类型参数,但是spark框架本省不了解Person类的内部结构;而右侧的DataFrame却提供了详细的结构信息(schema),使得spark sql可以清楚的知道该数据集中包含哪些列,每列的名称和类型

ØRDD是分布式的java对象的集合;而DataFrame是分布式的row对象的集合

ØDataFrame除了提供比RDD更丰富的算子外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化

1.4.2.2 DataFrame与DataSet的区别

DataSet可以认为是DataFrame的一个特例,主要区别是DataSet的每一个record存储的是一个强类型值而不是一个Row。具有三个特点:

ØDataSet在编译时检查类型

Ø面向对象的编程接口

Ø后面的版本DataFrame是继承DataSet的,DataFrame是面向Spark sql的接口

相互转换:

DataFrame和DataSet可以相互转化,df.as[ElementType]这样可以把DataFrame转化为DataSet,ds.toDF()这样可以把DataSet转化为DataFrame

1.4.2.3 DataFrame

(1)DataFrame是一个分布式的数据集,类似于关系数据库的一个表。

ØDataFrame以列的形式存储,但是不知道列的类型,因此,在编译时不进行校验,只有在运行时才会处理;DataSet不仅知道字段,还知道类型,所以编译时会进行类型校验

Ø可以由结构化的数据转换过来,也可以从hive,外部数据库或者RDD转换

ØDataFrame在spark sql中,可以使用sql的方式进行操作,与RDD类似,也可以采用lazy的方式,只有动作发生时才会真正的计算

ØDataFrame的数据源:支持JSON文件、hive表格,支持本地文件系统以及hdfs等;配合JDBC还支持外部关系型数据库

1.4.2.4与RDD的相互操作

spark sql支持两种不同的方式用于将存在的RDD转换为DataSets、DataFrame

Ø反射推断模式:

该模式使得代码更加的简练,不过在写spark程序的时候已经知道模式信息,(比如RDD中自己定义的case class类型)

练习:从hdfs文件中读取数据,创建一个Person的RDD

1.定义Person type

scala> case class

Person(id:Int,name:String,addr:String)

defined class Person

2.从hdfs读取文件,封装成DataFrame数据集

scala> val personDf = spark.sparkContext

.textFile("/test/preson.txt")

.map(_.split(","))

.map(p=>Person(p(0).toInt,p(1),p(2)))

.toDF

personDf:

org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

3.将personDf注册为一个视图view

scala>

personDf.createOrReplaceTempView("person")

4.通过sql查询视图;sql支持复杂的,包括多表关联

scala> spark.sql("select

addr,count(1) from person group by addr").show

Ø编程指定模式:

构造一个模式,将其应用到一个已经存在的RDD上将其转化为DataFrame,该方法适用于运行之前不知道列以及列的类型的情况

import org.apache.spark.sql.types._

1.加载数据

val presonRDD= spark.sparkContext.textFile(“/test/person.txt”)

2.定义schema

val stringSchema = “id,name,addr”

val schema = StructType(stringSchma.split(",").map(field=>StructField(field,StringType,nullable=true)))

3.转换rdd的记录到rows集合

1.4.3 spark sql的操作

1.4.3.1创建SparkSession实例

SparkSeesion类时Spark SQL的所有功能的入口;spark-shell启动时,默认生成了一个SparkSession的实例:spark

importorg.apache.spark.sql.SparkSession

val spark = SparkSession.builder

.master("local")

.config("spark.sql.warehouse.dir", "/user/hive/warehouse")

.appName("spark text")

.getOrCreate

//包含隐式转换(比如讲RDDs转成DataFrames)API

importspark.implicits._

1.4.3.2创建DataFrame

spark sql读取hdfs中json数据

val df = spark.read.json("/test/course.json")

#显示df的数据

Ø显示df的数据

df.show

#结果:

Ø查询df的结构信息

Ø显示指定的字段值:使用select(col1,,col2)

Ø过滤,查询长度》12的数据

Ø分组操作groupBy

1.4.3.3 spark sql实战

1.4.3.3.1入口:SQLContext,HiveContext(Starting Point: SQLContext)

spark sql中所有的操作入口点都是SQLContext类或者它的子类.创建一个基本的SQLContext,只需要SparkContext即可(sc)

注:spark2.0之后,sparkSession是实现了同样的功能,不需要显示的创建SparkConf、sparkContext、SQLContext,因为这些对象都封装在了SparkSession中。即是

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

valhiveContext =neworg.apache.spark.sql.hive.HiveContext(sc)

除了SparkContext外,还有HiveContext。两者的区别:

ØSQLContext只支持标准的sql语法解析器

ØHiveContext现在支持sql语法解析器和HiveSql语法解析器;默认为hivesql语法解析器,用户可以通过配置来切换sql语法解析器,来运行hivesql不支持的语法

Ø使用HiveContext可以使用Hive的UDF,读写Hive表数据等hive操作。sqlContext不可以对hive进行操作

Ø趋势:SqlContext不断丰富中,最终两者会形成一个统一的Context

1.4.3.3.2创建DataFrame

使用SqlContext,spark程序可以通过RDD、hive表、JSON格式数据等数据源创建DataFrame

val df = sqlContext.read.json(“/test/readme.json”)

1.4.3.3.3 DataFrame操作

df.show

df.printSchema

df.

1.4.3.3.4 Parquet文件

Parquet文件是一种列式存储格式的文件,能被很多数据处理系统支持。Spark SQL支持读取和写入Parquet文件,并可自动保留原始数据的格式(schema)

优势:

可以跳过不符合条件的数据,只读取需要的数据,降低IO数据量

压缩编码可以降低磁盘存储空间。由于同一列的数据类型是一样的,可以使用更高效的压缩编码进一步节省存储空间

只读取需要的列,支持向量运算,能够获取更好的扫描性能

还有,parquet数据源支持自动发现和推断分区信息

1.4.3.3.5 DataFrame的java操作

将DataFrame的结果转换为java的list

List listRow =result.javaRDD().collect();

for(Row row : listRow){

System.out.println(row);

}

1.4.3.4 SparkSession操作

在2.0版本之前,与spark交互之前必须创建SparkConf和SparkContext;然而到了2.0版本,不需要显示的创建这些对象SparkConf、SparkContext和SqlContext了,这些对象都已经封装在了SparkSession中了,即,2.0版本之后,入口就是就变成了SparkSession,在spark-shell启动时,会实例化一个SparkSession实例spark

ØSparkSession封装的对象

Ø获取conf默认配置,可以调整配置spark的运行参数

1.4.3.4.0 SparkSession的创建

sparkSession类是所有Spark SQL功能的入口,只需要调用SparkSession.builder()即可创建

importorg.apache.spark.sql.SparkSession

val spark = SparkSession.builder

.master("local")

.config("spark.sql.warehouse.dir", "/user/hive/warehouse")

.appName("spark Streaming +kafka")

.enableHiveSupport

.getOrCreate

1.4.3.4.1获取catalog元数据

1.4.3.4.2创建Dataset和Dataframe

最简单的办法就是通过range方法,创建DataSet

注,range也可以有3个参数,第三个参数是间隔,默认的创建的字段:id

Øtop(n)操作

Ø对某一列进行统计操作

Ø通过createDataFrame创建

重新命名列名withColumnRenamed

1.4.3.4.3读取json文件

1.4.3.4.4在SparkSession中使用Spark SQL

1.4.3.4.5数据源

spark支持多种数据源的数据,

Ø最简单的加载方式是load,默认的格式为parquet文件,(可以通过spark.sql.sources.default来默认指定格式)

val df = spark.read.load(“...”)

Ø将DataFrame数据存储为parquet

df.select("name","type").write.save("course.parquet")

存储的路径为(hdfs):path hdfs://192.168.21.144:9000/user/root/course.parquet

也可以手动指定格式,以及指定要保存的文件的格式

scala> val df =

spark.read.format("json").load("/test/course.json")

df: org.apache.spark.sql.DataFrame = [length: bigint, name: string

... 1 more field]

#指定要保存的文件的格式

scala>

df.write.format("parquet").save("course1.parquet")

1.4.3.4.6保存数据到永久表saveAsTable

DataFrame可以通过调用saveAsTable方法将数据落地到hive表中,不过对已经部署的hive不会受影响,spark会创建本地的metastore(使用derby),saveAsTable会持久化数据并指向hive metastore

saveAsTable默认会创建一个“受管理表”,意味着数据的位置都是受metastore管理的。当“受管理表”被删除,其对应的数据也都会被删除。

注:文件内容保存在${SPARK_HOME}/bin/spark-warehouse/tableName

scala> df.write.saveAsTable("course")

#调用,spark.sql(sql_str)比较灵活

1.4.3.4.7 spark整合hive

如果spark没有整合hive,那么spark的元数据都是在bin目录下,自动创建metastore_db(以derby做支撑)

整合hive后,支持spark从hive取数,永久保存数据到hive中;并支持hive的mysql作为元数据存储数据库

Ø将hive配置文件中hive-site.xml文件复制到${spark_home}/conf

Ø将hadoop配置文件中hdfs-site.xml和core-site.xml文件复制到${spark_home}/conf

Ø将hive下元数据库mysql的驱动,复制到${spark_home}/jars下

可以将DataFrame数据永久保存到hive表中

1.5 Spark Streaming

spark2.0将流数据计算统一到了DataSet中,提出了Structured

Streaming的概念,将数据源映射为一张无限长度的表,同时将流计算的结果映射为另一张表,完全以结构化的方式去操作流数据,复用了其对象的Catalyst引擎

1.5.1 spark Streaming实战

创建Steaming DataFrame,用来监听host:9999获取socket数据,并对获取的数据进行RDD转换操作,最后统计各个词出现的次数

Ø创建socket通道

nc –lk 9999

Ø获取socket通道数据(需要填写socket的host、port)

scala>val line

=

spark.readStream.format("socket").option("host","mapeng").option("port",9999).load

Ø转换操作

scala> val wordCount = line.as[String].flatMap(_.split("

")).groupBy("value").count

wordCount: org.apache.spark.sql.DataFrame

= [value: string, count: bigint]

Ø使用start()来启动流式数据计算流程

scala> val query =

wordCount.writeStream.outputMode("complete").format("console").start

程序自动启动了job计算,并在控制台展现计算结果;(接收到socket流数据,spark自动计算,控制台展现结果)

说明:

(1)outputMode现在有三种方式:complete ,append,update(目前只实现了前两种)

lcomplete:每次计算完成后,都能得到全量的计算结果(每次计算都得到转换后的最新结果集)

lappend:每次计算完成后,能拿到增量的计算结果

两种方式的使用说明:

使用了聚合类函数才能使用complete的模式,只有简单的使用了map,filter等转换模式才能使用append模式,不做复杂的聚合统计运算

1.6 Spark Streaming + kafka整合

1.6.1 pom.xml文件,添加spark依赖

org.apache.hadoop

hadoop-client

2.6.0

org.apache.hadoop

hadoop-common

2.6.0

org.apache.hadoop

hadoop-hdfs

2.6.0

org.apache.spark

spark-core_2.11

2.0.0

org.apache.spark

spark-sql_2.11

2.0.0

org.apache.spark

spark-streaming_2.11

2.0.0

org.apache.spark

spark-hive_2.11

2.0.0

org.apache.spark

spark-streaming-kafka-0-10_2.11

2.0.0

org.apache.hive

hive-jdbc

1.2.1

io.netty

netty-all

4.0.29.Final

1.6.2实战代码

packagecom.mp.fight

importorg.apache.spark.sql.SparkSession

importorg.apache.spark.streaming.StreamingContext

importorg.apache.spark.streaming.Seconds

importorg.apache.spark.streaming._

importorg.apache.spark.streaming.kafka010.KafkaUtils

importorg.apache.spark.streaming.kafka010.LocationStrategies

importorg.apache.kafka.common.serialization.StringDeserializer

importorg.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

importorg.apache.spark.sql.SQLContext

importorg.apache.spark.sql.SaveMode

objectTest4{

caseclassPerson(id:Int,name:String,addr:String)

defmain(args: Array[String]): Unit = {

//声明sparkSession

valspark=SparkSession.builder

.master("local")

.appName("spark  Streaming kafkasql")

.config("spark.sql.warehouse.dir","/user/hive/warehouse")

.getOrCreate

//kafka设置

valkafkaParams=Map[String, Object](

"bootstrap.servers"->"mapeng:9092",

"key.deserializer"->classOf[StringDeserializer],

"value.deserializer"->classOf[StringDeserializer],

"group.id"->"example",

"auto.offset.reset"->"latest",

"enable.auto.commit"->(false: java.lang.Boolean)

)

//topic

valtopics=List("testmp")

//初始化StreamingContext

valssc=newStreamingContext(spark.sparkContext,Seconds(30));

//从kafka中读取数据

valkafkaStream=KafkaUtils.createDirectStream[String,String](

ssc,

LocationStrategies.PreferConsistent,

Subscribe[String,String](topics,kafkaParams)

).map(_.value())

//kafkaStream.print

importspark.sqlContext.implicits._

//启用sparkSql来操作DStream转换为DataFrame

kafkaStream.foreachRDD{rdd=>{

if(rdd.isEmpty) {

println("rdd is

empty")

}else{

valperson=rdd.map(_.split(",")).map(p=>Person(p(0).toInt,p(1),p(2))).toDF

//新接收的数据,追加存储在parquet文件中(重写文件)

person.write.mode(SaveMode.Append).save("hdfs://mapeng:9000/test/person.parquet")

//实时统计区域人数

valdf=spark.read.load("hdfs://mapeng:9000/test/person.parquet")

df.createOrReplaceTempView("person")

valaddrCount=spark.sql("select

addr,count(1) as num from person group by addr")

//将统计结果实时回写到parquet文件中

addrCount.write.mode(SaveMode.Overwrite).save("hdfs://mapeng:9000/test/addrCount.parquet")

//继续做多维度统计,可以使用sparksql操作处理parquet文件

}

}

}

//启动job

ssc.start

ssc.awaitTermination

}

}

1.6.3 DStream中foreachRDD、foreachePartition、foreach的区别

ØforeachRDD:得到的是处理一个批次的数据

ØforeachPartition:对一个批次的每个分区数据做处理

Øforeach:每条数据处理,单个元素处理

1.6.4 spark Streaming + socket

val ssc = new StreamingContext(sparkConf,

Seconds(1))

//获得一个DStream负责连接监听端口:地址

val lines = ssc.socketTextStream(“192.168.21.144”, 9999)

//对每一行数据执行Split操作

val words = lines.flatMap(_.split(" "))

//统计word的数量

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

//输出结果

wordCounts.print

ssc.start//开始

ssc.awaitTermination//计算完毕退出

1.8 parquet文件

1.8.1 parquet是面向分析型业务的列式存储格式,有如下优势

Parquet文件尾部存储了文件的元数据信息和统计信息,自描述的,方便解析

1.只读取需要的列,支持向量运算,能够获得更好的扫描性能

2.可以跳过不符合条件的数据,只读取需要的数据,降低io

3.同一列的数据类型是一样的,可以使用更高效的压缩编码,节约存储磁盘

1.8.2 parquet适配多种计算框架

Parquet是语言无关的,而且不与任何一种数据处理框架绑定在一起,适配多种语言和组件,能够与Parquet配合的组件有:

查询引擎: Hive, Impala, Pig, Presto, Drill, Tajo, HAWQ, IBM

Big SQL

计算框架: MapReduce, Spark, Cascading, Crunch, Scalding, Kite

数据模型: Avro, Thrift, Protocol Buffers, POJOs

1.8.3 parquet数据模型

eg:

message AddressBook {

required string owner;

repeated stringownerPhoneNumbers;

repeated group contacts {

required string name;

optional string phoneNumber;

}

}

说明:

schema的格式是这样的:

(1)根叫做message

(2)message包括有多个fields,每个fields包括有三个属性:repetition,type,name

其中,repetition有3中类型:required(出现一次)、optional(出现0次或者1次)、repeated(出现0次或者多次)

(3)type可以是一个group或者一个简单的类型

以上schema描述说明:

(1)每条记录标识一个AddressBook

(2)有且只有一个owner

(3)有0个或多个ownerPhoneNumbers

(4)owner可以有0个或者多个contacts。每个contact有且只有一个name,这个contact的phoneNumber可有可无(0个或者1个)

注:parquet格式的数据类型没有复杂的Map,List,Set等,使用group和repeated fields来表示;

null值不会被存储

实例:

1.8.4 parquet格式文件的存储

在parquet格式的存储中,一个schema的树结构有几个叶子节点,实际存储汇总就有几个column,例如上图中的schema实际存储就4个列

1.8.5 DataFrame与Parquet

(1)保存DF为Parquet格式

dfPerson.write.parquet("person.parquet")

(2)hive中建立parquet格式的表

createtableperson_parquetlikepersonstoredasparquet;

insertoverwritetableperson_parquetselect*fromperson;

(3)加载Parquet文件不再需要case class。

valpersonDF =spark.read.parquet("person.parquet")

personDF.registerAsTempTable("pp")

valmales = spark.sql("select * from pp where gender='M'")

males.show

1.8.5 parquet文件的持久化

1.8.5.1 spark中将DataFrame数据写到hdfs中的parquet文件中,支持追加

personDf.write.mode(SaveMode.Append).save(“hdfs://mapeng:9000/test/person.parquet”)

saveMode有如下几种方式:

1.8.5.2 parquet文件合并

合并的规则:相同的列,在新的数据集中,是通用的列,

各自不同的列,也作为新的数据集的列。

实战:

1.9 spark项目实战

上一篇下一篇

猜你喜欢

热点阅读