数据仓库建设hive

数据仓库之Hive快速入门 - 离线&实时数仓架构

2020-11-09  本文已影响0人  端碗吹水

数据仓库VS数据库

数据仓库的定义:

数据仓库VS数据库:

OLTP VS OLAP:

常规的数仓架构:


image.png

为什么建设数据仓库:

数据仓库建设目标:

如何实现:


数据仓库分层建设

数仓建设背景:

为什么进行数仓分层:

常见的分层含义:


image.png

STG层

ODS层

DWD层

DWS层

DIM层

DM层

分层之间的数据流转:


image.png

Hive是什么

Hive简介:

Hive的简单架构图:


image.png

Hive VS Hadoop:

Hive与关系数据库Mysql的区别

产品定位

Hive是数据仓库,为海量数据的离线分析设计的,不支持OLTP(联机事务处理所需的关键功能ACID,而更接近于OLAP(联机分析技术)),适给离线处理大数据集。而MySQL是关系型数据库,是为实时业务设计的。

可扩展性

Hive中的数据存储在HDFS(Hadoop的分布式文件系统),metastore元数据一 般存储在独立的关系型数据库中,而MySQL则是服务器本地的文件系统。因此Hive具有良好的可扩展性,数据库由于ACID语义的严格限制,扩展性十分有限。

读写模式

Hive为读时模式,数据的验证则是在查询时进行的,这有利于大数据集的导入,读时模式使数据的加载非常迅速,数据的加载仅是文件复制或移动。MySQL为写时模式,数据在写入数据库时对照模式检查。写时模式有利于提升查询性能,因为数据库可以对列进行索引。

数据更新

Hive是针对数据仓库应用设计的,而数仓的内容是读多写少的,Hive中不支持对数据进行改写,所有数据都是在加载的时候确定好的。而数据库中的数据通常是需要经常进行修改的。

索引

Hive支持索引,但是Hive的索引与关系型数据库中的索引并不相同,比如,Hive不支持主键或者外键。Hive提供了有限的索引功能,可以为-些字段建立索引,一张表的索引数据存储在另外一张表中。由于数据的访问延迟较高,Hive不适合在线数据查询。数据库在少星的特定条件的数据访问中,索引可以提供较低的延迟。

计算模型

Hive默认使用的模型是MapReduce(也可以on spark、on tez),而MySQL使用的是自己设计的Executor计算模型

image.png

Hive安装部署

参考:


Hive基本使用(上)Hive数据类型/分区/基础语法

Hive数据类型:

Hive分区:

Hive常用基础语法:

写个Python脚本生成一些测试数据:

import json
import random
import uuid

name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline')
hobby = ('reading', 'play', 'dancing', 'sing')
subject = ('math', 'chinese', 'english', 'computer')

data = []
for item in name:
    scores = {key: random.randint(60, 100) for key in subject}
    data.append("|".join([uuid.uuid4().hex, item, ','.join(
        random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])]))

with open('test.csv', 'w') as f:
    f.write('\n'.join(data))

执行该脚本,生成测试数据文件:

[root@hadoop01 ~/py-script]# python3 gen_data.py
[root@hadoop01 ~/py-script]# ll -h
...
-rw-r--r--. 1 root root  745 11月  9 11:09 test.csv
[root@hadoop01 ~/py-script]# 

我们可以看一下生成的数据:

[root@hadoop01 ~/py-script]# cat test.csv 
f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77

...

创建测试用的数据库:

0: jdbc:hive2://localhost:10000> create database hive_test;
No rows affected (0.051 seconds)
0: jdbc:hive2://localhost:10000> use hive_test;
No rows affected (0.06 seconds)
0: jdbc:hive2://localhost:10000> 

创建测试表:

