Apache Beam SQL

2019-03-28  本文已影响0人  HelloWide

Beam不仅支持java,python还支持SQL分析,非常类似于Spark SQL;
Beam SQL 现在只支持 Java,底层是 Apache Calcite 的一个动态数据管理框架,用于大数据处理和一些流增强功能,它允许你自定义数据库功能。例如 Hive 使用了 Calcite 的查询优化,当然还有 Flink 解析和流 SQL 处理。Beam 在这之上添加了额外的扩展,以便轻松利用 Beam 的统一批处理 / 流模型以及对复杂数据类型的支持。


流程图

简化如下:


简化流程

Test Code

说明:所有案例都在源码中有;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.xxx.sqlbeam;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.*;

import javax.annotation.Nullable;

/**
 * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
 *
 * <p>Run the example from the Beam source root with
 *
 * <pre>
 *   ./gradlew :beam-sdks-java-extensions-sql:runBasicExample
 * </pre>
 *
 * <p>The above command executes the example locally using direct runner. Running the pipeline in
 * other runners require additional setup and are out of scope of the SQL examples. Please consult
 * Beam documentation on how to run pipelines.
 */
class BeamSqlExample {
  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
    options.setRunner(DirectRunner.class);
    Pipeline p = Pipeline.create(options);


    //define the input row format
    Schema type =
        Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();

    Row row1 = Row.withSchema(type).addValues(1, "row", 1.0).build();
    Row row2 = Row.withSchema(type).addValues(2, "row", 2.0).build();
    Row row3 = Row.withSchema(type).addValues(3, "row", 3.0).build();

    //create a source PCollection with Create.of();
    PCollection<Row> inputTable =
        PBegin.in(p)
            .apply(
                Create.of(row1, row2, row3)
                    .withSchema(
                        type, SerializableFunctions.identity(), SerializableFunctions.identity()));

    //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
    PCollection<Row> outputStream =
        inputTable.apply(SqlTransform.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));

    // print the output record of case 1;
    outputStream.apply(
        "log_result",
        MapElements.via(
            new SimpleFunction<Row, Void>() {
              @Override
              public @Nullable Void apply(Row input) {
                // expect output:
                //  PCOLLECTION: [3, row, 3.0]
                //  PCOLLECTION: [2, row, 2.0]
                System.out.println("PCOLLECTION: " + input.getValues());
                return null;
              }
            }));

    // Case 2. run the query with SqlTransform.query over result PCollection of case 1.
    PCollection<Row> outputStream2 =
        PCollectionTuple.of(new TupleTag<>("CASE1_RESULT"), outputStream)
            .apply(SqlTransform.query("select c2, sum(c3) from CASE1_RESULT group by c2"));

    // print the output record of case 2;
    outputStream2.apply(
        "log_result",
        MapElements.via(
            new SimpleFunction<Row, Void>() {
              @Override
              public @Nullable Void apply(Row input) {
                // expect output:
                //  CASE1_RESULT: [row, 5.0]
                System.out.println("CASE1_RESULT: " + input.getValues());
                return null;
              }
            }));

    p.run().waitUntilFinish();
  }
}


源码&Demo;https://github.com/apache/beam/tree/master
参考链接:https://beam.apache.org/documentation/dsls/sql/data-types/
Apache Beam: 一个高级且统一的编程模型
让批处理和流数据处理的作业在任何执行引擎上都可以运行.

上一篇下一篇

猜你喜欢

热点阅读