大数据

Zeppelin SDK :Flink 平台建设的基石

2020-09-03  本文已影响0人  Flink中文社区

用过 Zeppelin 的人应该比较熟悉 Zeppelin 的 UI,因为 Zeppelin 的主要使用场景都是交互式,用户需要手动来操作。那除了这种手动的方式,还有其他的方式吗?如果你不想用 Zeppelin UI,但又想用 Zeppelin 提交和管理大数据作业 (比如 Flink Job)的能力该怎么办?或者是你在 Zeppelin 里写好了代码,想定时调度起来,或者集成到其他系统里,该怎么办?

如果你有这样的诉求,那么 Zeppelin Client API (SDK)就是你所需要的东西。

Zeppelin 简介

对于不熟悉 Zeppelin 的人,可以用一句话来解释 Zeppelin:大数据引擎的入口,交互式大数据分析平台底座。Zeppelin 最大的特点是连接多种引擎,具有可插拔式,下面这张图例举了一些常用的引擎,当然 Zeppelin 还支持其他很多引擎,这里就不一一例举。

虽然 Zeppelin 有 Rest API,但是 Zeppelin 的 Rest API 太多,对于很多不熟悉 Zeppelin 的人来说使用 Rest API 门槛太高,所以 Zeppelin 专门开发了一个 Client API (SDK),方便大家做集成。Zeppelin Client API (SDK)分为 2 个层面的的东西(接下来会逐个详细介绍):

Zeppelin Client API (Low Level API)

Zeppelin Client API 可以在 Note 和 Paragraph 的粒度进行操作。你可以先在 notebook 里写好代码 (比如开发阶段在 notebook 里写代码,做测试),然后用 Low Level API 用编程的方式把 Job 跑起来(比如生产阶段把作业定时调度起来)。Zeppelin Client API 最重要的 class 是 ZeppelinClient,也是 Zeppelin Client API 的入口。下面例举几个重要的接口(这些 API 都比较直观,我就不多做解释了)。

public String createNote(String notePath) throws Exception 

public void deleteNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId) throws Exception 

public NoteResult executeNote(String noteId, 
                              Map<String, String> parameters) throws Exception
                              
public NoteResult queryNoteResult(String noteId) throws Exception 

public NoteResult submitNote(String noteId) throws Exception

public NoteResult submitNote(String noteId, 
                             Map<String, String> parameters) throws Exception 
                             
public NoteResult waitUntilNoteFinished(String noteId) throws Exception

public String addParagraph(String noteId, 
                           String title, 
                           String text) throws Exception
                           
public void updateParagraph(String noteId, 
                            String paragraphId, 
                            String title, 
                            String text) throws Exception
                            
public ParagraphResult executeParagraph(String noteId,
                                        String paragraphId,
                                        String sessionId,
                                        Map<String, String> parameters) throws Exception
                                        
public ParagraphResult submitParagraph(String noteId,
                                       String paragraphId,
                                       String sessionId,
                                       Map<String, String> parameters) throws Exception
                                       
public void cancelParagraph(String noteId, String paragraphId)
    
public ParagraphResult queryParagraphResult(String noteId, String paragraphId) 
    
public ParagraphResult waitUtilParagraphFinish(String noteId, String paragraphId)

那这些 API 能用来做什么呢?

一个典型的用途是我们在 Zeppelin 里写好代码,做好测试,然后在第三方系统里集成进来。比如下面的代码就是把 Zeppelin 自带的 Spark Basic Features 用编程的方式跑起来,你不仅可以跑 Zeppelin Note,还可以拿到运行结果 (ParagraphResult)。怎么处理运行结果,就留给你发挥想象的空间吧(可以在你的系统里展示出来,或者可视化出来,或者传给其他系统做消费等等)。

此外,对于 Dynamic forms(动态控件,比如文本框,下拉框等等),你还可以动态的提供参数,如下面例子里的 maxAge 和 marital。

ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
ZeppelinClient zClient = new ZeppelinClient(clientConfig);

String zeppelinVersion = zClient.getVersion();
System.out.println("Zeppelin version: " + zeppelinVersion);

ParagraphResult paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015259_1403135953");
System.out.println("Execute the 1st spark tutorial paragraph, paragraph result: " + paragraphResult);

paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150210-015302_1492795503");
System.out.println("Execute the 2nd spark tutorial paragraph, paragraph result: " + paragraphResult);

Map<String, String> parameters = new HashMap<>();
parameters.put("maxAge", "40");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150212-145404_867439529", parameters);
System.out.println("Execute the 3rd spark tutorial paragraph, paragraph result: " + paragraphResult);

parameters = new HashMap<>();
parameters.put("marital", "married");
paragraphResult = zClient.executeParagraph("2A94M5J1Z", "20150213-230422_1600658137", parameters);
System.out.println("Execute the 4th spark tutorial paragraph, paragraph result: " + paragraphResult);

这下面这张图就是上面我们要 Zeppelin Client API 跑的 Zeppelin 自带的 Spark Basic Features。

Session API (High Level API)

Session API 是 Zeppelin 的high level api,Session API 里没有 Note,Paragraph 的概念,粒度是你提交的代码。Session API里最重要的class就是 ZSession,这也是Session API的入口,一个 ZSession 代表一个独立的Zeppelin Interpreter 进程,对于 Flink 来说就是一个独立的 Flink Session Cluster。下面例举一些典型的接口(这些 API 都比较直观,我就不多做解释了)。