CREATE TABLE test(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

将本地数据加载到Hive中:

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test;
No rows affected (0.785 seconds)
0: jdbc:hive2://localhost:10000> 

查询数据:


image.png

Hive将HQL转换为MapReduce的流程

了解了Hive中的SQL基本操作之后,我们来看看Hive是如何将SQL转换为MapReduce任务的,整个转换过程分为六个阶段:

  1. Antr定义SQL的语法规则,完成SQL词法,语法解析,将SQL 转化为抽象语法树AST Tree
  2. 遍历AST Tree,抽象出查询的基本组成单元QueryBlock
  3. 遍历QueryBlock,翻译为执行操作树OperatorTree
  4. 逻辑层优化器进行OperatorTree变换,合并不必要的ReduceSinkOperator,减少shufle数据量
  5. 遍历OperatorTree,翻译为MapReduce任务
  6. 物理层优化器进行MapReduce任务的变换,生成最终的执行计划
image.png

与普通SQL一样,我们可以通过在HQL前面加上explain关键字查看HQL的执行计划:

explain select * from test where id > 10 limit 1000

Hive会将这条语句解析成一个个的Operator,Operator就是Hive解析之后的最小单元,每个Operator其实都是对应一个MapReduce任务。例如,上面这条语句被Hive解析后,就是由如下Operator组成:


image.png

同时,Hive实现了优化器对这些Operator的顺序进行优化,帮助我们提升查询效率。Hive中的优化器主要分为三类:


Hive基本使用(中)内部表/外部表/分区表/分桶表

内部表:

和传统数据库的Table概念类似,对应HDFS上存储目录,删除表时,删除元数据和表数据。内部表的数据,会存放在HDFS中的特定的位置中,可以通过配置文件指定。当删除表时,数据文件也会一并删除。适用于临时创建的中间表。

外部表:

指向已经存在的HDFS数据,删除时只删除元数据信息。适用于想要在Hive之外使用表的数据的情况,当你删除External Table时,只是删除了表的元数据,它的数据并没有被删除。适用于数据多部门共享。建表时使用create external table。指定external关键字即可。

分区表:

Partition对应普通数据库对Partition列的密集索引,将数据按照Partition列存储到不同目录,便于并行分析,减少数据量。分区表创建表的时候需要指定分区字段。

分区字段与普通字段的区别:分区字段会在HDFS表目录下生成一个分区字段名称的目录,而普通字段则不会,查询的时候可以当成普通字段来使用,一般不直接和业务直接相关。

分桶表:

对数据进行hash,放到不同文件存储,方便抽样和join查询。可以将内部表,外部表和分区表进一步组织成桶表,可以将表的列通过Hash算法进一步分解成不同的文件存储。

对于内部表和外部表的概念和应用场景我们很容易理解,我们需要重点关注一下分区表和分桶表。 我们为什么要建立分区表和分桶表呢?HQL通过where子句来限制条件提取数据,那么与其遍历一张大表,不如将这张大表拆分成多个小表,并通过合适的索引来扫描表中的一小部分,分区和分桶都是采用了这种理念。

分区会创建物理目录,并且可以具有子目录(通常会按照时间、地区分区),目录名以 分区名=值 形式命名,例如:create_time=202011。分区名会作为表中的伪列,这样通过where字句中加入分区的限制可以在仅扫描对应子目录下的数据。通过 partitioned by (feld1 type, ...) 创建分区列。

分桶可以继续在分区的基础上再划分小表,分桶根据哈希值来确定数据的分布(即MapReducer中的分区),比如分区下的一部分数据可以根据分桶再分为多个桶,这样在查询时先计算对应列的哈希值并计算桶号,只需要扫描对应桶中的数据即可。分桶通过clustered by(field) into n buckets创建。

接下来简单演示下这几种表的操作,首先将上一小节生成的测试数据文件上传到hdfs中:

[root@hadoop01 ~]# hdfs dfs -mkdir /test
[root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test
[root@hadoop01 ~]# hdfs dfs -ls /test
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /test/test.csv
[root@hadoop01 ~]# 

内部表

建表SQL:

CREATE TABLE test_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

将hdfs数据加载到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table;
No rows affected (0.169 seconds)
0: jdbc:hive2://localhost:10000> 

查看创建的表存储在hdfs的哪个目录下:

0: jdbc:hive2://localhost:10000> show create table test_table;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_table`(                         |
|   `user_id` string,                                |
|   `user_name` string,                              |
|   `hobby` array<string>,                           |
|   `scores` map<string,int>)                        |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'collection.delim'=',',                          |
|   'field.delim'='|',                               |
|   'line.delim'='\n',                               |
|   'mapkey.delim'=':',                              |
|   'serialization.format'='|')                      |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hadoop.mapred.TextInputFormat'       |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
| LOCATION                                           |
|   'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'transient_lastDdlTime'='1604893190')            |
+----------------------------------------------------+
22 rows selected (0.115 seconds)
0: jdbc:hive2://localhost:10000> 

在hdfs中可以查看到数据文件:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv
[root@hadoop01 ~]# 

