spark大数据Spark

spark ui

2023-01-11  本文已影响0人  乙腾

一.Spark UI 选项卡的组成

1.Jobs

1.1 首页

image.png

补充:

1.2 Event TImeline

1.3 Job Detail

示例:

image.png image.png

2. stages

image.png

运行中的界面

image.png

2.1stage detail

image.png image.png

总览

image.png

DAG

image.png

Summary Metrics(重点)

image.png

Aggregated Metrics by Executor

image.png

Tasks

image.png image.png

speculative:推测执行

image.png

因为有推测执行,所以如果发现task killed是正常现象

3.利用 Web UI 定位问题

查看stages

image.png

在 stage 页面可以看到 stage 44、58 两个执行时间很长,并且 shuffle 数据量也很大。

查看stage id 58的详情

Summary Metrics for n Completed Tasks(通过分位线判断是否发生数据倾斜)

image.png

可以看出 shuffle spill 耗时最多,其他指标都比较均匀,没有出现数据倾斜。

image.png

Tasks

image.png

按照时间倒叙,Detail stage 页面可以找到 task 执行时间很长,shuffle spill 数据很大,这里产生了大量的 IO。

stage 58 总结

查看 stage 44 的详情

Summary Metrics for n Completed Tasks(通过分位线判断是否发生数据倾斜)

image.png

Stage44 看出 shuffle 数据并不均匀,发生来数据倾斜,观察依据:

接下来要通过sql ui观察那部分执行逻辑发生倾斜

sql ui中观察

通过sql 标签 定位
image.png
stage no 44 的执行部分:
image.png

我们可以根据 SQL 执行图上找到 stage44,并查到发生 shuffle 操作的 join key(item_id),顺着其上游可找到其中一个表 device_item。

找sql代码
image.png

接着可以从任务 SQL 中找到问题出现的部分。定位到了问题所在,就可以通过一些针对数据倾斜的优化参数或者处理方法进行一一实验,并通过对比得出最优解。

4.Environment

介绍:

image.png

5.Executor

image.png image.png

二.如何通过 Spark UI 定位问题

1.如何找 Web UI地址

image.png image.png

2.怎么看 Spark Driver/Executor 日志

image.png

因为executor里面会跑很多task,当task有问题的时候,需要通过看executor log定位task日志

image.png

3.如何定位分配资源不足

image.png

如果 UI 页面上申请到的 executors 数量远小于配置的数量(静态资源:spark.executor.instances,动态资源:spark.dynamicAllocation.maxExecutors),则表示队列资源不充足。

4.如何定位Executor太少

image.png

5.如何定位Input读慢

image.png

tasks标签观察

5.1 task很少的场景

image.png

5.2 task 很多的场景

image.png

5.6 SparkSQL Input Split 划分原理

Partition / Split 大小取决于四个条件:

Split 的最大值取决于如下公式:

Spark 将输入文件按每 maxSplitBytes 切分成一个个 partition

补充:

1.spark sql如何为每个job设置executor的cpu、内存?

spark 中是没法为所有job定制化设置这些资源,一般需要在每个app粒度启动的时候设置:

executor:

driver:

需要每个application中去调这些参数,最小单元是app这个维度,无法为每个job定制资源。

一般在yarn里面,一个个spark作业就是application,一个application可以启很多个job

三.深入理解SQL页面

1.介绍

2.一个例子:

sql

val questionsDF = spark.read.json("/test/sparksql/simple/data/questions-json")
val usersDF = spark.read.parquet("/test/sparksql/simple/data/users")
val res = questionsDF.filter(col("score") > 0).groupBy("user_id").agg(count("*").alias("cnt")).join(usersDF, "user_id")

spark-shell

[root@hadoop102 spark-3.2.1]# spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-29 22:38:24,941 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://hadoop102:4040
Spark context available as 'sc' (master = local[*], app id = local-1653835106685).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.2.1
      /_/

Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.

scala>
image.png

建表questionsDF

scala> val questionsDF = spark.read.json("/test/sparksql/simple/data/questions-json")
questionsDF: org.apache.spark.sql.DataFrame = [accepted_answer_id: bigint, answers: bigint ... 9 more fields]

通过ui观看,4个任务

image.png

查看stages

image.png image.png

建表usersDF

val usersDF = spark.read.parquet("/test/sparksql/simple/data/users")

查看列信息

image.png image.png

执行sql

val res = questionsDF.filter(col("score") > 0).groupBy("user_id").agg(count("*").alias("cnt")).join(usersDF, "user_id")
image.png

查看plan

image.png
scala> res.queryExecution.logical
res4: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan =
'Join UsingJoin(Inner,List(user_id))  ##inner join ,join id user_id,根节点,向下看
:- Aggregate [user_id#16L], [user_id#16L, count(1) AS cnt#58L] ##join前做聚合,聚合后有count
:  +- Filter (score#13L > cast(0 as bigint)) ##聚合前 做过滤
:     +- Relation [accepted_answer_id#7L,answers#8L,body#9,comments#10L,creation_date#11,question_id#12L,score#13L,tags#14,title#15,user_id#16L,views#17L] json   ## 读questionsDF表
+- Relation [user_id#29L,display_name#30,about#31,location#32,downvotes#33L,upvotes#34L,reputation#35L,views#36L] parquet ##读usersDF 表

这只是计划,还没有执行,需要触发action,如show

触发action,查看sql

show默认展示20行,结果如下

image.png image.png

此时真正执行sql了

image.png

通过spark ui查看sql

image.png

detail

Physical Plan

每个节点的操作,根据计划树的(n)查找对应的算子操作

image.png

SQL 查询计划图像化

图像化的具体细节可以通过上面的detail查看
image.png
详解
image.png
两个scan
image.png image.png
查找stage
image.png
exchange
image.png image.png

3.知识补充

WholeStageCodegen

image.png

Scan Parquet

image.png

Filter

image.png

Project

(记住,下推的下是指往叶子结点推)。

image.png

上面这个物理计划图形的数据是从上往下流入,即最上面是叶子结点,最下面是根节点。

detail中与之展现相反,根在上面

image.png

Exchange

image.png

ShuffleQueryStage/AQEShuffleRead

detail中有,但是上面的查询图中不显示

image.png image.png image.png

HashAggregate

image.png

BroadcastHashJoin & BroadcastExchange

image.png image.png

三个作业,broadcastexchange必然是一个作业,看作业不是很重要

ColumnarToRow

image.png

4.问题排查

关键字:number of output rows。

image.png

如果上图中BroadcastHashJoin 最终是200多明显大于HashAggregate,那就是发生膨胀,一般聚合是将减少数据,但是join有可能增加数据,所以也是比较容易发生数据膨胀的算子。

关键字:limit。

关键字:AQEShuffleReader/CustomShuffleReader。

image.png

关键字:aggregate。

关键字:BroadcastNestedLoopJoin 或 Cartesian。

关键字:skew。

关键字:sort。

关键字:number of files read。

image.png

关键字:BroadcastHashJoin。

上一篇 下一篇

猜你喜欢

热点阅读