spark chapter 3 RDD
# 1 什么是RDD
*Resilient Distributed Dataset弹性分布式数据集
*Represents an immutable(不可变)partitioned(分区的)collection of elements that can be operated on in parallel(并行操作).
* 操作包括: `map`, `filter`, and `persist`(持久化)
*有KV,Double,Sequence文件类型
什么是不可变?——>对于变量不可修改,每次操作之后都会生成一个新的变量。
spark编程和scala类似,可以无缝对接
```
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging
```
1)抽象类,只有在子类里面实现了(子类继承父类)
2)带泛型的,可以支持多种类型(string,person,user)
单机存储/计算==>分布式存储/计算
1数据存储:切割 HDFS BLock
2数据计算:切割(并行计算)Mapreduce、spark
3存储和计算:HDFS/S3 + Mapreduce/Spark
# 2 RDD特性
【面试考点】Internally, each RDD is characterized by five main properties:
- A list of partitions一些列分区分片
- A function for computing each split可以去计算每个分区分片
rdd.map(_+1)->对每个分区做了一个相同的操作
- A list of dependencies on other RDDs rdd是具有依赖关系的【重要特点】
rdd1==>rdd2==>rdd3==>=rdd4
rdd1有五个分区,则rdd2也有五个分区。如果rdd2第四个分区丢失,则数据会从rdd1第四个分区再次计算,而不会对所有数据进行计算。
- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)。可选属性,KV分区可以按照不同策略去设置分区hash or range.etc
- Optionally, a list of preferred locations to compute each split on (e.g. block locations for
an HDFS file)可选属性,数据在哪儿可以把作业调度在数据所在的节点进行计算:数据移动不如移动作业
用户可自己创建RDD,从文件中读取数据
* refer课程:hadoop入门实战课程
# 3 RDD特性在源码中的体现
RDD有五个方法:
## 1 computer-特性2
> DeveloperApi ——开发API
> Implemented(实现)by subclasses(子类)to compute a given partition.
```
def compute(split: Partition, context: TaskContext): Iterator[T]
```
* parition源码
```
package org.apache.spark
// An identifier for a partition in an RDD.
trait Partition extends Serializable {
//Get the partition's index within its parent RDD
def index: Int
// A better default implementation of HashCode
override def hashCode(): Int = index
override def equals(other: Any): Boolean = super.equals(other)
}
```
面试问题:
1为什么重写equals方法的时候需要重写hashcode
2如何实现hashset/hashmap
## 2 getPartitions-特性1
```
protected def getPartitions: Array[Partition]
```
This method will only be called once, so it is safe to implement a time-consuming computation in it.
## 3 getDependencies-特性3
```
protected def getDependencies: Seq[Dependency[_]] = deps
```
返回一堆依赖关系
## 4 getPreferredLocations-特性5
```
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
```
## 5 partitioner-特性4
```
val partitioner: Option[Partitioner] = None
```
## + hadoop RDD的实现
:: DeveloperApi ::
An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).
protected def getInputFormat(conf: JobConf): InputFormat[K, V] ——new出来输入对象,然后匹配是否符合输入,然后返回。
override def getPartitions: Array[Partition] ——调用getinputformat,然后分块,检查是否为空,返回非空。
对于上面讲的在开发API中这些方法的实现没有理解
## + JDBC RDD的实现
也存在 上述的集中RDD支持的操作
---
两个实现的方式是说每个实现里会有这几种基本方式的实现,但是具怎么实现,实现原理是依赖于其本身的设计还是spark RDD的设计还是不理解。
logging属于日志接口
def this是附属构造器
# 4图解RDD
# 5 Spark Context、Spark Conf
1创建SparkContext
链接到Spark集群,可以跑在local、standalone、yarn、mesos
可以通过SC创建RDD,或者广播变量到集群
在创建sparkcontext之前,要创建spark conf
不要硬编码,最好是通过submit传入进来
启动pyshark(在python的bin目录里,可以配置环境变量后,支持全局可用)
```
./pyshark
```
在创建spark的时候初始化了sc,sc的master为local[*]
可以spark000:8080查看运行状况
也可以添加其他属性
```
./bin/pyshark --master local[4] --py-files code.py
```
用ipython启动
```
$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
```
sc.sparkconf
# 6 pyshark脚本解析
可以用 查询相关参数属性
./pyshark --help
传输参数
./pyshark --master local[2]
./pyshark --name "Pyskark" #定义名字
# 7 RDD 创建方式
Once created, distFile can be acted on by dataset operations.
## 方法 1 driver
Parallelized Collections,测试场景下使用较多
```
data = [1,2,3,4,5]
disData = sc.parallelize(data) #把data转换成RDD,默认转换成两个部分
disData.collect() #查看数据里面具体是什么
disData.reduce( lambda a , b : a+b ) #加和所有元素(?那对部分相加怎么办?)
```
ps:collect,reduce才会触发jobs
One important parameter for parallel collections is the number of*partitions*to cut the dataset into.——也就是说,我们的数据是可以切分的。例子如下:
```
disData = sc.parallelize(data,5) #这里的参数5是表示把RDD切成5个部分。
```
多分数据和业务逻辑和处理性能有关。
每个CPU可以设置2-4个partition
## 方法 2 外部数据导入
外部存储导入
支持读取的数据源格式:HDFS, HBase,S3,Hive,etc
支持文件类型:text files, SequenceFiles or any other Hadoop InputFormat or any data source offering a Hadoop InputFormat
```
# 先在目录下创建hello.txt文件,内容可自定义,记录下其村吃地址
disFile = sc.textFile("file://本地文件路径")
# 本地文件
disFile = sc.textFile("file:///home/hadoop/data/hello.txt")
disFile.collection()
#hadoop文件
disFile = sc.textFile("hdfs://hadoop000:8020/hello.txt")
disFile.collection()
#使用示例,求和文件中字段的长度并加和
disFile.map(lambda s:len(s)).reduce(lambda a,b:a+b)
```
*If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes.
注意点:如果你的文件在本地,要确保你的运行程序在运行的节点上可以访问本地。一般建议用网路的共享文件系统(use a network-mounted shared file system,类似于S3)?这就是说 如果在hdfs 上 也算非本地把?
1)上课环境是单节点,hello.txt本地有就可以读取
2)如果在standalone:Spark集群上:3个节点,local path都是表示节点本地读取文件。不建议
3)生产上直接用yarm,不会用standalone
* 支持整个文件夹,或者压缩文件。
```
sc.textFile("/my/directory")
sc.textFile("/my/directory/*.gz")
```
* 可以设置分区(partitions)数量,可以设置更多,单无法设置更少。
## python特有的支持类型
*SparkContext.wholeTextFiles
支持全路径的文件读取,key是文件村存储地址,value是文件内容
```
sc.wholeTextFiles("file:///home/hadoop/data/hello.txt").collect()
# 读取内容
[(文件路径,文件内容)]
```
* RDD.saveAsPickleFile——可以保存为这种文件类SparkContext.pickleFile
* SequenceFile and Hadoop Input/Output Formats
示例 1 序列
```
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
rdd.saveAsSequenceFile("path/to/file")
sorted(sc.sequenceFile("path/to/file").collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa')]
```
示例2
```
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable",
"org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf)
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
(u'Elasticsearch ID',
{u'field1': True,
u'field2': u'Some Text',
u'field3': 12345})
```
不知道什么是es
示例3 存储数据
```
data = [1,2,3,4,5]
disData = sc.parallelize(data)
disData.saveAsTextFile('file:///home/hadoop/data/output/)
```
# 8 Spark 应用程序开发并运行
IDE:IDEA pycharm
1 建立一个新的项目
2 建立一个py文件
3对py文件的configureations中添加环境变量
添加spark中pyhton的环境变量
添加spark本身的环境变量
4在preference里面的project structure的add content root添加python-lib下的py4j和pyshark两个zip包
5使用local进行本地测试
6提交pyshark程序
```
./spark - submit --master local[2] -name spark -py python文件名
```
# 9 程序运行coding小笔记
显示当前全部java进程pid的命令jps
创建hello.txt并对其进行编辑vi hello.txt
添加环境变量:vi ~/.bashrc
查看一个文本文件cat hello.txt
hdfs文件传输
```
hadoop fs -put /hello.txt
hadoop fs -text /hello.txt
```
?不懂什么时候需要/ 什么时候~/ 什么时候./
refer课程:10个小时入门hadoop