pyspark学习

案例1

2018-01-12  本文已影响17人  7125messi
#引入各种库
import os
import sys
#添加Spark工作环境
os.environ['SPARK_HOME']="/opt/spark-2.1.1-bin-hadoop2.7/"

#添加pyspark库到Python工作环境目录中
sys.path.append("/opt/spark-2.1.1-bin-hadoop2.7/bin/pyspark")



#初始化spark/sc
from pyspark.sql import SparkSession
from pyspark.sql import Row
# from pyspark.sql.types import *
spark = SparkSession.builder.appName("create").getOrCreate()
sc = spark.sparkContext


import numpy as np
import pandas as pd

from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import array, col, count, mean, sum, udf, when
from pyspark.sql.types import DoubleType, IntegerType, StringType, Row
from pyspark.sql.functions import sum, col, udf


df = spark.read.option('header','true')\
          .option('inferSchema','true')\
          .csv('/home/ydzhao/Book/spark-nba-analytics-master/data/season_totals.csv')
df.show(6)
df.printSchema()
df.describe()
df.orderBy('pts',ascending=False).show(5)
df.orderBy('pts',ascending=False).limit(10).show()
df.orderBy('pts',ascending=False).limit(10).toPandas()
df.orderBy('pts',ascending=False).limit(10).toPandas()[['yr','player','age','pts']]
df.orderBy('pts',ascending=False).limit(10)[['yr','player','age','pts']].show()


# groupBy
fga_py = df.groupBy('yr')\
           .agg({'mp' : 'sum', 'fg3a' : 'sum', 'fga' : 'sum'})\
           .select(col('yr'), (36*col('sum(fga)')/col('sum(mp)')).alias('fga_pm'), (36*col('sum(fg3a)')/col('sum(mp)')).alias('fg3a_pm'))\
           .orderBy('yr',ascending=False)
fga_py.show(6)


# sql
df.createOrReplaceTempView('df')
fga_py = spark.sql("SELECT yr,sum(fg3a)/sum(mp)*36 as fg3a_pm FROM df GROUP BY yr ORDER BY yr desc")
fga_py.show(6)


##############################################################   train the model
t = VectorAssembler(inputCols=['yr'], outputCol = 'features')
training = t.transform(fga_py)\
            .withColumn('yr',fga_py.yr)\
            .withColumn('label',fga_py.fg3a_pm)
training.show(10)
training.toPandas()

lr = LinearRegression(maxIter=10)
model = lr.fit(training)
model

############## 1.apply model for the 1979-80 season and  2020-21 season
training_yrs = training.select('yr').rdd.map(lambda x: x[0]).collect()
training_y = training.select('fg3a_pm').rdd.map(lambda x: x[0]).collect()

testing_yrs = [2017, 2018, 2019, 2020, 2021]
all_yrs = training_yrs + testing_yrs

############## 2.built testing DataFrame
test_rdd = sc.parallelize(all_yrs)
row = Row('yr')
testing = t.transform(test_rdd.map(row).toDF())
testing.show()

############## 3.apply linear regression model
df_results = model.transform(testing)
df_results.show()



sc.stop()
spark.stop()
上一篇下一篇

猜你喜欢

热点阅读