三、table API-Msql到终端打印

2022-09-07  本文已影响0人  Nick_4438

数据库准备

create table demo.source
(
    ID   int auto_increment
        primary key,
    data varchar(50) not null
);

create table demo.dest_table
(
    ID   int auto_increment
        primary key,
    data varchar(50) not null
);

数据处理

下面演示如何从mysql读取一个表,然后打印数据到终端

#!# --coding:utf-8 --


from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.expressions import col

# 1. 创建TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)


# 2. 创建源表Table
table_env.execute_sql("""
    CREATE TABLE source (
        id INT,
        data STRING
    ) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://10.201.103.144:3308/demo?useSSL=false&useUnicode=true&characterEncoding=utf8',
    'connector.table' = 'source',
    'connector.username' = 'crcpasepd',
    'connector.password' = 'Crcportal_123',
    'connector.write.flush.interval' = '1s',
    'connector.driver' = 'com.mysql.cj.jdbc.Driver'
    )
""")

# 3. 创建sink表
table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'print'
    )
""")

# 4. 查询源表,然后执行一些计算
# 利用表API创建一个查询源表:
source_table = table_env.from_path("source")
# 或者用sql创建一个查询源表:
# source_table = table_env.sql_query("SELECT * FROM datagen")
# source_table加工成result table
result_table = source_table.select(col("id") + 1, col("data"))
# 输出到print表中
result_table.execute_insert("print").wait()


上一篇 下一篇

猜你喜欢

热点阅读