十九 Gremlin Step 模型

2021-11-06  本文已影响0人  NazgulSun

hugegraph Step 模型

Step 接口

几个观点的点:

public interface Step<S, E> extends Iterator<Traverser.Admin<E>>, Serializable, Cloneable {

    /**
     * Add a iterator of {@link Traverser.Admin} objects of type S to the step.
     *
     * @param starts The iterator of objects to add
     */
    public void addStarts(final Iterator<Traverser.Admin<S>> starts);

    /**
     * Add a single {@link Traverser.Admin} to the step.
     *
     * @param start The traverser to add
     */
    public void addStart(final Traverser.Admin<S> start);

    /**
     * Set the step that is previous to the current step.
     * Used for linking steps together to form a function chain.
     *
     * @param step the previous step of this step
     */
    public void setPreviousStep(final Step<?, S> step);

    /**
     * Get the step prior to the current step.
     *
     * @return The previous step
     */
    public Step<?, S> getPreviousStep();

AbstractStep Step的抽象实现

    protected ExpandableStepIterator<S> starts;
    protected Traverser.Admin<E> nextEnd = null;
    protected boolean traverserStepIdAndLabelsSetByChild = false;

    protected Step<?, S> previousStep = EmptyStep.instance();
    protected Step<E, ?> nextStep = EmptyStep.instance();

    public AbstractStep(final Traversal.Admin traversal) {
        this.traversal = traversal;
        this.starts = new ExpandableStepIterator<>(this);
    }
        @Override
    public void addStarts(final Iterator<Traverser.Admin<S>> starts) {
        this.starts.add(starts);
    }

    @Override
    public void addStart(final Traverser.Admin<S> start) {
        this.starts.add(start);
    }

最重要的是引入了starts的结构,为ExpandableStepIterator, 我们知道一个step 必须要初始化他的start 我们才能运行起来;
start 决定了step的开始节点结合; addStart就是把元素放入到starts结构里面;

接下来看 hasNext方法,就是这个step 是否已经执行完,没有执行完,就继续:

    @Override
    public boolean hasNext() {
        if (null != this.nextEnd)
            return true;
        else {
            try {
                while (true) {
                    if (Thread.interrupted()) throw new TraversalInterruptedException();
                    this.nextEnd = this.processNextStart();
                    if (null != this.nextEnd.get() && 0 != this.nextEnd.bulk())
                        return true;
                    else
                        this.nextEnd = null;
                }
            } catch (final NoSuchElementException e) {
                return false;
            }
        }
    }

this.nextEnd = this.processNextStart();
通过nextEnd来判断,没处理一个 start 节点,就会有一个 nextEnd结果,知道NoSuchElement为止;
对于ProcessNextStart,我们看到是由各个具体的step自己来实现的;

举个例子 g.V().outE()

系统首先把gremlin 编译为一个DefaultTraversal, Traversal 是一个iterator 迭代器, 有一个filler负责迭代收集结果:
Traveral 的step-chains
[HugeGraphStep(vertex,[]), HugeVertexStep(OUT,edge)]
@Override
public boolean hasNext() {
if (!this.locked) this.applyStrategies();
return this.lastTraverser.bulk() > 0L || this.finalEndStep.hasNext();
}
finalEndStep 就是 HugeVertexStep: outE()

第一步就是调用 hasNext获取 结果,然后调用this.nextEnd = this.processNextStart();

第一次的调用时候,这是一个空的step; 因为vertexStep 集成了flatMap,所以进入了FlatMap的方法

    @Override
    protected Traverser.Admin<E> processNextStart() {
        while (true) {
            if (this.iterator.hasNext()) {
                return this.head.split(this.iterator.next(), this);
            } else {
                closeIterator();
                this.head = this.starts.next();
                this.iterator = this.flatMap(this.head);
            }
        }
    }

this.head = this.starts.next();

我们之前说到 starts是一个extendedIterator,我们看看他的next方法:

    public Traverser.Admin<S> next() {
        if (!this.traverserSet.isEmpty())
            return this.traverserSet.remove();
        /////////////
        if (this.hostStep.getPreviousStep().hasNext())
            return this.hostStep.getPreviousStep().next();
        /////////////
        return this.traverserSet.remove();
    }

调用了是前一步的next方法,通过这里我们就感觉到了,一个step 是否和被递归调用,通过 尾部的step ,尝试去获得结果,没有就调用它的上一层step去获得结果;

对于这个case 就是会调用 graphStep:
graphStep的关键:

    protected Traverser.Admin<E> processNextStart() {
        while (true) {
            if (this.iterator.hasNext()) {
                return this.isStart ? this.getTraversal().getTraverserGenerator().generate(this.iterator.next(), (Step) this, 1l) : this.head.split(this.iterator.next(), this);
            } else {
                if (this.isStart) {
                    if (this.done)
                        throw FastNoSuchElementException.instance();
                    else {
                        this.done = true;
                        this.iterator = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
                    }
                } else {
                    this.head = this.starts.next();
                    this.iterator = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
                }
            }
        }
    }

this.iterator = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();

我们看到,最后是通过IteratorSupplier。get方法获取元素, 那么这个iteratorSupplier就是关键;

回到hugegraph,我们看到hugegraph重写了 HugeGraphStep,本质就是提供这个supplier,从自己的底层获取开始遍历的节点:

这个从低向上不断拉取得过程就是整个traversal 执行大致流程;

以HugeCountGraph 为例,解析如何定制化step

hugegraph 在0.11版本加入了 hugeCount step, 优化了g.V().count()这样的step,可以去后端直接查询统计数据,而不用遍历整个图;
我们看看是怎么做的:
首先需要一个一个strategy,把原油的globalCount换成HugeCount;

    public void apply(Traversal.Admin<?, ?> traversal) {
        TraversalUtil.convAllHasSteps(traversal);

        // Extract CountGlobalStep
        List<CountGlobalStep> steps = TraversalHelper.getStepsOfClass(
                                      CountGlobalStep.class, traversal);
        if (steps.isEmpty()) {
            return;
        }

        // Find HugeGraphStep before count()
        CountGlobalStep<?> originStep = steps.get(0);
        List<Step<?, ?>> originSteps = new ArrayList<>();
        HugeGraphStep<?, ? extends Element> graphStep = null;
        Step<?, ?> step = originStep;
        do {
            if (!(step instanceof CountGlobalStep ||
                  step instanceof GraphStep ||
                  step instanceof IdentityStep ||
                  step instanceof NoOpBarrierStep ||
                  step instanceof CollectingBarrierStep) ||
                 (step instanceof TraversalParent &&
                  TraversalHelper.anyStepRecursively(s -> {
                      return s instanceof SideEffectStep ||
                             s instanceof AggregateStep;
                  }, (TraversalParent) step))) {
                return;
            }
            originSteps.add(step);
            if (step instanceof HugeGraphStep) {
                graphStep = (HugeGraphStep<?, ? extends Element>) step;
                break;
            }
            step = step.getPreviousStep();
        } while (step != null);

        if (graphStep == null) {
            return;
        }

        // Replace with HugeCountStep
        graphStep.queryInfo().aggregate(AggregateFunc.COUNT, null);
        HugeCountStep<?> countStep = new HugeCountStep<>(traversal, graphStep);
        for (Step<?, ?> origin : originSteps) {
            traversal.removeStep(origin);
        }
        traversal.addStep(0, countStep);
    }

我们看到 目前只处理g.V().count()或者g.E().count()这种在 graphStep层面上的count,其他模式的count不支持;

接下里,我们看看countStep 是如何实现的。

@Override
protected Admin<Long> processNextStart() throws NoSuchElementException {
    if (this.done) {
        throw FastNoSuchElementException.instance();
    }
    this.done = true;
    @SuppressWarnings({ "unchecked", "rawtypes" })
    Step<Long, Long> step = (Step) this;
    return this.getTraversal().getTraverserGenerator()
               .generate(this.originGraphStep.count(), step, 1L);
}

我们看到 processNextStart的方法

从整个模型来看,是一个比较典型的从底向上的地鬼调用,我们可以修改 其中的step,做相应的优化。
比如 对于 out().out() 这种两调的查询,我们可以做一个 twohupStep,然后 batch的方式拿回数据,生成traverser。减少目前的onebyone的迭代消耗;

对于之前paypal在infoQ提到的多线程模型查询,目前看,不太清楚是在哪个层面,实在生成多个 traverser,然后并发去拉取? 还是在 step internal 去并发访问存储?
后续继续探索;

上一篇 下一篇

猜你喜欢

热点阅读