Flink连接HDFS出现java.net.UnknownHos
2023-08-14 本文已影响0人
OkGogogooo
1. 背景
在开发大数据平台XSailboat中的查看Flink任务的状态数据工具时,用State Process API解析保存点数据,将其从HDFS上读取出来再将其解析过后下沉到HDFS以CSV格式保存,然后由其它接口提供对这个文件的分页加载功能。
以CSV格式下沉到HDFS,笔者直接使用了DataStream上已经废弃的writeAsCsv方法,因为这个方法的特性正好和此处的需求相符,没有使用FileSink,因为它的Bucket特性,在此处不适用。
2. 问题
String checkPointUrl = "http://yc/a/b/c.csv" ;
SavepointReader.read(env, checkPointUrl , new HashMapStateBackend()) ;
在连接hdfs的过程中,出现了
java.lang.IllegalArgumentException: java.net.UnknownHostException: yc
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:465)
at org.apache.hadoop.hdfs.NameNodeProxiesClient.createProxyWithClientProtocol(NameNodeProxiesClient.java:139)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:357)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:291)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:173)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:168)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:528)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274)
at org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:257)
at org.apache.flink.state.api.runtime.SavepointLoader.loadSavepointMetadata(SavepointLoader.java:50)
at org.apache.flink.state.api.SavepointReader.read(SavepointReader.java:101)
...
3. 解决办法
yc这个集群名称是在Hadoop的配置文件hdfs-site.xml的。
<property>
<name>dfs.nameservices</name>
<value>yc</value>
</property>
<!-- yc包含两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.yc</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.yc.nn1</name>
<value>XCloud150:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.yc.nn1</name>
<value>XCloud150:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.yc.nn2</name>
<value>XCloud151:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.yc.nn2</name>
<value>XCloud151:50070</value>
</property>
经过代码跟踪分析得知,Flink使用org.apache.flink.runtime.util.HadoopUtils的getHadoopConfiguration方法得到Hadoop的Configuration(org.apache.hadoop.conf.Configuration)。
现摘录其中的代码:
public static Configuration getHadoopConfiguration(
org.apache.flink.configuration.Configuration flinkConfiguration) {
// Instantiate an HdfsConfiguration to load the hdfs-site.xml and hdfs-default.xml
// from the classpath
Configuration result = new HdfsConfiguration();
boolean foundHadoopConfiguration = false;
// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
// the hdfs configuration.
// The properties of a newly added resource will override the ones in previous resources, so
// a configuration
// file with higher priority should be added later.
// Approach 1: HADOOP_HOME environment variables
String[] possibleHadoopConfPaths = new String[2];
// #############################################################
// 1. 从系统环境变量HADOOP_HOME目录下去寻找
// 因hadoop的这些配置文件已经放置在XSailboat的配置目录config/MicroService/common目录下了,此种方式不合适,不采用
final String hadoopHome = System.getenv("HADOOP_HOME");
if (hadoopHome != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_HOME: {}", hadoopHome);
possibleHadoopConfPaths[0] = hadoopHome + "/conf";
possibleHadoopConfPaths[1] = hadoopHome + "/etc/hadoop"; // hadoop 2.2
}
for (String possibleHadoopConfPath : possibleHadoopConfPaths) {
if (possibleHadoopConfPath != null) {
foundHadoopConfiguration = addHadoopConfIfFound(result, possibleHadoopConfPath);
}
}
// Approach 2: Flink configuration (deprecated)
// #############################################################
// 2.通过HDFS_DEFAULT_CONFIG配置项指定配置文件所在位置,这个参数已经标注deprecated,不是首选
final String hdfsDefaultPath =
flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
if (hdfsDefaultPath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
LOG.debug("Using hdfs-default configuration-file path from Flink config: {}"
,hdfsDefaultPath);
foundHadoopConfiguration = true;
}
// #############################################################
// 3.通过HDFS_SITE_CONFIG配置项指定配置文件所在位置,这个参数已经标注deprecated,不是首选
final String hdfsSitePath =
flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
if (hdfsSitePath != null) {
result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
LOG.debug(
"Using hdfs-site configuration-file path from Flink config: {}", hdfsSitePath);
foundHadoopConfiguration = true;
}
// #############################################################
// 4.通过PATH_HADOOP_CONFIG配置项指定配置文件所在位置,这个参数已经标注deprecated,不是首选
final String hadoopConfigPath =
flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
if (hadoopConfigPath != null) {
LOG.debug("Searching Hadoop configuration files in Flink config: {}", hadoopConfigPath);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfigPath) || foundHadoopConfiguration;
}
// Approach 3: HADOOP_CONF_DIR environment variable
// #############################################################
// 5. 环境变量只能在启动参数上设置,我希望在程序运行期根据参数目录位置设置,以在部署过程中减少参数配置
String hadoopConfDir = System.getenv("HADOOP_CONF_DIR");
if (hadoopConfDir != null) {
LOG.debug("Searching Hadoop configuration files in HADOOP_CONF_DIR: {}", hadoopConfDir);
foundHadoopConfiguration =
addHadoopConfIfFound(result, hadoopConfDir) || foundHadoopConfiguration;
}
// Approach 4: Flink configuration
// add all configuration key with prefix 'flink.hadoop.' in flink conf to hadoop conf
// #############################################################
// 6.在flink配置里面配置,在原先hadoop的配置项名称前面加上flink.hadoop.
for (String key : flinkConfiguration.keySet()) {
for (String prefix : FLINK_CONFIG_PREFIXES) {
if (key.startsWith(prefix)) {
String newKey = key.substring(prefix.length());
String value = flinkConfiguration.getString(key, null);
result.set(newKey, value);
LOG.debug(
"Adding Flink config entry for {} as {}={} to Hadoop config",
key,
newKey,
value);
foundHadoopConfiguration = true;
}
}
}
if (!foundHadoopConfiguration) {
LOG.warn(
"Could not find Hadoop configuration via any of the supported methods "
+ "(Flink configuration, environment variables).");
}
return result;
}
使用已经废弃的参数ConfigConstants.PATH_HADOOP_CONFIG尝试了一下,发现并没有起什么作用,跟踪代码发现执行了下面的代码:
类:org.apache.flink.core.fs.FileSystem
// getUnguardedFileSystem方法内
// this "default" initialization makes sure that the FileSystem class works
// even when not configured with an explicit Flink configuration, like on
// JobManager or TaskManager setup
if (FS_FACTORIES.isEmpty()) {
initializeWithoutPlugins(new Configuration());
}
// 配置类是new 出来的,所以外面的配置是进不去的。
处理方法,自己主动调用一下这个初始化方法
if(!AppContext.get("Flink_FS_Init", false))
{
org.apache.flink.configuration.Configuration flinkHdfsConf = new org.apache.flink.configuration.Configuration() ;
flinkHdfsConf.setString(ConfigConstants.PATH_HADOOP_CONFIG , MSApp.instance().getAppPaths().getCommonConfigDir().getAbsolutePath()) ;
org.apache.flink.core.fs.FileSystem.initialize(flinkHdfsConf , null);
AppContext.get("Flink_FS_Init", true) ;
}