Spark 学习笔记(四)-sparkSQL入门

2018-04-26  本文已影响0人  vision_zhang

入门
起点:SparkSession
Spark中所有功能的入口点就是这个SparkSession类。要创建一个基本的SparkSession,只需使用SparkSession.builder():

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
                .builder()
                .appName("java spark Sql basic example")
                .config("spark.some.config.option","some-value")
                .master("local")
                .getOrCreate();
        Dataset<Row> df = spark.read().json("D:\\resources\\people.json");
        df.show();

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

SparkSession在Spark 2.0中为Hive特性提供内置支持,包括使用HiveQL编写查询,访问Hive UDF以及从Hive表读取数据的能力。要使用这些功能,您不需要有现有的Hive安装程序。

创建数据框

使用一个 SparkSession,应用程序可以从现有的RDD,Hive表或Spark数据源创建DataFrame 。

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
无类型数据集操作(又名DataFrame操作)

DataFrames为Scala,Java,Python和R中的结构化数据操作提供了一个特定领域的语言。
在Spark 2.0中,DataFrames只是RowScala和Java API中的数据集。这些操作也被称为“无类型转换”,与强类型的Scala / Java数据集中的“类型转换”不同。
一些使用数据集的结构化数据处理的基本示例:

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

除了简单的列引用和表达式之外,数据集还具有丰富的函数库,包括字符串操作,日期算术,通用数学运算等等。DataFrame函数参考中提供了完整的列表。

以编程方式运行SQL查询

SparkSession上的sql函数允许应用程序能以编程方式运行SQL查询,并将结果返回为Dataset<Row>

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
全局临时视图

Spark SQL中的临时视图是会话范围的,如果创建它的会话终止,将会消失。如果您希望在所有会话之间共享一个临时视图并保持活动状态,直到Spark应用程序终止,则可以创建一个全局临时视图。全局临时视图与系统保存的数据库绑定global_temp,我们必须使用限定的名称来引用它,例如SELECT * FROM global_temp.view

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session跨会话
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+
创建数据集

数据集类似于RDD,但是,不使用Java序列化或Kryo,而是使用专门的编码器对对象进行序列化以便通过网络进行处理或传输。虽然编码器和标准序列化都负责将对象转换为字节,但编码器是动态生成的代码,并且使用Spark运行执行的操作(如过滤,排序和散列)格式,而无需将字节反序列化回对象。

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
//编码器编码

Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
    (MapFunction<Integer, Integer>) value -> value + 1,
    integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

与RDD进行互操作

Spark SQL支持将现有RDD转换为Datasets的两种不同方法。第一种方法使用反射来推断包含特定类型对象的RDD的模式。
创建数据集的第二种方法是通过编程接口,允许您构建模式,然后将其应用于现有的RDD。虽然这个方法比较冗长,但是它允许你在构造数据集的时候直到运行时才知道列和它们的类型。

Spark SQL支持自动将JavaBean的RDD 转换为DataFrame
Spark SQL不支持包含Map字段的JavaBean 。不过嵌套的JavaBean和Listor Array 字段是受支持的。您可以通过创建一个实现Serializable的类来创建JavaBean,并为其所有字段设置getter和setter。

// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read()
  .textFile("examples/src/main/resources/people.txt")
  .javaRDD()
  .map(line -> {
    String[] parts = line.split(",");
    Person person = new Person();
    person.setName(parts[0]);
    person.setAge(Integer.parseInt(parts[1].trim()));
    return person;
  });

// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");

// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map(
    (MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),
    stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+
以编程方式指定模式

当不能提前定义JavaBean类时(例如,记录的结构是用字符串编码的,或者文本数据集将被解析,字段对于不同的用户来说投影会不同),Dataset<Row>可以用三个步骤以编程方式创建 。

Row从原RDD 创建一个RDD;
在步骤1创建的RDD中StructType与Rows 结构匹配 的模式。
Row通过SparkSession中的createDataFrame提供的方法将模式应用于 的RDD

// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext()
  .textFile("examples/src/main/resources/people.txt", 1)
  .toJavaRDD();

