大数据 爬虫Python AI Sql大数据,机器学习,人工智能SpringCloud

我用代码模拟了mysql的mvcc版本控制

2022-03-30  本文已影响0人  pq217

事务隔离级别

innodb的事务隔离级别有四种

其中读未提交可串行化带来的问题都很大,一般很少使用,可重复读一般是首选且是默认,它们的实现主要使用了MVCC(Multi-Version Concurrency Control)机制,今天就以代码形式模拟mvcc,相信自己能写出来,就可以说完全理解了。

数据库

为了方便模拟,简化一下数据库的结构,假如我们的数据库只有一行数据,只有两个字段id和name,我们用一个对象current来存储这一行数据,就完成了一个极简数据库

private Data current = new Data (1l, "小明1"); // 给一个初始值,就不写insert了
class Data {
    private Long id; // id
    private String name; // 姓名
}
简化的数据库结构

事务

我们再根据常使用的innodb事务命令做一个抽象,此时的我们DB应该有的方法

接口

public interface TransactionalDB {
    String select();
    void update(String name);
    void beginTransactional();
    void commit();
    void rollback();
}

undo日志版本链

接下来实现这个DB,上面用一个对象就模拟了一条数据的存储,但实际上远没有这么简单,innodb会保留数据的变化历史版本(可以以此实现回滚),每个版本存储当时的值,记录版本的事务ID,并且指向上一版本,形成一个单向的版本链,当前数据也只是版本链的一个节点,这个版本链叫做undo日志版本链

undo日志版本链
也就是说当我们用Navicat等工具看到的只是绿色部分,实际内部存储的是整个黄色部分
那我们用代码来实现这个版本链
public class MyDB implements TransactionalDB {
    private Undo current = new Undo(1l, "小明1", 0, null); // 给一个初始值,就不写insert了
    @Data
    @AllArgsConstructor
    class Undo {
        private Long id; // id
        private String name; // 姓名
        private Integer trxId; // 事务ID
        private Undo rollPointer; // 指向上一个版本
    }
}

这段代码就实现了整个黄色部分的存储结构

beginTransactional

通过beginTransactional开启事务,简单做一个自增的事务ID,我们用ThreadLocal模拟存储(一个线程模拟一个事务)

private volatile Integer maxTid = 0; // 当前最大事务ID
private List<Integer> unTrxIds = new CopyOnWriteArrayList<>(); // 当前未提交事务ID数组
private ThreadLocal<Integer> trxIds = new ThreadLocal<>(); // 存储每个事务的ID
@Override
public synchronized void beginTransactional() {
    trxIds.set(++maxTid); // 生成事务ID
    unTrxIds.add(trxIds.get()); // 加入到未提交事务数组
}

其中unTrxIds存储了未提交的事务ID,这个一定要记录,否则读的时候无法判断版本是否已提交,会造成脏读,当后期commit或rollback时删除,这样就可以实时记录当前未提交事务有哪些

update

接下来实现update方法,update就是往undo日志的头部新增一个版本,并且用一个ReentrantLock模拟行锁

private Lock lock = new ReentrantLock(); // 模拟行锁
@Override
public void update(String value) {
    if (trxIds.get()==null) {
        throw new IllegalArgumentException();
    }
    lock.lock(); // 锁住行
    Undo undo = new Undo(1l, value, trxIds.get(), current); // 新生成一个版本
    current = undo; // 从链表头部插入
}

commit

commit要做的事刚才已经铺垫了,首先从未提交事务数组里移除当前事务,同时释放行锁

@Override
public void commit() {
    unTrxIds.remove(trxIds.get()); // 从未提交事务数组中移除
    try {
        lock.unlock(); // 释放行锁,因为select没有锁,所以加了个try
    } catch (Exception e) {}
    trxIds.remove(); // 无用删掉
}

rollback

略,有了undo日志,回滚还不简单吗

读已提交

这里比较重点,如果是读已提交,只要遍历undo版本链,并获取第一个不在unTrxIds(未提交事务ID数组)中的事务版本读出值即可实现读已提交

@Override
public String select() {
    if (trxIds.get()==null) {
        throw new IllegalArgumentException();
    }
    Undo e= current;
    do {
        if (unTrxIds.contains(e.getTrxId())) { // 在未提交数组中,不可见
            e = e.rollPointer; // 下一个
            continue;
        }
        return e.getName(); // 可见
    } while (e!=null);
    return null;
}

完整代码如下:

public class ReadCommit implements TransactionalDB {

    private Undo current = new Undo(1l, "小明1", 0, null); // 给一个初始值,就不写insert了

    private Lock lock = new ReentrantLock(); // 模拟行锁

