二十 Gremlin 如何并发执行

2021-11-08  本文已影响0人  NazgulSun

Gremlin 多跳的查询效率问题

从上一篇我们知道,gremlin 的执行引擎,是一个 dfs 模式,对于
g.V().out().out() 这样的查询, 会从 图中开始节点, A,B,C
依次调用每个节点的邻居矩阵, getAdj(A|B|C), 获得level1 之后,然后再依次调用, 这样遍历的效率非特别慢;
之前在 paypal 上看到 他们的工程师说道对gremlin 的执行引擎进行了并行化的优化;我之前一直想不通,直到从上一篇梳理了执行模式之后, 目前我想到优化的tip就是在 processNextStart 的时候,做并发;

以 hugegVertexStep 为例

hugeVertexStep 是hugegraph 获取节点的邻居节点的核心类, 关键方法在flatMap, 也就是给定一个节点,会 返回 他的邻居节点; Vertexstep继承自 flatMapStep:

    @Override
    protected Traverser.Admin<E> processNextStart() {
        while (true) {
            if (this.iterator.hasNext()) {
                return this.head.split(this.iterator.next(), this);
            } else {
                closeIterator();
                this.head = this.starts.next();
                this.iterator = this.flatMap(this.head);
            }
        }
    }

他的实现方式就是 start 节点 获取 邻居节点,然后一个个的返回给下一层; 每次调用flatMap, hugegVertex 就是对 存储层的一次访问;
那么这里我们会自然而然的想到一种并发的查询;