// The schema is encoded in a string
String schemaString = "name age";

// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {
  StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
  fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);

// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {
  String[] attributes = record.split(",");
  return RowFactory.create(attributes[0], attributes[1].trim());
});

// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");

// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map(
    (MapFunction<Row, String>) row -> "Name: " + row.getString(0),
    Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+
聚合

该内置功能DataFrames提供聚合,例如count(),countDistinct(),avg(),max(),min(),等。这些功能是专为DataFrames设计

非类型化的用户定义的聚合函数
public static class MyAverage extends UserDefinedAggregateFunction {

  private StructType inputSchema;
  private StructType bufferSchema;

  public MyAverage() {
    List<StructField> inputFields = new ArrayList<>();
    inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.LongType, true));
    inputSchema = DataTypes.createStructType(inputFields);

    List<StructField> bufferFields = new ArrayList<>();
    bufferFields.add(DataTypes.createStructField("sum", DataTypes.LongType, true));
    bufferFields.add(DataTypes.createStructField("count", DataTypes.LongType, true));
    bufferSchema = DataTypes.createStructType(bufferFields);
  }
  // Data types of input arguments of this aggregate function
  public StructType inputSchema() {
    return inputSchema;
  }
  // Data types of values in the aggregation buffer
  public StructType bufferSchema() {
    return bufferSchema;
  }
  // The data type of the returned value
  public DataType dataType() {
    return DataTypes.DoubleType;
  }
  // Whether this function always returns the same output on the identical input
  public boolean deterministic() {
    return true;
  }
  // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
  // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
  // the opportunity to update its values. Note that arrays and maps inside the buffer are still
  // immutable.
  public void initialize(MutableAggregationBuffer buffer) {
    buffer.update(0, 0L);
    buffer.update(1, 0L);
  }
  // Updates the given aggregation buffer `buffer` with new input data from `input`
  public void update(MutableAggregationBuffer buffer, Row input) {
    if (!input.isNullAt(0)) {
      long updatedSum = buffer.getLong(0) + input.getLong(0);
      long updatedCount = buffer.getLong(1) + 1;
      buffer.update(0, updatedSum);
      buffer.update(1, updatedCount);
    }
  }
  // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
  public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
    long mergedSum = buffer1.getLong(0) + buffer2.getLong(0);
    long mergedCount = buffer1.getLong(1) + buffer2.getLong(1);
    buffer1.update(0, mergedSum);
    buffer1.update(1, mergedCount);
  }
  // Calculates the final result
  public Double evaluate(Row buffer) {
    return ((double) buffer.getLong(0)) / buffer.getLong(1);
  }
}

// Register the function to access it
spark.udf().register("myAverage", new MyAverage());

Dataset<Row> df = spark.read().json("examples/src/main/resources/employees.json");
df.createOrReplaceTempView("employees");
df.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

Dataset<Row> result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees");
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
类型安全的用户定义的聚合函数

用于强类型数据集的用户定义聚合关联Aggregator抽象类。例如,类型安全的用户定义的平均值可能如下所示:

public static class Employee implements Serializable {
  private String name;
  private long salary;

  // Constructors, getters, setters...

}

public static class Average implements Serializable  {
  private long sum;
  private long count;

  // Constructors, getters, setters...

}

public static class MyAverage extends Aggregator<Employee, Average, Double> {
  // A zero value for this aggregation. Should satisfy the property that any b + zero = b
  public Average zero() {
    return new Average(0L, 0L);
  }
  // Combine two values to produce a new value. For performance, the function may modify `buffer`
  // and return it instead of constructing a new object
  public Average reduce(Average buffer, Employee employee) {
    long newSum = buffer.getSum() + employee.getSalary();
    long newCount = buffer.getCount() + 1;
    buffer.setSum(newSum);
    buffer.setCount(newCount);
    return buffer;
  }
  // Merge two intermediate values
  public Average merge(Average b1, Average b2) {
    long mergedSum = b1.getSum() + b2.getSum();
    long mergedCount = b1.getCount() + b2.getCount();
    b1.setSum(mergedSum);
    b1.setCount(mergedCount);
    return b1;
  }
  // Transform the output of the reduction
  public Double finish(Average reduction) {
    return ((double) reduction.getSum()) / reduction.getCount();
  }
  // Specifies the Encoder for the intermediate value type
  public Encoder<Average> bufferEncoder() {
    return Encoders.bean(Average.class);
  }
  // Specifies the Encoder for the final output value type
  public Encoder<Double> outputEncoder() {
    return Encoders.DOUBLE();
  }
}

