Flink源码阅读系列

Flink源码阅读之Sql-Client的执行原理

2020-12-03  本文已影响0人  〇白衣卿相〇

前言

sql-cli相信大家都用过,通过sql-client.sh embedded启动就会进入交互界面,每条sql都可以单独执行。在功能调试时非常方便,还有进入界面的那个大松鼠相当可爱。

脚本

先上脚本代码

#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

################################################################################
# Adopted from "flink" bash script
################################################################################

target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \(.*\)$'`
    iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

CC_CLASSPATH=`constructFlinkClassPath`

################################################################################
# SQL client specific logic
################################################################################

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)

# get path of jar in /opt if it exist
FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar")

# add flink-python jar to the classpath
if [[ ! "$CC_CLASSPATH" =~ .*flink-python.*.jar ]]; then
    FLINK_PYTHON_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-python.*.jar")
    if [ -n "$FLINK_PYTHON_JAR" ]; then
        CC_CLASSPATH="$CC_CLASSPATH:$FLINK_PYTHON_JAR"
    fi
fi

# check if SQL client is already in classpath and must not be shipped manually
if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then

    # start client without jar
    exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.client.SqlClient "$@"

# check if SQL client jar is in /opt
elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then

    # start client with jar
    exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`"

# write error message to stderr
else
    (>&2 echo "[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar' neither found in classpath nor /opt directory should be located in $FLINK_OPT_DIR.")

    # exit to force process failure
    exit 1
fi

内容比较简单,直接看最后启动命令调了那个类,可以看到是SqlClient。接下来就可以进入代码了。

代码

main方法:

public static void main(String[] args) {
        if (args.length < 1) {
            CliOptionsParser.printHelpClient();
            return;
        }

        switch (args[0]) {

            case MODE_EMBEDDED:
                // remove mode
                final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length);
                final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
                if (options.isPrintHelp()) {
                    CliOptionsParser.printHelpEmbeddedModeClient();
                } else {
                    try {
                        final SqlClient client = new SqlClient(true, options);
                        client.start();
                    } catch (SqlClientException e) {
                        // make space in terminal
                        System.out.println();
                        System.out.println();
                        LOG.error("SQL Client must stop.", e);
                        throw e;
                    } catch (Throwable t) {
                        // make space in terminal
                        System.out.println();
                        System.out.println();
                        LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
                        throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
                    }
                }
                break;

            case MODE_GATEWAY:
                throw new SqlClientException("Gateway mode is not supported yet.");

            default:
                CliOptionsParser.printHelpClient();
        }
    }

目前只支持embedded模式,后面还支持一些参数。对参数做解析,然后启动SqlClient。在start方法中主要做几件事:

  1. 根据配置加载一些依赖
  2. 启动gataway,用来和其他系统交互
  3. 构造environment
  4. 添加hook在程序结束时做一些事情
  5. 正式开始界面操作
private void start() {
        if (isEmbedded) {
            // create local executor with default environment
            final List<URL> jars;
            if (options.getJars() != null) {
                jars = options.getJars();
            } else {
                jars = Collections.emptyList();
            }
            final List<URL> libDirs;
            if (options.getLibraryDirs() != null) {
                libDirs = options.getLibraryDirs();
            } else {
                libDirs = Collections.emptyList();
            }
            final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
            executor.start();

            // create CLI client with session environment
            final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
            appendPythonConfig(sessionEnv, options.getPythonConfiguration());
            final SessionContext context;
            if (options.getSessionId() == null) {
                context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
            } else {
                context = new SessionContext(options.getSessionId(), sessionEnv);
            }

            // Open an new session
            String sessionId = executor.openSession(context);
            try {
                // add shutdown hook
                Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));

                // do the actual work
                openCli(sessionId, executor);
            } finally {
                executor.closeSession(sessionId);
            }
        } else {
            throw new SqlClientException("Gateway mode is not supported yet.");
        }
    }

执行sql语句是借助于CliClient

private void openCli(String sessionId, Executor executor) {
        CliClient cli = null;
        try {
            Path historyFilePath;
            if (options.getHistoryFilePath() != null) {
                historyFilePath = Paths.get(options.getHistoryFilePath());
            } else {
                historyFilePath = Paths.get(System.getProperty("user.home"),
                        SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
            }
            cli = new CliClient(sessionId, executor, historyFilePath);
            // interactive CLI mode
            if (options.getUpdateStatement() == null) {
                cli.open();
            }
            // execute single update statement
            else {
                final boolean success = cli.submitUpdate(options.getUpdateStatement());
                if (!success) {
                    throw new SqlClientException("Could not submit given SQL update statement to cluster.");
                }
            }
        } finally {
            if (cli != null) {
                cli.close();
            }
        }
    }

在open方法中接受sql-cli界面输入的sql语句进行解析,以分号作为一条sql的结束,借助SqlCommandParser对命令做解析,根据不同命令做不同处理。

open() {
        isRunning = true;

        // print welcome
        terminal.writer().append(CliStrings.MESSAGE_WELCOME);

        // begin reading loop
        while (isRunning) {
            // make some space to previous command
            terminal.writer().append("\n");
            terminal.flush();

            final String line;
            try {
                line = lineReader.readLine(prompt, null, (MaskingCallback) null, null);
            } catch (UserInterruptException e) {
                // user cancelled line with Ctrl+C
                continue;
            } catch (EndOfFileException | IOError e) {
                // user cancelled application with Ctrl+D or kill
                break;
            } catch (Throwable t) {
                throw new SqlClientException("Could not read from command line.", t);
            }
            if (line == null) {
                continue;
            }
            final Optional<SqlCommandCall> cmdCall = parseCommand(line);
            cmdCall.ifPresent(this::callCommand);
        }
    }

callCommand这个方法比较长,就是根据不同的sql执行不同的操作
比如create table的sql调用的是callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED);方法
最终会调用TableEnvironment#executeSql方法。
insert或insert overwrite调用的是callInsert(cmdCall)最终会调用TableEnvironment#sqlUpdate,这个方法已经过时。
其他的sql都是类似,感兴趣的可以跟代码进去看,再往后就是跟sql程序一样了,sql验证→转换→优化→翻译成transformation→提交执行。可以看Flink源码阅读之Flinksql执行流程这篇文章。

上一篇下一篇

猜你喜欢

热点阅读