python机器学习爬虫Spark

pyspark整理

2018-08-26  本文已影响106人  26f30aca5431

pyspark入门资料

公众号回复:pyspark (会有pyspark资料大礼包:Learning PySpark.pdf,PySpark_SQL_Cheat_Sheet_Python.pdf
如果你有python基础,只是工程上需要spark提升计算效率,那么pyspark是你最好的工具。经过一系列踩坑,发现很多pyspark大多数的坑是pyspark中的DataFrame和我们熟悉的pandas中的DataFrame有很多api不同,由于存储方式不同(分布式VS单机),很多操作也需要更新我们的观念,最基本的查看DataFrame的前10行,表达也确实不同,不过有了python的基础,学习曲线会十分友好。

工作中发现,先了解下官方api比自己遇到问题再去查效率要更高,这里给大家推荐一些最近找到的pyspark学习资源。
查找pyspark的函数接口

pyspark Search

pyspark sql 的函数接口

pyspark.sql module api

一些优秀的博客推荐

Spark-SQL之DataFrame操作大全
pyspark.sql.DataFrame与pandas.DataFrame之间的相互转换
Spark与Pandas中DataFrame的详细对比----DataFrame操作的重要文档
Spark Python API 官方文档中文版讲解
pyspark之数据处理学习--很多代码值得学习

设置spark

这一部分是当性能上遇到问题,需要对底层spark参数进行配置时可以了解的资源。

- spark.conf.set("spark.sql.shuffle.partitions", 6)
spark.conf.set("spark.executor.memory", "2g")
//get all settings
val configMap:Map[String, String] = spark.conf.getAll()

cheatsheet

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(“python spark sql basic example").enableHiveSupport().getOrCreate()

#get dataframe from hive table
Df = spark.sql(“select * from ft_dev.ybr_b limit 10”)
Df.show()
Df.select(df[‘jdpin’],df[‘user_aging’]/12).show()
Df = df.withColumn(‘user_aging’,df[‘user_aging’].cast(FloatType())
Df = df.withColumnRenamed(‘user_aging’,’keling’)
#there are very rich apis for data process,like filter,sort and so on . For more information,see https://spark.apache.org/docs/2.1.1/api/python/pyspark.sql.html
image.png

一个完整例子

Load data
births = spark.read.csv('births_transformed.csv.gz', header=True, 
schema=schema) 

Creating transformers
import pyspark.ml.feature as ft 
births = births .withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] 
.cast(typ.IntegerType())) 

encoder=ft.OneHotEncoder( inputCol='BIRTH_PLACE_INT',outputCol='BIRTH_PLACE_VEC') 

featuresCreator = ft.VectorAssembler( inputCols=[ col[0] for col 
in labels[2:]] + [encoder.getOutputCol()], outputCol='features' )

Creating an estimator
import pyspark.ml.classification as cl 

logistic = cl.LogisticRegression( maxIter=100, regParam=0.01, 
labelCol='INFANT_ALIVE_AT_REPORT')

Creating a pipeline
from pyspark.ml import Pipeline 

pipeline = Pipeline(stages=[encoder, featuresCreator, logistic ])  


Fitting the model
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)

model = pipeline.fit(births_train) 
test_model = model.transform(births_test) 

Evaluate the model
import pyspark.ml.evaluation as ev 

evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', 
labelCol='INFANT_ALIVE_AT_REPORT')  

print(evaluator.evaluate(test_model, 
{evaluator.metricName: 'areaUnderROC'})) 
print(evaluator.evaluate(test_model, 
{evaluator.metricName: 'areaUnderPR'})) 

Saving and load  model
pipelinePath = './infant_oneHotEncoder_Logistic_Pipeline' 
pipeline.write().overwrite().save(pipelinePath) 

loadedPipeline = Pipeline.load(pipelinePath) 
Result = loadedPipeline.fit(births_train).transform(births_test)

运行方式
Shell

Easy to explore,debug and test

Spark-submit

spark-submit etl.py --executor-memory 4g --executor-num 20 --total-executor-cores 80 --master yarn

pyspark问题解决

查看spark资源

yarn applications list | grep tangyaping
上一篇 下一篇

猜你喜欢

热点阅读