Encoder<Employee> employeeEncoder = Encoders.bean(Employee.class);
String path = "examples/src/main/resources/employees.json";
Dataset<Employee> ds = spark.read().json(path).as(employeeEncoder);
ds.show();
// +-------+------+
// |   name|salary|
// +-------+------+
// |Michael|  3000|
// |   Andy|  4500|
// | Justin|  3500|
// |  Berta|  4000|
// +-------+------+

MyAverage myAverage = new MyAverage();
// Convert the function to a `TypedColumn` and give it a name
TypedColumn<Employee, Double> averageSalary = myAverage.toColumn().name("average_salary");
Dataset<Double> result = ds.select(averageSalary);
result.show();
// +--------------+
// |average_salary|
// +--------------+
// |        3750.0|
// +--------------+
数据源

Spark SQL支持通过DataFrame接口在各种数据源上进行操作。DataFrame可以使用关系变换进行操作,也可以用来创建临时视图。将DataFrame注册为临时视图允许您对其数据运行SQL查询。本节介绍使用Spark Data Sources加载和保存数据的一般方法,然后介绍可用于内置数据源的特定选项。

通用加载/保存功能
Dataset<Row> usersDF = spark.read().load("examples/src/main/resources/users.parquet");
usersDF.select("name", "favorite_color").write().save("namesAndFavColors.parquet");
手动指定选项

您也可以手动指定将要使用的数据源以及您想要传递给数据源的其他选项。数据源通过其全名指定(即org.apache.spark.sql.parquet),但内置的来源,你也可以使用自己的短名称(json,parquet,jdbc,orc,libsvm,csv,text)。从任何数据源类型加载的数据框可以使用此语法转换为其他类型。

Dataset<Row> peopleDF =
  spark.read().format("json").load("examples/src/main/resources/people.json");
peopleDF.select("name", "age").write().format("parquet").save("namesAndAges.parquet");
直接在文件上运行SQL

您可以使用SQL直接查询该文件,而不是使用读取API将文件加载到DataFrame中并进行查询。

