python机器学习爬虫机器学习与数据挖掘spark

销售预测案例源码分析

2017-11-09  本文已影响242人  AliThink

本文重在借案例学习spark相关数据结构与语法

流程

1. 特征转换

val stateHolidayIndexer = new StringIndexer()
    .setInputCol("StateHoliday")
    .setOutputCol("StateHolidayIndex")
  val schoolHolidayIndexer = new StringIndexer()
    .setInputCol("SchoolHoliday")
    .setOutputCol("SchoolHolidayIndex")
  val stateHolidayEncoder = new OneHotEncoder()
    .setInputCol("StateHolidayIndex")
    .setOutputCol("StateHolidayVec")
  val schoolHolidayEncoder = new OneHotEncoder()
    .setInputCol("SchoolHolidayIndex")
    .setOutputCol("SchoolHolidayVec")
  val dayOfMonthEncoder = new OneHotEncoder()
    .setInputCol("DayOfMonth")
    .setOutputCol("DayOfMonthVec")
  val dayOfWeekEncoder = new OneHotEncoder()
    .setInputCol("DayOfWeek")
    .setOutputCol("DayOfWeekVec")
  val storeEncoder = new OneHotEncoder()
    .setInputCol("Store")
    .setOutputCol("StoreVec")

  val assembler = new VectorAssembler()
    .setInputCols(Array("StoreVec", "DayOfWeekVec", "Open",
      "DayOfMonthVec", "StateHolidayVec", "SchoolHolidayVec"))
    .setOutputCol("features")

2. 环境初始化(面向像我这样的小白选手)

main中 大部分抄袭文档

val conf = new SparkConf().setAppName("alithink").setMaster("local")
val sc = new SparkContext(conf)
val sparkSession = SparkSession.builder().getOrCreate()

3. 训练数据整理

// main中调用
val data = loadTrainingData(sparkSession, "/Users/alithink/Space/common_data/train.csv")
// 具体实现函数
def loadTrainingData(sqlContext:SparkSession, filePath:String):DataFrame = {
  val trainRaw = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .load(filePath)
    .repartition(30)
  trainRaw.createOrReplaceTempView("raw_training_data")
   
  sqlContext.sql("""SELECT
    double(Sales) label, double(Store) Store, int(Open) Open, double(DayOfWeek)   DayOfWeek,
    StateHoliday, SchoolHoliday, (double(regexp_extract(Date, '\\d+-\\d+-(\\d+)', 1))) DayOfMonth
    FROM raw_training_data
    """).na.drop()
}

4. 线性回归(随机森林类似,换了方法以及ParamMaps)

def preppedLRPipeline():TrainValidationSplit = {
    val lr = new LinearRegression()
    
    val paramGrid = new ParamGridBuilder()
     .addGrid(lr.regParam, Array(0.1, 0.01))
     .addGrid(lr.fitIntercept)
     .addGrid(lr.elasticNetParam, Array(0.0, 0.25, 0.5, 0.75, 1.0))
     .build()
    
    val pipeline = new Pipeline()
     .setStages(Array(stateHolidayIndexer, schoolHolidayIndexer,
       stateHolidayEncoder, schoolHolidayEncoder, storeEncoder,
       dayOfWeekEncoder, dayOfMonthEncoder,
       assembler, lr))
    
    val tvs = new TrainValidationSplit()
     .setEstimator(pipeline)
     .setEvaluator(new RegressionEvaluator)
     .setEstimatorParamMaps(paramGrid)
     .setTrainRatio(0.75)
    tvs
}

5. 模型训练与验证

def fitModel(tvs:TrainValidationSplit, data:DataFrame) = {
    val Array(training, test) = data.randomSplit(Array(0.8, 0.2), seed = 12345)
    logger.info("Fitting data")
    val model = tvs.fit(training)
    logger.info("Now performing test on hold out set")
    val holdout = model.transform(test).select("prediction","label")

    // have to do a type conversion for RegressionMetrics
    val rm = new RegressionMetrics(holdout.rdd.map(x =>
      (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

    logger.info("Test Metrics")
    logger.info("Test Explained Variance:")
    logger.info(rm.explainedVariance)
    logger.info("Test R^2 Coef:")
    logger.info(rm.r2)
    logger.info("Test MSE:")
    logger.info(rm.meanSquaredError)rm = new RegressionMetrics(holdout.rdd.map(x =>
      (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double])))

    logger.info("Test Metrics")
    logger.info("Test Explained Variance:")

    logger.info("Test RMSE:")
    logger.info(rm.rootMeanSquaredError)

    model
}

参考

上一篇 下一篇

猜你喜欢

热点阅读