KSQL源码分析-01:SQL 查询是如何运行的?

2018-08-12  本文已影响464人  larluo_罗浩

你是否想深入理解,当你在命令行提交一个SQL查询的时候,KSQL究竟发生了什么 ?

主要分成两部分: 客户端调用与服务端调用

1. 客户端调用

当我们启动ksql的时候 , 运行了ksql命令.

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat bin/ksql
#!/bin/bash

exec "$base_dir"/bin/ksql-run-class io.confluent.ksql.Ksql "$@"

这个命令很简单,就是运行Ksql.main(args)这个java类。那么我们接着看KSQL这个类 :

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-cli/src/main/java/io/confluent/ksql/Ksql.java
  public static void main(String[] args) throws IOException {
    final Options options = args.length == 0 ? Options.parse("http://localhost:8088")
                                             : Options.parse(args);
    if (options == null) {
      System.exit(-1);
    }

    try {

      final Properties properties = loadProperties(options.getConfigFile());
      final KsqlRestClient restClient = new KsqlRestClient(options.getServer(), properties);

      options.getUserNameAndPassword().ifPresent(
          creds -> restClient.setupAuthenticationCredentials(creds.left, creds.right)
      );

      final KsqlVersionCheckerAgent versionChecker = new KsqlVersionCheckerAgent();
      versionChecker.start(KsqlModuleType.CLI, properties);

      try (Cli cli = new Cli(options.getStreamedQueryRowLimit(),
                                   options.getStreamedQueryTimeoutMs(),
                                   restClient,
                                   new JLineTerminal(options.getOutputFormat(), restClient))
      ) {
        cli.runInteractively();
      }
    } catch (final Exception e) {
      final String msg = ErrorMessageUtil.buildErrorMessage(e);
      LOGGER.error(msg);
      System.err.println(msg);
      System.exit(-1);
    }
  }

这个类可选地加载一些配置文件以及一些命令行参数后调用Cli.runInteractively。

接着看Cli.java这个类

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-cli/src/main/java/io/confluent/ksql/cli/Cli.java
  public void runInteractively() {
    displayWelcomeMessage();
    validateClient(terminal.writer(), restClient);
    boolean eof = false;
    while (!eof) {
      try {
        handleLine(readLine());
      } catch (EndOfFileException exception) {
        // EOF is fine, just terminate the REPL
        terminal.writer().println("Exiting KSQL.");
        eof = true;
      } catch (Exception exception) {
        LOGGER.error("", exception);
        terminal.writer().println(ErrorMessageUtil.buildErrorMessage(exception));
      }
      terminal.flush();
    }
  }

  public void handleLine(String line) throws Exception {
    String trimmedLine = Optional.ofNullable(line).orElse("").trim();

    if (trimmedLine.isEmpty()) {
      return;
    }

    String[] commandArgs = trimmedLine.split("\\s+", 2);
    CliSpecificCommand cliSpecificCommand =
        terminal.getCliSpecificCommands().get(commandArgs[0].toLowerCase());
    if (cliSpecificCommand != null) {
      cliSpecificCommand.execute(commandArgs.length > 1 ? commandArgs[1] : "");
    } else {
      handleStatements(line);
    }
  }

  private void handleStatements(String line)
      throws InterruptedException, IOException, ExecutionException {
    StringBuilder consecutiveStatements = new StringBuilder();
    for (SqlBaseParser.SingleStatementContext statementContext :
        new KsqlParser().getStatements(line)) {
      String statementText = KsqlEngine.getStatementString(statementContext);
      if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext
          || statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
        consecutiveStatements = printOrDisplayQueryResults(
            consecutiveStatements,
            statementContext,
            statementText
        );

      } else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
        listProperties(statementText);

      } else if (statementContext.statement() instanceof SqlBaseParser.SetPropertyContext) {
        setProperty(statementContext);

      } else if (statementContext.statement() instanceof SqlBaseParser.UnsetPropertyContext) {
        consecutiveStatements = unsetProperty(consecutiveStatements, statementContext);
      } else if (statementContext.statement() instanceof SqlBaseParser.RunScriptContext) {
        runScript(statementContext, statementText);
      } else if (statementContext.statement() instanceof SqlBaseParser.RegisterTopicContext) {
        registerTopic(consecutiveStatements, statementContext, statementText);
      } else {
        consecutiveStatements.append(statementText);
      }
    }
    if (consecutiveStatements.length() != 0) {
      printKsqlResponse(
          restClient.makeKsqlRequest(consecutiveStatements.toString())
      );
    }
  }
  private StringBuilder printOrDisplayQueryResults(
      StringBuilder consecutiveStatements,
      SqlBaseParser.SingleStatementContext statementContext,
      String statementText
  ) throws InterruptedException, IOException, ExecutionException {
    if (consecutiveStatements.length() != 0) {
      printKsqlResponse(
          restClient.makeKsqlRequest(consecutiveStatements.toString())
      );
      consecutiveStatements = new StringBuilder();
    }
    if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
      handleStreamedQuery(statementText);
    } else {
      handlePrintedTopic(statementText);
    }
    return consecutiveStatements;
  }
  private void printKsqlResponse(RestResponse<KsqlEntityList> response) throws IOException {
    if (response.isSuccessful()) {
      KsqlEntityList ksqlEntities = response.getResponse();
      boolean noErrorFromServer = true;
      for (KsqlEntity entity : ksqlEntities) {
        if (entity instanceof CommandStatusEntity
            && (
            ((CommandStatusEntity) entity).getCommandStatus().getStatus()
                == CommandStatus.Status.ERROR)
        ) {
          String fullMessage = ((CommandStatusEntity) entity).getCommandStatus().getMessage();
          terminal.printError(fullMessage.split("\n")[0], fullMessage);
          noErrorFromServer = false;
        }
      }
      if (noErrorFromServer) {
        terminal.printKsqlEntityList(response.getResponse());
      }
    } else {
      terminal.printErrorMessage(response.getErrorMessage());
    }
  }


这里 CLI.runInteractively方法 做什么 呢?
从CLI.readLine方法 中读取输入,调用CLI.handleLine进行处理。
CLI.handleLine判断是否是Console.getCliSpecificCommands命令,
如果不是,则调用CLI.handleStatements.
那么哪些是Console.getCliSpecificCommands命令呢?

  public LinkedHashMap<String, CliSpecificCommand> getCliSpecificCommands() {
    return cliSpecificCommands;
  }

  public void registerCliSpecificCommand(final CliSpecificCommand cliSpecificCommand) {
    cliSpecificCommands.put(cliSpecificCommand.getName(), cliSpecificCommand);
  }

  private void registerDefaultCommands() {
    registerCliSpecificCommand(new Help());

    registerCliSpecificCommand(new Clear());

    registerCliSpecificCommand(new Output());

    registerCliSpecificCommand(new History());

    registerCliSpecificCommand(new Version());

    registerCliSpecificCommand(new Exit());
  }

