flinkFlink

Flink(1.13) Catalog

2021-08-26  本文已影响0人  万事万物

Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。

数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。

前面用到Connector其实就是在使用Catalog


Catalog类型


HiveCatalog

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.13.1</version>
</dependency>
<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
</dependency>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://hadoop162:9083</value>
    </property>

在hive-site.xml中配置元数据服务,所以需要启动该服务,否则无法读取数据。

hive service -metestore

元数据服务的作用:若没有元数据服务器,所有的请求将直接访问到hive,这样可能导致hive吃不消,有了元数据服务之后,请求将全部交由元数据服务器,元数据服务再去请求hive,起到一个缓冲的作用。

随便创建一个库

create database gmall;

随便创建一张表

use gmall;

create table student(
    id int,
    name string,
    age int,
    sex string
);

往表里插入一些数据

insert into student(id, name, age, sex)
values
(1,'张三',18,'男'),
(2,'李四',56,'女'),
(3,'王五',34,'男'),
(4,'赵六',33,'女'),
(5,'孙七',78,'男');
    @Test
    public void test1(){

        // 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        /**
         * 创建 hivecatalog
         *
         * my_hive catalog 名称(随意
         * gmall 指定数据库
         * input/ 配置hive-site.xml 的目录。
         */
        HiveCatalog hiveCatalog = new HiveCatalog("my_hive", "gmall", "input/");

        /**
         * 注册Catalog
         * my_hive2 名称随意,通常和  new HiveCatalog 中的一致
         */
        tableEnv.registerCatalog("my_hive2",hiveCatalog);

        /**
         * 指定数据库,查询
         * my_hive tableEnv.registerCatalog 中指定的
         * gmall :库名
         * student:表名
         */
        tableEnv.sqlQuery("select * from my_hive2.gmall.student").execute().print();

    }
+----+-------------+--------------------------------+-------------+--------------------------------+
| op |          id |                           name |         age |                            sex |
+----+-------------+--------------------------------+-------------+--------------------------------+
| +I |           1 |                           张三 |          18 |                             男 |
| +I |           2 |                           李四 |          56 |                             女 |
| +I |           3 |                           王五 |          34 |                             男 |
| +I |           4 |                           赵六 |          33 |                             女 |
| +I |           5 |                           孙七 |          78 |                             男 |
+----+-------------+--------------------------------+-------------+--------------------------------+
select * from my_hive2.gmall.student

这样写比较麻烦(如上),通常我们可以将其他提出去,设置成全局

tableEnv.useCatalog("my_hive2");
tableEnv.useDatabase("gmall");

/**
 * 指定数据库,查询
 * my_hive tableEnv.registerCatalog 中指定的
 * gmall :库名
 * student:表名
*/
tableEnv.sqlQuery("select * from student").execute().print();

上一篇 下一篇

猜你喜欢

热点阅读