HDFS架构师 2.2 - NameNode元数据双缓冲写入原理
2021-01-10 本文已影响0人
fat32jin
(1)HDFS元数据管理、双缓冲 17 } 0:55
双缓冲写元数据的方法
——》 FSEditLog#logEdit()
▼▼
package com.nx.hadoop.lesson02;
import java.util.LinkedList;
/**
*
* HDFS:有个别地方源码写得不错的。 Hadoop2.3.0
* kafka Zookeeper
*
*/
public class FSEditLog {
private long txid=0L;
private DoubleBuffer editLogBuffer=new DoubleBuffer();
//当前是否正在往磁盘里面刷写数据
private volatile Boolean isSyncRunning = false;
private volatile Boolean isWaitSync = false;
private volatile Long syncMaxTxid = 0L;
/**
* 一个线程 就会有自己一个ThreadLocal的副本
*/
private ThreadLocal<Long> localTxid=new ThreadLocal<Long>();
public static void main(String[] args) {
//logEdit("mkdir /data")
}
/**
* 写元数据日志方法
*
* @param content
*/
public void logEdit(String content){//mkdir /data
/**
* 因为我们要保证日志的ID要递增,唯一
*
* 线程1,线程2, 线程3
*/
synchronized (this){
//线程1
//日志的ID号,元数据信息的ID号。
txid++; //1
/**
* 每个线程都会有自己的一个副本。
* 线程1,1
* 线程2,2
* 线程3,3
*/
localTxid.set(txid);
EditLog log = new EditLog(txid, content);
//往内存里面写数据
editLogBuffer.write(log);
} //释放锁
/**
* 内存1:
* 线程1,1 元数据1
* 线程2,2 元数据2
* 线程3,3 元数据3
*/
logSync();
}
private void logSync(){
/**
*
* 线程1,ID号:1
*
* 线程2 进来了
*
* 线程3 进来了 return
*
* 线程4 进来了
* 线程5 进来了
*/
synchronized (this){
//当前是否正在往磁盘写数据,默认是false
if(isSyncRunning){
//当前的元数据信息的编号就是:2
//4
//5
long txid = localTxid.get();
// 2 <= 3
//4 <= 3
//5 <= 3
if(txid <= syncMaxTxid){
return;
}
//
if(isWaitSync){
//直接返回
return;
}
//重新赋值
isWaitSync = true;
while(isSyncRunning){
try {
//线程4就会在这儿等待
//释放锁
/**
* 1) 时间到了
* 2)被唤醒了
*/
wait(2000);
}catch (Exception e){
e.printStackTrace();
}
}
//重新赋值
isWaitSync = false;
}
//交换内存,我是直接交换的内存,肯定是简单粗暴
//真正的源码里面是有判断。
//如果来不来就直接交换内存,频繁的交换内存,也是很影响性能的。
editLogBuffer.setReadyToSync();
if(editLogBuffer.currentBuffer.size() > 0) {
//获取当前 内存2(正在往磁盘上面写数据的那个内存)
//里面元数据日志日志编号最大的是多少
syncMaxTxid = editLogBuffer.getSyncMaxTxid();
}
//说明接下来就要往磁盘上面写元数据日志信息了。
isSyncRunning = true;
} //释放锁
//往磁盘上面写数据(这个操作是很耗费时间的)
/**
* 线程一 执行如下代码
* 在最耗费时间的这段代码上面是没有加锁
* 几毫秒,几十毫秒
*/
editLogBuffer.flush(); //这个地方写完了。
synchronized (this) {
//状态恢复
isSyncRunning = false;
//唤醒当前wait的线程。
notify();
}
}
/**
* 使用了面向对象的思想,把一条日志看成一个对象。
* 日志信息,或者就是我们说的元数据信息。
*/
class EditLog{ //
//日志的编号,递增,唯一。
long txid;
//日志内容
String content;//mkdir /data
//构造函数
public EditLog(long txid,String content){
this.txid = txid;
this.content = content;
}
//方便我们打印日志
@Override
public String toString() {
return "EditLog{" +
"txid=" + txid +
", content='" + content + '\'' +
'}';
}
}
/**
* 双缓冲算法
*/
class DoubleBuffer{
//内存1
LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
//内存2
LinkedList<EditLog> syncBuffer= new LinkedList<EditLog>();
/**
* 把数据写到当前内存
* @param log
*/
public void write(EditLog log){
currentBuffer.add(log);
}
/**
* 两个内存交换数据
*/
public void setReadyToSync(){
LinkedList<EditLog> tmp= currentBuffer;
currentBuffer = syncBuffer;
syncBuffer = tmp;
}
/**
* 获取当前正在刷磁盘的内存里的ID最大的值。
* @return
*/
public Long getSyncMaxTxid(){
return syncBuffer.getLast().txid;
}
/**
* 就是把数据涮写到磁盘上面
* 为了演示效果,所以我们把打印出来
*/
public void flush(){
for(EditLog log:syncBuffer){
System.out.println("存入磁盘日志信息:"+log);
}
//清空内存
syncBuffer.clear();
}
}
}