Spark Sql日志分析项目实战

2019-03-15  本文已影响0人  kangapp
项目简介
环境安装

CDH相关软件下载地址

Spark环境搭建

1、官网下载相应版本源码包
参考编译过程
2、spark源码编译中的坑
pom.xml添加

<repository>
    <id>cloudera</id>
    <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>

设置内存

export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"

选择scala版本

./dev/change-scala-version.sh 2.10

local模式启动:spark-shell --master local[2]

standalone模式:

  • 修改spark-env.sh配置文件
    SPARK_MASTER_HOST=master
    SPARK_WORKER_CORES=2
    SPARK_WORKER_CORES=2g
    SPARK_WORKER_INSTANCES=1
  • 启动
    sbin/start-all.sh:启动slaves配置的所有节点的worker
    spark-shell --master spark://master:7077:启动spark
    spark-shell --help 可以查看启动参数
    --total-executor-cores 1 指定core总数量

Spark on Yarn
spark-env.sh 添加 Hadoop conf 的目录

  • [Client]
    Driver运行在Client端
    Client会和请求到的Container进行通信来完成作业的调度和执行,Client不能退出
    日志信息会在控制台输出,方便调试
  • [Cluster]
    Driver运行在ApplicationMaster中
    Client只要提交完作业之后就可以关闭
    日志在终端看不到,可以通过yarn logs -applicationId <app ID>查看日志
    ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ [--deploy-mode cluster \] //默认client模式 --executor-memory 1G \ --num-executors 1 \ /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ 4
Spark SQL 框架介绍

Spark SQL is Apache Spark's module for working with structured(结构化) data.

从Hive平滑过渡到Spark SQL
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

本地可直接运行

package com.test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

/**
  * SQLContext的使用
  */
object SQLContextApp {
  def main(args: Array[String]): Unit = {

    //创建相应的Context
    val sparkConf = new SparkConf()

    //在测试和或者生产中,参数一般通过脚本进行指定
    sparkConf.setAppName("SQLContext").setMaster("spark://192.168.247.100:7077")//测试通常采用本地模式“local[2]”
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    //相应的处理:json
    val path = args(0)
    val people = sqlContext.read.format("json").load(path)
    people.printSchema()
    people.show()

    //关闭资源
    sc.stop()
  }
}

打包上服务器运行

spark-submit \
  --name SQLContext \
  --class com.test.SQLContextApp \
  --master spark://192.168.247.100:7077 \
  /home/kang/lib/SparkTest-1.0.jar \
  /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
package com.test

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
  * HiveContext的使用
  */
object HiveContextApp {
  def main(args: Array[String]): Unit = {

    //创建相应的Context
    val sparkConf = new SparkConf()

    //在测试和或者生产中,参数一般通过脚本进行指定
    sparkConf.setAppName("HiveContext").setMaster("spark://192.168.247.100:7077")//测试通常采用本地模式“local[2]”
    val sc = new SparkContext(sparkConf)
    val hiveContext = new HiveContext(sc)

    //相应的处理:json
    hiveContext.table("test").show

    //关闭资源
    sc.stop()
  }
}

打包上传提交时要添加MySQL连接包

spark-submit \
  --name HiveContext \
  --class com.test.HiveContextApp \
  --master spark://master:7077\
  --jars /home/kang/lib/mysql-connector-java-5.1.34.jar \
  /home/kang/lib/HiveContext.jar \
import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()
spark-shell/spark-sql的使用
thriftserver/beeline的使用
./sbin/start-thriftserver.sh --master local[2] \
--jars ~/lib/mysql-connector-java-5.1.34.jar \
--hiveconf hive.server2.thrift.port=10040
./bin/beeline -u jdbc:hive2://localhost:10040 -n kang
<dependency>
      <groupId>org.spark-project.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>1.2.1.spark2</version>
</dependency>
package com.test

import java.sql.DriverManager

object SparkSQLThriftServerApp {

  def main(args: Array[String]): Unit = {
    Class.forName("org.apache.hive.jdbc.HiveDriver")
    val conn = DriverManager.getConnection("jdbc:hive2://192.168.247.100:10040","kang","")
    val pstmt = conn.prepareStatement("select * from test")
    val rs = pstmt.executeQuery()
    while (rs.next()){
      println("context:" + rs.getString("context"))
    }
    rs.close()
    pstmt.close()
    conn.close()
  }
}
用户行为日志概述
离线数据处理架构(流程)
  • 使用Spark SQL解析访问日志
  • 解析出课程编号、类型
  • 根据IP解析出城市信息
  • 使用Spark SQL将访问时间按天进行分区输出

输入:访问时间、访问url、耗费的流量、访问的IP信息
输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天

ip地址解析
下载:https://github.com/kangapp/ipdatabase
编译:mvn clean package -DskipTests
jar包入库:mvn install:install-file -Dfile=F:\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
pom文件引入,resources文件两个表格文件引入

调优点:
1)控制文件输出大小:coalesce
2)分区字段的数据类型调整
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")

需求一
create table day_video_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
times bigint(10) not null,
primary key (day,cms_id)
)

需求二
create table day_video_city_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank int not null,
primary key (day,city,cms_id)
)
需求三
create table day_video_traffics_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day,cms_id)
)

项目需求
项目打包

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
mvn assembly:assembly
spark-submit \ --class com.test.SparkStatCleanJobYARN \ --name SparkStatCleanJobYARN \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ --files /home/kang/project/SprakSQL/resource/ipDatabase.csv,/home/kang/project/SprakSQL/resource/ipRegion.xlsx \ /home/kang/project/SprakSQL/lib/sparksql.jar \ hdfs://192.168.247.100:9000/data/spark/output/* hdfs://192.168.247.100:9000/data/spark/partitionByDay

项目性能调优

https://segmentfault.com/a/1190000014876069

代码优化
参数优化

并行度:spark.sql.shuffle.partitions
分区字段类型推测:spark.sql.sources.partitionColumnTypeInference.enabled

上一篇下一篇

猜你喜欢

热点阅读