2019-02 Pyspark 初探
0. 安装及环境
a. 下载JDK并设置环境路径 (官网下载pkg文件)
b. 下载Spark并设置环境路径 (官网下载tar文件)
c. 下载Pyspark (用pip安装)
最终环境设置如下所示:
环境设置.png
1. Spark 工作架构
Spark可以分为1个 Driver (笔记本电脑或者集群网关机器上,用户编写的Spark程序)和若干个executor(在RDD分布的各个节点上)。用户通过SparkContext(简称sc)连接Spark集群、创建RDD、累加器(accumlator)、广播变量(broadcast variables),简单可以认为SparkContext是Spark程序的根本。
Driver会把计算任务分成一系列小的task,然后送到executor执行。executor之间可以通信,在每个executor完成自己的task以后,所有的信息会被传回。
2. Pyspark核心
Pyspark里最核心的模块是SparkContext (简称sc),最重要的数据载体是RDD。RDD就像一个NumPy array或者一个Pandas Series,可以视作一个有序的item集合。只不过这些item并不存在driver端的内存里,而是被分割成很多个partitions,每个partition的数据存在集群的executor的内存中。
3. Pyspark的RDD基本运算
- Transformation: 转换运算将一个RDD转换为另一个RDD,但是由于RDD的lazy特性,转换运算不会立刻实际执行,它会等到执行到“动作”运算,才会实际执行
- Action: RDD执行动作运算之后,不会产生另一个RDD,它会产生数值、数组或写入文件系统;RDD执行动作运算后会立刻实际执行,并且连同之前的转换运算一起执行。
from pyspark import SparkContext
sc = SparkContext()
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
print kvRDD1.collect() ## action操作
print kvRDD1.take(2) ## action操作
- Persistence: 对于那些会重复使用的RDD, 可以将RDD持久化在内存中作为后续使用,以提高执行性能。
4. Spark SQL
一张SQL数据表可以映射为一个DataFrame对象,DataFrame是Spark SQL中的主要数据结构,它从R语言和python语言里面给引进过来的,延续了传统的单机数据开发的一个体验,并把它推广到分布式的大数据场景当中来使用,让我们觉得是在写单机应用程序,但是写出来的程序能够在分布式的场景进行使用。
# encoding=utf8
from pyspark.sql import SQLContext
from pyspark import SparkContext
sqlContext = SQLContext(SparkContext())
data = [('a', 1, 18), ('b', 2, 22), ('c', 3, 20)]
df = sqlContext.createDataFrame(data, ['name', 'number', 'age'])
print df.collect()
print df.show(10) ## 用表格形式呈现前10行
print df.count() ## 显示数据的行数
print df.drop("One").show() ## 删除某一列,并show显示
print df.take(2) ## 显示前几行
print df.filter(df.age>=19).show() ## 进行筛选
5. 特征处理
首先,需要进行mysql的驱动包下载jdbc。否则无法连接。
参考文献:
1. Pyspark的使用和操作
2. PySpark之RDD入门最全攻略!
3. Mac下如何安装JDK
4. pyspark环境配置
4. Spark SQL结构化数据处理