Flink源码阅读系列

Flink源码阅读之flink run 背后做了什么

2020-12-01  本文已影响0人  〇白衣卿相〇

我们经常使用flink run命令提交一个jar任务,比如提交到yarn集群,那么执行这个命令的背后flink都做了什么,能将我们的job提交到集群呢。对此我有兴趣一探究竟。

  1. flink脚本
#!/usr/bin/env bash
################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
target="$0"
# For the case, the executable has been directly symlinked, figure out
# the correct bin path by following its symlink up to an upper bound.
# Note: we can't use the readlink utility here if we want to be POSIX
# compatible.
iteration=0
while [ -L "$target" ]; do
    if [ "$iteration" -gt 100 ]; then
        echo "Cannot resolve path: You have a cyclic symlink in $target."
        break
    fi
    ls=`ls -ld -- "$target"`
    target=`expr "$ls" : '.* -> \(.*\)$'`
    iteration=$((iteration + 1))
done

# Convert relative path to absolute path
bin=`dirname "$target"`

# get flink config
. "$bin"/config.sh

if [ "$FLINK_IDENT_STRING" = "" ]; then
        FLINK_IDENT_STRING="$USER"
fi

CC_CLASSPATH=`constructFlinkClassPath`

log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-client-$HOSTNAME.log
log_setting=(-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlog4j.configurationFile=file:"$FLINK_CONF_DIR"/log4j-cli.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml)

# Add HADOOP_CLASSPATH to allow the usage of Hadoop file systems
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"

上面是flink脚本的内容,我全部搬运过来了,对shell脚本比较小白就不细究了,主要最后一行就行,应该是执行org.apache.flink.client.cli.CliFrontend这个类的main函数。
main函数如下:

/**
     * Submits the job based on the arguments.
     */
    public static void main(final String[] args) {
        EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);

        // 1. find the configuration directory
        final String configurationDirectory = getConfigurationDirectoryFromEnv();

        // 2. load the global configuration
        final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);

        // 3. load the custom command lines
        final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
            configuration,
            configurationDirectory);

        try {
            final CliFrontend cli = new CliFrontend(
                configuration,
                customCommandLines);

            SecurityUtils.install(new SecurityConfiguration(cli.configuration));
            int retCode = SecurityUtils.getInstalledContext()
                    .runSecured(() -> cli.parseParameters(args));
            System.exit(retCode);
        }
        catch (Throwable t) {
            final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
            LOG.error("Fatal error while running command line interface.", strippedThrowable);
            strippedThrowable.printStackTrace();
            System.exit(31);
        }
    }

主要做了几件事:

加载任务需要的依赖包

/**
     * Executions the run action.
     *
     * @param args Command line arguments for the run action.
     */
    protected void run(String[] args) throws Exception {
    ...
    final List<URL> jobJars = program.getJobJarAndDependencies();
        final Configuration effectiveConfiguration =
                getEffectiveConfiguration(commandLine, programOptions, jobJars);

        LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

        try {
            executeProgram(effectiveConfiguration, program);
        } finally {
            program.deleteExtractedLibraries();
        }
}

执行程序,根据用户的类加载器调用用户jar包中的main函数

public static void executeProgram(
            PipelineExecutorServiceLoader executorServiceLoader,
            Configuration configuration,
            PackagedProgram program) throws ProgramInvocationException {
        checkNotNull(executorServiceLoader);
        final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
        final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(userCodeClassLoader);

            LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));

            ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
                    executorServiceLoader,
                    configuration,
                    userCodeClassLoader);
            ContextEnvironment.setAsContext(factory);

            try {
                program.invokeInteractiveModeForExecution();
            } finally {
                ContextEnvironment.unsetContext();
            }
        } finally {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
        }
    }

private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
        mainMethod.invoke(null, (Object) args);
}

这这里已经走到了用户jar包里面的main方法了。然后就是streamGraph生成

public JobExecutionResult execute(String jobName) throws Exception {
        Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");

        return execute(getStreamGraph(jobName));
    }

jobGraph的生成

public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
...
CompletableFuture<JobClient> jobClientFuture = executorFactory
            .getExecutor(configuration)
            .execute(streamGraph, configuration);
...
}

public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
        final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);

        try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
            final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);

            final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);

            final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
                    .deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
            LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());

            return CompletableFuture.completedFuture(
                    new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
        }
    }

ClusterClientProvider有Yarn、k8s、standalone的实现。
以yarn为例,提交任务到yarn之前会做一些检查,包括kerberos认证检查、任务队列检查、资源检查
通过yarnClient创建AppMaster,获取到appId为后续取消任务使用。

public ClusterClientProvider<ApplicationId> deployJobCluster(
        ClusterSpecification clusterSpecification,
        JobGraph jobGraph,
        boolean detached) throws ClusterDeploymentException {
        try {
            return deployInternal(
                clusterSpecification,
                "Flink per-job cluster",
                getYarnJobClusterEntrypoint(),
                jobGraph,
                detached);
        } catch (Exception e) {
            throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
        }
    }

private ClusterClientProvider<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached) throws Exception {

        if (UserGroupInformation.isSecurityEnabled()) {
            // note: UGI::hasKerberosCredentials inaccurately reports false
            // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
            // so we check only in ticket cache scenario.
            boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

            boolean isCredentialsConfigured = HadoopUtils.isCredentialsConfigured(
                UserGroupInformation.getCurrentUser(), useTicketCache);
            if (!isCredentialsConfigured) {
                throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
                    "does not have Kerberos credentials or delegation tokens!");
            }
        }

        isReadyForDeployment(clusterSpecification);

        // ------------------ Check if the specified queue exists --------------------

        checkYarnQueues(yarnClient);

        // ------------------ Check if the YARN ClusterClient has the requested resources --------------

        // Create application via yarnClient
        final YarnClientApplication yarnApplication = yarnClient.createApplication();
        final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

        Resource maxRes = appResponse.getMaximumResourceCapability();

        final ClusterResourceDescription freeClusterMem;
        try {
            freeClusterMem = getCurrentFreeClusterResources(yarnClient);
        } catch (YarnException | IOException e) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
        }

        final int yarnMinAllocationMB = yarnConfiguration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);

        final ClusterSpecification validClusterSpecification;
        try {
            validClusterSpecification = validateClusterResources(
                    clusterSpecification,
                    yarnMinAllocationMB,
                    maxRes,
                    freeClusterMem);
        } catch (YarnDeploymentException yde) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw yde;
        }

        LOG.info("Cluster specification: {}", validClusterSpecification);

        final ClusterEntrypoint.ExecutionMode executionMode = detached ?
                ClusterEntrypoint.ExecutionMode.DETACHED
                : ClusterEntrypoint.ExecutionMode.NORMAL;

        flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());

        ApplicationReport report = startAppMaster(
                flinkConfiguration,
                applicationName,
                yarnClusterEntrypoint,
                jobGraph,
                yarnClient,
                yarnApplication,
                validClusterSpecification);

        // print the application id for user to cancel themselves.
        if (detached) {
            final ApplicationId yarnApplicationId = report.getApplicationId();
            logDetachedClusterInformation(yarnApplicationId, LOG);
        }

        setClusterEntrypointInfoToConfig(report);

        return () -> {
            try {
                return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
            } catch (Exception e) {
                throw new RuntimeException("Error while creating RestClusterClient.", e);
            }
        };
    }

当任务提交到yarn集群后还会有很多资源申请以及分配的工作,包括TM资源、slot资源等。以后有时间在写

上一篇下一篇

猜你喜欢

热点阅读