第一届POLARDB数据库性能大赛初赛总结

2018-11-20  本文已影响287人  张张张大炮

之前六月份的时候有参加过阿里举办的第四届中间件性能大赛,学到了不少东西,所以之后经常会关注一下天池那边阿里举办的程序设计大赛,九月底的时候注意到了这一届的POLARDB数据库性能大赛,很早就报了名。预热赛10月25日开始了~,初赛11月5日正式开始,11月19日结束,我这篇文章发布的时候初赛就已经结束了。因为11月之前一直在找实习,所以一直没有做什么准备,11月10号左右开始编写第一版的代码,到11月18号晚上放弃继续尝试。最后初赛成绩是42名,时间是240.69秒,和大佬们比起来还是有很大差距的。写这篇博客,主要还是想分享一下这段时间参赛的思路,和一点一点慢慢提升的经历。
GitHub: https://github.com/AlexZFX/engine 当前只️更新了初赛代码,复赛结束后会继续更新。

题目介绍

PolarDB作为软硬件结合的代表, 充分使用新硬件, 榨干硬件的红利来为用户获取极致的数据性能, 其中在PolarDB 的设计中, 我们使用 Optane SSD作为所有热数据的写入缓冲区, 通过kernel bypass的方式, 实现了极致的性能。所以本次比赛就以Optane SSD盘为背景,参赛者在其基础之上探索实现一种高效的kv存储引擎

以上是阿里云官方给的比赛背景,具体的题目内容如下
初赛赛题(完整请点击查看): 实现一个简化、高效的kv存储引擎,支持Write、Read接口。

程序评测逻辑 评测程序分为2个阶段:
1)Recover正确性评测 此阶段评测程序会并发写入特定数据(key 8B、value
4KB)同时进行任意次kill
-9来模拟进程意外退出(参赛引擎需要保证进程意外退出时数据持久化不丢失),接着重新打开DB,调用Read接口来进行正确性校验

2)性能评测

  • 随机写入:64个线程并发随机写入,每个线程使用Write各写100万次随机数据(key 8B、value 4KB)
  • 随机读取:64个线程并发随机读取,每个线程各使用Read读取100万次随机数据 注:2.2阶段会对所有读取的kv校验是否匹配,如不通过则终止,评测失败

总体说来我们能得到的要求和信息为以下几点:

  1. 实现一个KV型数据库的核心逻辑,主要为open、read、write三个接口。
  2. 支持多线程并发读写。
  3. 保证在方法成功返回的情况下数据不丢失(kill保证不丢失的前提是已经正确返回了,如果没有的话是不算做丢失的)。
  4. key和value的长度是确定的 key为8B,value为4KB。
  5. 可以使用Java语言或者C++,Java内存限制3G,C++限制2G。
  6. 磁盘占用不超过 320G

编写过程

完整的参赛过程大概是一周时间,这一周进行了非常多的尝试,成绩也从第一次跑通时的900多s到最后稳定在240s,接下来会细细的说一说每一版的思路和进阶过程。(下面的标题写的key value分别表示采用的文件数)

大体思路

先做一些简单的计算,
key + offset = ( 8 + 8 ) * 64000000 / 1024 / 1024 = 977M
value = 4096 * 64000000 / 1024 / 1024 / 1024 = 245G
可见磁盘和内存的限制相对来说不会造成很大的影响,对合理的设计来说还是充足的。

因为key是一个8B的byte数组,故转化成一个long型的数字很简单并且非常有利于接下来计算的事情。所以下文讨论的key都是建立在long型的基础上的。
初始主体的思路是这样的

第一版 FileChannel读写 1 key + 128 value 381.79s