现在就进入了handleStatements的处理逻辑:
handleStatement通过KsqlParser.getStatements(line)进行语句解析后得到语句类型。

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-parser/src/main/java/io/confluent/ksql/parser/KsqlParser.java
import org.antlr.v4.runtime.ANTLRInputStream
  public List<SqlBaseParser.SingleStatementContext> getStatements(String sql) {
    try {
      ParserRuleContext tree = getParseTree(sql);
      SqlBaseParser.StatementsContext statementsContext = (SqlBaseParser.StatementsContext) tree;
      return statementsContext.singleStatement();
    } catch (Exception e) {
      throw new ParseFailedException(e.getMessage(), e);
    }
  }
  private ParserRuleContext getParseTree(String sql) {

    SqlBaseLexer
        sqlBaseLexer =
        new SqlBaseLexer(new CaseInsensitiveStream(new ANTLRInputStream(sql)));
    CommonTokenStream tokenStream = new CommonTokenStream(sqlBaseLexer);
    SqlBaseParser sqlBaseParser = new SqlBaseParser(tokenStream);

    sqlBaseLexer.removeErrorListeners();
    sqlBaseLexer.addErrorListener(ERROR_LISTENER);

    sqlBaseParser.removeErrorListeners();
    sqlBaseParser.addErrorListener(ERROR_LISTENER);

    Function<SqlBaseParser, ParserRuleContext> parseFunction = SqlBaseParser::statements;
    ParserRuleContext tree;
    try {
      // first, try parsing with potentially faster SLL mode
      sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
      tree = parseFunction.apply(sqlBaseParser);
    } catch (ParseCancellationException ex) {
      // if we fail, parse with LL mode
      tokenStream.reset(); // rewind input stream
      sqlBaseParser.reset();

      sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.LL);
      tree = parseFunction.apply(sqlBaseParser);
    }

    return tree;
  }

KsqlParser.getStatements 接着调用antlr的SqlBaseParser::statements进行解析:

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 
grammar SqlBase;

tokens {
    DELIMITER
}

statements
    : (singleStatement)* EOF
    ;

singleStatement
    : statement ';'
    ;
singleExpression
    : expression EOF
    ;

statement
    : query                                                                 #querystatement
    | (LIST | SHOW) PROPERTIES                                              #listProperties
    | (LIST | SHOW) TOPICS                                                  #listTopics
    | (LIST | SHOW) REGISTERED TOPICS                                       #listRegisteredTopics
    | (LIST | SHOW) STREAMS EXTENDED?                                   #listStreams
    | (LIST | SHOW) TABLES EXTENDED?                                    #listTables
    | (LIST | SHOW) FUNCTIONS                                            #listFunctions
    | DESCRIBE EXTENDED? (qualifiedName | TOPIC qualifiedName)              #showColumns
    | DESCRIBE FUNCTION qualifiedName                                       #describeFunction
    | PRINT (qualifiedName | STRING) (FROM BEGINNING)? ((INTERVAL | SAMPLE) number)?   #printTopic
    | (LIST | SHOW) QUERIES EXTENDED?                                   #listQueries
    | TERMINATE QUERY? qualifiedName                                        #terminateQuery
    | SET STRING EQ STRING                                                  #setProperty
    | UNSET STRING                                                          #unsetProperty
    | LOAD expression                                                       #loadProperties
    | REGISTER TOPIC (IF NOT EXISTS)? qualifiedName
            (WITH tableProperties)?                                         #registerTopic
    | CREATE STREAM (IF NOT EXISTS)? qualifiedName
                ('(' tableElement (',' tableElement)* ')')?
                (WITH tableProperties)?                                     #createStream
    | CREATE STREAM (IF NOT EXISTS)? qualifiedName
            (WITH tableProperties)? AS query
                                       (PARTITION BY identifier)?           #createStreamAs
    | CREATE TABLE (IF NOT EXISTS)? qualifiedName
                    ('(' tableElement (',' tableElement)* ')')?
                    (WITH tableProperties)?                                 #createTable
    | CREATE TABLE (IF NOT EXISTS)? qualifiedName
            (WITH tableProperties)? AS query                                #createTableAs
    | INSERT INTO qualifiedName query (PARTITION BY identifier)?            #insertInto
    | DROP TOPIC (IF EXISTS)? qualifiedName                                 #dropTopic
    | DROP STREAM (IF EXISTS)? qualifiedName (DELETE TOPIC)?                  #dropStream
    | DROP TABLE (IF EXISTS)? qualifiedName  (DELETE TOPIC)?                  #dropTable
    | EXPLAIN ANALYZE?
            ('(' explainOption (',' explainOption)* ')')? (statement | qualifiedName)         #explain
    | EXPORT CATALOG TO STRING                                              #exportCatalog
    | RUN SCRIPT STRING                                                     #runScript
    ;
query
    :  queryNoWith
    ;
queryNoWith:
      queryTerm
      (LIMIT limit=(INTEGER_VALUE | ALL))?
    ;
queryTerm
    : queryPrimary                                                             #queryTermDefault
    ;

queryPrimary
    : querySpecification                   #queryPrimaryDefault
    | TABLE qualifiedName                  #table
    | VALUES expression (',' expression)*  #inlineTable
    | '(' queryNoWith  ')'                 #subquery
    ;
querySpecification
    : SELECT STREAM? selectItem (',' selectItem)*
      (INTO into=relationPrimary)?
      (FROM from=relation (',' relation)*)?
      (WINDOW  windowExpression)?
      (WHERE where=booleanExpression)?
      (GROUP BY groupBy)?
      (HAVING having=booleanExpression)?
    ;

通过匹配 SqlBase.g4 语法,得到的statements为 querystatement.
再次回到 CLI.handleStatements.

  private void handleStatements(String line)
      throws InterruptedException, IOException, ExecutionException {
    StringBuilder consecutiveStatements = new StringBuilder();
    for (SqlBaseParser.SingleStatementContext statementContext :
        new KsqlParser().getStatements(line)) {
      String statementText = KsqlEngine.getStatementString(statementContext);
      if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext
          || statementContext.statement() instanceof SqlBaseParser.PrintTopicContext) {
        consecutiveStatements = printOrDisplayQueryResults(
            consecutiveStatements,
            statementContext,
            statementText
        );

      } else if (statementContext.statement() instanceof SqlBaseParser.ListPropertiesContext) {
        listProperties(statementText);

      } else if (statementContext.statement() instanceof SqlBaseParser.SetPropertyContext) {
        setProperty(statementContext);

      } else if (statementContext.statement() instanceof SqlBaseParser.UnsetPropertyContext) {
        consecutiveStatements = unsetProperty(consecutiveStatements, statementContext);
      } else if (statementContext.statement() instanceof SqlBaseParser.RunScriptContext) {
        runScript(statementContext, statementText);
      } else if (statementContext.statement() instanceof SqlBaseParser.RegisterTopicContext) {
        registerTopic(consecutiveStatements, statementContext, statementText);
      } else {
        consecutiveStatements.append(statementText);
      }
    }
    if (consecutiveStatements.length() != 0) {
      printKsqlResponse(
          restClient.makeKsqlRequest(consecutiveStatements.toString())
      );
    }
  }

由于这里的类型 是SqlBaseParser.QuerystatementContext,调用CLI.printOrDisplayQueryResults.

  private StringBuilder printOrDisplayQueryResults(
      StringBuilder consecutiveStatements,
      SqlBaseParser.SingleStatementContext statementContext,
      String statementText
  ) throws InterruptedException, IOException, ExecutionException {
    if (consecutiveStatements.length() != 0) {
      printKsqlResponse(
          restClient.makeKsqlRequest(consecutiveStatements.toString())
      );
      consecutiveStatements = new StringBuilder();
    }
    if (statementContext.statement() instanceof SqlBaseParser.QuerystatementContext) {
      handleStreamedQuery(statementText);
    } else {
      handlePrintedTopic(statementText);
    }
    return consecutiveStatements;
  }
  private void handleStreamedQuery(String query)
      throws InterruptedException, ExecutionException, IOException {
    RestResponse<KsqlRestClient.QueryStream> queryResponse =
        restClient.makeQueryRequest(query);

    LOGGER.debug("Handling streamed query");

    if (queryResponse.isSuccessful()) {
      try (KsqlRestClient.QueryStream queryStream = queryResponse.getResponse()) {
        Future<?> queryStreamFuture = queryStreamExecutorService.submit(new Runnable() {
          @Override
          public void run() {
            for (long rowsRead = 0; keepReading(rowsRead) && queryStream.hasNext(); rowsRead++) {
              try {
                StreamedRow row = queryStream.next();
                terminal.printStreamedRow(row);
                if (row.getFinalMessage() != null || row.getErrorMessage() != null) {
                  break;
                }
              } catch (IOException exception) {
                throw new RuntimeException(exception);
              }
            }
          }
        });

        terminal.handle(Terminal.Signal.INT, signal -> {
          terminal.handle(Terminal.Signal.INT, Terminal.SignalHandler.SIG_IGN);
          queryStreamFuture.cancel(true);
        });

        try {
          if (streamedQueryTimeoutMs == null) {
            queryStreamFuture.get();
            Thread.sleep(1000); // TODO: Make things work without this
          } else {
            try {
              queryStreamFuture.get(streamedQueryTimeoutMs, TimeUnit.MILLISECONDS);
            } catch (TimeoutException exception) {
              queryStreamFuture.cancel(true);
            }
          }
        } catch (CancellationException exception) {
          // It's fine
        }
      } finally {
        terminal.writer().println("Query terminated");
        terminal.flush();
      }
    } else {
      terminal.printErrorMessage(queryResponse.getErrorMessage());
    }
  }

