spark chapter 3 RDD

2019-08-10  本文已影响0人  深海suke

# 1 什么是RDD

*Resilient Distributed Dataset弹性分布式数据集

*Represents an immutable(不可变)partitioned(分区的)collection of elements that can be operated on in parallel(并行操作). 

* 操作包括: `map`, `filter`, and `persist`(持久化)

*有KV,Double,Sequence文件类型

什么是不可变?——>对于变量不可修改,每次操作之后都会生成一个新的变量。

spark编程和scala类似,可以无缝对接

```

abstract class RDD[T: ClassTag](

    @transient private var _sc: SparkContext,

    @transient private var deps: Seq[Dependency[_]]

  ) extends Serializable with Logging

```

1)抽象类,只有在子类里面实现了(子类继承父类)

2)带泛型的,可以支持多种类型(string,person,user)

单机存储/计算==>分布式存储/计算

1数据存储:切割  HDFS BLock

2数据计算:切割(并行计算)Mapreduce、spark

3存储和计算:HDFS/S3 + Mapreduce/Spark

# 2 RDD特性

【面试考点】Internally, each RDD is characterized by five main properties:

- A list of partitions一些列分区分片

- A function for computing each split可以去计算每个分区分片

rdd.map(_+1)->对每个分区做了一个相同的操作

- A list of dependencies on other RDDs rdd是具有依赖关系的【重要特点】

rdd1==>rdd2==>rdd3==>=rdd4

rdd1有五个分区,则rdd2也有五个分区。如果rdd2第四个分区丢失,则数据会从rdd1第四个分区再次计算,而不会对所有数据进行计算。

- Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)。可选属性,KV分区可以按照不同策略去设置分区hash or range.etc

   - Optionally, a list of preferred locations to compute each split on (e.g. block locations for

     an HDFS file)可选属性,数据在哪儿可以把作业调度在数据所在的节点进行计算:数据移动不如移动作业

用户可自己创建RDD,从文件中读取数据

* refer课程:hadoop入门实战课程

# 3 RDD特性在源码中的体现

RDD有五个方法:

## 1 computer-特性2

> DeveloperApi ——开发API

> Implemented(实现)by subclasses(子类)to compute a given partition.

```

def compute(split: Partition, context: TaskContext): Iterator[T]

```

* parition源码

```

package org.apache.spark

// An identifier for a partition in an RDD.

trait Partition extends Serializable {

  //Get the partition's index within its parent RDD

  def index: Int

  // A better default implementation of HashCode

  override def hashCode(): Int = index

  override def equals(other: Any): Boolean = super.equals(other)

}

```

面试问题:

1为什么重写equals方法的时候需要重写hashcode

2如何实现hashset/hashmap

## 2 getPartitions-特性1

```

 protected def getPartitions: Array[Partition]

```

This method will only be called once, so it is safe to implement a time-consuming computation in it.

## 3 getDependencies-特性3

```

 protected def getDependencies: Seq[Dependency[_]] = deps

```

返回一堆依赖关系

## 4 getPreferredLocations-特性5

```

protected def getPreferredLocations(split: Partition): Seq[String] = Nil

```

## 5 partitioner-特性4

```

val partitioner: Option[Partitioner] = None

```

## + hadoop RDD的实现

 :: DeveloperApi ::

An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`).

protected def getInputFormat(conf: JobConf): InputFormat[K, V]  ——new出来输入对象,然后匹配是否符合输入,然后返回。

override def getPartitions: Array[Partition] ——调用getinputformat,然后分块,检查是否为空,返回非空。

对于上面讲的在开发API中这些方法的实现没有理解

## + JDBC RDD的实现

也存在 上述的集中RDD支持的操作

---

两个实现的方式是说每个实现里会有这几种基本方式的实现,但是具怎么实现,实现原理是依赖于其本身的设计还是spark RDD的设计还是不理解。

logging属于日志接口

def this是附属构造器

#  4图解RDD

# 5 Spark ContextSpark Conf

1创建SparkContext

链接到Spark集群,可以跑在local、standalone、yarn、mesos

可以通过SC创建RDD,或者广播变量到集群

 在创建sparkcontext之前,要创建spark conf

不要硬编码,最好是通过submit传入进来

启动pyshark(在python的bin目录里,可以配置环境变量后,支持全局可用)

```

./pyshark

```

在创建spark的时候初始化了sc,sc的master为local[*]

可以spark000:8080查看运行状况

也可以添加其他属性

```

./bin/pyshark --master local[4] --py-files code.py

```

用ipython启动

```

$ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark

