(三)flink docker开发环境搭建

2022-11-13  本文已影响0人  Nick_4438

简介

本文详细讲解怎么快速搭建一个flink的开发环境。笔者做以下假设

如果1、2不满足,还请读者自行补脑

本文在docker内安装了一个flink jobmanager和一个flink taskmanager,再安装了一个mysql,然后使用flink sql创建任务,从mysql数据库内的表1同步数据到表2

详细步骤

环境准备

version: "2.2"
services:
  jobmanager:
    container_name: jobmanager
    # image: apache/flink:1.14.6
    image: qiujiahong/flink:1.14.6_py
    networks:
      proxy:
        ipv4_address: 172.16.0.11
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager        

  taskmanager:
    container_name: taskmanager
    # image: apache/flink:1.14.6
    image: qiujiahong/flink:1.14.6_py
    networks:
      proxy:
        ipv4_address: 172.16.0.12
    depends_on:
      - jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: jobmanager
        taskmanager.numberOfTaskSlots: 2        
  # sql-client:
  #   image: apache/flink:1.14.6
  #   command: bin/sql-client.sh
  #   depends_on:
  #     - jobmanager
  #   environment:
  #     - |
  #       FLINK_PROPERTIES=
  #       jobmanager.rpc.address: jobmanager
  #       rest.address: jobmanager     
  mysql:
    image: qiujiaihong/example-mysql:1.1
    ports:
      - "3306:3306"
    networks:
      proxy:
        ipv4_address: 172.16.0.13
    environment:
      - MYSQL_ROOT_PASSWORD=123456
      - MYSQL_USER=mysqluser
      - MYSQL_PASSWORD=mysqlpw
networks:
  proxy:
    ipam:
      config:
      - subnet: 172.16.0.0/24
#  启动
docker-compose up
# 关闭
# docker-compose down

docker exec -it jobmanager  bash
mycli -h172.16.0.13 -uroot -p123456

create database demo;
use demo;

# 创建源表
CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));
ALTER TABLE products AUTO_INCREMENT = 101;
INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");



# 创建目标表
CREATE TABLE dproducts (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));


使用命令行测试

docker exec -it jobmanager  bash ./bin/sql-client.sh

SET execution.checkpointing.interval = 3s;
-- 创建源表
CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '172.16.0.13',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'demo',
    'table-name' = 'products'
  );
-- 创建目标表
CREATE TABLE dproducts (
  id BIGINT,
  name STRING,
  description STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://172.16.0.13:3306/demo?useSSL=false',
   'table-name' = 'dproducts',
   'username' = 'root',
   'password' = '123456'
);
# 执行该语句后,可登录mysql查看dproducts表结果
INSERT INTO dproducts SELECT  p.id, p.name, p.description FROM products AS p;

使用python 本地调试

from pyflink.table import EnvironmentSettings, TableEnvironment


print('step 01')
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
print('step 02')

# table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar;file:/opt/flink/lib/mysql-connector-java-8.0.30.jar")
print('step 03')

# 2. create source Table=
table_env.execute_sql("""
    CREATE TABLE products (
        id INT,
        name STRING,
        description STRING,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '172.16.0.13',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'demo',
        'table-name' = 'products'
    )
""")

# 3. create sink Table
table_env.execute_sql("""
    CREATE TABLE dproducts (
        id BIGINT,
        name STRING,
        description STRING
    ) WITH (
        'connector' = 'print'
    )
""")

print('step 04')
table_env.execute_sql("INSERT INTO dproducts SELECT  p.id, p.name, p.description FROM products AS p").wait()
print('step 06')

# 拷贝成程序到容器内
docker cp ./demo.py jobmanager:/opt/flink  
# 执行,可看到原表的数据在命令行直接被打印了出来
docker exec -it jobmanager python demo.py

理论python程序本地开发电脑直接执行(提前装好相关依赖)

from pyflink.table import EnvironmentSettings, TableEnvironment


print('step 01')
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
print('step 02')

table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-sql-connector-mysql-cdc-2.2.1.jar;file:/opt/flink/lib/mysql-connector-java-8.0.30.jar")
print('step 03')

# 2. create source Table
table_env.execute_sql("""
    CREATE TABLE products (
        id INT,
        name STRING,
        description STRING,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = '172.16.0.13',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'demo',
        'table-name' = 'products'
    )
""")
# 3. create sink table
table_env.execute_sql("""
    CREATE TABLE dproducts (
        id BIGINT,
        name STRING,
        description STRING,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://172.16.0.13:3306/demo?useSSL=false',
        'table-name' = 'dproducts',
        'username' = 'root',
        'password' = '123456'
    )
""")
# 'driver' = 'com.mysql.cj.jdbc.Driver',
# 'driver' = 'com.mysql.jdbc.Driver',

print('step 04')
table_env.execute_sql("INSERT INTO dproducts SELECT  p.id, p.name, p.description FROM products AS p").wait()
print('step 06')

docker exec -it jobmanager python demo1.py
mycli -h172.16.0.13 -uroot -p123456
truncate dproducts
# 拷贝
docker cp ./demo1.py jobmanager:/opt/flink  
# 执行程序,执行后,该程序不会退出,执行该语句后,可登录mysql查看dproducts表结果
docker exec -it jobmanager python demo1.py

理论python程序本地开发电脑直接执行(提前装好相关依赖)

其他方式提交

# 在flink服务器上本地提交
 ./bin/flink run --python demo.py

# 用文件夹提交pyflink job,并且使用--pyModule 指定入口模块 :
./bin/flink run \
      --pyModule table.word_count \
      --pyFiles examples/python/table

# 提交 PyFlink到一个具体的JobManager :
./bin/flink run \
      --jobmanager <jobmanagerHost>:8081 \
      --python examples/python/table/word_count.py

更多提交方法,请查看官网

上一篇下一篇

猜你喜欢

热点阅读