数客联盟

Flink使用GCS作为state backend

2020-01-06  本文已影响0人  biggeng

由于项目的原因,需要将Flink运行在GCP上,因此File System自然是想使用GCS. 在网上搜了很多,由于众所周知的原因,国内使用Google Cloud非常少,资料就更少了。Flink官方文档对这块描述又很简单,传送门。总结下如何使用GCS作为State Backend方法如下:

  1. 使用Flink对HDFS的支持方式支持GCS
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>
    <name>google.cloud.auth.service.account.enable</name>
    <value>true</value>
</property>
    <!-- Turn security off for tests by default -->
    <property>
      <name>fs.gs.impl</name>
      <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
    </property>
    <property>
      <name>fs.AbstractFileSystem.gs.impl</name>
      <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
    </property>
    <property>
      <name>google.cloud.auth.service.account.json.keyfile</name>
      <value>/data/flink-1.9.1/conf/gcs-service-account.json</value>
    </property>
    <property>
        <name>fs.gs.project.id</name>
        <value>XXX</value>
        <description>
            Required. Google Cloud Project ID with access to configured GCS buckets.
        </description>
    </property>
fs.hdfs.hadoopconf: /data/flink-1.9.1/conf/
  1. 在程序中使用GCS作为state backend
       CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setFailOnCheckpointingErrors(false);
        checkpointConfig.setCheckpointInterval(10000);
        checkpointConfig.setMinPauseBetweenCheckpoints(5000);
        checkpointConfig.setMaxConcurrentCheckpoints(1);
        checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

        RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(
                "gs://flinkcheckpoint", true);
        env.setStateBackend((StateBackend) rocksDBStateBackend);
  1. 将相关jar包放到Flink能加载的class path上
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'gs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
  1. 创建并下载GCP上的相关project的json格式的service account,并将其放置在步骤1 google.cloud.auth.service.account.json.keyfile里的地址下,fs.gs.project.id配置为其project id.
  2. 一切就绪,Run Flink job, 在GCS的目录下可以检查是否有check point文件生成。
上一篇下一篇

猜你喜欢

热点阅读