Spark Sql日志分析项目实战
项目简介
- 统计主站最受欢迎的课程Top N 访问次数
- 按地市统计主站最受欢迎的Top N 课程
- 按流量统计主站最受欢迎的Top N 课程
环境安装
Spark环境搭建
- Spark源码编译(以spark2.1.0为例)
spark编译官方文档
1、官网下载相应版本源码包
参考编译过程
2、spark源码编译中的坑
pom.xml添加<repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository>
设置内存
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
选择scala版本
./dev/change-scala-version.sh 2.10
- 环境搭建
设置环境变量
local模式启动:spark-shell --master local[2]
standalone模式:
- 修改spark-env.sh配置文件
SPARK_MASTER_HOST=master
SPARK_WORKER_CORES=2
SPARK_WORKER_CORES=2g
SPARK_WORKER_INSTANCES=1- 启动
sbin/start-all.sh:启动slaves配置的所有节点的worker
spark-shell --master spark://master:7077:启动spark
spark-shell --help 可以查看启动参数
--total-executor-cores 1 指定core总数量
Spark on Yarn
spark-env.sh 添加 Hadoop conf 的目录
- [Client]
Driver运行在Client端
Client会和请求到的Container进行通信来完成作业的调度和执行,Client不能退出
日志信息会在控制台输出,方便调试- [Cluster]
Driver运行在ApplicationMaster中
Client只要提交完作业之后就可以关闭
日志在终端看不到,可以通过yarn logs -applicationId <app ID>
查看日志
./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ [--deploy-mode cluster \] //默认client模式 --executor-memory 1G \ --num-executors 1 \ /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ 4
Spark SQL 框架介绍
Spark SQL is Apache Spark's module for working with structured(结构化) data.
- Integrated(集成)
Seamlessly mix(无缝混合) SQL queries with Spark programs.
Spark SQL lets you query structured data inside Spark programs, using either SQL or a familiar DataFrame API. Usable in Java, Scala, Python and R. - Uniform Data Access(统一的数据访问)
Connect to any data source the same way.
DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources. - Hive Integration(Hive集成)
Run SQL or HiveQL queries on existing warehouses
Spark SQL supports the HiveQL syntax as well as Hive SerDes and UDFs, allowing you to access existing Hive warehouses. - Standard Connectivity(标准连接)
Connect through JDBC or ODBC.
A server mode provides industry standard JDBC and ODBC connectivity for business intelligence tools.
从Hive平滑过渡到Spark SQL
- Spark1.x中Spark SQL的入口点:SQLContext
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
本地可直接运行
package com.test
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
/**
* SQLContext的使用
*/
object SQLContextApp {
def main(args: Array[String]): Unit = {
//创建相应的Context
val sparkConf = new SparkConf()
//在测试和或者生产中,参数一般通过脚本进行指定
sparkConf.setAppName("SQLContext").setMaster("spark://192.168.247.100:7077")//测试通常采用本地模式“local[2]”
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
//相应的处理:json
val path = args(0)
val people = sqlContext.read.format("json").load(path)
people.printSchema()
people.show()
//关闭资源
sc.stop()
}
}
打包上服务器运行
spark-submit \
--name SQLContext \
--class com.test.SQLContextApp \
--master spark://192.168.247.100:7077 \
/home/kang/lib/SparkTest-1.0.jar \
/home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json
- Spark1.x中Spark SQL的入口点:HiveContext
要获取hive中的元数据信息,需把hive-site.xml配置文件复制到spark的/conf目录下
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
package com.test
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/**
* HiveContext的使用
*/
object HiveContextApp {
def main(args: Array[String]): Unit = {
//创建相应的Context
val sparkConf = new SparkConf()
//在测试和或者生产中,参数一般通过脚本进行指定
sparkConf.setAppName("HiveContext").setMaster("spark://192.168.247.100:7077")//测试通常采用本地模式“local[2]”
val sc = new SparkContext(sparkConf)
val hiveContext = new HiveContext(sc)
//相应的处理:json
hiveContext.table("test").show
//关闭资源
sc.stop()
}
}
打包上传提交时要添加MySQL连接包
spark-submit \
--name HiveContext \
--class com.test.HiveContextApp \
--master spark://master:7077\
--jars /home/kang/lib/mysql-connector-java-5.1.34.jar \
/home/kang/lib/HiveContext.jar \
- Spark2.x中Spark SQL的入口点:SparkSession
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
spark-shell/spark-sql的使用
- 添加hive-site.xml配置文件
- -- jars传递mysql驱动包
- spark-shell --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar
spark.sql("show tables").show - spark-sql --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar
直接输入sql语句
explain extended + sql(查看详细执行计划)
thriftserver/beeline的使用
- 启动thriftserver
默认端口是10000,可以修改
./sbin/start-thriftserver.sh --master local[2] \
--jars ~/lib/mysql-connector-java-5.1.34.jar \
--hiveconf hive.server2.thrift.port=10040
- 启动beeline
./bin/beeline -u jdbc:hive2://localhost:10040 -n kang
- thriftserver和普通的spark-shell/spark-sql有什么区别?
spark-shell、spark-sql都是一个spark application
thriftserver,不管启动多少个客户端(beeline/code),永远只有一个spark application,多个客户端可以共享缓存数据。 - code连接thriftserver
添加相关的依赖包
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1.spark2</version>
</dependency>
package com.test
import java.sql.DriverManager
object SparkSQLThriftServerApp {
def main(args: Array[String]): Unit = {
Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://192.168.247.100:10040","kang","")
val pstmt = conn.prepareStatement("select * from test")
val rs = pstmt.executeQuery()
while (rs.next()){
println("context:" + rs.getString("context"))
}
rs.close()
pstmt.close()
conn.close()
}
}
用户行为日志概述
-
用户行为日志:用户每次访问网站时所有的行为数据(访问、浏览、搜索、点击)
-
日志数据内容
访问的系统属性:操作系统、浏览器等等
访问特征:点击的url,从哪个url跳转过来、页面工停留的时间等
访问信息:session_id、访问ip(访问城市)等 -
分析的意义
离线数据处理架构(流程)
- 数据采集
nginx记录日志信息
Flume:web日志写入HDFS - 数据清洗
spark、Hive、Mapreduce等
- 使用Spark SQL解析访问日志
- 解析出课程编号、类型
- 根据IP解析出城市信息
- 使用Spark SQL将访问时间按天进行分区输出
输入:访问时间、访问url、耗费的流量、访问的IP信息
输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天
ip地址解析
下载:https://github.com/kangapp/ipdatabase
编译:mvn clean package -DskipTests
jar包入库:mvn install:install-file -Dfile=F:\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupId=com.ggstar -DartifactId=ipdatabase -Dversion=1.0 -Dpackaging=jar
pom文件引入,resources文件两个表格文件引入
- 数据处理
spark、Hive、Mapreduce进行业务统计和分析
任务调度:Oozie、Azkaban
调优点:
1)控制文件输出大小:coalesce
2)分区字段的数据类型调整
.config("spark.sql.sources.partitionColumnTypeInference.enabled","false")
- 处理结果入库
RDBMS、NoSQL
需求一
create table day_video_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
times bigint(10) not null,
primary key (day,cms_id)
)
需求二
create table day_video_city_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank int not null,
primary key (day,city,cms_id)
)
需求三
create table day_video_traffics_access_topn_stat (
day varchar(8) not null,
cms_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day,cms_id)
)
- 数据可视化
Echarts、HUE、Zeppelin
项目需求
- 统计imooc主站最受欢迎的课程/手记Top N访问次数
- 按地市统计imooc主站最受欢迎Top N课程
根据IP提取城市信息
窗口函数在Spark SQL中的使用 - 按流量统计imocc主站最受欢迎的Top N课程
项目打包
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
mvn assembly:assembly
spark-submit \ --class com.test.SparkStatCleanJobYARN \ --name SparkStatCleanJobYARN \ --master yarn \ --executor-memory 1G \ --num-executors 1 \ --files /home/kang/project/SprakSQL/resource/ipDatabase.csv,/home/kang/project/SprakSQL/resource/ipRegion.xlsx \ /home/kang/project/SprakSQL/lib/sparksql.jar \ hdfs://192.168.247.100:9000/data/spark/output/* hdfs://192.168.247.100:9000/data/spark/partitionByDay
项目性能调优
https://segmentfault.com/a/1190000014876069
- 存储格式选择
-
压缩格式选择
parquet默认压缩格式
代码优化
- 选用高性能的算子
- 复用已有的数据
参数优化
并行度:spark.sql.shuffle.partitions
分区字段类型推测:spark.sql.sources.partitionColumnTypeInference.enabled