public void start() throws Exception

public void start(MessageHandler messageHandler) throws Exception

public void stop() throws Exception

public ExecuteResult execute(String code) throws Exception

public ExecuteResult execute(String subInterpreter,
                             Map<< span="">String, String> localProperties,
                             String code,
                             StatementMessageHandler messageHandler) throws Exception

public ExecuteResult submit(String code) throws Exception

public ExecuteResult submit(String subInterpreter,
                            Map<< span="">String, String> localProperties,
                            String code,
                            StatementMessageHandler messageHandler) throws Exception
                           
public void cancel(String statementId) throws Exception
 
public ExecuteResult queryStatement(String statementId) throws Exception

public ExecuteResult waitUntilFinished(String statementId) throws Exception

那这个 API 能用来做什么呢? 一个典型的用途是就是我们动态创建 Session (Zeppelin Interpreter 进程),动态的提交运行代码,并拿到运行结果。比如你不想用 Zeppelin 的 UI,要自己做一个 Flink 的开发管理平台,那么你就可以自己做 UI,让用户在 UI 上配置 Flink Job,输入 SQL,然后把所有的这些信息发送到后端,后端调用 ZSession 来运行 Flink Job。

下面的 Java 代码就是用编程的方式调用了 2 条 Flink SQL 语句,并且在 MyStatementMessageHandler1 和 MyStatementMessageHandler2 中读取源源不断发送过来更新的 SQL 运行结果 (怎么来使用这个结果就靠你的想象力了)。

需要说明的是像 Flink Interpreter 这种流式结果数据更新是通过 WebSocket 实现的,所以下面的代码里有会有 CompositeMessageHandler,MyStatementMessageHandler1 以及 MyStatementMessageHandler2,这些 MessageHandler 就是用来处理通过 WebSocket 发送过来的流式数据结果。下面是 2 条我们在 Zeppelin 里运行的 Flink SQL。

接下来我们会用 Zeppelin Session API 来跑着这 2 条 Flink SQL,然后我们会在MyStatementMessageHandler1,MyStatementMessageHandler2 里拿到结果展示出来。

ZSession session = null;
try {
    ClientConfig clientConfig = new ClientConfig("http://localhost:8080");
    Map<< span="">String, String> intpProperties = new HashMap<>();

    session = ZSession.builder()
        .setClientConfig(clientConfig)
        .setInterpreter("flink")
        .setIntpProperties(intpProperties)
        .build();

    // CompositeMessageHandler allow you to add StatementMessageHandler for each statement.
    // otherwise you have to use a global MessageHandler.
    session.start(new CompositeMessageHandler());
    System.out.println("Flink Web UI: " + session.getWeburl());

    System.out.println("-----------------------------------------------------------------------------");
    String initCode = IOUtils.toString(FlinkAdvancedExample.class.getResource("/init_stream.scala"));
    ExecuteResult result = session.execute(initCode);
    System.out.println("Job status: " + result.getStatus() + ", data: " + result.getResults().get(0).getData());

    // run flink ssql
    Map<< span="">String, String> localProperties = new HashMap<>();
    localProperties.put("type", "update");
    result = session.submit("ssql", localProperties, "select url, count(1) as pv from log group by url",
                            new MyStatementMessageHandler1());
    session.waitUntilFinished(result.getStatementId());

    result = session.submit("ssql", localProperties, "select upper(url), count(1) as pv from log group by url",
                            new MyStatementMessageHandler2());
    session.waitUntilFinished(result.getStatementId());

} catch (Exception e) {
    e.printStackTrace();
} finally {
    if (session != null) {
        try {
            session.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public static class MyStatementMessageHandler1 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler1, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler1, update output: " + output);
    }
}

public static class MyStatementMessageHandler2 implements StatementMessageHandler {

    @Override
    public void onStatementAppendOutput(String statementId, int index, String output) {
        System.out.println("MyStatementMessageHandler2, append output: " + output);
    }

    @Override
    public void onStatementUpdateOutput(String statementId, int index, String type, String output) {
        System.out.println("MyStatementMessageHandler2, update output: " + output);
    }
}

除了编程方式跑 Flink Job,这个 Session API 还能给我们带来什么呢?

在 Zeppelin 里如果你可以通过 %flink.conf 来对你的 Flink Cluster 进行非常丰富的配置,但是 %flink.conf 是纯文本的配置,不熟悉 Flink 的人很容易配错(如下图)。如果你是自己做 Flink 开发平台的话就可以做一个更完整的 UI,用一些下拉框等等把一些配置选项固定下来,用户只要选择就行了,不需要自己输入文本来配置。

还有下面这类 paragraph 的 local properties 配置,比如 type,template, resumeFromLatestCheckpoint 也是比较容易写错的,同理你可以在自己 UI 里用一些控件把这些选项提前固定下来,而不是让用户输入文本的方式。

我相信 Zeppelin Client API 还有很多可以发挥和想象的空间,大家脑洞起来吧。

更多 Zeppelin 技术干货及使用交流可加入 Flink on Zeppelin 钉钉群。

(钉钉扫码加群)

上一篇下一篇

猜你喜欢

热点阅读