删除表:

0: jdbc:hive2://localhost:10000> drop table test_table;
No rows affected (0.107 seconds)
0: jdbc:hive2://localhost:10000> 

查看hdfs会发现该表所对应的存储目录也一并被删除了:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table
drwxr-xr-x   - root supergroup          0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test
[root@hadoop01 ~]# 

外部表

建表SQL,与内部表的区别就在于external关键字:

CREATE external TABLE external_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

将数据文件加载到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table;
No rows affected (0.182 seconds)
0: jdbc:hive2://localhost:10000> 

此时会发现hdfs中的数据文件会被移动到hive的目录下:

[root@hadoop01 ~]# hdfs dfs -ls /test
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]# 

删除表:

0: jdbc:hive2://localhost:10000> drop table external_table;
No rows affected (0.112 seconds)
0: jdbc:hive2://localhost:10000> 

查看hdfs会发现该表所对应的存储目录仍然存在:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]# 

分区表

建表语句:

CREATE TABLE partition_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
PARTITIONED BY (create_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

将数据文件加载到Hive中,并指定分区:

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011');
No rows affected (0.747 seconds)
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012');
No rows affected (0.347 seconds)
0: jdbc:hive2://localhost:10000> 

执行如下sql,可以从不同的分区统计结果:

0: jdbc:hive2://localhost:10000> select count(*) from partition_table;
+------+
| _c0  |
+------+
| 16   |
+------+
1 row selected (15.881 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (14.639 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (15.555 seconds)
0: jdbc:hive2://localhost:10000> 

分区表在hdfs中的存储结构:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
drwxr-xr-x   - root supergroup          0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv
[root@hadoop01 ~]# 

分桶表

建表语句:

CREATE TABLE bucket_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
clustered by (user_name) sorted by (user_name) into 2 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

test表中的数据插入到bucket_table中:

0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test;
No rows affected (17.393 seconds)
0: jdbc:hive2://localhost:10000> 

抽样查询:


image.png

分桶表在hdfs的存储目录如下:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table
Found 2 items
-rw-r--r--   1 root supergroup        465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0
-rw-r--r--   1 root supergroup        281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0
[root@hadoop01 ~]# 

Hive基本使用(下)内置函数/自定义函数/实现UDF

Hive常见内置函数:

查询引擎都自带了一部分函数来帮助我们解决查询过程当中一些复杂的数据计算或者数据转换操作,但是有时候自带的函数功能不能满足业务的需要。这时候就需要我们自己开发自定义的函数来辅助完成了,这就是所谓的用户自定义函数UDF(User-Defined Functions)。Hive支持三类自定义函数:

UDF函数其实就是一段遵循一定接口规范的程序。在执行过程中Hive将SQL转换为MapReduce程序,在执行过程当中在执行我们的UDF函数。

本小节简单演示下自定义UDF函数,首先创建一个空的Maven项目,然后添加hive-exec依赖,版本与你安装的Hive版本需对应上。完整的pom文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hive-udf-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

首先创建一个继承UDF的类,我们实现的这个自定义函数功能就是简单的获取字段的长度:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class StrLen extends UDF {

    public int evaluate(final Text col) {
        return col.getLength();
    }
}

以上这种自定义函数只能支持处理普通类型的数据,如果要对复杂类型的数据做处理则需要继承GenericUDF,并实现其抽象方法。例如,我们实现一个对测试数据中的scores字段求平均值的函数:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.text.DecimalFormat;

public class AvgScore extends GenericUDF {

    /**
     * 函数的名称
     */
    private static final String FUNC_NAME = "AVG_SCORE";

    /**
     * 函数所作用的字段类型,这里是map类型
     */
    private transient MapObjectInspector mapOi;

    /**
     * 控制精度只返回两位小数
     */
    DecimalFormat df = new DecimalFormat("#.##");

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // 在此方法中可以做一些前置的校验,例如检测函数参数个数、检测函数参数类型
        mapOi = (MapObjectInspector) objectInspectors[0];
        // 指定函数的输出类型
        return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        // 函数的核心逻辑,取出map中的value进行求平均值,并返回一个Double类型的结果值
        Object o = deferredObjects[0].get();
        double v = mapOi.getMap(o).values().stream()
                .mapToDouble(a -> Double.parseDouble(a.toString()))
                .average()
                .orElse(0.0);

        return Double.parseDouble(df.format(v));
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "func(map)";
    }
}

对项目进行打包,并上传到服务器中:

[root@hadoop01 ~/jars]# ls
hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]# 

将jar包上传到hdfs中:

[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs
[root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs
[root@hadoop01 ~/jars]# hdfs dfs -ls /udfs
Found 1 items
-rw-r--r--   1 root supergroup       4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]# 

在Hive中添加该jar包:

0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;
No rows affected (0.022 seconds)
0: jdbc:hive2://localhost:10000> 

然后注册临时函数,临时函数只会在当前的session中生效:

0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen";
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore";
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000> 

使用自定义函数处理:

0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test;
+------------+---------+------------+
| user_name  | length  | avg_score  |
+------------+---------+------------+
| Tom        | 3       | 80.25      |
| Jerry      | 5       | 77.5       |
| Jim        | 3       | 83.75      |
| Angela     | 6       | 84.5       |
| Ann        | 3       | 90.0       |
| Bella      | 5       | 69.25      |
| Bonnie     | 6       | 76.5       |
| Caroline   | 8       | 84.5       |
+------------+---------+------------+
8 rows selected (0.083 seconds)
0: jdbc:hive2://localhost:10000> 

删除已注册的临时函数:

0: jdbc:hive2://localhost:10000> drop temporary function strlen;
No rows affected (0.01 seconds)
0: jdbc:hive2://localhost:10000> drop temporary function avg_score;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000> 

临时函数只会在当前的session中生效,如果需要注册成永久函数则只需要把TEMPORARY关键字给去掉即可。如下所示:

0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.049 seconds)
0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> 

删除永久函数也是把TEMPORARY关键字给去掉即可。如下所示:

0: jdbc:hive2://localhost:10000> drop function strlen;
No rows affected (0.031 seconds)
0: jdbc:hive2://localhost:10000> drop function avg_score;
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> 

Hive存储结构 - OrcFile

Hive支持的存储格式:


image.png

我们都知道关系型数据库基本是使用行式存储作为存储格式,而大数据领域更多的是采用列式存储,因为大数据分析场景中通常需要读取大量行,但是只需要少数的几个列。这也是为什么通常使用OrcFile作为Hive的存储格式的原因。由此可见,大数据的绝大部分应用场景都是OLAP场景。

OLAP场景的特点

读多于写

不同于事务处理(OLTP)的场景,比如电商场景中加购物车、下单、支付等需要在原地进行大量insert、update、delete操作,数据分析(OLAP)场景通常是将数据批量导入后,进行任意维度的灵活探索、BI工具洞察、报表制作等。

数据一次性写入后,分析师需要尝试从各个角度对数据做挖掘、分析,直到发现其中的商业价值、业务变化趋势等信息。这是一个需要反复试错、不断调整、持续优化的过程,其中数据的读取次数远多于写入次数。这就要求底层数据库为这个特点做专门设计,而不是盲目采用传统数据库的技术架构。

大宽表,读大量行但是少量列,结果集较小

在OLAP场景中,通常存在一张或是几张多列的大宽表,列数高达数百甚至数千列。对数据分析处理时,选择其中的少数几列作为维度列、其他少数几列作为指标列,然后对全表或某一个较大范围内的数据做聚合计算。这个过程会扫描大量的行数据,但是只用到了其中的少数列。而聚合计算的结果集相比于动辄数十亿的原始数据,也明显小得多。

数据批量写入,且数据不更新或少更新

OLTP类业务对于延时(Latency)要求更高,要避免让客户等待造成业务损失;而OLAP类业务,由于数据量非常大,通常更加关注写入吞吐(Throughput),要求海量数据能够尽快导入完成。一旦导入完成,历史数据往往作为存档,不会再做更新、删除操作。

无需事务,数据一致性要求低

OLAP类业务对于事务需求较少,通常是导入历史日志数据,或搭配一款事务型数据库并实时从事务型数据库中进行数据同步。多数OLAP系统都支持最终一致性。

灵活多变,不适合预先建模

分析场景下,随着业务变化要及时调整分析维度、挖掘方法,以尽快发现数据价值、更新业务指标。而数据仓库中通常存储着海量的历史数据,调整代价十分高昂。预先建模技术虽然可以在特定场景中加速计算,但是无法满足业务灵活多变的发展需求,维护成本过高。

行式存储和列式存储

行式存储和列式存储的对比图:


image.png

与行式存储将每一行的数据连续存储不同,列式存储将每一列的数据连续存储。相比于行式存储,列式存储在分析场景下有着许多优良的特性:

  1. 如前所述,分析场景中往往需要读大量行但是少数几个列。在行存模式下,数据按行连续存储,所有列的数据都存储在一个block中,不参与计算的列在IO时也要全部读出,读取操作被严重放大。而列存模式下,只需要读取参与计算的列即可,极大的减低了IO cost,加速了查询。
  2. 同一列中的数据属于同一类型,压缩效果显著。列存往往有着高达十倍甚至更高的压缩比,节省了大量的存储空间,降低了存储成本。
  3. 更高的压缩比意味着更小的data size,从磁盘中读取相应数据耗时更短。
  4. 自由的压缩算法选择。不同列的数据具有不同的数据类型,适用的压缩算法也就不尽相同。可以针对不同列类型,选择最合适的压缩算法。
  5. 高压缩比,意味着同等大小的内存能够存放更多数据,系统cache效果更好。

OrcFile

OrcFile存储格式:


image.png

Orc列式存储优点:

除了Orc外,Parquet也是常用的列式存储格式。Orc VS Parquet:


离线数仓VS实时数仓

image.png

离线数仓:

离线数仓架构:

实时数仓:

实时数仓架构:


图解Lambda架构数据流程

Lambda 架构(Lambda Architecture)是由 Twitter 工程师南森·马茨(Nathan Marz)提出的大数据处理架构。这一架构的提出基于马茨在 BackType 和 Twitter 上的分布式数据处理系统的经验。

Lambda 架构使开发人员能够构建大规模分布式数据处理系统。它具有很好的灵活性和可扩展性,也对硬件故障和人为失误有很好的容错性。

Lambda 架构总共由三层系统组成:批处理层(Batch Layer),速度处理层(Speed Layer),以及用于响应查询的服务层(Serving Layer)。


image.png

在 Lambda 架构中,每层都有自己所肩负的任务。批处理层存储管理主数据集(不可变的数据集)和预先批处理计算好的视图。批处理层使用可处理大量数据的分布式处理系统预先计算结果。它通过处理所有的已有历史数据来实现数据的准确性。这意味着它是基于完整的数据集来重新计算的,能够修复任何错误,然后更新现有的数据视图。输出通常存储在只读数据库中,更新则完全取代现有的预先计算好的视图。

速度处理层会实时处理新来的数据。速度层通过提供最新数据的实时视图来最小化延迟。速度层所生成的数据视图可能不如批处理层最终生成的视图那样准确或完整,但它们几乎在收到数据后立即可用。而当同样的数据在批处理层处理完成后,在速度层的数据就可以被替代掉了。

本质上,速度层弥补了批处理层所导致的数据视图滞后。比如说,批处理层的每个任务都需要 1 个小时才能完成,而在这 1 个小时里,我们是无法获取批处理层中最新任务给出的数据视图的。而速度层因为能够实时处理数据给出结果,就弥补了这 1 个小时的滞后。

所有在批处理层和速度层处理完的结果都输出存储在服务层中,服务层通过返回预先计算的数据视图或从速度层处理构建好数据视图来响应查询。

所有的新用户行为数据都可以同时流入批处理层和速度层。批处理层会永久保存数据并且对数据进行预处理,得到我们想要的用户行为模型并写入服务层。而速度层也同时对新用户行为数据进行处理,得到实时的用户行为模型。

而当“应该对用户投放什么样的广告”作为一个查询(Query)来到时,我们从服务层既查询服务层中保存好的批处理输出模型,也对速度层中处理的实时行为进行查询,这样我们就可以得到一个完整的用户行为历史了。

一个查询就如下图所示,既通过批处理层兼顾了数据的完整性,也可以通过速度层弥补批处理层的高延时性,让整个查询具有实时性。


image.png

Kappa 架构 VS Lambda

Lambda 架构的不足

虽然 Lambda 架构使用起来十分灵活,并且可以适用于很多的应用场景,但在实际应用的时候,Lambda 架构也存在着一些不足,主要表现在它的维护很复杂。

使用 Lambda 架构时,架构师需要维护两个复杂的分布式系统,并且保证他们逻辑上产生相同的结果输出到服务层中。举个例子吧,我们在部署 Lambda 架构的时候,可以部署 Apache Hadoop 到批处理层上,同时部署 Apache Flink 到速度层上。

我们都知道,在分布式框架中进行编程其实是十分复杂的,尤其是我们还会针对不同的框架进行专门的优化。所以几乎每一个架构师都认同,Lambda 架构在实战中维护起来具有一定的复杂性。

那要怎么解决这个问题呢?我们先来思考一下,造成这个架构维护起来如此复杂的根本原因是什么呢?

维护 Lambda 架构的复杂性在于我们要同时维护两套系统架构:批处理层和速度层。我们已经说过了,在架构中加入批处理层是因为从批处理层得到的结果具有高准确性,而加入速度层是因为它在处理大规模数据时具有低延时性。

那我们能不能改进其中某一层的架构,让它具有另外一层架构的特性呢?例如,改进批处理层的系统让它具有更低的延时性,又或者是改进速度层的系统,让它产生的数据视图更具准确性和更加接近历史数据呢?

另外一种在大规模数据处理中常用的架构——Kappa 架构(Kappa Architecture),便是在这样的思考下诞生的。

Kappa 架构

Kappa 架构是由 LinkedIn 的前首席工程师杰伊·克雷普斯(Jay Kreps)提出的一种架构思想。克雷普斯是几个著名开源项目(包括 Apache Kafka 和 Apache Samza 这样的流处理系统)的作者之一,也是现在 Confluent 大数据公司的 CEO。

克雷普斯提出了一个改进 Lambda 架构的观点:

我们能不能改进 Lambda 架构中速度层的系统性能,使得它也可以处理好数据的完整性和准确性问题呢?我们能不能改进 Lambda 架构中的速度层,使它既能够进行实时数据处理,同时也有能力在业务逻辑更新的情况下重新处理以前处理过的历史数据呢?

他根据自身多年的架构经验发现,我们是可以做到这样的改进的。我们知道像 Apache Kafka 这样的流处理平台是具有永久保存数据日志的功能的。通过Kafka的这一特性,我们可以重新处理部署于速度层架构中的历史数据。

下面我就以 Kafka 为例来介绍整个全新架构的过程。

第一步,部署 Kafka,并设置数据日志的保留期(Retention Period)。

这里的保留期指的是你希望能够重新处理的历史数据的时间区间。例如,如果你希望重新处理最多一年的历史数据,那就可以把 Apache Kafka 中的保留期设置为 365 天。如果你希望能够处理所有的历史数据,那就可以把 Apache Kafka 中的保留期设置为“永久(Forever)”。

第二步,如果我们需要改进现有的逻辑算法,那就表示我们需要对历史数据进行重新处理。我们需要做的就是重新启动一个 Kafka 作业实例(Instance)。这个作业实例将重头开始,重新计算保留好的历史数据,并将结果输出到一个新的数据视图中。

我们知道 Kafka 的底层是使用 Log Offset 来判断现在已经处理到哪个数据块了,所以只需要将 Log Offset 设置为 0,新的作业实例就会重头开始处理历史数据。

第三步,当这个新的数据视图处理过的数据进度赶上了旧的数据视图时,我们的应用便可以切换到从新的数据视图中读取。

第四步,停止旧版本的作业实例,并删除旧的数据视图。

这个架构就如同下图所示。


image.png

与 Lambda 架构不同的是,Kappa 架构去掉了批处理层这一体系结构,而只保留了速度层。你只需要在业务逻辑改变又或者是代码更改的时候进行数据的重新处理。Kappa 架构统一了数据的处理方式,不再维护离线和实时两套代码逻辑。

Kappa 架构的不足

Kappa 架构也是有着它自身的不足的。因为 Kappa 架构只保留了速度层而缺少批处理层,在速度层上处理大规模数据可能会有数据更新出错的情况发生,这就需要我们花费更多的时间在处理这些错误异常上面。如果需求发生变化或历史数据需要重新处理都得通过上游重放来完成。并且重新处理历史的吞吐能力会低于批处理。

还有一点,Kappa 架构的批处理和流处理都放在了速度层上,这导致了这种架构是使用同一套代码来处理算法逻辑的。所以 Kappa 架构并不适用于批处理和流处理代码逻辑不一致的场景。

Lambda VS Kappa

image.png

主流大公司的实时数仓架构

阿里菜鸟实时数仓

image.png
image.png

美团实时数仓

image.png

实时数仓建设特征

上一篇下一篇

猜你喜欢

热点阅读