Spark应用学习笔记

2018-08-25  本文已影响0人  卡卡xx

Sqoop

sqoop 是 apache 旗下一款“Hadoop 和关系数据库服务器之间传送数据”的工具。
核心的功能有两个:

导入数据:MySQL,Oracle 导入数据到 Hadoop 的 HDFS、HIVE、HBASE 等数据存储系统
导出数据:从 Hadoop 的文件系统中导出数据到关系数据库 mysql 等 Sqoop 的本质还是一个命令行工具,和 HDFS,Hive 相比,并没有什么高深的理论。
本质就是迁移数据, 迁移的方式:就是把sqoop的迁移命令转换成MR程序

安装Sqoop

使用CDH安装组件直接添加服务即可,相关配置文件会自动修改。sqoop就是一个工具, 只需要在一个节点上进行安装即可,安装版本为Sqoop2。



安装完成并重新部署集群后,去sqoop目录下检查安装是否成功和相应的端口是否开启 检测是否可以连接数据库 如果报错就把连接数据库的jar放到它的lib下面。它的作用就是把关系型数据库导入hive、hdfs之类的hadoop系统里,在下面会用到。

Spark sql

前面我们学习了Hive,它是运行在Hadoop上的SQL-on-Hadoop工具,但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的SQL-on-Hadoop工具开始产生,其中表现较为突出的是:Shark(spark sql的前身),Hive和它的运行架构如下图:


SparkSQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了SparkSQL代码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便
使用Spark SQL库可以对存储在RDD、批处理文件、JSON数据集或Hive表中的数据执行SQL查询。而且性能提高了10-100倍
sparkSQL层级
当我们想用sparkSQL来解决我们的需求时,其实说简单也简单,就经历了三步:读入数据 -> 对数据进行处理 -> 写入最后结果,那么这三个步骤用的主要类其实就三个:读入数据和写入最后结果用到两个类HiveContext和SQLContext,对数据进行处理用到的是DataFrame类,此类是你把数据从外部读入到内存后,数据在内存中进行存储的基本数据结构,在对数据进行处理时还会用到一些中间类,用到时在进行讲解。如下图所示:
HiveContext和SQLContext

HiveContext继承自SQLContext,因为HQL和SQL有一定的差别,所以有两个引擎。使用不同的读数据的类,底层会进行标记,自动识别是使用哪个类进行数据操作,然后采用不同的执行计划执行操作。
当从hive库中读数据的时候,必须使用HiveContext来进行读取数据,不然在进行查询的时候会出一些奇怪的错。其他的数据源两者都可以选择,但是最好使用SQLContext来完成。因为其支持的sql语法更多。

读数据

Mlib

MLlib是Spark里的机器学习库。它的目标是使实用的机器学习算法可扩展并容易使用。它提供如下工具:
1.机器学习算法:常规机器学习算法包括分类、回归、聚类和协同过滤。
2.特征工程:特征提取、特征转换、特征选择以及降维。
3.管道:构造、评估和调整的管道的工具。
4.存储:保存和加载算法、模型及管道
5.实用工具:线性代数,统计,数据处理等。

具体的代码直接参照官网的教程即可,非常详细,只需要跟着写就可以了。

由于日志太多,只能看到部分结果。


官网教程
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.sql.SparkSession

object classfier {

  def main(args: Array[String]) {
    // Load the data stored in LIBSVM format as a DataFrame.
    val spark=SparkSession.builder
      .master("local[4]")
      .appName("classifier")
      .getOrCreate()
    val data = spark.read.format("libsvm").load("data/sample_multiclass_classification_data.txt")

    // Index labels, adding metadata to the label column.
    // Fit on whole dataset to include all labels in index.
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel")
      .fit(data)
    // Automatically identify categorical features, and index them.
    val featureIndexer = new VectorIndexer()
      .setInputCol("features")
      .setOutputCol("indexedFeatures")
      .setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
      .fit(data)

    // Split the data into training and test sets (30% held out for testing).
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))

    // Train a DecisionTree model.
    val dt = new DecisionTreeClassifier()
      .setLabelCol("indexedLabel")
      .setFeaturesCol("indexedFeatures")

    // Convert indexed labels back to original labels.
    val labelConverter = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("predictedLabel")
      .setLabels(labelIndexer.labels)

    // Chain indexers and tree in a Pipeline.
    val pipeline = new Pipeline()
      .setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))

    // Train model. This also runs the indexers.
    val model = pipeline.fit(trainingData)

    // Make predictions.
    val predictions = model.transform(testData)

    // Select example rows to display.
    predictions.select("predictedLabel", "label", "features").show(5)

    // Select (prediction, true label) and compute test error.
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("indexedLabel")
      .setPredictionCol("prediction")
      .setMetricName("accuracy")
    val accuracy = evaluator.evaluate(predictions)
    println(s"Test Error = ${(1.0 - accuracy)}")

    val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
    println(s"Learned classification tree model:\n ${treeModel.toDebugString}")
  }

}

import org.apache.spark.SparkContext
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

object clustering {

  def main(args: Array[String]) {
    val sc = new SparkContext("local[4]","WordCount")
    // Load and parse the data
    val data = sc.textFile("data/Kmeans_data.txt")
    val parsedData = data.map(s => Vectors.dense(s.split(',').map(_.toDouble))).cache()

    // Cluster the data into two classes using KMeans
    val numClusters = 6
    val numIterations = 20
    val clusters = KMeans.train(parsedData, numClusters, numIterations)

    // Evaluate clustering by computing Within Set Sum of Squared Errors
    val WSSSE = clusters.computeCost(parsedData)
    println(s"Within Set Sum of Squared Errors = $WSSSE")

    // Save and load model
    //clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
    //val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")

  }
}
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.regression.LinearRegression

object linear {

  def main(args: Array[String]) {
    // Load training data
    val spark=SparkSession.builder
      .master("local[4]")
      .appName("linear")
      .getOrCreate()

    val training = spark.read.format("libsvm")
      .load("data/sample_linear_regression_data.txt")

    val lr = new LinearRegression()
      .setMaxIter(10)
      .setRegParam(0.3)
      .setElasticNetParam(0.8)

    // Fit the model
    val lrModel = lr.fit(training)

    // Print the coefficients and intercept for linear regression
    println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")

    // Summarize the model over the training set and print out some metrics
    val trainingSummary = lrModel.summary
    println(s"numIterations: ${trainingSummary.totalIterations}")
    println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
    trainingSummary.residuals.show()
    println(s"RMSE: ${trainingSummary.rootMeanSquaredError}")
    println(s"r2: ${trainingSummary.r2}")
  }

}

上一篇 下一篇

猜你喜欢

热点阅读