04.HDFS拓展
2019-11-10 本文已影响0人
哈哈大圣
HDFS拓展
一、副本摆放策略
不同的版本副本拜访策略不同,主要是针对rack(机架)而选择的策略,机架之间通信的带宽消耗比较高,但是如果一个机架蹦了,这个机架上的所有节点都蹦了,虽然出现的概率很小(新版本继续会优化这个问题)
1. 常见的策略
-
策略1
1-本rack(机架)的一个节点上
2-另外一个rack的节点上
3-与2相同的rack的另外一个节点上 -
策略2
1-本rack的一个节点上
2-本rack的另外一个节点上
3-不同rack的一个节点上
二、客户端写数据到HDFS的大致流程
客户端写数据到HDFS的大致流程.png
三、客户端从HDFS读数据的大致流程
HDFS读数据流程.png
二、HDFS的元数据管理
- 元数据:HDFS的目录结构以及每个文件的BLOCK信息(id,副本系数、block存放在哪个DN上)
- 存在什么地方:对应配置 ${hadoop.tmp.dir}/name/......
- 元数据存放在文件中:
Checkpoint机制.png
三、安全模式
HDFS在启动时,会进入安全模式,此时不能向NN上写东西,因为会有30秒的时间NN与DN进行数据块的确认,计算相关系数是否达到指标,30秒过后安全模式自动关闭,此时才能进行写入操作!HDFS在整个其他框架的时候,注意要等30秒(也可以通过70070端口查看)
四、单词统计案例
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 使用HDFS API完成wordcount统计
*
* 需求:统计HDFS上的文件的wc,然后将统计结果输出到HDFS
*
* 功能拆解:
* 1)读取HDFS上的文件 ==> HDFS API
* 2)业务处理(词频统计):对文件中的每一行数据都要进行业务处理(按照分隔符分割) ==> Mapper
* 3)将处理结果缓存起来 ==> Context
* 4)将结果输出到HDFS ==> HDFS API
* @author Liucheng
* @since 2019-11-08
*/
@Deprecated
public class HDFSWCAppOld {
public static void main(String[] args) throws Exception {
// 1. 获取文件系统句柄
Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000"), configuration, "hadoop");
// 2. 获取需要统计的文件
Path root = new Path("/");
Path remotePath = new Path(root, new Path("apache-hadoop.txt"));
// 3. 获取文件迭代器
RemoteIterator<LocatedFileStatus> iterator = fileSystem.listFiles(remotePath, false);
// 4. 遍历文件迭代器获取流,并存入缓存
Map<String, Integer> catche = new ConcurrentHashMap<>();
while (iterator.hasNext()) {
LocatedFileStatus file = iterator.next();
FSDataInputStream in = fileSystem.open(file.getPath());
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line;
while ((line = reader.readLine()) != null) {
// todo 统计计算
String[] wordArr = line.split("[^a-zA-Z0-9]+");
for (String word : wordArr) {
catche.merge(word, 1, (a, b) -> a + b);
}
}
// 关闭输入流
reader.close();
in.close();
}
// 5. 获取输入流
Path outPath = new Path("/count.txt");
FSDataOutputStream out = fileSystem.create(outPath);
// 6. 遍历缓存,将统计写入输出流
Set<Map.Entry<String, Integer>> entries = catche.entrySet();
entries.forEach(entry -> {
try {
System.out.println(entry.getKey() + " \t " + entry.getValue());
out.writeUTF(entry.getKey() + " \t " + entry.getValue() + "\n");
} catch (IOException e) {
e.printStackTrace();
}
});
// 7.释放资源
catche.clear();
out.close();
fileSystem.close();
}
}