CLI.handleStreamedQuery使用KsqlRestClient.makeQueryRequest进行服务请求。

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/client/KsqlRestClient.java
  public RestResponse<QueryStream> makeQueryRequest(String ksql) {
    KsqlRequest jsonRequest = new KsqlRequest(ksql, localProperties);
    Response response = makePostRequest("query", jsonRequest);
    if (response.getStatus() == Response.Status.OK.getStatusCode()) {
      return RestResponse.successful(new QueryStream(response));
    } else {
      return RestResponse.erroneous(response.readEntity(KsqlErrorMessage.class));
    }
  }
  private Response makePostRequest(String path, Object jsonEntity) {
    try {
      return client.target(serverAddress)
          .path(path)
          .request(MediaType.APPLICATION_JSON_TYPE)
          .post(Entity.json(jsonEntity));
    } catch (Exception exception) {
      throw new KsqlRestClientException("Error issuing POST to KSQL server", exception);
    }
  }

最终使用java自带的jax-rs: javax.ws.rs.client.Clien调用服务端的rest服务。样例发下:

curl -X "POST" "http://localhost:8088/query" \
     -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
     -d $'{
  "ksql": "SELECT * FROM TEST_STREAM;",
  "streamsProperties": {}
}'



2. 服务端代码

当启动ksql服务器时,调用了sql-server-start脚本

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./bin/ksql-server-start
exec "$base_dir"/bin/ksql-run-class io.confluent.ksql.rest.server.KsqlServerMain $EXTRA_ARGS "$@"

这里很直接 ,就是调用KsqlServerMain.main(args)这个java类:

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlServerMain.java
  public static void main(final String[] args) {
    try {
      final ServerOptions serverOptions = ServerOptions.parse(args);
      if (serverOptions == null) {
        return;
      }

      final Properties properties = serverOptions.loadProperties(System::getProperties);
      final String installDir = properties.getProperty("ksql.server.install.dir");
      final Optional<String> queriesFile = serverOptions.getQueriesFile(properties);
      final Executable executable = createExecutable(properties, queriesFile, installDir);
      new KsqlServerMain(executable).tryStartApp();
    } catch (final Exception e) {
      log.error("Failed to start KSQL", e);
      System.exit(-1);
    }
  }
  @SuppressWarnings("OptionalUsedAsFieldOrParameterType")
  private static Executable createExecutable(
      final Properties properties,
      final Optional<String> queriesFile,
      final String installDir
  ) throws Exception {
    if (queriesFile.isPresent()) {
      return StandaloneExecutor.create(properties, queriesFile.get(), installDir);
    }

    if (!properties.containsKey(StreamsConfig.APPLICATION_ID_CONFIG)) {
      properties.put(StreamsConfig.APPLICATION_ID_CONFIG, KSQL_REST_SERVER_DEFAULT_APP_ID);
    }
    final KsqlRestConfig restConfig = new KsqlRestConfig(properties);
    return KsqlRestApplication.buildApplication(
        restConfig,
        new KsqlVersionCheckerAgent()
    );
  }
  void tryStartApp() throws Exception {
    try {
      log.info("Starting server");
      executable.start();
      log.info("Server up and running");
      executable.join();
    } finally {
      log.info("Server shutting down");
      executable.stop();
    }
  }

KsqlServerMain调用KsqlRestApplication.buildApplication构建rest服务器,接着调用tryStartApp启动服务。

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
  public static KsqlRestApplication buildApplication(
      KsqlRestConfig restConfig,
      VersionCheckerAgent versionCheckerAgent
  )
      throws Exception {

    final String ksqlInstallDir = restConfig.getString(KsqlRestConfig.INSTALL_DIR_CONFIG);

    final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());

    KsqlEngine ksqlEngine = new KsqlEngine(ksqlConfig);
    KafkaTopicClient topicClient = ksqlEngine.getTopicClient();
    UdfLoader.newInstance(ksqlConfig, ksqlEngine.getMetaStore(), ksqlInstallDir).load();

    String ksqlServiceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
    String commandTopic =
        restConfig.getCommandTopic(ksqlServiceId);
    ensureCommandTopic(restConfig, topicClient, commandTopic);

    Map<String, Expression> commandTopicProperties = new HashMap<>();
    commandTopicProperties.put(
        DdlConfig.VALUE_FORMAT_PROPERTY,
        new StringLiteral("json")
    );
    commandTopicProperties.put(
        DdlConfig.KAFKA_TOPIC_NAME_PROPERTY,
        new StringLiteral(commandTopic)
    );

    ksqlEngine.getDdlCommandExec().execute(new RegisterTopicCommand(new RegisterTopic(
        QualifiedName.of(COMMANDS_KSQL_TOPIC_NAME),
        false,
        commandTopicProperties
    )), false);

    ksqlEngine.getDdlCommandExec().execute(new CreateStreamCommand(
        "statementText",
        new CreateStream(
            QualifiedName.of(COMMANDS_STREAM_NAME),
            Collections.singletonList(new TableElement(
                "STATEMENT",
                new PrimitiveType(Type.KsqlType.STRING)
            )),
            false,
            Collections.singletonMap(
                DdlConfig.TOPIC_NAME_PROPERTY,
                new StringLiteral(COMMANDS_KSQL_TOPIC_NAME)
            )
        ),
        ksqlEngine.getTopicClient(),
        true
    ), false);

    Map<String, Object> commandConsumerProperties = restConfig.getCommandConsumerProperties();
    KafkaConsumer<CommandId, Command> commandConsumer = new KafkaConsumer<>(
        commandConsumerProperties,
        getJsonDeserializer(CommandId.class, true),
        getJsonDeserializer(Command.class, false)
    );

    KafkaProducer<CommandId, Command> commandProducer = new KafkaProducer<>(
        restConfig.getCommandProducerProperties(),
        getJsonSerializer(true),
        getJsonSerializer(false)
    );

    CommandStore commandStore = new CommandStore(
        commandTopic,
        commandConsumer,
        commandProducer,
        new CommandIdAssigner(ksqlEngine.getMetaStore())
    );

    StatementParser statementParser = new StatementParser(ksqlEngine);

    StatementExecutor statementExecutor = new StatementExecutor(
        ksqlConfig,
        ksqlEngine,
        statementParser
    );

    CommandRunner commandRunner = new CommandRunner(
        statementExecutor,
        commandStore
    );

    RootDocument rootDocument = new RootDocument();

    StatusResource statusResource = new StatusResource(statementExecutor);
    StreamedQueryResource streamedQueryResource = new StreamedQueryResource(
        ksqlConfig,
        ksqlEngine,
        statementParser,
        restConfig.getLong(KsqlRestConfig.STREAMED_QUERY_DISCONNECT_CHECK_MS_CONFIG)
    );
    KsqlResource ksqlResource = new KsqlResource(
        ksqlConfig,
        ksqlEngine,
        commandStore,
        statementExecutor,
        restConfig.getLong(KsqlRestConfig.DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)
    );

    commandRunner.processPriorCommands();

    return new KsqlRestApplication(
        ksqlEngine,
        ksqlConfig,
        restConfig,
        commandRunner,
        rootDocument,
        statusResource,
        streamedQueryResource,
        ksqlResource,
        versionCheckerAgent
    );
  }
  @Override
  public void start() throws Exception {
    super.start();
    commandRunnerThread.start();
    Properties metricsProperties = new Properties();
    metricsProperties.putAll(getConfiguration().getOriginals());
    if (versionCheckerAgent != null) {
      versionCheckerAgent.start(KsqlModuleType.SERVER, metricsProperties);
    }

    displayWelcomeMessage();
  }

  @Override
  public void setupResources(Configurable<?> config, KsqlRestConfig appConfig) {
    config.register(rootDocument);
    config.register(new ServerInfoResource(serverInfo));
    config.register(statusResource);
    config.register(ksqlResource);
    config.register(streamedQueryResource);
    config.register(new KsqlExceptionMapper());
  }