第一种思路,就是先收集所有start 节点,然后并发的去查询邻居节点;

    private Traverser.Admin<Vertex> head = null;
    private Iterator<E> iterator = EmptyIterator.instance();
    private ExecutorService executorService = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors() *2);
    //可以并发执行
    @Override
    protected Traverser.Admin<E> processNextStart() {

        while (true) {
            if (this.iterator.hasNext()) {
                return this.head.split(this.iterator.next(), this);
            } else {
                closeIterator();
                // 这里可以并发执行,也可以用inList
                this.head = this.starts.next();
                this.iterator = this.flatMap(this.head);
                List<Future<Iterator<E>>> futures = Lists.newArrayList();
                while(this.starts.hasNext()){
                    Traverser.Admin<Vertex> start = this.starts.next();
                    Future<Iterator<E>> its = executorService.submit(()->{
                        Iterator<E>  end = this.flatMap(start);
                        return end;
                    });
                    futures.add(its);
                }
                try {
                    for(Future<Iterator<E>> its: futures) {
                        this.iterator = Iterators.concat(iterator, its.get());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }

            }
        }
    }

第二种思路,就是batch模式,依赖与底层的存储层;
也是收集了所有的 开始节点, 我们能不能提供一个 getAdjByBatchStarts(List starts) , 也就是一次拿出来所有的邻居节点,
如果底层的数据结构支持,类似与 id in list 这种功能的化,也是一个很好的优化点,效果可能比多线程并发要好。

但是目前来看,hugegraph,对于 in List 的实现,都是flattern,我估计是为了适配不同的底层存储,如果我们不flattern效果应该不错;
尝试: g.V().has('ticker','000002').out().out().out().out().profile()
1, 直接 one by one 的模式, 11s 左右;
2, inList, 6-7s 左右, 随着 out 展开的数据越多,这个差距会越来越明显; 数据量很少的时候,差距不是很大;

测试需要修改的代码:

HugeVertextStep


    private Traverser.Admin<Vertex> head = null;
    private Iterator<E> iterator = EmptyIterator.instance();
    private Set<Id> sourceIds = Sets.newHashSet();
    //可以并发执行
    @Override
    protected Traverser.Admin<E> processNextStart() {

        while (true) {
            if (this.iterator.hasNext()) {
                return this.head.split(this.iterator.next(), this);
            } else {
                closeIterator();
                // 这里可以并发执行,也可以用inList
                this.head = this.starts.next();
                sourceIds.add((Id) this.head.get().id());
                while(this.starts.hasNext()){
                    Traverser.Admin<Vertex> start = this.starts.next();
                    sourceIds.add((Id)start.get().id());
                }
                //使用sourceIds
                this.iterator = this.flatMap(this.head);
            }
        }
    }

    public static ConditionQuery stateEdgesQueryWithIDS(Set<Id> sourceIds,
                                                        Directions direction,
                                                        Id... edgeLabels) {
        ConditionQuery query = new ConditionQuery(HugeType.EDGE);

        // Edge source vertex
//        sourceIds.forEach(sourceVertex->{
//            query.eq(HugeKeys.OWNER_VERTEX, sourceVertex);
//        });

        query.query(Condition.in(HugeKeys.OWNER_VERTEX, Lists.newArrayList(sourceIds)));

        // Edge direction
        if (direction == Directions.BOTH) {
            query.query(Condition.or(
                    Condition.eq(HugeKeys.DIRECTION, Directions.OUT),
                    Condition.eq(HugeKeys.DIRECTION, Directions.IN)));
        } else {
            assert direction == Directions.OUT || direction == Directions.IN;
            query.eq(HugeKeys.DIRECTION, direction);
        }
        // Edge labels
        if (edgeLabels.length == 1) {
            query.eq(HugeKeys.LABEL, edgeLabels[0]);
        } else if (edgeLabels.length > 1) {
            query.query(Condition.in(HugeKeys.LABEL,
                    Arrays.asList(edgeLabels)));
        } else {
            assert edgeLabels.length == 0;
        }

        return query;
    }

ConditionQueryFlatten
 public static List<ConditionQuery> flatten(ConditionQuery query) {
        if (query.isFlattened() && !query.mayHasDupKeys(SPECIAL_KEYS)) {
            return Arrays.asList(query);
        }

        List<ConditionQuery> queries = new ArrayList<>();

        // Flatten IN/NOT_IN if needed
        Set<Condition> conditions = InsertionOrderUtil.newSet();
//        for (Condition condition : query.conditions()) {
//            Condition cond = flattenIn(condition);
//            if (cond == null) {
//                // Process 'XX in []'
//                return ImmutableList.of();
//            }
//            conditions.add(cond);
//        }
        for (Condition condition : query.conditions()) {
//            Condition cond = flattenIn(condition);
//            if (cond == null) {
//                // Process 'XX in []'
//                return ImmutableList.of();
//            }
            conditions.add(condition);
        }

GraphTransaction ->  optimizeQuery  
comment  verifyEdgesConditionQuery() method

CassandraSerializer: rewrite query 
    //查询edge 的时候,增加InList
    @Override
    protected Query writeQueryEdgeCondition(Query query) {
        ConditionQuery result = (ConditionQuery) query;
        for (Condition.Relation r : result.relations()) {
            Object value = r.value();
            if(value instanceof List){
                serializeListValue(r,value);
            }else{
                serializeSingleValue(r,value);
            }
        }
        return null;
    }

    private void serializeSingleValue(Condition.Relation r, Object value) {
        if (value instanceof Id) {
            if (r.key() == HugeKeys.OWNER_VERTEX ||
                    r.key() == HugeKeys.OTHER_VERTEX) {
                // Serialize vertex id
                r.serialValue(this.writeId((Id) value));
            }
            else {
                // Serialize label id
                r.serialValue(((Id) value).asObject());
            }
        }
        else if (value instanceof Directions) {
            r.serialValue(((Directions) value).type().code());
        }
    }

    private void serializeListValue(Condition.Relation r, Object value) {
        assert value instanceof List;
        List<Object> lv = (List)value;
        Object singleValue = lv.get(0);

        if(singleValue instanceof  Id){
            if (r.key() == HugeKeys.OWNER_VERTEX ||
                    r.key() == HugeKeys.OTHER_VERTEX) {
                List<Object> ids = Lists.newArrayList();
                for(Object val : lv){
                    ids.add(this.writeId((Id) val));
                }
                // Serialize vertex id
                r.serialValue(ids);
            }
            else {
                // Serialize label id
                List<Object> ids = Lists.newArrayList();
                for(Object val : lv){
                    ids.add(((Id) val).asObject());
                }
                r.serialValue(ids);
            }
        }else if (singleValue instanceof Directions) {
            r.serialValue(((Directions) singleValue).type().code());
        }
    }

cassandraTable: 

            case IN:
                if(value instanceof  List){
                    return QueryBuilder.in(key, (List)value);
                }else{
                    return QueryBuilder.in(key,value);
                }

思路现在有了,可以具体去实现一下,然后测试性能到底差多少。

存在的问题

最终的方案:

    private Traverser.Admin<Vertex> head = null;
    private Iterator<E> iterator = EmptyIterator.instance();
    private static ExecutorService executorService = Executors.newFixedThreadPool(8);
    private Map<Traverser.Admin<Vertex>, Iterator<E>> adj = new HashMap<>();
    //可以并发执行
    @Override
    protected Traverser.Admin<E> processNextStart() {
        while (true) {
            if(!adj.isEmpty()){
                Set<Traverser.Admin<Vertex>> toRemove = Sets.newHashSet();
                for(Traverser.Admin<Vertex> head: adj.keySet()){
                    Iterator<E> iterator = adj.get(head);
                    if(iterator.hasNext()){
                        return head.split(iterator.next(), this);
                    }else{
                        toRemove.add(head);
                    }
                }
                toRemove.forEach(k-> adj.remove(k));
            }
            else {

                closeIterator();
                // 这里可以并发执行,也可以用inList
                this.head = this.starts.next();
                this.iterator = this.flatMap(this.head);
                adj.put(head,this.iterator);
                List<Future<Pair<Traverser.Admin<Vertex>,Iterator<E>>>> futures = Lists.newArrayList();
                while(this.starts.hasNext()){
                    Traverser.Admin<Vertex> start = this.starts.next();
                    Future<Pair<Traverser.Admin<Vertex>,Iterator<E>>> its = executorService.submit(()->{
                        Iterator<E>  end = this.flatMap(start);
                        return new ImmutablePair<>(start, end);
                    });
                    futures.add(its);
                }

                try {
                    for(Future<Pair<Traverser.Admin<Vertex>,Iterator<E>>> its: futures) {
                        Pair<Traverser.Admin<Vertex>,Iterator<E>> pair = its.get();
                        this.adj.put(pair.getKey(), pair.getValue());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }

            }
        }

    }

上一篇下一篇

猜你喜欢

热点阅读