十七 RocksDB 作为存储后端

2021-10-25  本文已影响0人  NazgulSun

rocksDB 在huagegraph 中应该有很重要的定位:

在现有0.11的版本中,我们看到了hugegraph做了优化,之前0.10版本中, 一个节点存储,可能需要存储多个 key-value,[包含属性,以Id作为前缀],使得查询和存储的负担都变大, 目前已经更改为 一个节点, id -> properties 的序列化; 与 nebula 存储的方式基本一致;
因为,使用的是 纯粹KV,没有 table的概念, 在实现图的model的时候,使用的columFamily 来作为隔离的; hugegraph 依旧是 使用 g_v, g_oe, g_ie 这样的表来分割数据;

    public static class IndexLabel extends SchemaTable {
        public static final String TABLE = HugeType.INDEX_LABEL.string();
        public IndexLabel(String database) {
            super(database, TABLE);
        }
    }
    public static class Vertex extends RocksDBTable {
        public static final String TABLE = HugeType.VERTEX.string();
        public Vertex(String database) {
            super(database, TABLE);
        }
        @Override
        protected BackendColumnIterator queryById(Session session, Id id) {
            return this.getById(session, id);
        }
    }

不过使用 rocksdb 做后端,运维是个大问题,貌似没有什么成熟的管理工具;只能靠图的工具。

RocksDb 的目录,m/s/g 开头,在cassandra 等中,这个作为 table的命名的前缀;rocksdb作为 目录的前缀;

    public static final ConfigOption<String> STORE_SYSTEM =
            new ConfigOption<>(
                    "store.system",
                    "The system table name, which store system data.",
                    disallowEmpty(),
                    "s"
            );

    public static final ConfigOption<String> STORE_SCHEMA =
            new ConfigOption<>(
                    "store.schema",
                    "The schema table name, which store meta data.",
                    disallowEmpty(),
                    "m"
            );

    public static final ConfigOption<String> STORE_GRAPH =
            new ConfigOption<>(
                    "store.graph",
                    "The graph table name, which store vertex, edge and property.",
                    disallowEmpty(),
                    "g"
            );

因为 rocks本身没有keyspace的概念,所以每个 graph ,都会加到 columnFamily的前缀上;

ETL 的高性能尝试

之前nebula 对比 导入性能的时候,对比的是 rocksdb vs cassandra, 并且 还是使用 ingest sst file 这样的方式, 那么显而易见的是,nebula 离线导入性能会吊打 其他的数据库的;

我们知道rocksdb 插入数据是 memtable 到sstfile, 同时提供API 支持用户 自己生成 sstfile,然后ingest到rocksdb中,这样的数据就是飞快的, 我们给出以一个例子, 通过sstfile 插入数据,但是需要保证 sstFile 中的key 是严格递增的; rocksdb engine,会自动把 现有的文件, 和 ingestFile 进行merge;

   public static void main(String[] args) throws RocksDBException {
        RocksDB.loadLibrary();
        final Random random = new Random();
        final EnvOptions envOptions = new EnvOptions();
        final StringAppendOperator stringAppendOperator = new StringAppendOperator();
        Options options1 = new Options();
        SstFileWriter fw = null;
        ComparatorOptions comparatorOptions = new ComparatorOptions();

        try {

            options1 = options1
                    .setCreateIfMissing(true)
                    .setEnv(Env.getDefault())
                    .setComparator(new BytewiseComparator(comparatorOptions));

            fw = new SstFileWriter(envOptions, options1);

            fw.open("./sst_upload_01");
            Map<String, String> data = new HashMap<>();
            for (int index = 0; index < 1000; index++) {
                data.put("Key-" + random.nextLong(), "Value-" + random.nextDouble());
            }
            List<String> keys = new ArrayList<String>(data.keySet());
            Collections.sort(keys);
            for (String key : keys) {
                Slice keySlice = new Slice(key);
                Slice valueSlice = new Slice(data.get(key));
                fw.put(keySlice, valueSlice);
            }

            fw.finish();
        } catch (RocksDBException ex) {
            ex.printStackTrace();
        } finally {
            stringAppendOperator.close();
            envOptions.close();
            options1.close();
            if (fw != null) {
                fw.close();
            }
        }

        try {
            final Options options = new Options().setCreateIfMissing(true);
            final String rocksDirectory = "./rocksdb";
            final Path path = Paths.get(rocksDirectory);
            RocksDB rocksDB = RocksDB.open(options, path.toString());

            List<String> sst = new ArrayList<>();
            sst.add("./sst_upload_01");

            IngestExternalFileOptions igOptions = new IngestExternalFileOptions();
            igOptions.setMoveFiles(true);
            rocksDB.ingestExternalFile(sst, igOptions);

            final RocksIterator iterator = rocksDB.newIterator();
            for (iterator.seekToFirst(); iterator.isValid(); iterator.next()) {
                System.out.println(new String(iterator.key()) +"\t" + new String(iterator.value()));
            }
            } catch (final Exception exception) {
                System.exit(1);
            }

    }