KsqlRestApplication.buildApplication运行过程中,会读 取commandTopic进行恢复,这里不作过多介绍。后续将会详细展开.
接着,KsqlRestApplication.buildApplication加载相关的restful api资源。

随后, KsqlRestApplication.start启动restful服务。
KsqlRestApplication.start首先调用父类Application.start 方法,该application位于另一项目 中:
https://github.com/confluentinc/rest-utils.git

[larluo@larluo-nixos:~/work/git/my-repo/rest-utils]$ cat ./core/src/main/java/io/confluent/rest/Application.java
  /**
   * Register resources or additional Providers, ExceptionMappers, and other JAX-RS components with
   * the Jersey application. This, combined with your Configuration class, is where you can
   * customize the behavior of the application.
   */
  public abstract void setupResources(Configurable<?> config, T appConfig);

  public void start() throws Exception {
    if (server == null) {
      createServer();
    }
    server.start();
  }
  public Server createServer() throws RestConfigException, ServletException {
    // The configuration for the JAX-RS REST service
    ResourceConfig resourceConfig = new ResourceConfig();

    Map<String, String> configuredTags = getConfiguration().getMap(RestConfig.METRICS_TAGS_CONFIG);

    Map<String, String> combinedMetricsTags = new HashMap<>(getMetricsTags());
    combinedMetricsTags.putAll(configuredTags);

    configureBaseApplication(resourceConfig, combinedMetricsTags);
    setupResources(resourceConfig, getConfiguration());

    // Configure the servlet container
    ServletContainer servletContainer = new ServletContainer(resourceConfig);
    final FilterHolder servletHolder = new FilterHolder(servletContainer);

    server = new Server() {
      @Override
      protected void doStop() throws Exception {
        super.doStop();
        Application.this.metrics.close();
        Application.this.onShutdown();
        Application.this.shutdownLatch.countDown();
      }
    };

    MBeanContainer mbContainer = new MBeanContainer(ManagementFactory.getPlatformMBeanServer());
    server.addEventListener(mbContainer);
    server.addBean(mbContainer);

    MetricsListener metricsListener = new MetricsListener(metrics, "jetty", combinedMetricsTags);

    List<URI> listeners = parseListeners(config.getList(RestConfig.LISTENERS_CONFIG),
            config.getInt(RestConfig.PORT_CONFIG), Arrays.asList("http", "https"), "http");
    for (URI listener : listeners) {
      log.info("Adding listener: " + listener.toString());
      NetworkTrafficServerConnector connector;
      if (listener.getScheme().equals("http")) {
        connector = new NetworkTrafficServerConnector(server);
      } else {
        SslContextFactory sslContextFactory = new SslContextFactory();
        if (!config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG).isEmpty()) {
          sslContextFactory.setKeyStorePath(
              config.getString(RestConfig.SSL_KEYSTORE_LOCATION_CONFIG)
          );
          sslContextFactory.setKeyStorePassword(
              config.getPassword(RestConfig.SSL_KEYSTORE_PASSWORD_CONFIG).value()
          );
          sslContextFactory.setKeyManagerPassword(
              config.getPassword(RestConfig.SSL_KEY_PASSWORD_CONFIG).value()
          );
          sslContextFactory.setKeyStoreType(
              config.getString(RestConfig.SSL_KEYSTORE_TYPE_CONFIG)
          );

          if (!config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG).isEmpty()) {
            sslContextFactory.setKeyManagerFactoryAlgorithm(
                    config.getString(RestConfig.SSL_KEYMANAGER_ALGORITHM_CONFIG));
          }
        }

        sslContextFactory.setNeedClientAuth(config.getBoolean(RestConfig.SSL_CLIENT_AUTH_CONFIG));

        List<String> enabledProtocols = config.getList(RestConfig.SSL_ENABLED_PROTOCOLS_CONFIG);
        if (!enabledProtocols.isEmpty()) {
          sslContextFactory.setIncludeProtocols(enabledProtocols.toArray(new String[0]));
        }

        List<String> cipherSuites = config.getList(RestConfig.SSL_CIPHER_SUITES_CONFIG);
        if (!cipherSuites.isEmpty()) {
          sslContextFactory.setIncludeCipherSuites(cipherSuites.toArray(new String[0]));
        }

        if (!config.getString(RestConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG).isEmpty()) {
          sslContextFactory.setEndpointIdentificationAlgorithm(
                  config.getString(RestConfig.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG));
        }

        if (!config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG).isEmpty()) {
          sslContextFactory.setTrustStorePath(
              config.getString(RestConfig.SSL_TRUSTSTORE_LOCATION_CONFIG)
          );
          sslContextFactory.setTrustStorePassword(
              config.getPassword(RestConfig.SSL_TRUSTSTORE_PASSWORD_CONFIG).value()
          );
          sslContextFactory.setTrustStoreType(
              config.getString(RestConfig.SSL_TRUSTSTORE_TYPE_CONFIG)
          );

          if (!config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG).isEmpty()) {
            sslContextFactory.setTrustManagerFactoryAlgorithm(
                    config.getString(RestConfig.SSL_TRUSTMANAGER_ALGORITHM_CONFIG)
            );
          }
        }

        sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROTOCOL_CONFIG));
        if (!config.getString(RestConfig.SSL_PROVIDER_CONFIG).isEmpty()) {
          sslContextFactory.setProtocol(config.getString(RestConfig.SSL_PROVIDER_CONFIG));
        }

        connector = new NetworkTrafficServerConnector(server, sslContextFactory);
      }

      connector.addNetworkTrafficListener(metricsListener);
      connector.setPort(listener.getPort());
      connector.setHost(listener.getHost());
      server.addConnector(connector);
    }

    ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
    context.setContextPath("/");


    ServletHolder defaultHolder = new ServletHolder("default", DefaultServlet.class);
    defaultHolder.setInitParameter("dirAllowed", "false");

    ResourceCollection staticResources = getStaticResources();
    if (staticResources != null) {
      context.setBaseResource(staticResources);
    }

    configureSecurityHandler(context);

    List<String> unsecurePaths = config.getList(RestConfig.AUTHENTICATION_SKIP_PATHS);
    setUnsecurePathConstraints(context, unsecurePaths);

    String allowedOrigins = getConfiguration().getString(
        RestConfig.ACCESS_CONTROL_ALLOW_ORIGIN_CONFIG
    );
    if (allowedOrigins != null && !allowedOrigins.trim().isEmpty()) {
      FilterHolder filterHolder = new FilterHolder(CrossOriginFilter.class);
      filterHolder.setName("cross-origin");
      filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins);
      String allowedMethods = getConfiguration().getString(
          RestConfig.ACCESS_CONTROL_ALLOW_METHODS
      );
      if (allowedMethods != null && !allowedOrigins.trim().isEmpty()) {
        filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods);
      }
      context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST));
    }
    configurePreResourceHandling(context);
    context.addFilter(servletHolder, "/*", null);
    configurePostResourceHandling(context);
    context.addServlet(defaultHolder, "/*");

    RequestLogHandler requestLogHandler = new RequestLogHandler();
    requestLogHandler.setRequestLog(requestLog);

    HandlerCollection handlers = new HandlerCollection();
    handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler});

    /* Needed for graceful shutdown as per `setStopTimeout` documentation */
    StatisticsHandler statsHandler = new StatisticsHandler();
    statsHandler.setHandler(handlers);

    final ServletContextHandler webSocketServletContext =
        new ServletContextHandler(ServletContextHandler.SESSIONS);
    webSocketServletContext.setContextPath(
        config.getString(RestConfig.WEBSOCKET_PATH_PREFIX_CONFIG)
    );
    final ContextHandlerCollection contexts = new ContextHandlerCollection();
    contexts.setHandlers(new Handler[] {
        statsHandler,
        webSocketServletContext
    });

    server.setHandler(wrapWithGzipHandler(contexts));

    ServerContainer container =
        WebSocketServerContainerInitializer.configureContext(webSocketServletContext);
    registerWebSocketEndpoints(container);
    int gracefulShutdownMs = getConfiguration().getInt(RestConfig.SHUTDOWN_GRACEFUL_MS_CONFIG);
    if (gracefulShutdownMs > 0) {
      server.setStopTimeout(gracefulShutdownMs);
    }
    server.setStopAtShutdown(true);

    return server;
  }