```

sc.sparkconf

# 6 pyshark脚本解析

 可以用 查询相关参数属性

./pyshark --help

传输参数

./pyshark --master local[2]

./pyshark --name  "Pyskark" #定义名字

# 7 RDD 创建方式

Once created, distFile can be acted on by dataset operations.

## 方法 1 driver

Parallelized Collections,测试场景下使用较多

```

data =  [1,2,3,4,5]

disData = sc.parallelize(data) #把data转换成RDD,默认转换成两个部分

disData.collect() #查看数据里面具体是什么

disData.reduce( lambda a , b : a+b ) #加和所有元素(?那对部分相加怎么办?)

```

ps:collect,reduce才会触发jobs

One important parameter for parallel collections is the number of*partitions*to cut the dataset into.——也就是说,我们的数据是可以切分的。例子如下:

```

disData = sc.parallelize(data,5) #这里的参数5是表示把RDD切成5个部分。

```

多分数据和业务逻辑和处理性能有关。

每个CPU可以设置2-4个partition

## 方法 2 外部数据导入

外部存储导入

支持读取的数据源格式:HDFS, HBase,S3,Hive,etc

支持文件类型:text files, SequenceFiles or any other Hadoop InputFormat or any data source offering a Hadoop InputFormat

```

# 先在目录下创建hello.txt文件,内容可自定义,记录下其村吃地址

disFile = sc.textFile("file://本地文件路径")

# 本地文件

disFile = sc.textFile("file:///home/hadoop/data/hello.txt")

disFile.collection()

#hadoop文件

disFile = sc.textFile("hdfs://hadoop000:8020/hello.txt")

disFile.collection()

#使用示例,求和文件中字段的长度并加和

disFile.map(lambda s:len(s)).reduce(lambda a,b:a+b)

```

*If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes.

注意点:如果你的文件在本地,要确保你的运行程序在运行的节点上可以访问本地。一般建议用网路的共享文件系统(use a network-mounted shared file system,类似于S3)?这就是说 如果在hdfs 上 也算非本地把?

1)上课环境是单节点,hello.txt本地有就可以读取

2)如果在standalone:Spark集群上:3个节点,local path都是表示节点本地读取文件。不建议

3)生产上直接用yarm,不会用standalone

* 支持整个文件夹,或者压缩文件。

```

sc.textFile("/my/directory")

sc.textFile("/my/directory/*.gz")

```

* 可以设置分区(partitions)数量,可以设置更多,单无法设置更少。

## python特有的支持类型

*SparkContext.wholeTextFiles

支持全路径的文件读取,key是文件村存储地址,value是文件内容

```

sc.wholeTextFiles("file:///home/hadoop/data/hello.txt").collect()

# 读取内容

[(文件路径,文件内容)]

```

* RDD.saveAsPickleFile——可以保存为这种文件类SparkContext.pickleFile

* SequenceFile and Hadoop Input/Output Formats

示例 1 序列

```

rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))

rdd.saveAsSequenceFile("path/to/file")

sorted(sc.sequenceFile("path/to/file").collect())

[(1, u'a'), (2, u'aa'), (3, u'aaa')]

```

示例2

```

$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar

>>> conf = {"es.resource" : "index/type"}  # assume Elasticsearch is running on localhost defaults

>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",

                             "org.apache.hadoop.io.NullWritable",

                             "org.elasticsearch.hadoop.mr.LinkedMapWritable",

                             conf=conf)

>>> rdd.first()  # the result is a MapWritable that is converted to a Python dict

(u'Elasticsearch ID',

 {u'field1': True,

  u'field2': u'Some Text',

  u'field3': 12345})

```

不知道什么是es

示例3 存储数据

```

data =  [1,2,3,4,5]

disData = sc.parallelize(data)

disData.saveAsTextFile('file:///home/hadoop/data/output/)

```

# 8 Spark 应用程序开发并运行

IDE:IDEA pycharm

1 建立一个新的项目

2 建立一个py文件

3对py文件的configureations中添加环境变量

添加spark中pyhton的环境变量

添加spark本身的环境变量

4在preference里面的project structure的add content root添加python-lib下的py4j和pyshark两个zip包

5使用local进行本地测试

6提交pyshark程序

```

 ./spark - submit --master local[2]  -name spark -py python文件名

```

# 9 程序运行coding小笔记

显示当前全部java进程pid的命令jps 

创建hello.txt并对其进行编辑vi hello.txt 

添加环境变量:vi ~/.bashrc

查看一个文本文件cat hello.txt

hdfs文件传输

```

hadoop fs -put /hello.txt

hadoop fs -text /hello.txt

```

?不懂什么时候需要/ 什么时候~/ 什么时候./

refer课程:10个小时入门hadoop

上一篇下一篇

猜你喜欢

热点阅读