通过 SST file 导入数据到 hugegraph

    public static void main(String[] args) throws Exception {
        RocksDB.loadLibrary();

        final Random random = new Random();

        final EnvOptions envOptions = new EnvOptions();
        final StringAppendOperator stringAppendOperator = new StringAppendOperator();
        Options options1 = new Options();
        SstFileWriter fw = null;
        ComparatorOptions comparatorOptions = new ComparatorOptions();

        options1 = options1
                .setCreateIfMissing(true)
                .setEnv(Env.getDefault())
                .setComparator(new BytewiseComparator(comparatorOptions));

        fw = new SstFileWriter(envOptions, options1);
        fw.open(targetDir);
        List<String> lines = FileUtils.readLines(new File(companyFile));


        /// hugegraph 序列化的时候,id 第一个字符串为 id 的长度;所以在排序的时候,短ID需要在前面;
        BinarySerializer ser = new BinarySerializer();
        for(String line: lines){
            BackendEntry entry = ser.writeVertex(parse(line));
            BackendEntry.BackendColumn column = entry.columns().iterator().next();
            fw.put(column.name, column.value);
        }

        fw.finish();



    }

    static HugeGraph graph = Mockito.mock(HugeGraph.class);

    static HugeVertex parse(String line){
        String id = line.split("@")[0];
        String name = line.split("@")[1];
        String eid = line.split("@")[2];

        PropertyKey pname = newPropertyKey(IdGenerator.of(1), "name");
        PropertyKey peid = newPropertyKey(IdGenerator.of(2), "eid");
        VertexLabel vl = new VertexLabel(graph,IdGenerator.of(1),"Company");
        vl.idStrategy(IdStrategy.CUSTOMIZE_STRING);
        vl.properties(pname.id(),peid.id());

        HugeVertex vertex = new HugeVertex(graph, IdGenerator.of(id),vl);
        vertex.addProperty(pname,name);
        vertex.addProperty(peid,eid);

        return vertex;
    }


    public static PropertyKey newPropertyKey(Id id, String name) {
        return newPropertyKey(id, name, DataType.TEXT, Cardinality.SINGLE);
    }

    public static PropertyKey newPropertyKey(Id id, String name,
                                             DataType dataType) {
        return newPropertyKey(id, name, dataType, Cardinality.SINGLE);
    }

    public static PropertyKey newPropertyKey(Id id, String name,
                                             DataType dataType,
                                             Cardinality cardinality) {
        PropertyKey schema = new PropertyKey(graph, id, name);
        schema.dataType(dataType);
        schema.cardinality(cardinality);
        return schema;
    }

与cassandra 导入性能测试

同一份数据集,3000万节点,5kw边,导入 cassandra/rocksdb 后端,单机;
rocksdb: 1453s,平均 5万/s。
rocksdb: 3个节点集群模式,5546s;
cassandra:4808,3被多的差距,1.6万/s的情况
3跳查询:
g.V().has('ticker','中国').out().out().out().profile()
rocksdb: 1800 ms
cassandra: 24000ms; 估计有10的性能差距;
如果使用中信证券,3跳, rocksdb 24s,cassandra120s;性能差距还是很大;

上一篇 下一篇

猜你喜欢

热点阅读