Spark RDD及其常用算子介绍

2023-04-25  本文已影响0人  文景大大

一、RDD介绍

1.1 什么是RDD

RDD(Resilient Distributed DataSet),称作弹性分布式数据集,是Spark中最基本的数据抽象,表示一个不可变的分区的,其中元素可以被并行计算数据集合。所有的数据操作都是建立在RDD这一抽象数据结构上的,就好比我们Java中的List,Set一样,只不过List和Set是在一个JVM进程中的,不是分布式的而已。

1.2 RDD的特点

1.3 RDD的创建

要想创建RDD对象,首先需要初始化Spark RDD的程序入口,即SparkContext对象,只有基于它,才能创建出第一个RDD对象。

from pyspark import SparkConf
from pyspark import SparkContext

if __name__ == '__main__':
    conf=SparkConf().setAppName("test1").setMaster("local")
    sc=SparkContext(conf=conf)
    # SparkContext的主要作用就是创建出第一个RDD对象
    rdd1 = sc.parallelize([1,2,3,4,5,6], 3)

通常我们有两种方式来创建RDD:

二、常用算子介绍

分布式集合对象上的API操作被称为算子,算子通常被分为三大类:

2.1 转换算子Transformation

转换算子的结果仍然是一个RDD对象,这类算子是懒加载的,只有遇到Action算子,它们才会真正进行转换计算;

2.2 动作算子Action

动作算子的结果不是RDD对象,它相当于是一个指令来让整个执行计划开始真正工作;通常在Action算子之前都会有一系列的转换算子链条,最终通过一个Action算子来结束当前的计算链条,同时其也是启动整个计算链条工作的开关;

2.3 分区算子

分区上的数据在本分区内由各自的Executor分别直接执行,不用经过Driver;通常我们需要控制并发数和分区数一致时可以考虑试用分区算子。

这里只是列举了常用的一些算子,详细的内容请参考官方文档:API Reference — PySpark 3.3.2 documentation (apache.org)

三、案例演示与分析

3.1 wordcount

我们在项目根目录下新建一个待统计的文本文件,当然你可以指定其它任何位置的文件皆可:

hello spark
hello python
hello pyspark
welcome to spark world

现在我们需要编写一个基于pyspark的计数程序:

# 读取文件转换为RDD
fileRdd = sc.textFile("./word.txt")
# 对每行数据按章空格进行切割,并去除每行自带的嵌套
wordRdd = fileRdd.flatMap(lambda x : x.split(" "))
# ['hello', 'spark', 'hello', 'python', 'hello', 'pyspark', 'welcome', 'to', 'spark', 'world']
print(wordRdd.collect())
# 每个单词计数为1,形成KV形式
wordPairRdd = wordRdd.map(lambda x : (x, 1))
# [('hello', 1), ('spark', 1), ('hello', 1), ('python', 1), ('hello', 1), ('pyspark', 1), ('welcome', 1), ('to', 1), ('spark', 1), ('world', 1)]
print(wordPairRdd.collect())
# 对单词进行规约reduce计算,相同的单词计数器相加
wordCountRdd = wordPairRdd.reduceByKey(lambda a, b : a + b)
# [('hello', 3), ('spark', 2), ('python', 1), ('pyspark', 1), ('welcome', 1), ('to', 1), ('world', 1)]
print(wordCountRdd.collect())
# 将RDD对象转换为List进行输出
result = wordCountRdd.collect()
# [('hello', 3), ('spark', 2), ('python', 1), ('pyspark', 1), ('welcome', 1), ('to', 1), ('world', 1)]
print(result)

3.2 商品筛选

现在我们要基于如下项目根目录下的商品信息进行商品数据的提取:要求从中提取出北京的商品,去重展示。

{"id":19,"timestamp":"2019-05-08T01:03.00Z","category":"电脑","areaName":"杭州","money":"5551"}|{"id":20,"timestamp":"2019-05-08T01:01.00Z","category":"电脑","areaName":"北京","money":"2450"}
{"id":21,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"5520"}|{"id":22,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"北京","money":"6650"}
{"id":23,"timestamp":"2019-05-08T01:03.00Z","category":"服饰","areaName":"杭州","money":"1240"}|{"id":24,"timestamp":"2019-05-08T01:01.00Z","category":"食品","areaName":"天津","money":"5600"}
{"id":25,"timestamp":"2019-05-08T01:03.00Z","category":"食品","areaName":"北京","money":"7801"}|{"id":26,"timestamp":"2019-05-08T01:01.00Z","category":"服饰","areaName":"北京","money":"9000"}

我们编写的pyspark的程序如下:

from pyspark import SparkConf
from pyspark import SparkContext
import json

if __name__ == '__main__':
    conf=SparkConf().setAppName("test1").setMaster("local")
    sc=SparkContext(conf=conf)

    # 读取文件转换为RDD
    fileRdd = sc.textFile("./goods.txt")
    # 每行数据按照分隔符进行分割,得到单独的json数据
    jsonRdd = fileRdd.flatMap(lambda x : x.split("|"))
    # print(jsonRdd.collect())
    # 使用json库转为字典类型
    dictRdd = jsonRdd.map(lambda x : json.loads(x))
    # {'id': 1, 'timestamp': '2019-05-08T01:03.00Z', 'category': '平板电脑', 'areaName': '北京', 'money': '1450'}等等8条
    print(dictRdd.collect())
    # 筛选北京的字典KV数据
    beijingDictRdd = dictRdd.filter(lambda x : x["areaName"] == "北京")
    # {'id': 20, 'timestamp': '2019-05-08T01:01.00Z', 'category': '电脑', 'areaName': '北京', 'money': '2450'}等等5条
    print(beijingDictRdd.collect())
    # 将北京的地域名和商品名拼接起来
    resultRdd = beijingDictRdd.map(lambda x : x["areaName"] + "_" + x["category"])
    # ['北京_电脑', '北京_食品', '北京_服饰']
    print(resultRdd.distinct().collect())

四、踩坑小记

4.1 Too many parameters

在使用某些算子进行实现的时候,比如reduceByKey、distinct的时候,明明代码案例和官网示例一模一样,但就是执行报错:

TypeError: Too many parameters for typing.Iterable; actual 2, expected 1

后来在stackoverflow上查到一个类似的,说是python版本和spark版本不兼容导致的,我出现问题的版本是:

然后将python更新到3.7版本后,该报错问题就解决了:

conda install python=3.7
上一篇 下一篇

猜你喜欢

热点阅读