Application.start()启动过程 中会调用子类KsqlRestApplication.setupResource加载资源路径.
对于查询请求,对应的资源为StreamedQueryResource.

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/StreamedQueryResource.java
  @POST
  @Consumes(MediaType.APPLICATION_JSON)
  public Response streamQuery(KsqlRequest request) throws Exception {
    String ksql = request.getKsql();
    Statement statement;
    if (ksql == null) {
      return Errors.badRequest("\"ksql\" field must be given");
    }
    Map<String, Object> clientLocalProperties =
        Optional.ofNullable(request.getStreamsProperties()).orElse(Collections.emptyMap());
    try {
      statement = statementParser.parseSingleStatement(ksql);
    } catch (IllegalArgumentException | KsqlException e) {
      return Errors.badRequest(e);
    }

    if (statement instanceof Query) {
      QueryStreamWriter queryStreamWriter;
      try {
        queryStreamWriter = new QueryStreamWriter(
            ksqlConfig,
            ksqlEngine,
            disconnectCheckInterval,
            ksql,
            clientLocalProperties,
            objectMapper);
      } catch (KsqlException e) {
        return Errors.badRequest(e);
      }
      log.info("Streaming query '{}'", ksql);
      return Response.ok().entity(queryStreamWriter).build();

    } else if (statement instanceof PrintTopic) {
      TopicStreamWriter topicStreamWriter = getTopicStreamWriter(
          clientLocalProperties,
          (PrintTopic) statement
      );
      return Response.ok().entity(topicStreamWriter).build();
    }
    return Errors.badRequest(String .format(
        "Statement type `%s' not supported for this resource",
        statement.getClass().getName()));
  }

StreamedQueryResource.streamQuery判断为Query类型,构建 QueryStreamWriter对象进行处理.

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/QueryStreamWriter.java
  QueryStreamWriter(
      final KsqlConfig ksqlConfig,
      final KsqlEngine ksqlEngine,
      final long disconnectCheckInterval,
      final String queryString,
      final Map<String, Object> overriddenProperties,
      final ObjectMapper objectMapper
  ) throws Exception {
    QueryMetadata queryMetadata =
        ksqlEngine.buildMultipleQueries(
            queryString, ksqlConfig, overriddenProperties).get(0);
    this.objectMapper = objectMapper;
    if (!(queryMetadata instanceof QueuedQueryMetadata)) {
      throw new Exception(String.format(
          "Unexpected metadata type: expected QueuedQueryMetadata, found %s instead",
          queryMetadata.getClass()
      ));
    }

    this.disconnectCheckInterval = disconnectCheckInterval;
    this.queryMetadata = ((QueuedQueryMetadata) queryMetadata);
    this.queryMetadata.setLimitHandler(new LimitHandler());
    this.queryMetadata.getKafkaStreams().setUncaughtExceptionHandler(new StreamsExceptionHandler());
    this.ksqlEngine = ksqlEngine;

    queryMetadata.getKafkaStreams().start();
  }

QueryStreamWriter.ctor调用KsqlEngine.buildMultipleQueries构建kafka streams dsl后,运行kafka streams进行查询处理.

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/KsqlEngine.java
  public List<QueryMetadata> buildMultipleQueries(
      final String queriesString,
      final KsqlConfig ksqlConfig,
      final Map<String, Object> overriddenProperties) {
    for (String property : overriddenProperties.keySet()) {
      if (IMMUTABLE_PROPERTIES.contains(property)) {
        throw new IllegalArgumentException(
            String.format("Cannot override property '%s'", property)
        );
      }
    }

    // Multiple queries submitted as the same time should success or fail as a whole,
    // Thus we use tempMetaStore to store newly created tables, streams or topics.
    // MetaStore tempMetaStore = new MetaStoreImpl(metaStore);
    MetaStore tempMetaStore = metaStore.clone();

    // Build query AST from the query string
    List<Pair<String, Statement>> queries = parseQueries(
        queriesString,
        tempMetaStore
    );

    return planQueries(queries, ksqlConfig, overriddenProperties, tempMetaStore);
  }
  List<Pair<String, Statement>> parseQueries(
      final String queriesString,
      final MetaStore tempMetaStore
  ) {
    try {
      MetaStore tempMetaStoreForParser = tempMetaStore.clone();
      // Parse and AST creation
      KsqlParser ksqlParser = new KsqlParser();

      List<SqlBaseParser.SingleStatementContext> parsedStatements
          = ksqlParser.getStatements(queriesString);
      List<Pair<String, Statement>> queryList = new ArrayList<>();

      for (SqlBaseParser.SingleStatementContext singleStatementContext : parsedStatements) {
        Pair<Statement, DataSourceExtractor> statementInfo = ksqlParser.prepareStatement(
            singleStatementContext,
            tempMetaStoreForParser
        );
        Statement statement = statementInfo.getLeft();
        if (StatementRewriteForStruct.requiresRewrite(statement)) {
          statement = new StatementRewriteForStruct(
              statement,
              statementInfo.getRight())
              .rewriteForStruct();
        }
        Pair<String, Statement> queryPair =
            buildSingleQueryAst(
                statement,
                getStatementString(singleStatementContext),
                tempMetaStore,
                tempMetaStoreForParser
            );
        if (queryPair != null) {
          queryList.add(queryPair);
        }
      }
      return queryList;
    } catch (Exception e) {
      throw new ParseFailedException("Exception while processing statements :" + e.getMessage(), e);
    }
  }
