SparkUI定制

2017-05-13  本文已影响0人  Alien_Swordsman

前言

感谢开源世界的大牛们贡献这么多开源软件,让我们有机会学习复杂软件的设计,同时也可根据自己的需要定制开源软件。本文先简单讲解Spark监控UI的实现,最后写一个简单的定制spark ui的demo。

SparkUI的实现

sparkUI的启动是在sparkContext初始化是实现的,具体代码为:

//启动web ui, 创建sparkUI对象
    _ui =
      if (conf.getBoolean("spark.ui.enabled", true)) {
        Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
          _env.securityManager, appName, startTime = startTime))
      } else {
        // For tests, do not enable the UI
        None
      }
    // Bind the UI before starting the task scheduler to communicate
    // the bound port to the cluster manager properly
    _ui.foreach(_.bind()) //启动jetty, bind方法继承于webui, 调用jetty API启动服务

SparkUI对象是整个监控服务的核心容器,负责启动jetty,维持url于ServletContextHandler之间的映射关系。其基础数据放在基类WebUI里:

  protected val tabs = ArrayBuffer[WebUITab]()
  protected val handlers = ArrayBuffer[ServletContextHandler]()
  protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]] 

tabs是ui页面中标签栏的tab,handlers是请求request请求处理对象,pageToHandlers是page于ServletContextHandler之间的对应关系,这样就建立的url于handlers之间的映射。当SparkUI对象调用initialize()方法时这三个数据结构会进行初始化,将WebUIPage的render函数变换成servlet。具体代码如下:

  def initialize() {
    attachTab(new JobsTab(this))
    attachTab(stagesTab)
    attachTab(new StorageTab(this))
    attachTab(new EnvironmentTab(this))
    attachTab(new ExecutorsTab(this))
    attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
    attachHandler(createRedirectHandler("/", "/jobs/", basePath = basePath))
    attachHandler(ApiRootResource.getServletHandler(this))
    // This should be POST only, but, the YARN AM proxy won't proxy POSTs
    attachHandler(createRedirectHandler(
      "/stages/stage/kill", "/stages/", stagesTab.handleKillRequest,
      httpMethods = Set("GET", "POST")))
  }
  initialize() //创建SparkUI时就初始化

  def attachTab(tab: WebUITab) {
    tab.pages.foreach(attachPage)
    tabs += tab
  }

  /** Attach a page to this UI. */
  def attachPage(page: WebUIPage) {
    val pagePath = "/" + page.prefix
    val renderHandler = createServletHandler(pagePath,
      (request: HttpServletRequest) => page.render(request), securityManager, conf, basePath)
    val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
      (request: HttpServletRequest) => page.renderJson(request), securityManager, conf, basePath)
    attachHandler(renderHandler)
    attachHandler(renderJsonHandler)
    pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
      .append(renderHandler)
  }

WebUITab,WebUIPage其实是对标签,url和页面html生成的封装。WebUITab靠字段prefix拼接成其包含的WebUIPage的url,WebUIPage的子类实现处理逻辑,生成html。

在SparkUI中添加新标签

理解了spark监控页面的逻辑,给其添加新的标签tab是比较简单的事情。我实现的例子非常简单,效果如下所示:


image.png

在spark源码中ui包下新建包yss,添加两个类YssPage, YssTab。


image.png
YssTab代码为:
private[ui] class YssTab(parent: SparkUI) extends SparkUITab(parent, "yss") {
  attachPage(new YssPage(this))
}

YssPage 代码为:

private[ui] class YssPage(parent: YssTab) extends WebUIPage("") {
  def render(request: HttpServletRequest): Seq[Node] = {
    val content =
      <div class="yss_tech">
        Hello Yss
      </div>
    UIUtils.headerSparkPage("yss", content, parent)
  }
}

最后在SparkUI类initialize()函数中加入:

attachTab(new YssTab(this))

写一个简单spark app,如最经典的WordCount,在ide运行即可在浏览器看到我们新添加的页面。在此基础上我们可以添加我们关心的监控信息到新页面。

valconf =newSparkConf().setMaster("local[1]").setAppName("wordcount")
valsc =newSparkContext(conf)
valline = sc.textFile("pom.xml")
line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect().foreach(println)                                                                               
Thread.sleep(5*60*1000)
sc.stop()
上一篇 下一篇

猜你喜欢

热点阅读