    // 当前最大事务ID
    private volatile Integer maxTid = 0;

    private List<Integer> unTrxIds = new CopyOnWriteArrayList<>(); // 当前未提交事务ID数组

    // 存储每个事务的ID
    private ThreadLocal<Integer> trxIds = new ThreadLocal<>();

    @Data
    @AllArgsConstructor
    class Undo {
        private Long id; // id
        private String name; // 姓名
        private Integer trxId; // 事务ID
        private Undo rollPointer; // 指向上一个版本
    }

    @Override
    public String select() {
        if (trxIds.get()==null) {
            throw new IllegalArgumentException();
        }
        Undo e= current;
        do {
            if (unTrxIds.contains(e.getTrxId())) { // 在未提交数组中,不可见
                e = e.rollPointer; // 下一个
                continue;
            }
            return e.getName(); // 可见
        } while (e!=null);
        return null;
    }

    @Override
    public void update(String value) {
        if (trxIds.get()==null) {
            throw new IllegalArgumentException();
        }
        lock.lock(); // 锁住行
        Undo undo = new Undo(1l, value, trxIds.get(), current); // 新生成一个版本
        current = undo; // 从链表头部插入
    }

    @Override
    public synchronized void beginTransactional() {
        trxIds.set(++maxTid); // 生成事务ID
        unTrxIds.add(trxIds.get()); // 加入到未提交事务数组
    }

    @Override
    public void commit() {
        unTrxIds.remove(trxIds.get()); // 从未提交事务数组中移除
        try {
            lock.unlock(); // 释放行锁,因为select没有锁,所以加了个try
        } catch (Exception e) {}
        trxIds.remove(); // 无用删掉
    }

    @Override
    public void rollback() {
        // todo
    }
}

测试一下