private Pair<String, Statement> buildSingleQueryAst(
      final Statement statement,
      final String statementString,
      final MetaStore tempMetaStore,
      final MetaStore tempMetaStoreForParser
  ) {

    log.info("Building AST for {}.", statementString);

    if (statement instanceof Query) {
      return new Pair<>(statementString, statement);
    } else if (statement instanceof CreateAsSelect) {
      CreateAsSelect createAsSelect = (CreateAsSelect) statement;
      QuerySpecification querySpecification =
          (QuerySpecification) createAsSelect.getQuery().getQueryBody();
      Query query = addInto(
          createAsSelect.getQuery(),
          querySpecification,
          createAsSelect.getName().getSuffix(),
          createAsSelect.getProperties(),
          createAsSelect.getPartitionByColumn(),
          true
      );
      tempMetaStoreForParser.putSource(
          queryEngine.getResultDatasource(
              querySpecification.getSelect(),
              createAsSelect.getName().getSuffix()
          ).cloneWithTimeKeyColumns());
      return new Pair<>(statementString, query);
    } else if (statement instanceof InsertInto) {
      InsertInto insertInto = (InsertInto) statement;
      if (tempMetaStoreForParser.getSource(insertInto.getTarget().getSuffix()) == null) {
        throw new KsqlException(String.format("Sink, %s, does not exist for the INSERT INTO "
                                              + "statement.", insertInto.getTarget().getSuffix()));
      }

      if (tempMetaStoreForParser.getSource(insertInto.getTarget().getSuffix()).getDataSourceType()
          != DataSource.DataSourceType.KSTREAM) {
        throw new KsqlException(String.format("INSERT INTO can only be used to insert into a "
                                              + "stream. %s is a table.",
                                              insertInto.getTarget().getSuffix()));
      }

      QuerySpecification querySpecification =
          (QuerySpecification) insertInto.getQuery().getQueryBody();

      Query query = addInto(
          insertInto.getQuery(),
          querySpecification,
          insertInto.getTarget().getSuffix(),
          new HashMap<>(),
          insertInto.getPartitionByColumn(),
          false
      );

      return new Pair<>(statementString, query);
    } else  if (statement instanceof DdlStatement) {
      return buildSingleDdlStatement(statement,
                                     statementString,
                                     tempMetaStore,
                                     tempMetaStoreForParser);
    }

    return null;
  }
  private List<QueryMetadata> planQueries(
      final List<Pair<String, Statement>> statementList,
      final KsqlConfig ksqlConfig,
      final Map<String, Object> overriddenProperties,
      final MetaStore tempMetaStore
  ) {
    // Logical plan creation from the ASTs
    List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(
        tempMetaStore,
        statementList,
        ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties)
    );

    // Physical plan creation from logical plans.
    List<QueryMetadata> runningQueries = queryEngine.buildPhysicalPlans(
        logicalPlans,
        statementList,
        ksqlConfig,
        overriddenProperties,
        clientSupplier,
        true
    );

    for (QueryMetadata queryMetadata : runningQueries) {
      if (queryMetadata instanceof PersistentQueryMetadata) {
        livePersistentQueries.add(queryMetadata);
        PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
        persistentQueries.put(persistentQueryMetadata.getQueryId(), persistentQueryMetadata);
        metaStore.updateForPersistentQuery(persistentQueryMetadata.getQueryId().getId(),
                                           persistentQueryMetadata.getSourceNames(),
                                           persistentQueryMetadata.getSinkNames());
      }
      allLiveQueries.add(queryMetadata);
    }

    return runningQueries;
  }

  public QueryMetadata getQueryExecutionPlan(final Query query, final KsqlConfig ksqlConfig) {

    // Logical plan creation from the ASTs
    List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(
        metaStore,
        Collections.singletonList(new Pair<>("", query)),
        ksqlConfig);

    // Physical plan creation from logical plans.
    List<QueryMetadata> runningQueries = queryEngine.buildPhysicalPlans(
        logicalPlans,
        Collections.singletonList(new Pair<>("", query)),
        ksqlConfig,
        Collections.emptyMap(),
        clientSupplier,
        false
    );
    return runningQueries.get(0);
  }


KsqlEngine.buildMultipleQueries首先调用KsqlEngine.parseQueries生成AST,接着根据 AST调用KsqlEngine.planQueries生成KakaStreams。

首先来看KsqlEngine.parseQueries

  List<Pair<String, Statement>> parseQueries(
      final String queriesString,
      final MetaStore tempMetaStore
  ) {
    try {
      MetaStore tempMetaStoreForParser = tempMetaStore.clone();
      // Parse and AST creation
      KsqlParser ksqlParser = new KsqlParser();

      List<SqlBaseParser.SingleStatementContext> parsedStatements
          = ksqlParser.getStatements(queriesString);
      List<Pair<String, Statement>> queryList = new ArrayList<>();

      for (SqlBaseParser.SingleStatementContext singleStatementContext : parsedStatements) {
        Pair<Statement, DataSourceExtractor> statementInfo = ksqlParser.prepareStatement(
            singleStatementContext,
            tempMetaStoreForParser
        );
        Statement statement = statementInfo.getLeft();
        if (StatementRewriteForStruct.requiresRewrite(statement)) {
          statement = new StatementRewriteForStruct(
              statement,
              statementInfo.getRight())
              .rewriteForStruct();
        }
        Pair<String, Statement> queryPair =
            buildSingleQueryAst(
                statement,
                getStatementString(singleStatementContext),
                tempMetaStore,
                tempMetaStoreForParser
            );
        if (queryPair != null) {
          queryList.add(queryPair);
        }
      }
      return queryList;
    } catch (Exception e) {
      throw new ParseFailedException("Exception while processing statements :" + e.getMessage(), e);
    }
  }

这里调用KsqlParser.getStatements进行语句解析:

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-parser/src/main/java/io/confluent/ksql/parser/KsqlParser.java
import org.antlr.v4.runtime.ANTLRInputStream
  public List<SqlBaseParser.SingleStatementContext> getStatements(String sql) {
    try {
      ParserRuleContext tree = getParseTree(sql);
      SqlBaseParser.StatementsContext statementsContext = (SqlBaseParser.StatementsContext) tree;
      return statementsContext.singleStatement();
    } catch (Exception e) {
      throw new ParseFailedException(e.getMessage(), e);
    }
  }
  private ParserRuleContext getParseTree(String sql) {

    SqlBaseLexer
        sqlBaseLexer =
        new SqlBaseLexer(new CaseInsensitiveStream(new ANTLRInputStream(sql)));
    CommonTokenStream tokenStream = new CommonTokenStream(sqlBaseLexer);
    SqlBaseParser sqlBaseParser = new SqlBaseParser(tokenStream);

    sqlBaseLexer.removeErrorListeners();
    sqlBaseLexer.addErrorListener(ERROR_LISTENER);

    sqlBaseParser.removeErrorListeners();
    sqlBaseParser.addErrorListener(ERROR_LISTENER);

    Function<SqlBaseParser, ParserRuleContext> parseFunction = SqlBaseParser::statements;
    ParserRuleContext tree;
    try {
      // first, try parsing with potentially faster SLL mode
      sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
      tree = parseFunction.apply(sqlBaseParser);
    } catch (ParseCancellationException ex) {
      // if we fail, parse with LL mode
      tokenStream.reset(); // rewind input stream
      sqlBaseParser.reset();

      sqlBaseParser.getInterpreter().setPredictionMode(PredictionMode.LL);
      tree = parseFunction.apply(sqlBaseParser);
    }

    return tree;
  }

KsqlParser.getStatements 接着调用antlr的SqlBaseParser::statements进行解析:

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 
grammar SqlBase;

tokens {
    DELIMITER
}

statements
    : (singleStatement)* EOF
    ;

singleStatement
    : statement ';'
    ;
