Python 操作Spark —— 基本使用

2019-04-11  本文已影响0人  枫隐_5f5f

pySpark API文档

输入文件内容

zhangsan,77,88,99
lisi,56,78,89
wanger,78,77,67

1.map

一对一处理函数, 应用于RDD的每个元素,并返回一个RDD

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)

lines = sc.textFile("/test_file")

def sp(line):
        ss = line.strip().split(",")
        name =  str(ss[0])
        eng = str(ss[1])
        math = str(ss[2])
        lang = str(ss[3])
        return name,eng,math,lang

rdd = lines.map(sp).collect()
#print (rdd)
[('zhangsan', '77', '88', '99'), ('lisi', '56', '78', '89'), ('wanger', '78', '77', '67')]

for line in rdd:
        print (line)

('zhangsan', '77', '88', '99')
('lisi', '56', '78', '89')
('wanger', '78', '77', '67')

2. flatMap

一对多函数, 将数据按照定义的规则拆分成多条记录 并返回一个新的RDD

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)

def flatMap_func(x):
        return x.strip().split(",")

lines = sc.textFile("/test_file")
rdd = lines.flatMap(flatMap_func).collect()
print (rdd)
#把一个文件里的内容按行读取  并按照“,”分割成单词
[u'zhangsan', u'77', u'88', u'99', u'lisi', u'56', u'78', u'89', u'wanger', u'78', u'77', u'67']

3.filter

过滤掉不符合条件的元素,返回一个新的RDD 函数需传入两个参数

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)


def sp(line):
        ss = line.strip().split(",")
        name =  str(ss[0])
        eng = int(ss[1])
        math = int(ss[2])
        lang = int(ss[3])
        return name,eng,math,lang


def flatMap_func(x):
        return x.strip().split(",")

def filter_func(x):
        if x[3] > 80:
                return x

lines = sc.textFile("/test_file")
rdd = lines.map(sp) \
        .filter(filter_func)\
        .collect()

print (rdd)

#过滤掉了lang小于80的记录
[('zhangsan', 77, 88, 99), ('lisi', 56, 78, 89)]

4.reduce

汇总RDD的所有元素

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)

li = [1,2,3,4,5]
vec = sc.parallelize(li)

rdd = vec.reduce(lambda x,y:int(x)+int(y))
print (rdd)
#15

li = ["asd","asd","word"]
vec = sc.parallelize(li)

rdd = vec.reduce(lambda x,y:x+y)
print (rdd)
#asdasdword

5. countByValue

统计RDD中每个元素出现的次数

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)

li = ["asd","ad","asd","hello","word","word"]
vec = sc.parrallelize(li)

re = vec.countByValue()
print (re)
#defaultdict(<type 'int'>, {'asd': 2, 'word': 2, 'hello': 1, 'ad': 1})

6.reduceByKey

按key聚合 可自定义函数

import sys
from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("api_test")
sc = SparkContext(conf=conf)


def sp(line):
        ss = line.strip().split(",")
        name =  str(ss[0])
        eng = int(ss[1])
        math = int(ss[2])
        lang = int(ss[3])
        return name,eng,math,lang


def flatMap_func(x):
        return x.strip().split(",")

def filter_func(x):
        if x[3] > 80:
                return x

lines = sc.textFile("/test_file")
rdd = lines.flatMap(flatMap_func) \
        .map(lambda x:(x,1)) \
        .reduceByKey(lambda x,y:x+y) \
        .collect()

print (rdd)

#[(u'77', 2), (u'wanger', 1), (u'56', 1), (u'99', 1), (u'lisi', 1), (u'88', 1), (u'89', 1), (u'67', 1), (u'zhangsan', 1), (u'78', 2)]

7.sortBy 排序
from pyspark import SparkConf,SparkContext
import sys

reload(sys)
sys.setdefaultencoding("utf8")


conf = SparkConf().setAppName("Name").setMaster("local")
sc = SparkContext(conf=conf)

infile_path = "file:///home/njliu/prc/pyspark/RDD/The_Man_of_Property.txt"
infile = sc.textFile(infile_path)
re = infile.flatMap(lambda l:l.strip().split(" ")) \
        .map(lambda x:(x,1)) \
        .reduceByKey(lambda x,y:x +y) \
        .sortBy(lambda x:x[1],False) \

for line in re.collect():
        print ("\t".join([line[0],str(line[1])]))


7.groupByKey
from pyspark import SparkConf,SparkContext

def sub_process(k,v):
        tmp_list = []
        for tu in v:
                tmp_list.append(tu)
        res_list = sorted(tmp_list,key=lambda x:x[1],reverse=True)
        res_list.insert(0,k)
        return res_list

if __name__ == "__main__":
        conf = SparkConf().setAppName("A Name").setMaster("local")
        sc = SparkContext(conf=conf)

        infiles = sc.textFile("file:///home/njliu/prc/pyspark/RDD/uis.data")
        res = infiles.map(lambda line:line.strip().split("\t")) \
                .filter(lambda x:x[2]>1.5) \
                .map(lambda x:(x[0],(str(x[1]),str(x[2])))) \
                .groupByKey() \
                .map(lambda (k,v):sub_process(k,v))\

        for line in res.collect():
                print (line)

上一篇 下一篇

猜你喜欢

热点阅读