首先想的是要跑出成绩,把所有的key都写在了一个文件里,一开始忽略了一个小点,把key和offset分开写入了文件,导致出现了一些key和value不匹配的问题。很显然的问题是写key和写offset会出现线程问题,可能导致本来应该是KeyValueKeyValue形式的数据,被写成KeyKeyValueValue的形式,所以出错之后直接加了个synchronized关键字,得出第一次的成绩968s,很快修改了这个简单的小问题,得到一个明显有大幅提升的成绩381.79s,这时的代码主要是这样的。

    @Override
    public void open(String path) throws EngineException {
        File file = new File(path);
        // 创建目录
        if (!file.exists()) {
            if (!file.mkdir()) {
                throw new EngineException(RetCodeEnum.IO_ERROR, "创建文件目录失败:" + path);
            } else {
                logger.info("创建文件目录成功:" + path);
            }
        }

        //创建 FILE_COUNT个FileChannel 顺序写入
        RandomAccessFile randomAccessFile;
        if (file.isDirectory()) {
            for (int i = 0; i < FILE_COUNT; i++) {
                try {
                    randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");

                    FileChannel channel = randomAccessFile.getChannel();
                    fileChannels[i] = channel;
                    // 从 length处直接写入
                    offsets[i] = new AtomicLong(randomAccessFile.length());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } else {
            throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一个目录");
        }
        File keyFile = new File(path + File.separator + "key");
        if (!keyFile.exists()) {
            try {
                keyFile.createNewFile();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        // 从 index 文件建立 hashmap
        try {
            randomAccessFile = new RandomAccessFile(keyFile, "rw");
            keyFileChannel = randomAccessFile.getChannel();

            ByteBuffer keyBuffer = ByteBuffer.allocate(KEY_LEN);
            ByteBuffer offBuffer = ByteBuffer.allocate(KEY_LEN);
            keyFileOffset = new AtomicLong(randomAccessFile.length());
            long temp = 0, maxOff = keyFileOffset.get();
            while (temp < maxOff) {
                keyBuffer.position(0);
                keyFileChannel.read(keyBuffer, temp);
                temp += KEY_LEN;
                offBuffer.position(0);
                keyFileChannel.read(offBuffer, temp);
                temp += KEY_LEN;
                keyBuffer.position(0);
                offBuffer.position(0);
                keyMap.put(keyBuffer.getLong(), offBuffer.getLong());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(byte[] key, byte[] value) throws EngineException {
        long numkey = Util.bytes2long(key);
        int hash = hash(numkey);
        long off = offsets[hash].getAndAdd(VALUE_LEN);
        keyMap.put(numkey, off + 1);
        try {
            //key和offset写入文件
            localKey.get().putLong(0, numkey).putLong(8, off + 1);
            localKey.get().position(0);
            keyFileChannel.write(localKey.get(), keyFileOffset.getAndAdd(KEY_AND_OFF_LEN));
            //将value写入buffer
            localBufferValue.get().position(0);
            localBufferValue.get().put(value, 0, VALUE_LEN);
            //buffer写入文件
            localBufferValue.get().position(0);
            fileChannels[hash].write(localBufferValue.get(), off);
        } catch (IOException e) {
            throw new EngineException(RetCodeEnum.IO_ERROR, "写入数据出错");
        }
    }


    @Override
    public byte[] read(byte[] key) throws EngineException {
        long numkey = Util.bytes2long(key);
        int hash = hash(numkey);
        // key 不存在会返回0,避免跟位置0混淆,off写加一,读减一
        long off = keyMap.get(numkey);
        if (off == 0) {
            throw new EngineException(RetCodeEnum.NOT_FOUND, numkey + "不存在");
        }
        try {
            localBufferValue.get().position(0);
            fileChannels[hash].read(localBufferValue.get(), off - 1);
        } catch (IOException e) {
            throw new EngineException(RetCodeEnum.IO_ERROR, "读取数据出错");
        }
        localBufferValue.get().position(0);
        localBufferValue.get().get(localByteValue.get(), 0, VALUE_LEN);
        return localByteValue.get();
    }

这个时候open的时间将近90s,很显然是一个超出可承受范围的结果。所以接下来很快对这一部分进行了优化。

第二版 FileChannel 64线程open 64 key 128 value 260.96s

open时间过长,所以这成了我们关注的一个重点,这段时间我们做了很多改动,改动的过程主要是这样的。

  1. 单key文件,单个map,将完整的offset分为64份读取 无成绩
    这一做法其实还没跑出成绩就被我们过渡掉了,因为本地进行的测试一直过不去,我们第一时间想到的原因是因为,单个key文件并发初始化的时候,后面出现的相等的key不一定会把前面的key覆盖掉,所以会出现值不对的状况。所以解决方案只能是所有相同的key必须要严格有序的读取。
  2. 64个key文件,单个map 301.49s
    因为上面所述的原因,所以选择对key也进行一次hash,按照hash的结果将key划分在了64个不同的key文件中,这样的结果是相同的key一定会在相同文件中按照先后顺序被写入,故读取的时候一定是严格有序的。
    这个版本本地的小量测试也通过了,以为没有问题,但线上失败,这时我们才开始关注hppc的map本身的线程安全性,给map的put加锁后提交,果然通过了,得分301.49s。
    简单看了一下源码,显然是线程不安全的,所以促使我们接下来的一次分map改动。
  3. 64个key文件,64个map 260.96s
    于是我们进行了一次map的拆分,根据key的文件个数直接拆为了64个hashmap,差别是这样拆分让我们的map容量无法确定,简单线上用log测试了一下之后完成了64map的版本。并发问题解决好之后,这一版本的分数又有了不少提升,260.96s。这时候的open已经被压到了10s以内,但其实还是有提升的空间。

这版的主要改动在open地方,下面贴出了这版的open方法。

    @Override
    public void open(String path) throws EngineException {
        File file = new File(path);
        // 创建目录
        if (!file.exists()) {
            if (!file.mkdir()) {
                throw new EngineException(RetCodeEnum.IO_ERROR, "创建文件目录失败:" + path);
            } else {
                logger.info("创建文件目录成功:" + path);
            }
        }
        RandomAccessFile randomAccessFile;
        // file是一个目录时进行接下来的操作
        if (file.isDirectory()) {
            try {
                //先构建keyFileChannel 和 初始化 map
                for (int i = 0; i < THREAD_NUM; i++) {
                    randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");
                    FileChannel channel = randomAccessFile.getChannel();
                    keyFileChannels[i] = channel;
                    keyOffsets[i] = new AtomicInteger((int) randomAccessFile.length());
                }
                ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
                CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
                for (int i = 0; i < THREAD_NUM; i++) {
                    if (!(keyOffsets[i].get() == 0)) {
                        final long off = keyOffsets[i].get();
                        final int finalI = i;
                        executor.execute(() -> {
                            int start = 0;
                            long key;
                            int keyHash;
                            while (start < off) {
                                try {
                                    localKey.get().position(0);
                                    keyFileChannels[finalI].read(localKey.get(), start);
                                    start += KEY_AND_OFF_LEN;
                                    localKey.get().position(0);
                                    key = localKey.get().getLong();
                                    keyHash = keyFileHash(key);
                                    keyMap[keyHash].put(key, localKey.get().getInt());
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                            countDownLatch.countDown();
                        });
                    } else {
                        countDownLatch.countDown();
                    }
                }
                countDownLatch.await();
                executor.shutdownNow();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
            //创建 FILE_COUNT个FileChannel 供write顺序写入
            for (int i = 0; i < FILE_COUNT; i++) {
                try {
                    randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");
                    FileChannel channel = randomAccessFile.getChannel();
                    fileChannels[i] = channel;
                    // 从 length处直接写入
                    valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } else {
            throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一个目录");
        }
    }

这一部分里我们还做了一些小事

第三版 用mmap读open,64key 64value 245.18s

之前一直考虑着用mmap,在java里面对应的就是MappedByteBuffer,因为我不确定mmap能不能在kill -9 被杀进程的情况保证数据的完整性,同时,如果都用mmap写入的话,会让我无法确定文件的大小(mmap映射时要预先指定文件大小),无法在kill之后能从指定的位置追加写入。所以打算一步一步,最后再考虑使用这个。

但是open的时候使用mmap读一定是没有风险的,所以又进行了一次对open的改动,这时还是64个key文件和128个value文件,得到的跑分是248.58,open过程被压缩在了1s以内,大约600ms左右,这个open速度我们就基本已经满足了。

后来改成了64个value文件,每次只进行一次hash就可以确定key和value文件的位置,并且读写速度似乎都有略微进步,达到了245.18s。

这时的open代码如下

@Override
    public void open(String path) throws EngineException {
        File file = new File(path);
        // 创建目录
        if (!file.exists()) {
            if (!file.mkdir()) {
                throw new EngineException(RetCodeEnum.IO_ERROR, "创建文件目录失败:" + path);
            } else {
                logger.info("创建文件目录成功:" + path);
            }
        }
        RandomAccessFile randomAccessFile;
        // file是一个目录时进行接下来的操作
        if (file.isDirectory()) {
            try {
                //先构建keyFileChannel 和 初始化 map
                for (int i = 0; i < THREAD_NUM; i++) {
                    randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");
                    FileChannel channel = randomAccessFile.getChannel();
                    keyFileChannels[i] = channel;
                    keyOffsets[i] = new AtomicInteger((int) randomAccessFile.length());
                }
                ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUM);
                CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
                for (int i = 0; i < THREAD_NUM; i++) {
                    if (!(keyOffsets[i].get() == 0)) {
                        final long off = keyOffsets[i].get();
                        final int finalI = i;
                        executor.execute(() -> {
                            int start = 0;
                            try {
                                MappedByteBuffer mappedByteBuffer = keyFileChannels[finalI].map(FileChannel.MapMode.READ_ONLY, 0, off);
                                while (start < off) {
                                    start += KEY_AND_OFF_LEN;
                                    keyMap[finalI].put(mappedByteBuffer.getLong(), mappedByteBuffer.getInt());
                                }
                                unmap(mappedByteBuffer);
                                countDownLatch.countDown();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        });
                    } else {
                        countDownLatch.countDown();
                    }
                }
                countDownLatch.await();
                executor.shutdownNow();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
            //创建 FILE_COUNT个FileChannel 供write顺序写入
            for (int i = 0; i < FILE_COUNT; i++) {
                try {
                    randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");
                    FileChannel channel = randomAccessFile.getChannel();
                    fileChannels[i] = channel;
                    // 从 length处直接写入
                    valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        } else {
            throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一个目录");
        }
    }

这一版当中我们也发现了一些问题,阅读了许多文章,总结主要如下:

第四版 mmap读写key,FileChannel读写value,64 + 64 240.69s

有第三版最后发现的内容,我们打算再对key的写入做一些改动,也就是将fileChannel写入key的方式改动为mmap写入。而mmap映射的文件大小选择一个稍大的值,open之后的写入offset通过value文件的大小来确定(valuelen / 4096 * 12),这一优化带来的大约2~3s的提升。

除此之外,还进行了简单的jvm调优工作,将新生代和老年代的比例进行了调整,将原来1:1的比例调整为了6:1,这部分优化带来了大约2s的性能提升。

最后完整的代码这一块我就直接贴在下面,对整个过程有兴趣的也可以去我的github上clone下来查看。

package com.alibabacloud.polar_race.engine.common;

import com.alibabacloud.polar_race.engine.common.exceptions.EngineException;
import com.alibabacloud.polar_race.engine.common.exceptions.RetCodeEnum;
import com.carrotsearch.hppc.LongIntHashMap;
import io.netty.util.concurrent.FastThreadLocal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class EngineRace extends AbstractEngine {

    private static Logger logger = LoggerFactory.getLogger(EngineRace.class);
    // key+offset 长度 16B
    private static final int KEY_AND_OFF_LEN = 12;
    // 线程数量
    private static final int THREAD_NUM = 64;
    // value 长度 4K
    private static final int VALUE_LEN = 4096;
    //每个map存储的key数量
    private static final int PER_MAP_COUNT = 1024000;

    private static final int SHIFT_NUM = 12;
    //    存放 value 的文件数量 128
    private static final int FILE_COUNT = 64;

    private static final int HASH_VALUE = 0x3F;

    private static final LongIntHashMap[] keyMap = new LongIntHashMap[THREAD_NUM];

    static {
        for (int i = 0; i < THREAD_NUM; i++) {
            keyMap[i] = new LongIntHashMap(PER_MAP_COUNT, 0.98);
        }
    }

    //key 文件的fileChannel
    private static FileChannel[] keyFileChannels = new FileChannel[THREAD_NUM];

    private static AtomicInteger[] keyOffsets = new AtomicInteger[THREAD_NUM];

    private static MappedByteBuffer[] keyMappedByteBuffers = new MappedByteBuffer[THREAD_NUM];

    //value 文件的fileChannel
    private static FileChannel[] fileChannels = new FileChannel[FILE_COUNT];

    private static AtomicInteger[] valueOffsets = new AtomicInteger[FILE_COUNT];

    private static FastThreadLocal<ByteBuffer> localBufferValue = new FastThreadLocal<ByteBuffer>() {
        @Override
        protected ByteBuffer initialValue() throws Exception {
            return ByteBuffer.allocate(VALUE_LEN);
        }
    };

    @Override
    public void open(String path) throws EngineException {
        File file = new File(path);
        // 创建目录
        if (!file.exists()) {
            if (!file.mkdir()) {
                throw new EngineException(RetCodeEnum.IO_ERROR, "创建文件目录失败:" + path);
            } else {
                logger.info("创建文件目录成功:" + path);
            }
        }
        RandomAccessFile randomAccessFile;
        // file是一个目录时进行接下来的操作
        if (file.isDirectory()) {
            try {
                //先 创建 FILE_COUNT个FileChannel 供write顺序写入,并由此文件获取value文件的大小
                for (int i = 0; i < FILE_COUNT; i++) {
                    try {
                        randomAccessFile = new RandomAccessFile(path + File.separator + i + ".data", "rw");
                        FileChannel channel = randomAccessFile.getChannel();
                        fileChannels[i] = channel;
                        // 从 length处直接写入
                        valueOffsets[i] = new AtomicInteger((int) (randomAccessFile.length() >>> SHIFT_NUM));
                        keyOffsets[i] = new AtomicInteger(valueOffsets[i].get() * KEY_AND_OFF_LEN);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                //先构建keyFileChannel 和 初始化 map
                for (int i = 0; i < THREAD_NUM; i++) {
                    randomAccessFile = new RandomAccessFile(path + File.separator + i + ".key", "rw");
                    FileChannel channel = randomAccessFile.getChannel();
                    keyFileChannels[i] = channel;
                    keyMappedByteBuffers[i] = channel.map(FileChannel.MapMode.READ_WRITE, 0, PER_MAP_COUNT * 20);
                }
                CountDownLatch countDownLatch = new CountDownLatch(THREAD_NUM);
                for (int i = 0; i < THREAD_NUM; i++) {
                    if (!(keyOffsets[i].get() == 0)) {
                        final long off = keyOffsets[i].get();
                        final int finalI = i;
                        final MappedByteBuffer buffer = keyMappedByteBuffers[i];
                        new Thread(() -> {
                            int start = 0;
                            while (start < off) {
                                start += KEY_AND_OFF_LEN;
                                keyMap[finalI].put(buffer.getLong(), buffer.getInt());
                            }
                            countDownLatch.countDown();
                        }).start();
                    } else {
                        countDownLatch.countDown();
                    }
                }
                countDownLatch.await();
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            throw new EngineException(RetCodeEnum.IO_ERROR, "path不是一个目录");
        }
    }

    @Override
    public void write(byte[] key, byte[] value) throws EngineException {
        long numkey = Util.bytes2long(key);
        int hash = valueFileHash(numkey);
        int off = valueOffsets[hash].getAndIncrement();
        try {
            ByteBuffer keyBuffer = keyMappedByteBuffers[hash].slice();
            keyBuffer.position(keyOffsets[hash].getAndAdd(KEY_AND_OFF_LEN));
            keyBuffer.putLong(numkey).putInt(off);
            //将value写入buffer
            ByteBuffer valueBuffer = localBufferValue.get();
            valueBuffer.clear();
            valueBuffer.put(value);
            valueBuffer.flip();
            fileChannels[hash].write(valueBuffer, ((long) off) << SHIFT_NUM);
        } catch (IOException e) {
            throw new EngineException(RetCodeEnum.IO_ERROR, "写入数据出错");
        }
    }


    @Override
    public byte[] read(byte[] key) throws EngineException {
        long numkey = Util.bytes2long(key);
        int hash = valueFileHash(numkey);
        long off = keyMap[hash].getOrDefault(numkey, -1);
        ByteBuffer buffer = localBufferValue.get();
        if (off == -1) {
            throw new EngineException(RetCodeEnum.NOT_FOUND, numkey + "不存在");
        }
        try {
            buffer.clear();
            fileChannels[hash].read(buffer, off << SHIFT_NUM);
        } catch (IOException e) {
            throw new EngineException(RetCodeEnum.IO_ERROR, "读取数据出错");
        }
        return buffer.array();
    }

    @Override
    public void range(byte[] lower, byte[] upper, AbstractVisitor visitor) throws EngineException {
    }

    @Override
    public void close() {
        for (int i = 0; i < FILE_COUNT; i++) {
            try {
                keyFileChannels[i].close();
                fileChannels[i].close();
            } catch (IOException e) {
                logger.error("close error");
            }
        }
    }

    private static int valueFileHash(long key) {
        return (int) (key & HASH_VALUE);
    }
}

这一版写的代码和之前有点不同如下:

总结

初赛刚写的时候有一点中间件性能大赛复赛类似的地方,不过相比来说还是多学会了许多知识。我其实也尝试了利用unsafe来实现内存拷贝的一部分,但是似乎并没有起到一个好的效果,感觉主要还是我的使用姿势有些不正确,我把这一部分的有关代码放在了github的unsafe分支中,有兴趣也可以简单查看一下。

正在进行的是复赛,相比初赛来说增加了一个全量顺序遍历的需求,难度更大,也更有意思了,感觉复赛更考的是一部分设计方面的东西了,接下来还是会使用Java继续参加,如果有所收获的话,还会再写一篇博客进行相应的总结。

上一篇 下一篇

猜你喜欢

热点阅读