singleExpression
    : expression EOF
    ;

statement
    : query                                                                 #querystatement
    | (LIST | SHOW) PROPERTIES                                              #listProperties
    | (LIST | SHOW) TOPICS                                                  #listTopics
    | (LIST | SHOW) REGISTERED TOPICS                                       #listRegisteredTopics
    | (LIST | SHOW) STREAMS EXTENDED?                                   #listStreams
    | (LIST | SHOW) TABLES EXTENDED?                                    #listTables
    | (LIST | SHOW) FUNCTIONS                                            #listFunctions
    | DESCRIBE EXTENDED? (qualifiedName | TOPIC qualifiedName)              #showColumns
    | DESCRIBE FUNCTION qualifiedName                                       #describeFunction
    | PRINT (qualifiedName | STRING) (FROM BEGINNING)? ((INTERVAL | SAMPLE) number)?   #printTopic
    | (LIST | SHOW) QUERIES EXTENDED?                                   #listQueries
    | TERMINATE QUERY? qualifiedName                                        #terminateQuery
    | SET STRING EQ STRING                                                  #setProperty
    | UNSET STRING                                                          #unsetProperty
    | LOAD expression                                                       #loadProperties
    | REGISTER TOPIC (IF NOT EXISTS)? qualifiedName
            (WITH tableProperties)?                                         #registerTopic
    | CREATE STREAM (IF NOT EXISTS)? qualifiedName
                ('(' tableElement (',' tableElement)* ')')?
                (WITH tableProperties)?                                     #createStream
    | CREATE STREAM (IF NOT EXISTS)? qualifiedName
            (WITH tableProperties)? AS query
                                       (PARTITION BY identifier)?           #createStreamAs
    | CREATE TABLE (IF NOT EXISTS)? qualifiedName
                    ('(' tableElement (',' tableElement)* ')')?
                    (WITH tableProperties)?                                 #createTable
    | CREATE TABLE (IF NOT EXISTS)? qualifiedName
            (WITH tableProperties)? AS query                                #createTableAs
    | INSERT INTO qualifiedName query (PARTITION BY identifier)?            #insertInto
    | DROP TOPIC (IF EXISTS)? qualifiedName                                 #dropTopic
    | DROP STREAM (IF EXISTS)? qualifiedName (DELETE TOPIC)?                  #dropStream
    | DROP TABLE (IF EXISTS)? qualifiedName  (DELETE TOPIC)?                  #dropTable
    | EXPLAIN ANALYZE?
            ('(' explainOption (',' explainOption)* ')')? (statement | qualifiedName)         #explain
    | EXPORT CATALOG TO STRING                                              #exportCatalog
    | RUN SCRIPT STRING                                                     #runScript
    ;
query
    :  queryNoWith
    ;
queryNoWith:
      queryTerm
      (LIMIT limit=(INTEGER_VALUE | ALL))?
    ;
queryTerm
    : queryPrimary                                                             #queryTermDefault
    ;

queryPrimary
    : querySpecification                   #queryPrimaryDefault
    | TABLE qualifiedName                  #table
    | VALUES expression (',' expression)*  #inlineTable
    | '(' queryNoWith  ')'                 #subquery
    ;
querySpecification
    : SELECT STREAM? selectItem (',' selectItem)*
      (INTO into=relationPrimary)?
      (FROM from=relation (',' relation)*)?
      (WINDOW  windowExpression)?
      (WHERE where=booleanExpression)?
      (GROUP BY groupBy)?
      (HAVING having=booleanExpression)?
    ;

解析成功后对于每个语句通过KsqlEngine.buildSingleQueryAst构建 Pair<String, Statement>.

接着查看planQueries:

  private List<QueryMetadata> planQueries(
      final List<Pair<String, Statement>> statementList,
      final KsqlConfig ksqlConfig,
      final Map<String, Object> overriddenProperties,
      final MetaStore tempMetaStore
  ) {
    // Logical plan creation from the ASTs
    List<Pair<String, PlanNode>> logicalPlans = queryEngine.buildLogicalPlans(
        tempMetaStore,
        statementList,
        ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties)
    );

    // Physical plan creation from logical plans.
    List<QueryMetadata> runningQueries = queryEngine.buildPhysicalPlans(
        logicalPlans,
        statementList,
        ksqlConfig,
        overriddenProperties,
        clientSupplier,
        true
    );

    for (QueryMetadata queryMetadata : runningQueries) {
      if (queryMetadata instanceof PersistentQueryMetadata) {
        livePersistentQueries.add(queryMetadata);
        PersistentQueryMetadata persistentQueryMetadata = (PersistentQueryMetadata) queryMetadata;
        persistentQueries.put(persistentQueryMetadata.getQueryId(), persistentQueryMetadata);
        metaStore.updateForPersistentQuery(persistentQueryMetadata.getQueryId().getId(),
                                           persistentQueryMetadata.getSourceNames(),
                                           persistentQueryMetadata.getSinkNames());
      }
      allLiveQueries.add(queryMetadata);
    }

    return runningQueries;
  }

这里KsqlEngine.planQueries通过QueryEngine.buildLogicalPlans及QueryEngine.buildPhysicalPlans构造逻辑计划以及物理计划,其中物理计划生成 kafka streams dsl。

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/QueryEngine.java
  List<Pair<String, PlanNode>> buildLogicalPlans(
      final MetaStore metaStore,
      final List<Pair<String, Statement>> statementList,
      final KsqlConfig config) {

    List<Pair<String, PlanNode>> logicalPlansList = new ArrayList<>();
    // TODO: the purpose of tempMetaStore here
    MetaStore tempMetaStore = metaStore.clone();

    for (Pair<String, Statement> statementQueryPair : statementList) {
      if (statementQueryPair.getRight() instanceof Query) {
        PlanNode logicalPlan = buildQueryLogicalPlan(
            statementQueryPair.getLeft(),
            (Query) statementQueryPair.getRight(),
            tempMetaStore, config
        );
        logicalPlansList.add(new Pair<>(statementQueryPair.getLeft(), logicalPlan));
      } else {
        logicalPlansList.add(new Pair<>(statementQueryPair.getLeft(), null));
      }

      log.info("Build logical plan for {}.", statementQueryPair.getLeft());
    }
    return logicalPlansList;
  }
  private PlanNode buildQueryLogicalPlan(
      final String sqlExpression,
      final Query query,
      final MetaStore tempMetaStore,
      final KsqlConfig config) {
    final QueryAnalyzer queryAnalyzer = new QueryAnalyzer(
        tempMetaStore,
        ksqlEngine.getFunctionRegistry(),
        config
    );
    final Analysis analysis = queryAnalyzer.analyze(sqlExpression, query);
    final AggregateAnalysis aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);
    final PlanNode logicalPlan
        = new LogicalPlanner(analysis, aggAnalysis, ksqlEngine.getFunctionRegistry()).buildPlan();
    if (logicalPlan instanceof KsqlStructuredDataOutputNode) {
      KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode =
          (KsqlStructuredDataOutputNode) logicalPlan;

      StructuredDataSource
          structuredDataSource =
          new KsqlStream(
              sqlExpression,
              ksqlStructuredDataOutputNode.getId().toString(),
              ksqlStructuredDataOutputNode.getSchema(),
              ksqlStructuredDataOutputNode.getKeyField(),
              ksqlStructuredDataOutputNode.getTimestampExtractionPolicy(),
              ksqlStructuredDataOutputNode.getKsqlTopic()
          );
      if (analysis.isDoCreateInto()) {
        tempMetaStore.putTopic(ksqlStructuredDataOutputNode.getKsqlTopic());
        tempMetaStore.putSource(structuredDataSource.cloneWithTimeKeyColumns());
      }
    }
    return logicalPlan;
  }
  List<QueryMetadata> buildPhysicalPlans(
      final List<Pair<String, PlanNode>> logicalPlans,
      final List<Pair<String, Statement>> statementList,
      final KsqlConfig ksqlConfig,
      final Map<String, Object> overriddenProperties,
      final KafkaClientSupplier clientSupplier,
      final boolean updateMetastore
  ) {

    List<QueryMetadata> physicalPlans = new ArrayList<>();

    for (int i = 0; i < logicalPlans.size(); i++) {

      Pair<String, PlanNode> statementPlanPair = logicalPlans.get(i);
      if (statementPlanPair.getRight() == null) {
        Statement statement = statementList.get(i).getRight();
        if (!(statement instanceof DdlStatement)) {
          throw new KsqlException("expecting a statement implementing DDLStatement but got: "
                                  + statement.getClass());
        }
        handleDdlStatement(
            statementPlanPair.getLeft(),
            (DdlStatement) statement
        );
      } else {
        buildQueryPhysicalPlan(
            physicalPlans, statementPlanPair, ksqlConfig,
            overriddenProperties, clientSupplier, updateMetastore
        );
      }

    }
    return physicalPlans;
  }
  private void buildQueryPhysicalPlan(
      final List<QueryMetadata> physicalPlans,
      final Pair<String, PlanNode> statementPlanPair,
      final KsqlConfig ksqlConfig,
      final Map<String, Object> overriddenProperties,
      final KafkaClientSupplier clientSupplier,
      final boolean updateMetastore
  ) {

    final StreamsBuilder builder = new StreamsBuilder();

    // Build a physical plan, in this case a Kafka Streams DSL
    final PhysicalPlanBuilder physicalPlanBuilder = new PhysicalPlanBuilder(
        builder,
        ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties),
        ksqlEngine.getTopicClient(),
        ksqlEngine.getFunctionRegistry(),
        overriddenProperties,
        updateMetastore,
        ksqlEngine.getMetaStore(),
        ksqlEngine.getSchemaRegistryClient(),
        ksqlEngine.getQueryIdGenerator(),
        new KafkaStreamsBuilderImpl(clientSupplier)
    );
    physicalPlans.add(physicalPlanBuilder.buildPhysicalPlan(statementPlanPair));
  }

