案例1
2018-01-12 本文已影响17人
7125messi
#引入各种库
import os
import sys
#添加Spark工作环境
os.environ['SPARK_HOME']="/opt/spark-2.1.1-bin-hadoop2.7/"
#添加pyspark库到Python工作环境目录中
sys.path.append("/opt/spark-2.1.1-bin-hadoop2.7/bin/pyspark")
#初始化spark/sc
from pyspark.sql import SparkSession
from pyspark.sql import Row
# from pyspark.sql.types import *
spark = SparkSession.builder.appName("create").getOrCreate()
sc = spark.sparkContext
import numpy as np
import pandas as pd
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import array, col, count, mean, sum, udf, when
from pyspark.sql.types import DoubleType, IntegerType, StringType, Row
from pyspark.sql.functions import sum, col, udf
df = spark.read.option('header','true')\
.option('inferSchema','true')\
.csv('/home/ydzhao/Book/spark-nba-analytics-master/data/season_totals.csv')
df.show(6)
df.printSchema()
df.describe()
df.orderBy('pts',ascending=False).show(5)
df.orderBy('pts',ascending=False).limit(10).show()
df.orderBy('pts',ascending=False).limit(10).toPandas()
df.orderBy('pts',ascending=False).limit(10).toPandas()[['yr','player','age','pts']]
df.orderBy('pts',ascending=False).limit(10)[['yr','player','age','pts']].show()
# groupBy
fga_py = df.groupBy('yr')\
.agg({'mp' : 'sum', 'fg3a' : 'sum', 'fga' : 'sum'})\
.select(col('yr'), (36*col('sum(fga)')/col('sum(mp)')).alias('fga_pm'), (36*col('sum(fg3a)')/col('sum(mp)')).alias('fg3a_pm'))\
.orderBy('yr',ascending=False)
fga_py.show(6)
# sql
df.createOrReplaceTempView('df')
fga_py = spark.sql("SELECT yr,sum(fg3a)/sum(mp)*36 as fg3a_pm FROM df GROUP BY yr ORDER BY yr desc")
fga_py.show(6)
############################################################## train the model
t = VectorAssembler(inputCols=['yr'], outputCol = 'features')
training = t.transform(fga_py)\
.withColumn('yr',fga_py.yr)\
.withColumn('label',fga_py.fg3a_pm)
training.show(10)
training.toPandas()
lr = LinearRegression(maxIter=10)
model = lr.fit(training)
model
############## 1.apply model for the 1979-80 season and 2020-21 season
training_yrs = training.select('yr').rdd.map(lambda x: x[0]).collect()
training_y = training.select('fg3a_pm').rdd.map(lambda x: x[0]).collect()
testing_yrs = [2017, 2018, 2019, 2020, 2021]
all_yrs = training_yrs + testing_yrs
############## 2.built testing DataFrame
test_rdd = sc.parallelize(all_yrs)
row = Row('yr')
testing = t.transform(test_rdd.map(row).toDF())
testing.show()
############## 3.apply linear regression model
df_results = model.transform(testing)
df_results.show()
sc.stop()
spark.stop()