Hadoop Namenode双缓冲写原理(demo)

2021-04-17  本文已影响0人  _Kantin

介绍

背景

代码

import java.util.LinkedList;

public class NNEditLog {
    //事务ID:每个刷新的日志都有唯一的id
    private long txid = 0L;
    private DoubleBuffer editLogBuffer = new DoubleBuffer();
    //是否正在刷写磁盘
    private volatile Boolean isSyncRunning = false;
    //是否正在等待同步
    private volatile Boolean isWaitSync = false;
    //正在等待同步中最大的线程ID
    private volatile Long syncMaxTxid = 0L;

    //每个线程都对应自己的一个副本
    private ThreadLocal<Long> localTxid = new ThreadLocal<Long>();

    public void logEdit(String content) {
        /**
         * 1.加锁的目的就是为了事务ID的唯一,而且是递增
         * 2.重要逻辑:若当前有线程 1,2,3,4,5进入到此方法
         *   此时1先获取到锁,调起了logSync()方法
         *   但是由于线程1 write(log)后就释放锁了(此过程很快)
         *   那么其实在logSync()执行之前,2,3,4,5添加进了editLogBuffer
         *   所以buffer 1 其实此时有 1.2.3.4.5 五个日志了
         *   由于线程1调用了同步,因此 buffer2中也有1.2.3.4.5,其中syncMaxTxid=5
         */
        synchronized (this) {
            txid++;
            localTxid.set(txid);
            EditLog log = new EditLog(txid, content);
            editLogBuffer.write(log);
        }
        logSync();
    }

    private void logSync() {
        synchronized (this) {
            //是否有人正在把数据同步到磁盘上面
            if (isSyncRunning) { 
                long txid = localTxid.get();
                /**
                 *  可能此时是线程2进来了,但此时syncMaxTxid是5
                 *  表示自己已经在同步list中了
                 */
                if (txid <= syncMaxTxid) {
                    return;
                }
                if (isWaitSync) {
                    return;
                }
                isWaitSync = true;

                while (isSyncRunning) {
                    try {
                        wait(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                isWaitSync = false;
            }
            editLogBuffer.setReadyToSync();
            //此处editLogBuffer.syncBuffer.size()不是1 
            if (editLogBuffer.syncBuffer.size() > 0) {
                syncMaxTxid = editLogBuffer.getSyncMaxTxid();
            }
            isSyncRunning = true;
        } 
        editLogBuffer.flush();
        synchronized (this) {
            isSyncRunning = false;
            //释放锁
            notify();
        }
    }

    class EditLog {
        long txid;
        String content;

        //构造函数
        public EditLog(long txid, String content) {
            this.txid = txid;
            this.content = content;
        }

        //为了测试方便
        @Override
        public String toString() {
            return "EditLog{" +
                    "txid=" + txid +
                    ", content='" + content + '\'' +
                    '}';
        }
    }


    /**
     * 双缓存方案
     */
    class DoubleBuffer {
        //内存1
        LinkedList<EditLog> currentBuffer = new LinkedList<EditLog>();
        //内存2
        LinkedList<EditLog> syncBuffer = new LinkedList<EditLog>();
        
        /**
         * 把数据写到当前内存1
         * @param log
         */
        public void write(EditLog log) {
            currentBuffer.add(log);
        }

        /**
         * 交换内存
         */
        public void setReadyToSync() {
            LinkedList<EditLog> tmp = currentBuffer;
            currentBuffer = syncBuffer;
            syncBuffer = tmp;
        }

        /**
         * 获取内存2里面的日志的最大的事务编号
         * linklist是有序的,获取最后一个的id即为最大
         */
        public Long getSyncMaxTxid() {
            return syncBuffer.getLast().txid;
        }
        
        //刷写磁盘
        public void flush() {
            for (EditLog log : syncBuffer) {
                //把数据写到磁盘上
                System.out.println("存入磁盘日志信息:" + log);
            }
            //把内存2里面的数据要清空
            syncBuffer.clear();
        }
    }

    public static void main(String[] args) {
        NNEditLog fs=new NNEditLog();
        for (int i = 0; i < 1000; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    for (int j = 0; j < 100; j++) {
                        fs.logEdit("namenode edit log ");
                    }
                }
            }).start();
        }

    }
}
上一篇 下一篇

猜你喜欢

热点阅读