QueryEngine.buildLogicalPlans主要由QueryAnalyzer实现,进行AstVisitor生成 Analysis,这里不做进一步介绍,后续将会详细讲解...
QueryEngine.buildPhysicalPlans主要由PhysicalPlanBuilder.buildPhysicalPlan实现
PhysicalPlanBuilder.buildQueryPhysicalPlan中构建了KafkaStreamsBuilderImpl对象后续使用。

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/physical/PhysicalPlanBuilder.java
  public QueryMetadata buildPhysicalPlan(final Pair<String, PlanNode> statementPlanPair) {
    final SchemaKStream resultStream = statementPlanPair
        .getRight()
        .buildStream(
            builder,
            ksqlConfig,
            kafkaTopicClient,
            functionRegistry,
            overriddenStreamsProperties,
            schemaRegistryClient
        );
    final OutputNode outputNode = resultStream.outputNode();
    boolean isBareQuery = outputNode instanceof KsqlBareOutputNode;

    // Check to make sure the logical and physical plans match up;
    // important to do this BEFORE actually starting up
    // the corresponding Kafka Streams job
    if (isBareQuery && !(resultStream instanceof QueuedSchemaKStream)) {
      throw new KsqlException(String.format(
          "Mismatch between logical and physical output; "
          + "expected a QueuedSchemaKStream based on logical "
          + "KsqlBareOutputNode, found a %s instead",
          resultStream.getClass().getCanonicalName()
      ));
    }
    String serviceId = getServiceId();
    String persistanceQueryPrefix =
        ksqlConfig.getString(KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG);
    String transientQueryPrefix =
        ksqlConfig.getString(KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG);

    if (isBareQuery) {
      return buildPlanForBareQuery(
          (QueuedSchemaKStream) resultStream,
          (KsqlBareOutputNode) outputNode,
          serviceId,
          transientQueryPrefix,
          statementPlanPair.getLeft()
      );

    } else if (outputNode instanceof KsqlStructuredDataOutputNode) {

      KsqlStructuredDataOutputNode ksqlStructuredDataOutputNode =
          (KsqlStructuredDataOutputNode) outputNode;
      ksqlStructuredDataOutputNode = ksqlStructuredDataOutputNode.cloneWithDoCreateInto(
          ((KsqlStructuredDataOutputNode) statementPlanPair.getRight()).isDoCreateInto()
      );
      return buildPlanForStructuredOutputNode(
          statementPlanPair.getLeft(),
          resultStream,
          ksqlStructuredDataOutputNode,
          serviceId,
          persistanceQueryPrefix,
          statementPlanPair.getLeft());


    } else {
      throw new KsqlException(
          "Sink data source of type: "
          + outputNode.getClass()
          + " is not supported.");
    }
  }
  private QueryMetadata buildPlanForBareQuery(
      final QueuedSchemaKStream schemaKStream,
      final KsqlBareOutputNode bareOutputNode,
      final String serviceId,
      final String transientQueryPrefix,
      final String statement
  ) {

    final String applicationId = addTimeSuffix(getBareQueryApplicationId(
        serviceId,
        transientQueryPrefix
    ));

    KafkaStreams streams = buildStreams(
        bareOutputNode,
        builder,
        applicationId,
        ksqlConfig,
        overriddenStreamsProperties
    );

    SchemaKStream sourceSchemaKstream = schemaKStream.getSourceSchemaKStreams().get(0);

    return new QueuedQueryMetadata(
        statement,
        streams,
        bareOutputNode,
        schemaKStream.getExecutionPlan(""),
        schemaKStream.getQueue(),
        (sourceSchemaKstream instanceof SchemaKTable)
            ? DataSource.DataSourceType.KTABLE : DataSource.DataSourceType.KSTREAM,
        applicationId,
        kafkaTopicClient,
        builder.build(),
        overriddenStreamsProperties
    );
  }

  private KafkaStreams buildStreams(
      final OutputNode outputNode,
      final StreamsBuilder builder,
      final String applicationId,
      final KsqlConfig ksqlConfig,
      final Map<String, Object> overriddenProperties
  ) {
    final Map<String, Object> newStreamsProperties
        = new HashMap<>(ksqlConfig.getKsqlStreamConfigProps());
    newStreamsProperties.putAll(overriddenProperties);
    newStreamsProperties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);

    updateListProperty(
        newStreamsProperties,
        StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),
        ConsumerCollector.class.getCanonicalName()
    );
    updateListProperty(
        newStreamsProperties,
        StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),
        ProducerCollector.class.getCanonicalName()
    );
    return kafkaStreamsBuilder.buildKafkaStreams(builder, new StreamsConfig(newStreamsProperties));
  }

这里PhysicalPlanBuilder.buildPhysicalPlan调用标准输出形式 的PhysicalPlanBuilder.buildPlanForBareQuery,进一步调用KafkaStreamsBuilderImpl.buildKafkaStreams生成KafkaStreams

[larluo@larluo-nixos:~/work/git/my-repo/ksql]$ cat ./ksql-engine/src/main/java/io/confluent/ksql/physical/KafkaStreamsBuilderImpl.java
  public KafkaStreams buildKafkaStreams(StreamsBuilder builder, StreamsConfig conf) {
    return new KafkaStreams(builder.build(), conf, clientSupplier);
  }

最终QueryStreamWriter.ctor运行构建 好的kafka streams dsl完成 实时查询。。。

上一篇下一篇

猜你喜欢

热点阅读