public static void main(String[] args) throws Exception {
    ReadCommit db = new ReadCommit();
    new Thread(() -> { // 事务1
        try {
            db.beginTransactional();
            db.update("小明2");
            Thread.sleep(400);
            db.commit();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> { // 事务2
        try {
            db.beginTransactional();
            Thread.sleep(200);
            System.out.println(db.select()); // 小明1
            Thread.sleep(400);
            System.out.println(db.select()); // 小明2
            db.commit();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

执行时间轴如下


测试时间轴

最终输出:

小明1
小明2

实现了读已提交

readview和可重复读

上面实现了读已提交,但是有一个问题,上例事务2读了两次,得到的结果却不一样,这就是典型的不可重复读
如何在读已提交的基础上实现可重复读,mvcc的解法是这样的:
上面的select方法我们遍历undo版本链时判断了当前版本是否已提交,判断方法是判断版本事务ID是否在未提交事务数组里if (unTrxIds.contains(e.getTrxId()))之所以读到的结果不一样,是因为unTrxIds数组是实时变化的,那么为了让读到的结果一样,可以在事务开始时对unTrxIds做一份快照(copy),select改为判断快照中是否包含版本事务ID
通过快照,每次同一个事务每次select判断条件都一样,得到的结果也一定一样
但是有个问题,快照保存的unTrxIds是事务开始未提交的事务,那不包含在其中的一定已提交了吗?不一定,因为后续可能会出现新事务并且还未提交,因此快照除了保存unTrxIds的备份,还要存一份事务开始时最大事务ID,所有大于该事务ID的事务都是未来事务,所存储的版本都不可见
总结一下:可重复读模式下,所有事务开始时未提交的事务事务开始时未生成的事务都是不可见的
这个快照就是readview,包含了

代码模拟下结构

@Data
@AllArgsConstructor
class ReadView {
    List<Integer> unTrxIds; // 未提交的事务Id数组
    private Integer upLimitId; // 最大事务ID
}

因为每个事务都有一份自己的readview,所以还是用ThreadLocal模拟存储

private ThreadLocal<ReadView> readViews = new ThreadLocal<>();

刚才一直说事务开始时生成readview,实际上通过测试发现生成快照应该是第一次select时生成的,所以代码就是select时判断有没有,没有就生成,有就不生成。
同时为了兼容不可重复读也就是读已提交模式,加一个参数来设置是否可重复读(只要每次select都重新生成快照就是不可重复读)

private void createReadView() {
    if (readRepeat && readViews.get()!=null) { // 如果可重复读,且已创建,不需要创建
        return;
    }
    List<Integer> currentUnTrxIds = new ArrayList<>();
    currentUnTrxIds.addAll(unTrxIds); // 生成unTrxIds快照
    readViews.set(new ReadView(currentUnTrxIds, maxTid)); // unTrxIds和maxTid快照
}

select代码

@Override
public String select() {
    if (trxIds.get()==null) {
        throw new IllegalArgumentException();
    }
    // 创建readview
    createReadView();
    Undo e= current;
    ReadView readView = readViews.get(); // 获取事务快照
    do {
        if (e.getTrxId() > readView.getUpLimitId()) { // 大于readview最大事务,不可见
            e = e.rollPointer;
            continue;
        }
        if (readView.getUnTrxIds().contains(e.getTrxId())) { // 在readview未提交数组中,不可见
            e = e.rollPointer;
            continue;
        }
        return e.getName(); // 可见
    } while (e!=null);
    return null;
}

完整代码如下

public class MyDB implements TransactionalDB {

    private Undo current = new Undo(1l, "小明1", 0, null); // 给一个初始值,就不写insert了

    private Lock lock = new ReentrantLock(); // 模拟行锁

    // 当前最大事务ID
    private volatile Integer maxTid = 0;

    private List<Integer> unTrxIds = new CopyOnWriteArrayList<>(); // 当前未提交事务ID数组

    // 存储每个事务的ID
    private ThreadLocal<Integer> trxIds = new ThreadLocal<>();

    // 存储每个事务的readview
    private ThreadLocal<ReadView> readViews = new ThreadLocal<>();

    // 是否可重复读
    private Boolean readRepeat;

    public MyDB(Boolean readRepeat) {
        this.readRepeat = readRepeat;
    }

    @Data
    @AllArgsConstructor
    class Undo {
        private Long id; // id
        private String name; // 姓名
        private Integer trxId; // 事务ID
        private Undo rollPointer; // 指向上一个版本
    }

    @Data
    @AllArgsConstructor
    class ReadView {
        List<Integer> unTrxIds; // 未提交的事务Id数组
        private Integer upLimitId; // 最大事务ID
    }

    @Override
    public String select() {
        if (trxIds.get()==null) {
            throw new IllegalArgumentException();
        }
        // 创建readview
        createReadView();
        Undo e= current;
        ReadView readView = readViews.get(); // 获取事务快照
        do {
            if (e.getTrxId() > readView.getUpLimitId()) { // 大于readview最大事务,不可见
                e = e.rollPointer;
                continue;
            }
            if (readView.getUnTrxIds().contains(e.getTrxId())) { // 在readview未提交数组中,不可见
                e = e.rollPointer;
                continue;
            }
            return e.getName(); // 可见
        } while (e!=null);
        return null;
    }

    private void createReadView() {
        if (readRepeat && readViews.get()!=null) { // 如果可重复读,且已创建,不需要创建
            return;
        }
        List<Integer> currentUnTrxIds = new ArrayList<>();
        currentUnTrxIds.addAll(unTrxIds); // 生成unTrxIds快照
        readViews.set(new ReadView(currentUnTrxIds, maxTid)); // unTrxIds和maxTid快照
    }

    @Override
    public void update(String value) {
        if (trxIds.get()==null) {
            throw new IllegalArgumentException();
        }
        lock.lock(); // 锁住行
        Undo undo = new Undo(1l, value, trxIds.get(), current); // 新生成一个版本
        current = undo; // 从链表头部插入
    }

    @Override
    public synchronized void beginTransactional() {
        trxIds.set(++maxTid); // 生成事务ID
        unTrxIds.add(trxIds.get()); // 加入到未提交事务数组
    }

    @Override
    public void commit() {
        unTrxIds.remove(trxIds.get()); // 从未提交事务数组中移除
        try {
            lock.unlock(); // 释放行锁,因为select没有锁,所以加了个try
        } catch (Exception e) {}
        trxIds.remove(); // 无用删掉
        readViews.remove(); // 无用删掉
    }

    @Override
    public void rollback() {
    }
}

还是测试一下

public static void main(String[] args) throws Exception {
    MyDB db = new MyDB(true);
    new Thread(() -> { // 事务1
        try {
            Thread.sleep(1);
            db.beginTransactional();
            db.update("小明2");
            db.commit();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> { // 事务2
        try {
            db.beginTransactional();
            Thread.sleep(100);
            db.update("小明3");
            Thread.sleep(200);
            db.commit();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> { // 事务4
        try {
            Thread.sleep(400);
            db.beginTransactional();
            db.update("小明4");
            db.commit();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    new Thread(() -> { // 事务3
        try {
            db.beginTransactional();
            Thread.sleep(200);
            System.out.println(db.select()); // 小明2
            Thread.sleep(500);
            System.out.println(db.select()); // 小明2
            db.commit();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
}

时序图如下


时序图

到此就模拟出了MVCC的事务版本控制,over~

总结

这个真是很复杂,很难一句话做总结,大概捋一下核心思路

上一篇下一篇

猜你喜欢

热点阅读