大数据平台技术笔记

Flink连接HDFS出现java.net.UnknownHos

2023-08-14  本文已影响0人  OkGogogooo

1. 背景

在开发大数据平台XSailboat中的查看Flink任务的状态数据工具时,用State Process API解析保存点数据,将其从HDFS上读取出来再将其解析过后下沉到HDFS以CSV格式保存,然后由其它接口提供对这个文件的分页加载功能。

以CSV格式下沉到HDFS,笔者直接使用了DataStream上已经废弃的writeAsCsv方法,因为这个方法的特性正好和此处的需求相符,没有使用FileSink,因为它的Bucket特性,在此处不适用。

2. 问题

String checkPointUrl = "http://yc/a/b/c.csv" ;
SavepointReader.read(env, checkPointUrl , new HashMapStateBackend()) ;

在连接hdfs的过程中,出现了

java.lang.IllegalArgumentException: java.net.UnknownHostException: yc
    at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:465)
    at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:357)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:291)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:173)
    at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:168)
    at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:528)
    at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
    at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:257)
    at org.apache.flink.state.api.runtime.SavepointLoader.loadSavepointMetadata(SavepointLoader.java:50)
    at org.apache.flink.state.api.SavepointReader.read(SavepointReader.java:101)
    ...

3. 解决办法

yc这个集群名称是在Hadoop的配置文件hdfs-site.xml的。

<property>
        <name>dfs.nameservices</name>
        <value>yc</value>
    </property>

    <!-- yc包含两个NameNode,分别是nn1,nn2 -->
    <property>
        <name>dfs.ha.namenodes.yc</name>
        <value>nn1,nn2</value>
    </property>

    <!-- nn1的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.yc.nn1</name>
        <value>XCloud150:9000</value>
    </property>

    <!-- nn1的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.yc.nn1</name>
        <value>XCloud150:50070</value>
    </property>

    <!-- nn2的RPC通信地址 -->
    <property>
        <name>dfs.namenode.rpc-address.yc.nn2</name>
        <value>XCloud151:9000</value>
    </property>

    <!-- nn2的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address.yc.nn2</name>
        <value>XCloud151:50070</value>
    </property>

经过代码跟踪分析得知,Flink使用org.apache.flink.runtime.util.HadoopUtils的getHadoopConfiguration方法得到Hadoop的Configuration(org.apache.hadoop.conf.Configuration)。
现摘录其中的代码:

public static Configuration getHadoopConfiguration(
            org.apache.flink.configuration.Configuration flinkConfiguration) {

   // Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
   // from the classpath

   Configuration result = new HdfsConfiguration();
   boolean foundHadoopConfiguration = false;

   // We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
   // the hdfs configuration.
   // The properties of a newly added resource will override the ones in previous resources, so
   // a configuration
   // file with higher priority should be added later.

   // Approach 1: HADOOP_HOME environment variables
   String[] possibleHadoopConfPaths = new String[2];
   
   // #############################################################
   // 1. 从系统环境变量HADOOP_HOME目录下去寻找
   // 因hadoop的这些配置文件已经放置在XSailboat的配置目录config/MicroService/common目录下了,此种方式不合适,不采用
   final String hadoopHome = System.getenv("HADOOP_HOME");
   if (hadoopHome != null) {
   LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
     possibleHadoopConfPaths[0] = hadoopHome + "/conf";
     possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
   }

   for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
     if (possibleHadoopConfPath != null) {
        foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
     }
   }

   // Approach 2: Flink configuration (deprecated)
   // #############################################################
   // 2.通过HDFS_DEFAULT_CONFIG配置项指定配置文件所在位置,这个参数已经标注deprecated,不是首选
   final String hdfsDefaultPath =
                flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
   if (hdfsDefaultPath != null) {
      result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
      LOG.debug("Using hdfs-default configuration-file path from Flink config: {}"
          ,hdfsDefaultPath);
      foundHadoopConfiguration = true;
   }
   // #############################################################
   // 3.通过HDFS_SITE_CONFIG配置项指定配置文件所在位置,这个参数已经标注deprecated,不是首选
   final String hdfsSitePath =
                flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
   if (hdfsSitePath != null) {
      result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
      LOG.debug(
                    "Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
      foundHadoopConfiguration = true;
   }
   
   // #############################################################
   // 4.通过PATH_HADOOP_CONFIG配置项指定配置文件所在位置,这个参数已经标注deprecated,不是首选
   final String hadoopConfigPath =
                flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
   if (hadoopConfigPath != null) {
            LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
            foundHadoopConfiguration =
                    addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
   }

   // Approach 3: HADOOP_CONF_DIR environment variable
   // #############################################################
   // 5. 环境变量只能在启动参数上设置,我希望在程序运行期根据参数目录位置设置,以在部署过程中减少参数配置
   String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
   if (hadoopConfDir != null) {
      LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
      foundHadoopConfiguration =
                    addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
   }

   // Approach 4: Flink configuration
   // add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
   // #############################################################
   // 6.在flink配置里面配置,在原先hadoop的配置项名称前面加上flink.hadoop.
   for (String key : flinkConfiguration.keySet()) {
       for (String prefix : FLINK_CONFIG_PREFIXES) {
           if (key.startsWith(prefix)) {
               String newKey = key.substring(prefix.length());
               String value = flinkConfiguration.getString(key, null);
               result.set(newKey, value);
               LOG.debug(
                            "Adding Flink config entry for {} as {}={} to Hadoop config",
                            key,
                            newKey,
                            value);
               foundHadoopConfiguration = true;
            }
      }
   }

   if (!foundHadoopConfiguration) {
      LOG.warn(
                    "Could not find Hadoop configuration via any of the supported methods "
                            + "(Flink configuration, environment variables).");
   }

   return result;
 }

使用已经废弃的参数ConfigConstants.PATH_HADOOP_CONFIG尝试了一下,发现并没有起什么作用,跟踪代码发现执行了下面的代码:
类:org.apache.flink.core.fs.FileSystem

  // getUnguardedFileSystem方法内
            
  // this "default" initialization makes sure that the FileSystem class works
  // even when not configured with an explicit Flink configuration, like on
  // JobManager or TaskManager setup
  if (FS_FACTORIES.isEmpty()) {
     initializeWithoutPlugins(new Configuration());
  }
  // 配置类是new 出来的,所以外面的配置是进不去的。

处理方法,自己主动调用一下这个初始化方法

if(!AppContext.get("Flink_FS_Init", false))
{
  org.apache.flink.configuration.Configuration flinkHdfsConf = new org.apache.flink.configuration.Configuration() ;
  flinkHdfsConf.setString(ConfigConstants.PATH_HADOOP_CONFIG , MSApp.instance().getAppPaths().getCommonConfigDir().getAbsolutePath()) ;
  org.apache.flink.core.fs.FileSystem.initialize(flinkHdfsConf , null);
  AppContext.get("Flink_FS_Init", true) ;
}
上一篇下一篇

猜你喜欢

热点阅读