Dataset<Row> sqlDF =
  spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`");
保存模式

保存操作可以选择一个SaveMode,指定如何处理现有的数据(如果存在)。认识到这些保存模式不使用任何锁定而不是原子是很重要的。另外,执行时Overwrite,数据在写出新数据之前将被删除。

SaveMode.ErrorIfExists (默认) "error" (默认)    将DataFrame保存到数据源时,如果数据已经存在,则预计会抛出异常。
SaveMode.Append "append"    将DataFrame保存到数据源时,如果data / table已经存在,则DataFrame的内容将被追加到现有数据中。
SaveMode.Overwrite  "overwrite" 覆盖模式意味着将DataFrame保存到数据源时,如果data / table已经存在,则现有数据将被DataFrame的内容覆盖。
SaveMode.Ignore "ignore"    忽略模式意味着,当将DataFrame保存到数据源时,如果数据已经存在,保存操作将不会保存DataFrame的内容,也不会更改现有数据。这与CREATE TABLE IF NOT EXISTSSQL中的类似。
保存到持久表

DataFrames也可以使用该saveAsTable 命令将其作为持久表保存到Hive Metastore中。请注意,现有的Hive部署对于使用此功能不是必需的。Spark将为您创建一个默认的本地Hive Metastore(使用Derby)。与createOrReplaceTempView命令不同的是, saveAsTable将实现DataFrame的内容并创建指向Hive Metastore中的数据的指针。即使您的Spark程序重新启动后,持久性表格仍然存在,只要您保持与同一Metastore的连接。用于持久表的DataFrame可以通过使用表的名称调用tablea方法来创建SparkSession。

对于基于文件的数据源,例如文本,parquet,json等,您可以通过path选项指定自定义表格路径 ,例如df.write.option(“path”, “/some/path”).saveAsTable(“t”)。当表被删除时,自定义表路径将不会被删除,表数据仍然存在。如果没有指定自定义表格路径,则Spark将把数据写入仓库目录下的默认表格路径。当表被删除时,默认的表路径也将被删除。
持久数据源表具有存储在Hive Metastore中的每个分区元数据。这带来了几个好处:

由于Metastore只能返回查询所需的分区,因此不再需要发现第一个查询的所有分区。
Hive DDL如ALTER TABLE PARTITION … SET LOCATION现在可用于使用Datasource API创建的表。
请注意,创建外部数据源表(具有path选项的那些表)时,默认情况下不会收集分区信息。要同步Metastore中的分区信息,可以调用MSCK REPAIR TABLE。

分段,分类和分区

对于基于文件的数据源,也可以对输出进行分类和分类。分段和排序仅适用于持久表

peopleDF.write().bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed");

而分区则可以同时使用save和saveAsTable使用数据集API。

usersDF
  .write()
  .partitionBy("favorite_color")
  .format("parquet")
  .save("namesPartByColor.parquet");

可以对单个表使用分区和分区:

peopleDF
  .write()
  .partitionBy("favorite_color")
  .bucketBy(42, "name")
  .saveAsTable("people_partitioned_bucketed");

partitionBy创建一个目录结构,如“ 分区发现”部分所述。因此,对基数高的柱子的适用性有限。相比之下 bucketBy,通过固定数量的桶分配数据,并且可以在大量唯一值无界时使用。

分区发现

表分区是像Hive这样的系统中常用的优化方法。在分区表中,数据通常存储在不同的目录中,分区列值在每个分区目录的路径中编码。现在,Parquet数据源能够自动发现和推断分区信息。例如,我们可以使用以下目录结构,两个额外的列gender和country分区列将所有先前使用的人口数据存储到分区表中:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通过传递path/to/table给SparkSession.read.parquet或者SparkSession.read.load,Spark SQL将自动从路径中提取分区信息。现在,返回的DataFrame的模式变成:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

请注意,分区列的数据类型是自动推断的。目前支持数字数据类型和字符串类型。有时用户可能不希望自动推断分区列的数据类型。对于这些用例,可以使用spark.sql.sources.partitionColumnTypeInference.enabled默认 的自动类型推断来配置true。当禁用类型推断时,字符串类型将用于分区列。

JSON数据集

Spark SQL可以自动推断JSON数据集的模式,并将其作为一个Dataset。这个转换可以SparkSession.read().json()在一个Dataset或者一个JSON文件上完成。

请注意,作为json文件提供的文件不是典型的JSON文件。每行必须包含一个单独的,独立的有效JSON对象。有关更多信息,请参阅 JSON行文本格式,也称为换行符分隔的JSON。

对于常规的多行JSON文件,请将该multiLine选项设置为true。

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
Dataset<Row> people = spark.read().json("examples/src/main/resources/people.json");

// The inferred schema can be visualized using the printSchema() method
people.printSchema();
// root
//  |-- age: long (nullable = true)
//  |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
people.createOrReplaceTempView("people");

// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> namesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");
namesDF.show();
// +------+
// |  name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset<String> storing one JSON object per string.
List<String> jsonData = Arrays.asList(
        "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// |        address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

JDBC到其他数据库

故障排除

性能调整
对于某些工作负载,可以通过在内存中缓存数据或打开一些实验选项来提高性能。

在内存中缓存数据

其他配置选项

分布式SQL引擎

运行Thrift JDBC / ODBC服务器

运行Spark SQL CLI

详情参见
http://spark.apache.org/docs/latest/sql-programming-guide.html

上一篇 下一篇

猜你喜欢

热点阅读