Flink源码阅读之Sql-Client的执行原理
2020-12-03 本文已影响0人
〇白衣卿相〇
前言
sql-cli相信大家都用过,通过sql-client.sh embedded启动就会进入交互界面,每条sql都可以单独执行。在功能调试时非常方便,还有进入界面的那个大松鼠相当可爱。
脚本
先上脚本代码
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
################################################################################
# Adopted from "flink" bash script
################################################################################
target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
if [ "$iteration" -gt 100 ]; then
echo "Cannot resolve path: You have a cyclic symlink in $target."
break
fi
ls=`ls -ld -- "$target"`
target=`expr "$ls" : '.* -> \(.*\)$'`
iteration=$((iteration + 1))
done
# Convert relative path to absolute path
bin=`dirname "$target"`
# get flink config
. "$bin"/config.sh
if [ "$FLINK_IDENT_STRING" = "" ]; then
FLINK_IDENT_STRING="$USER"
fi
CC_CLASSPATH=`constructFlinkClassPath`
################################################################################
# SQL client specific logic
################################################################################
log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-sql-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)
# get path of jar in /opt if it exist
FLINK_SQL_CLIENT_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-sql-client.*.jar")
# add flink-python jar to the classpath
if [[ ! "$CC_CLASSPATH" =~ .*flink-python.*.jar ]]; then
FLINK_PYTHON_JAR=$(find "$FLINK_OPT_DIR" -regex ".*flink-python.*.jar")
if [ -n "$FLINK_PYTHON_JAR" ]; then
CC_CLASSPATH="$CC_CLASSPATH:$FLINK_PYTHON_JAR"
fi
fi
# check if SQL client is already in classpath and must not be shipped manually
if [[ "$CC_CLASSPATH" =~ .*flink-sql-client.*.jar ]]; then
# start client without jar
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.table.client.SqlClient "$@"
# check if SQL client jar is in /opt
elif [ -n "$FLINK_SQL_CLIENT_JAR" ]; then
# start client with jar
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS:$FLINK_SQL_CLIENT_JAR"`" org.apache.flink.table.client.SqlClient "$@" --jar "`manglePath $FLINK_SQL_CLIENT_JAR`"
# write error message to stderr
else
(>&2 echo "[ERROR] Flink SQL Client JAR file 'flink-sql-client*.jar' neither found in classpath nor /opt directory should be located in $FLINK_OPT_DIR.")
# exit to force process failure
exit 1
fi
内容比较简单,直接看最后启动命令调了那个类,可以看到是SqlClient。接下来就可以进入代码了。
代码
main方法:
public static void main(String[] args) {
if (args.length < 1) {
CliOptionsParser.printHelpClient();
return;
}
switch (args[0]) {
case MODE_EMBEDDED:
// remove mode
final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length);
final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
if (options.isPrintHelp()) {
CliOptionsParser.printHelpEmbeddedModeClient();
} else {
try {
final SqlClient client = new SqlClient(true, options);
client.start();
} catch (SqlClientException e) {
// make space in terminal
System.out.println();
System.out.println();
LOG.error("SQL Client must stop.", e);
throw e;
} catch (Throwable t) {
// make space in terminal
System.out.println();
System.out.println();
LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
}
}
break;
case MODE_GATEWAY:
throw new SqlClientException("Gateway mode is not supported yet.");
default:
CliOptionsParser.printHelpClient();
}
}
目前只支持embedded模式,后面还支持一些参数。对参数做解析,然后启动SqlClient。在start方法中主要做几件事:
- 根据配置加载一些依赖
- 启动gataway,用来和其他系统交互
- 构造environment
- 添加hook在程序结束时做一些事情
- 正式开始界面操作
private void start() {
if (isEmbedded) {
// create local executor with default environment
final List<URL> jars;
if (options.getJars() != null) {
jars = options.getJars();
} else {
jars = Collections.emptyList();
}
final List<URL> libDirs;
if (options.getLibraryDirs() != null) {
libDirs = options.getLibraryDirs();
} else {
libDirs = Collections.emptyList();
}
final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
executor.start();
// create CLI client with session environment
final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
appendPythonConfig(sessionEnv, options.getPythonConfiguration());
final SessionContext context;
if (options.getSessionId() == null) {
context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
} else {
context = new SessionContext(options.getSessionId(), sessionEnv);
}
// Open an new session
String sessionId = executor.openSession(context);
try {
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(sessionId, executor));
// do the actual work
openCli(sessionId, executor);
} finally {
executor.closeSession(sessionId);
}
} else {
throw new SqlClientException("Gateway mode is not supported yet.");
}
}
执行sql语句是借助于CliClient
private void openCli(String sessionId, Executor executor) {
CliClient cli = null;
try {
Path historyFilePath;
if (options.getHistoryFilePath() != null) {
historyFilePath = Paths.get(options.getHistoryFilePath());
} else {
historyFilePath = Paths.get(System.getProperty("user.home"),
SystemUtils.IS_OS_WINDOWS ? "flink-sql-history" : ".flink-sql-history");
}
cli = new CliClient(sessionId, executor, historyFilePath);
// interactive CLI mode
if (options.getUpdateStatement() == null) {
cli.open();
}
// execute single update statement
else {
final boolean success = cli.submitUpdate(options.getUpdateStatement());
if (!success) {
throw new SqlClientException("Could not submit given SQL update statement to cluster.");
}
}
} finally {
if (cli != null) {
cli.close();
}
}
}
在open方法中接受sql-cli界面输入的sql语句进行解析,以分号作为一条sql的结束,借助SqlCommandParser对命令做解析,根据不同命令做不同处理。
open() {
isRunning = true;
// print welcome
terminal.writer().append(CliStrings.MESSAGE_WELCOME);
// begin reading loop
while (isRunning) {
// make some space to previous command
terminal.writer().append("\n");
terminal.flush();
final String line;
try {
line = lineReader.readLine(prompt, null, (MaskingCallback) null, null);
} catch (UserInterruptException e) {
// user cancelled line with Ctrl+C
continue;
} catch (EndOfFileException | IOError e) {
// user cancelled application with Ctrl+D or kill
break;
} catch (Throwable t) {
throw new SqlClientException("Could not read from command line.", t);
}
if (line == null) {
continue;
}
final Optional<SqlCommandCall> cmdCall = parseCommand(line);
cmdCall.ifPresent(this::callCommand);
}
}
callCommand这个方法比较长,就是根据不同的sql执行不同的操作
比如create table的sql调用的是callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED);方法
最终会调用TableEnvironment#executeSql方法。
insert或insert overwrite调用的是callInsert(cmdCall)最终会调用TableEnvironment#sqlUpdate,这个方法已经过时。
其他的sql都是类似,感兴趣的可以跟代码进去看,再往后就是跟sql程序一样了,sql验证→转换→优化→翻译成transformation→提交执行。可以看Flink源码阅读之Flinksql执行流程这篇文章。