JanusGraph源码解析(一)事务

2019-07-21  本文已影响0人  ZackJiang

JanusGraph源码解析(一)事务

这篇文章是对于一次JanusGraph事务的逆向分析,主要走读加入一个节点后数据是经过何种处理存入到后端存储中的,主要涉及janusgraph-core和janus-graph-cql的代码走读,总结起来JanusGraph一次事务可以分为如下几层,在走读代码的时候可以记住当前是在哪一层处理的。

首先我已经将JanusGraph配置成使用cassandra作为后端存储,但是我想知道JanusGraph是如何将数据存储到cassandra中的,下面开始反向分析走读JanusGraph代码。

首先,使用gremlin事务在我的JanusGraph中增加一个Vertex,代码如下

GraphTraversalSource g = graph.traversal();
g.tx().open();
g.addV(GraphConstantUtil.VertexLabel.NEWS).next();
g.tx().commit();

先看下事务提交的时候发生了什么,事务提交的时候首先调用readWrite方法确保事务开启,然后提交,这里先说下比较简单的,JanusGraph中的gremlin traversal中直接调用tx()方法取到的事务对象的实现实际上是在类JanusGraphBlueprintsGraph.java中的内部类GraphTransantion,是JaunsGraphBlueprints中的属性tinkerpopContainer,此处还有一个ThreadLocal域里面存的JanusGraphBlueprintsTransaction,一会看看这个事务对象是干嘛的,既然放到ThreadLocal中,此处可以猜测这个对象可以保证线程的独立性,即每个线程只能有一个事务对象。

public abstract class JanusGraphBlueprintsGraph implements JanusGraph {

    private static final Logger log =
            LoggerFactory.getLogger(JanusGraphBlueprintsGraph.class);

    // ########## TRANSACTION HANDLING ###########################
        //gremlin拿到的事务对象
    final GraphTransaction tinkerpopTxContainer = new GraphTransaction();
  
    private ThreadLocal<JanusGraphBlueprintsTransaction> txs = ThreadLocal.withInitial(() -> null);

调用commit方法的使用,首先调用到了GraphTransaction中的doCommit方法,这个方法取到了ThreadLocal域中的事务对象并且调用了这个对象的commit()方法。

//这里是GraphTransaction中的方法,GraphTransaction是JanusGraphBlueprintsGraph的内部类,因此可以访问父类所有的属性和方法
        @Override
        public void doCommit() {
            getAutoStartTx().commit();
        }
//getAutoStartTx()的实现
        private JanusGraphBlueprintsTransaction getAutoStartTx() {
        if (txs == null) throw new IllegalStateException("Graph has been closed");
        tinkerpopTxContainer.readWrite();

        JanusGraphBlueprintsTransaction tx = txs.get();
        Preconditions.checkNotNull(tx,"Invalid read-write behavior configured: " +
                "Should either open transaction or throw exception.");
        return tx;
    }

看下JanusGraphBlueprintsGraphTransaction这个类的commit方法实现,这个引用的实例实际上是StandardJanusGraphTx,瞅一眼Uml图。

StandardJanusGraphTx.png

检查打开事务是否开启啥的就不说了,往重点里看,就是hasModifications()那里开始

 @Override
    public synchronized void commit() {
        Preconditions.checkArgument(isOpen(), "The transaction has already been closed");
        boolean success = false;
        if (null != config.getGroupName()) {
            MetricManager.INSTANCE.getCounter(config.getGroupName(), "tx", "commit").inc();
        }
        try {
            // 判断当前事务是否进行了修改
            if (hasModifications()) {
              // 调用StandardGraph的commit方法,并且传入增加和删除的关系InternalRelation列表,以及当前的StandardJanusGraphTx对象
                graph.commit(addedRelations.getAll(), deletedRelations.values(), this);
            } else {
                txHandle.commit();
            }
            success = true;
        } catch (Exception e) {
            try {
                txHandle.rollback();
            } catch (BackendException e1) {
                throw new JanusGraphException("Could not rollback after a failed commit", e);
            }
            throw new JanusGraphException("Could not commit transaction due to exception during persistence", e);
        } finally {
            releaseTransaction();
            if (null != config.getGroupName() && !success) {
                MetricManager.INSTANCE.getCounter(config.getGroupName(), "tx", "commit.exceptions").inc();
            }
        }
    }

hasModifications()方法实际判断的是StandardJanusGraphTx中的两个属性是否是空,看名字应该是是否对图做过修改,因为之前的没有关注,反正这里因为加了一个点所以应该有东西了,这里这些东西咋加进来的以后记得研究下,调试结果如下

    @Override
    public boolean hasModifications() {
        return !addedRelations.isEmpty() || !deletedRelations.isEmpty();
    }
addRelaitons.png

从这个addRelations中可以猜测啊,这里增加了两个关系,一个是StandardVertexPropery对象,增加了v(4272)和VertexExists的关系,值为true,代表这个节点存在,另一个是StandardEdge对象,从v(4272)到News,类型是~T$vertexLabel,这里应该就是我们这个点的Label了。

当前已经有修改了,然后就进入到StandardJanusGraph中的commit方法了,这个类也是JanusGraph的实例对象,下面的代码会很长,主要就在代码注释里写吧,代码逻辑中如果涉及其他模块的走读,会在注释中写明,并且把涉及模块的代码在下文贴出。


    public void commit(final Collection<InternalRelation> addedRelations,
                     final Collection<InternalRelation> deletedRelations, final StandardJanusGraphTx tx) {
        if (addedRelations.isEmpty() && deletedRelations.isEmpty()) return;
        //1. Finalize transaction
        log.debug("Saving transaction. Added {}, removed {}", addedRelations.size(), deletedRelations.size());
      //是否记录事务提交时间,没记录事务提交时间不管
        if (!tx.getConfiguration().hasCommitTime()) tx.getConfiguration().setCommitTime(times.getTime());
        final Instant txTimestamp = tx.getConfiguration().getCommitTime();
      //从AutomicLong txCounter中获取事务id
        final long transactionId = txCounter.incrementAndGet();

        //2. Assign JanusGraphVertex IDs
      // 我没有设置自动分配id,所以这个逻辑也不会走,但从addRelations中看到增加的节点已经有一个4232的id了,这里待研究一下
        if (!tx.getConfiguration().hasAssignIDsImmediately())
            idAssigner.assignIDs(addedRelations);

        //3. Commit
        BackendTransaction mutator = tx.getTxHandle();
      // acquireLocks false
        final boolean acquireLocks = tx.getConfiguration().hasAcquireLocks();
      // cassandra只保证最终一致性,hasTxIsolation false
        final boolean hasTxIsolation = backend.getStoreFeatures().hasTxIsolation();
      // logTransaction false 如果开启了batch loading一定是false
        final boolean logTransaction = config.hasLogTransactions() && !tx.getConfiguration().hasEnabledBatchLoading();
      // txLog null
        final KCVSLog txLog = logTransaction?backend.getSystemTxLog():null;
      // txLogHeader也是在开启logTransaction时才有用,不关注
        final TransactionLogHeader txLogHeader = new TransactionLogHeader(transactionId,txTimestamp, times);
        ModificationSummary commitSummary;

        try {
            //3.1 Log transaction (write-ahead log) if enabled
          // 不关注logTransaction, 这块代码不看,不是此次走读重点
            if (logTransaction) {
                //[FAILURE] Inability to log transaction fails the transaction by escalation since it's likely due to unavailability of primary
                //storage backend.
                Preconditions.checkNotNull(txLog, "Transaction log is null");
                txLog.add(txLogHeader.serializeModifications(serializer, LogTxStatus.PRECOMMIT, tx, addedRelations, deletedRelations),txLogHeader.getLogKey());
            }

            //3.2 Commit schema elements and their associated relations in a separate transaction if backend does not support
            //    transactional isolation
          // 此次事务提交是否包含schema元素,这里只是增加一个Label为News的点,不涉及Schema,SCHEMA_FILTER的代码写在下面,有助于理解JanusGraph数据结构
            boolean hasSchemaElements = !Iterables.isEmpty(Iterables.filter(deletedRelations,SCHEMA_FILTER))
                    || !Iterables.isEmpty(Iterables.filter(addedRelations,SCHEMA_FILTER));
          // 如果需要修改Schema并且开启了batch loading模式,且没有拿到锁,就抛异常,在batch loading 模式修改schema必须获得锁
            Preconditions.checkArgument(!hasSchemaElements || (!tx.getConfiguration().hasEnabledBatchLoading() && acquireLocks),
                    "Attempting to create schema elements in inconsistent state");
//没修改Schema 这块代码不看
            if (hasSchemaElements && !hasTxIsolation) {
                /*
                 * On storage without transactional isolation, create separate
                 * backend transaction for schema aspects to make sure that
                 * those are persisted prior to and independently of other
                 * mutations in the tx. If the storage supports transactional
                 * isolation, then don't create a separate tx.
                 */
                final BackendTransaction schemaMutator = openBackendTransaction(tx);

                try {
                    //[FAILURE] If the preparation throws an exception abort directly - nothing persisted since batch-loading cannot be enabled for schema elements
                    commitSummary = prepareCommit(addedRelations,deletedRelations, SCHEMA_FILTER, schemaMutator, tx, acquireLocks);
                    assert commitSummary.hasModifications && !commitSummary.has2iModifications;
                } catch (Throwable e) {
                    //Roll back schema tx and escalate exception
                    schemaMutator.rollback();
                    throw e;
                }

                try {
                    schemaMutator.commit();
                } catch (Throwable e) {
                    //[FAILURE] Primary persistence failed => abort and escalate exception, nothing should have been persisted
                    log.error("Could not commit transaction ["+transactionId+"] due to storage exception in system-commit",e);
                    throw e;
                }
            }

            //[FAILURE] Exceptions during preparation here cause the entire transaction to fail on transactional systems
            //or just the non-system part on others. Nothing has been persisted unless batch-loading
          // 这部分是重点了,下面写了prepareCommit的源码分析,重点看下这块
            commitSummary = prepareCommit(addedRelations,deletedRelations, hasTxIsolation? NO_FILTER : NO_SCHEMA_FILTER, mutator, tx, acquireLocks);
            if (commitSummary.hasModifications) {
              // 之前拿到的commitSummary已经确定这里有修改了,日志相关的先不看
                String logTxIdentifier = tx.getConfiguration().getLogIdentifier();
                boolean hasSecondaryPersistence = logTxIdentifier!=null || commitSummary.has2iModifications;

                //1. Commit storage - failures lead to immediate abort

                //1a. Add success message to tx log which will be committed atomically with all transactional changes so that we can recover secondary failures
                //    This should not throw an exception since the mutations are just cached. If it does, it will be escalated since its critical
                if (logTransaction) {
                    txLog.add(txLogHeader.serializePrimary(serializer,
                                        hasSecondaryPersistence?LogTxStatus.PRIMARY_SUCCESS:LogTxStatus.COMPLETE_SUCCESS),
                            txLogHeader.getLogKey(),mutator.getTxLogPersistor());
                }

                try {
                  // 这里应该就是BackendTransaction的提交事务了,这里要把修改提交给cassandra了,源码解析见下文
                    mutator.commitStorage();
                } catch (Throwable e) {
                    //[FAILURE] If primary storage persistence fails abort directly (only schema could have been persisted)
                    log.error("Could not commit transaction ["+transactionId+"] due to storage exception in commit",e);
                    throw e;
                }

                if (hasSecondaryPersistence) {
                    LogTxStatus status = LogTxStatus.SECONDARY_SUCCESS;
                    Map<String,Throwable> indexFailures = ImmutableMap.of();
                    boolean userlogSuccess = true;

                    try {
                        //2. Commit indexes - [FAILURE] all exceptions are collected and logged but nothing is aborted
                        indexFailures = mutator.commitIndexes();
                        if (!indexFailures.isEmpty()) {
                            status = LogTxStatus.SECONDARY_FAILURE;
                            for (Map.Entry<String,Throwable> entry : indexFailures.entrySet()) {
                                log.error("Error while committing index mutations for transaction ["+transactionId+"] on index: " +entry.getKey(),entry.getValue());
                            }
                        }
                        //3. Log transaction if configured - [FAILURE] is recorded but does not cause exception
                        if (logTxIdentifier!=null) {
                            try {
                                userlogSuccess = false;
                                final Log userLog = backend.getUserLog(logTxIdentifier);
                                Future<Message> env = userLog.add(txLogHeader.serializeModifications(serializer, LogTxStatus.USER_LOG, tx, addedRelations, deletedRelations));
                                if (env.isDone()) {
                                    try {
                                        env.get();
                                    } catch (ExecutionException ex) {
                                        throw ex.getCause();
                                    }
                                }
                                userlogSuccess=true;
                            } catch (Throwable e) {
                                status = LogTxStatus.SECONDARY_FAILURE;
                                log.error("Could not user-log committed transaction ["+transactionId+"] to " + logTxIdentifier, e);
                            }
                        }
                    } finally {
                        if (logTransaction) {
                            //[FAILURE] An exception here will be logged and not escalated; tx considered success and
                            // needs to be cleaned up later
                            try {
                                txLog.add(txLogHeader.serializeSecondary(serializer,status,indexFailures,userlogSuccess),txLogHeader.getLogKey());
                            } catch (Throwable e) {
                                log.error("Could not tx-log secondary persistence status on transaction ["+transactionId+"]",e);
                            }
                        }
                    }
                } else {
                    //This just closes the transaction since there are no modifications
                    mutator.commitIndexes();
                }
            } else { //Just commit everything at once
                //[FAILURE] This case only happens when there are no non-system mutations in which case all changes
                //are already flushed. Hence, an exception here is unlikely and should abort
                mutator.commit();
            }
        } catch (Throwable e) {
            log.error("Could not commit transaction ["+transactionId+"] due to exception",e);
            try {
                //Clean up any left-over transaction handles
                mutator.rollback();
            } catch (Throwable e2) {
                log.error("Could not roll-back transaction ["+transactionId+"] after failure due to exception",e2);
            }
            if (e instanceof RuntimeException) throw (RuntimeException)e;
            else throw new JanusGraphException("Unexpected exception",e);
        }
    }

SCHEMA_FILTER的lamda表达式,如何判断InternalRelation描述的是 type是BaseRelationType,这里咱addRelations中的两个InternalRelation都是,一个是BaseKey,一个是BaseLabel,这俩都是BaseRelationType的子类,第二个条件从internalRealation中获取vertex判断是否是JanusGraphSchemaVertex,这里都不是,咱这里Vertex是StandardVertex也就是咱存的图里面实实在在的一个点,而不是用于描述Schema的节点,所以这里结果是false。NO_SHEMA_FILTER的lamda表达式就是SCHEMA_FILTER的lamda表达式取反

private static final Predicate<InternalRelation> SCHEMA_FILTER =
        internalRelation -> internalRelation.getType() instanceof BaseRelationType && internalRelation.getVertex(0) instanceof JanusGraphSchemaVertex;

private static final Predicate<InternalRelation> NO_SCHEMA_FILTER = internalRelation -> !SCHEMA_FILTER.apply(internalRelation);

private static final Predicate<InternalRelation> NO_FILTER = Predicates.alwaysTrue();

StandardJanusGraph中的prepareCommit方法,又是一个很长的方法,方法参数传进去增加和删除的InternalRelation集合,由于hasTxIsolation是false,filter传入的是NO_SCHEMA_FILTER,我现在还没搞明白

    public ModificationSummary prepareCommit(final Collection<InternalRelation> addedRelations,
                                     final Collection<InternalRelation> deletedRelations,
                                     final Predicate<InternalRelation> filter,
                                     final BackendTransaction mutator, final StandardJanusGraphTx tx,
                                     final boolean acquireLocks) throws BackendException {

// 所有变化的InternalRelation存储,key为LongID
        ListMultimap<Long, InternalRelation> mutations = ArrayListMultimap.create();
      //变化的属性存储,key为InternalVertex
        ListMultimap<InternalVertex, InternalRelation> mutatedProperties = ArrayListMultimap.create();
      // 所有索引的更新
        List<IndexSerializer.IndexUpdate> indexUpdates = Lists.newArrayList();
        //1) Collect deleted edges and their index updates and acquire edge locks
      // 看到这里应该可以看出下面的代码应该是需要把对图更新的变化按类型不同放在不同的集合中,是否是对索引更新,是否是对属性更新,以及所有的更新
      // 我们此处没有做删除操作,都是增加和删除应该是类似的,这里不往里读了,直接看对addRelations的处理
        for (InternalRelation del : Iterables.filter(deletedRelations,filter)) {
            Preconditions.checkArgument(del.isRemoved());
            for (int pos = 0; pos < del.getLen(); pos++) {
                InternalVertex vertex = del.getVertex(pos);
                if (pos == 0 || !del.isLoop()) {
                    if (del.isProperty()) mutatedProperties.put(vertex,del);
                    mutations.put(vertex.longId(), del);
                }
                if (acquireLock(del,pos,acquireLocks)) {
                    Entry entry = edgeSerializer.writeRelation(del, pos, tx);
                    mutator.acquireEdgeLock(idManager.getKey(vertex.longId()), entry);
                }
            }
            indexUpdates.addAll(indexSerializer.getIndexUpdates(del));
        }

        //2) Collect added edges and their index updates and acquire edge locks
      // 此处对增加的关系做处理了,记住咱的addRelations还是上面那两个,看到这里想吐槽一下,增删的代码都一样吧。。。不能抽个方法吗
        for (InternalRelation add : Iterables.filter(addedRelations,filter)) {
            Preconditions.checkArgument(add.isNew());
//注释这里写一次循环吧,第二次类似,第一个add即是表示节点存在的的那个vp[~T$VertexExist->true],对象类型为StandardVertexProperty,表示这个节点的属性
            for (int pos = 0; pos < add.getLen(); pos++) {
              // vertex拿到的就是这个InternalRelation中涉及的节点v[4272]
                InternalVertex vertex = add.getVertex(pos);
              //如果pos==0 且没有循环,才将这个节点放到变化集合和属性变化集合中
                if (pos == 0 || !add.isLoop()) {
                  //isProperty为true,加入到属性集合中
                    if (add.isProperty()) mutatedProperties.put(vertex,add);
                    mutations.put(vertex.longId(), add);
                }
              // vertex.isNew==true 节点是新增的,这个逻辑也先不看
                if (!vertex.isNew() && acquireLock(add,pos,acquireLocks)) {
                    Entry entry = edgeSerializer.writeRelation(add, pos, tx);
                    mutator.acquireEdgeLock(idManager.getKey(vertex.longId()), entry.getColumn());
                }
            }
          // 没有涉及索引,这里也先不看,尤其这种Serializer又比较复杂
            indexUpdates.addAll(indexSerializer.getIndexUpdates(add));
        }

        //3) Collect all index update for vertices
      // 看完addRelations中的InternalRelation中的索引后再看下属性更新集合中的key也就是InternalVertex是否涉及索引,这里也没有
        for (InternalVertex v : mutatedProperties.keySet()) {
            indexUpdates.addAll(indexSerializer.getIndexUpdates(v,mutatedProperties.get(v)));
        }
        //4) Acquire index locks (deletions first)
      // 不涉及索引更新,这里先不看
        for (IndexSerializer.IndexUpdate update : indexUpdates) {
            if (!update.isCompositeIndex() || !update.isDeletion()) continue;
            CompositeIndexType iIndex = (CompositeIndexType) update.getIndex();
            if (acquireLock(iIndex,acquireLocks)) {
                mutator.acquireIndexLock((StaticBuffer)update.getKey(), (Entry)update.getEntry());
            }
        }
        for (IndexSerializer.IndexUpdate update : indexUpdates) {
            if (!update.isCompositeIndex() || !update.isAddition()) continue;
            CompositeIndexType iIndex = (CompositeIndexType) update.getIndex();
            if (acquireLock(iIndex,acquireLocks)) {
                mutator.acquireIndexLock((StaticBuffer)update.getKey(), ((Entry)update.getEntry()).getColumn());
            }
        }

        //5) Add relation mutations
      // 此次走读关注重点,在prepareCommit阶段,做了什么处理
        for (Long vertexId : mutations.keySet()) {
            Preconditions.checkArgument(vertexId > 0, "Vertex has no id: %s", vertexId);
          // 获取vertex对应的InternalRelation此处仍然是那两个
            final List<InternalRelation> edges = mutations.get(vertexId);
          // 增加的Entry列表和删除的Entry列表,这里Entry应为cassandra的存储模型,至于ArrayList的大小,应该是数学推导出来的吧,先往下看
            final List<Entry> additions = new ArrayList<>(edges.size());
            final List<Entry> deletions = new ArrayList<>(Math.max(10, edges.size() / 10));
            for (final InternalRelation edge : edges) {
              // 第一个InternalRelation edge为表示节点v[4192]存在的属性模型,实例为StandardVertexProperty,取出的的InternalRelationType baseType实现为BaseKey
                final InternalRelationType baseType = (InternalRelationType) edge.getType();
                assert baseType.getBaseType()==null;

                for (InternalRelationType type : baseType.getRelationIndexes()) {
                  //调用到了EmptyRelationType抽象类中实现的getRelationIndexex()方法,取到的仍然是BaseKey本身,getRelationIndexes具体获得的是什么待研究
                    if (type.getStatus()== SchemaStatus.DISABLED) continue;
                  // edge若继承自AbstractEdge arity=2 若继承自AbstractVertexPropery arity = 1, BaseKey arity = 1,此处arity表示的是这个InternalRelationType涉及几个节点,源码解析见下文
                    for (int pos = 0; pos < edge.getArity(); pos++) {
                      // type.isUnidirected在BaseKey中的实现见下文,即传入的方向如果是OUT即视为单向
                        if (!type.isUnidirected(Direction.BOTH) && !type.isUnidirected(EdgeDirection.fromPosition(pos)))
                            continue; //Directionality is not covered
                        if (edge.getVertex(pos).longId()==vertexId) {
                          // 终于走到序列化方法中了,序列化后应该就是存入cassandra中的数据,这个方法深入看下,源码解析见下文,传入的当前的InternalRelation和InternalRelationType以及对应节点的位置和StandardJanusGraphTx对象
                            StaticArrayEntry entry = edgeSerializer.writeRelation(edge, type, pos, tx);
                          // entry已经拿到了,buffer里面就是以上分析中写的一些序列化的数据
                            if (edge.isRemoved()) {
                                deletions.add(entry);
                            } else {
                                Preconditions.checkArgument(edge.isNew());
                              // 这里不涉及TTL先不看
                                int ttl = getTTL(edge);
                                if (ttl > 0) {
                                    entry.setMetaData(EntryMetaData.TTL, ttl);
                                }
                                additions.add(entry);
                            }
                        }
                    }
                }
            }
                        // 下文中分析下这个vertexKey拿到又是个啥
          // -88 0 0 0 0 0 0 -128
            StaticBuffer vertexKey = idManager.getKey(vertexId);
          //读到这咱序列化的活终于搞的差不多了,接下来又是比较关键的部分,源码解析见下文,终于要跟cassandra打交道了,实际上做的事就是把之前序列化的结果放到mutator中转换成cassandra的数据结构
            mutator.mutateEdges(vertexKey, additions, deletions);
        }

        //6) Add index updates
        boolean has2iMods = false;
      // 不涉及索引这里先不看
        for (IndexSerializer.IndexUpdate indexUpdate : indexUpdates) {
            assert indexUpdate.isAddition() || indexUpdate.isDeletion();
            if (indexUpdate.isCompositeIndex()) {
                final IndexSerializer.IndexUpdate<StaticBuffer,Entry> update = indexUpdate;
                if (update.isAddition())
                    mutator.mutateIndex(update.getKey(), Lists.newArrayList(update.getEntry()), KCVSCache.NO_DELETIONS);
                else
                    mutator.mutateIndex(update.getKey(), KeyColumnValueStore.NO_ADDITIONS, Lists.newArrayList(update.getEntry()));
            } else {
                final IndexSerializer.IndexUpdate<String,IndexEntry> update = indexUpdate;
                has2iMods = true;
                IndexTransaction itx = mutator.getIndexTransaction(update.getIndex().getBackingIndexName());
                String indexStore = ((MixedIndexType)update.getIndex()).getStoreName();
                if (update.isAddition())
                    itx.add(indexStore, update.getKey(), update.getEntry(), update.getElement().isNew());
                else
                    itx.delete(indexStore,update.getKey(),update.getEntry().field,update.getEntry().value,update.getElement().isRemoved());
            }
        }
      //ok,看到这涉及的代码已经看了一大堆了,继续事务里面跟,即StandardJanusGraph的commit方法
        return new ModificationSummary(!mutations.isEmpty(),has2iMods);
    }

getArity方法解析,这里取到的其实是当前的InternalRelation涉及的图中节点个数,有两个实现,分别在AbstractVertexProperty中和AbstractEdge中,我们可以结合这两个类实现的getVertex方法看下就一目了然了,逻辑上也比较好理解,如果一个InternalRelation表示的一个edge那么涉及两个节点,如果表示的是一个property,那么只涉及一个节点。

// AbstractEdge
    @Override
    public InternalVertex getVertex(int pos) {
        switch (pos) {
            case 0:
                return start;
            case 1:
                return end;
            default:
                throw new IllegalArgumentException("Invalid position: " + pos);
        }
    }

    @Override
    public int getArity() {
        return 2;
    }
// AbstractVertexProperty
    @Override
    public InternalVertex getVertex(int pos) {
        if (pos==0) return vertex;
        else throw new IllegalArgumentException("Invalid position: " + pos);
    }

    @Override
    public final int getArity() {
        return 1;
    }

isUnidirected方法,以我们当前的两个例子大概来理解下,当前我们的第一个InternalRelation表示的新增节点v[4192]存在的属性,获取其InternalRelationType的实例为BaseKey的一个引用,从图的角度抽象,这里可以理解为v[4192]节点有一个edge指向节点存在的属性,所以这里的方向是OUT,通过方向是否是OUT判断InternalRelationType的单向性;我们第二个InternalRelation是StandardEdge,即从新增节点v[4192]指向News的Label,获取到InternalRelationType的实现就是BaseLabel,同理,通过方向是否是OUT判断单向性。

        @Override
    public boolean isUnidirected(Direction dir) {
        return dir==Direction.OUT;
    }

EdgeSerialize writeRelation源码解析

此处走读的是表示节点存在的InternalRelationType,即StandardVertexProperty实例,其他类型逻辑类似

public StaticArrayEntry writeRelation(InternalRelation relation, InternalRelationType type, int position,
                                      TypeInspector tx) {
    assert type==relation.getType() || (type.getBaseType() != null
            && type.getBaseType().equals(relation.getType()));
  // dir是OUT
    Direction dir = EdgeDirection.fromPosition(position);
    Preconditions.checkArgument(type.isUnidirected(Direction.BOTH) || type.isUnidirected(dir));
  // 获取InternalRelationType的id 注意这里不是vertex的longId了,此处调试结果是101,应为JanusGraph的id生成器生成,后面的代码还会对这个id的比特位的意义进行解析
    long typeId = type.longId();
  // 代码解析放在下面,获得的DirectionId有三种,是属性方向还是edge out或是out in
    DirectionID dirID = getDirID(dir, relation.isProperty() ? RelationCategory.PROPERTY : RelationCategory.EDGE);
// serializer实例为StandardSerializer,DEFAULT_CAPACITY为128,这里返回的是一个StandardSerizer中的内部类StandardOutput的一个实例,用于存储序列化后的数据的
    DataOutput out = serializer.getDataOutput(DEFAULT_CAPACITY);
    int valuePosition;
  // 调用IDHandler中的静态方法writeRelationType传入out以及typeId(101) type.isInvisibleType()是true,BaseKey继承自BaseRelationType invisible都是true,这个方法涉及JanusGraph底层序列化,进去看下,源码解析贴在下文
 // 这个方法就是把InternalRelationType相关属性序列化的方法
    IDHandler.writeRelationType(out, typeId, dirID, type.isInvisibleType());
  // 回到这里,将typeId中的信息写进了buffer中,这里已经解析了好多底层代码了,继续往下看
  // 表示节点存在是StandardVertexProperty的Multiplicity MANY2ONE
    Multiplicity multiplicity = type.multiplicity();

  // 0 先不看SortKey啥含义
    long[] sortKey = type.getSortKey();
    assert !multiplicity.isConstrained() || sortKey.length==0: type.name();
    int keyStartPos = out.getPosition();
  // 只要Multiplicity不是MULTI那都是有限制的,是MULTI的时候才去序列化这个逻辑,这里暂时不看
    if (!multiplicity.isConstrained()) {
        writeInlineTypes(sortKey, relation, out, tx, InlineType.KEY);
    }
    int keyEndPos = out.getPosition();

    long relationId = relation.longId();

    //How multiplicity is handled for edges and properties is slightly different
  //StandardVertexProperty不是edge不走这个
    if (relation.isEdge()) {
        long otherVertexId = relation.getVertex((position + 1) % 2).longId();
        if (multiplicity.isConstrained()) {
            if (multiplicity.isUnique(dir)) {
                valuePosition = out.getPosition();
                VariableLong.writePositive(out, otherVertexId);
            } else {
                VariableLong.writePositiveBackward(out, otherVertexId);
                valuePosition = out.getPosition();
            }
            VariableLong.writePositive(out, relationId);
        } else {
            VariableLong.writePositiveBackward(out, otherVertexId);
            VariableLong.writePositiveBackward(out, relationId);
            valuePosition = out.getPosition();
        }
    } else {
      // 这块代码都是表示的是对Property关系的处理,走读代码分析的仍然是表示节点存在的StandardVertexProperty实例
        assert relation.isProperty();
        Preconditions.checkArgument(relation.isProperty());
      // valur = true
        Object value = ((JanusGraphVertexProperty) relation).value();
        Preconditions.checkNotNull(value);
      // key和type都是InternalRelationType实例,这里是BaseKey "~T$VertexExists"
        PropertyKey key = (PropertyKey) type;
        assert key.dataType().isInstance(value);

        if (multiplicity.isConstrained()) {
          // 判断单向性,此处dir是OUT且Multiplicity是MANY2ONE所以是单向的,将单向属性写进去
            if (multiplicity.isUnique(dir)) { //Cardinality=SINGLE
              // valuePositon是当前out写到哪个位置了
                valuePosition = out.getPosition();
              // 这个属性咋写的源码分析在下文
                writePropertyValue(out,key,value);
              // 这里把这个属性的值写入进去,目前buffer中前三个字节是201
            } else { //Cardinality=SET
                writePropertyValue(out,key,value);
                valuePosition = out.getPosition();
            }
          // 写关系id,这里指的是InternalRelation也就是实例StandardVertexProperty,这个id是539 1000011011b writePositive的源码分析见下文
            VariableLong.writePositive(out, relationId);
          // 这里吧relationId写完结果是 2 0 1 4 -101
        } else {
            assert multiplicity.getCardinality()== Cardinality.LIST;
            VariableLong.writePositiveBackward(out, relationId);
            valuePosition = out.getPosition();
            writePropertyValue(out,key,value);
        }
    }

    //Write signature
  // 写签名,这里没有
    long[] signature = type.getSignature();
    writeInlineTypes(signature, relation, out, tx, InlineType.SIGNATURE);

    //Write remaining properties
  // 还有啥剩下的属性没写,这里也没有
    LongSet writtenTypes = new LongHashSet(sortKey.length + signature.length);
    if (sortKey.length > 0 || signature.length > 0) {
        for (long id : sortKey) writtenTypes.add(id);
        for (long id : signature) writtenTypes.add(id);
    }
    LongArrayList remainingTypes = new LongArrayList(8);
    for (PropertyKey t : relation.getPropertyKeysDirect()) {
        if (!(t instanceof ImplicitKey) && !writtenTypes.contains(t.longId())) {
            remainingTypes.add(t.longId());
        }
    }
    //Sort types before writing to ensure that value is always written the same way
    long[] remaining = remainingTypes.toArray();
    Arrays.sort(remaining);
    for (long tid : remaining) {
        PropertyKey t = tx.getExistingPropertyKey(tid);
        writeInline(out, t, relation.getValueDirect(t), InlineType.NORMAL);
    }
    assert valuePosition>0;
  
// 终于看到这了可以返回Entry对象了
  // type.getSortOrder 继承在EmptyRelationType的都是升序 out.getStaticBuffer的代码解析见下文
    return new StaticArrayEntry(type.getSortOrder() == Order.DESC ?
                                out.getStaticBufferFlipBytes(keyStartPos, keyEndPos) :
                                out.getStaticBuffer(), valuePosition);
}

EdgeSerializer中的getDirID方法,很简单,不多解释,贴下代码

private static DirectionID getDirID(Direction dir, RelationCategory rt) {
    switch (rt) {
        case PROPERTY:
            assert dir == Direction.OUT;
            return DirectionID.PROPERTY_DIR;

        case EDGE:
            switch (dir) {
                case OUT:
                    return DirectionID.EDGE_OUT_DIR;

                case IN:
                    return DirectionID.EDGE_IN_DIR;

                default:
                    throw new IllegalArgumentException("Invalid direction: " + dir);
            }

        default:
            throw new IllegalArgumentException("Invalid relation type: " + rt);
    }
}

IDHandler中的writeRelationType方法,注释里面以及写明了序列化字节的存储方式,但是咱再深入看下代码。这块代码分析后,就是吧InternalRelaitonType序列化的方法

/**
 * The edge type is written as follows: [ Invisible &amp; System (2 bit) | Relation-Type-ID (1 bit) | Relation-Type-Count (variable) | Direction-ID (1 bit)]
 * Would only need 1 bit to store relation-type-id, but using two so we can upper bound.
 *
 *
 * @param out
 * @param relationTypeId
 * @param dirID
 */
public static void writeRelationType(WriteBuffer out, long relationTypeId, DirectionID dirID, boolean invisible) {
    assert relationTypeId > 0 && (relationTypeId << 1) > 0; //Check positive and no-overflow
/**
1. strippedID解析:
relationTypeId 为101 二进制表示为1100101  
对于InternalRelation为StandardVertexProperty,dirID为PROPERTY_DIR,调用getDirctionInt(),方法返回0,内部实现为id&1,id取低一位,PROPERTY_DIR id为0 
getDirectionInt方法就是返回edge方向 0表示OUT 1表示IN
下文咱再进入分析下IDManager.stripEntieRelationTypePadding方法
从下文对IDManager.stripEntieRelationTypePadding的解析可以看出这个方法返回值即为1,因此strippedId为2(二进制10b)
2. VariableLong.writePositiveWithPrefix解析:
先分析下IDManager.isSystemRelationID()方法,源码解析见下文,由于id101判断类型是SystemProperteyKey此处返回true,PREFIX_BIT_LES = 3
接下来还要看下dirID.getPrefix获取的前缀是啥,代码还是贴在下面
然后进入VariableLong.writePositiveWithPrefix方法中,源码解析见下文


**/
  // strippedId这里两位高位是relationTypeId的最高位,低位表示方向
    long strippedId = (IDManager.stripEntireRelationTypePadding(relationTypeId) << 1) + dirID.getDirectionInt();
    VariableLong.writePositiveWithPrefix(out, strippedId, dirID.getPrefix(invisible, IDManager.isSystemRelationTypeId(relationTypeId)), PREFIX_BIT_LEN);
}

IDManager.stripEntireRelationTypePadding方法

这里就开始越来越底层了,IDManager,里面有好多enum对象,我们这里的stripEntireRelationTypePadding用的是UserEdgeLabel,对应的应该都是序列化出的帧结构,这里吧JanusGraph涉及ID的帧结构注释代码也贴下,涉及哪一块可以对应查下这里的比特位代表的含义

  /**
     *bit mask- Description (+ indicates defined type, * indicates proper &amp; defined type)
     *
     *      0 - + User created Vertex
     *    000 -     * Normal vertices
     *    010 -     * Partitioned vertices
     *    100 -     * Unmodifiable (e.g. TTL'ed) vertices
     *    110 -     + Reserved for additional vertex type
     *      1 - + Invisible
     *     11 -     * Invisible (user created/triggered) Vertex [for later]
     *     01 -     + Schema related vertices
     *    101 -         + Schema Type vertices
     *   0101 -             + Relation Type vertices
     *  00101 -                 + Property Key
     * 000101 -                     * User Property Key
     * 100101 -                     * System Property Key
     *  10101 -                 + Edge Label
     * 010101 -                     * User Edge Label
     * 110101 -                     * System Edge Label
     *   1101 -             Other Type vertices
     *  01101 -                 * Vertex Label
     *    001 -         Non-Type vertices
     *   1001 -             * Generic Schema Vertex
     *   0001 -             Reserved for future
     *
     *
     */

public static long stripEntireRelationTypePadding(long id) {
    Preconditions.checkArgument(isProperRelationType(id));
    return VertexIDType.UserEdgeLabel.removePadding(id);
}

// UserEdgeLabel offset=6 
        public final long removePadding(long id) {
          // id=101 二进制表示1100101,右移6位即只取第一位数,即1,看到这里继续回到IDHandler中的writeRelationType方法中
            return id >>> offset();
        }

IDManager.isSystemRelationTypeID()方法

就是判断这个传入的id是否是SystemEdgeLabel或者SystemPropertyKey,这里直接看下is方法实现

通过调试,对于表示节点存在的StandardVertexProperty实例,id101的id类型是SystemPropertyKey,此处返回true

public static boolean isSystemRelationTypeId(long id) {
    return VertexIDType.SystemEdgeLabel.is(id) || VertexIDType.SystemPropertyKey.is(id);
}

        public final boolean is(long id) {
          //id此处还是101 二进制表示为1100101
          // 1左移6位减1 二进制111111 即取id后6位如果,判断是否是suffix, 此处结果是100101b
          // SystemEdgeLabel的suffix是53 110101b
          // SystemPropertyKey的suffix是37 100101b
            return (id & ((1L << offset()) - 1)) == suffix();
        }

VariableLong.writePositiveWithPrefix()方法

这里就是在EdgeSerializer中调用IDHandler的writeRelationType方法最后走到的想WriteBuffer写数据的地方了,还是跟上文一样,这次走读就先拿表示节点存在的StandardVertexProperty看到底了,value为2 prefix这里是0,prefixBitLen = 3

这里将stippedId里面的信息写进去了,writeRelationType这里就算完事了,看到这里可以回到EdgeSerializer中看看接下来又序列化了个啥。

public static void writePositiveWithPrefix(final WriteBuffer out, long value, long prefix, final int prefixBitLen) {
    assert value >= 0;
    assert prefixBitLen > 0 && prefixBitLen < 6 && (prefix < (1L << prefixBitLen));
    //Write first byte
  // 一个字节prefix占完了还剩多少,这里是5
    int deltaLen = 8 - prefixBitLen;
  // prefix的数值,systemType都是0 咋移这一位都是0现在
    byte first = (byte)(prefix << deltaLen);
  // value第一个不为0的位开始要占用几个bit,这里value是10b,就占2bit
    int valueLen = unsignedBitLength(value);
  // 对7取余 此处还是2
    int mod = valueLen % 7;
    if (mod <= (deltaLen - 1)) {
        int offset = (valueLen - mod);
        first = (byte)(first | (value >>> offset));
        value = value & ((1L << offset) - 1);
        valueLen -= mod;
    } else {
        valueLen += (7 - mod);
    }
    assert valueLen >= 0;
    if (valueLen > 0) {
        //Add continue mask to indicate reading further
        first = (byte) ( first | (1 << (deltaLen - 1)));
    }
  // 将10b写进WriteByteBuffer中第一个byte
    out.putByte(first);
    if (valueLen > 0) {
        //Keep writing
        writeUnsigned(out, valueLen, value);
    }
}

IDHandler中的内部类DirID.getPrefix()方法

getRelationType为DirectionID的id >>> 1,DirectionID的id是啥也在这块代码里贴出,可以看出就是id取最高位,0表示属性方向,1就是EDGE方向,我们这里走读的表示节点存在的StandardVertexRelation,这个InternalRelation的DirectionID是0对应enum类型为PROPERTY_DIR,在这种情况下:

systemType= true则 prefix = 0

systemType=false invisible=false则prefix=2

注意,其实我读到这里这个DirectionID的两bit表示已经很明确了,高位表示类型,低位表示方向

private int getPrefix(boolean invisible, boolean systemType) {
    assert !systemType || invisible; // systemType implies invisible
    return ((systemType?0:invisible?2:1)<<1) + getRelationType();
}
// DirectionID的值的定义
        PROPERTY_DIR(0),  //00b
        EDGE_OUT_DIR(2),  //10b
        EDGE_IN_DIR(3);   //11b

EdgeSerializer.writePropertyValue()

函数名看着应该是序列化属性值的方法,此处分析的仍然是StandardVertexProperty,PropertyKey是BaseKey

    private enum InlineType {

        KEY, SIGNATURE, NORMAL;

        public boolean writeInlineKey() {
            return this==NORMAL;
        }

        public boolean writeByteOrdered() {
            return this==KEY;
        }

    }

private void writePropertyValue(DataOutput out, PropertyKey key, Object value) {
    writePropertyValue(out,key,value,InlineType.NORMAL);
}

    private void writePropertyValue(DataOutput out, PropertyKey key, Object value, InlineType inlineType) {
      // 判断key的dataType是不是Object类型,BaseKey不是,人家是Boolean
        if (AttributeUtil.hasGenericDataType(key)) {
            assert !inlineType.writeByteOrdered();
            out.writeClassAndObject(value);
        } else {
            assert value==null || value.getClass().equals(key.dataType());
          // 判断inlineType是不是KEY,这里传入的是NORMAL直接走else逻辑
            if (inlineType.writeByteOrdered()) out.writeObjectByteOrder(value, key.dataType());
          // 下文分析下else逻辑是咋给out写入的
            else out.writeObject(value, key.dataType());
        }
    }

StandSerializer.writeObject()方法

这里传入的object是true,type为Boolean.class

OK,把这个属性也序列化以后现在序列化前三字节已经有了,结果目前是 201

@Override
public DataOutput writeObject(Object object, Class type) {
    return writeObjectInternal(object,type,false);
}

        private DataOutput writeObjectInternal(Object object, Class type, boolean byteOrder) {
            if (supportsNullSerialization(type)) {
                AttributeSerializer s = getSerializer(type);
                if (byteOrder) ensureOrderPreserving(s,type).writeByteOrder(this,object);
                else s.write(this, object);
            } else {
                //write flag for null or not
              // Boolean没有支持Null的序列化器
                if (object==null) {
                    putByte((byte)-1);
                } else {
                  // 不是null先写个0进去
                    putByte((byte)0);
                    writeObjectNotNullInternal(object,byteOrder);
                }
            }
            return this;
        }

        private DataOutput writeObjectNotNullInternal(Object object, boolean byteOrder) {
            Preconditions.checkNotNull(object);
            Class type = object.getClass();
          // 取出Boolean.class对应的属性序列化器
            AttributeSerializer s = getSerializer(type);
            if (byteOrder) {
                ensureOrderPreserving(s,type).writeByteOrder(this,object);
            } else {
              // BooleanSerializer比较简单,就是根据true false写 1 或者 0进去
                s.write(this, object);
            }
            return this;
        }

VariableLong.writePositive()方法

    public static void writePositive(WriteBuffer out, final long value) {
        assert value >= 0;
        writeUnsigned(out, value);
    }

    private static void writeUnsigned(WriteBuffer out, final long value) {
        writeUnsigned(out, unsignedBlockBitLength(value), value);
    }
// 需要多少位来序列化,一个block7位,两个14位
    private static int unsignedBlockBitLength(final long value) {
        return unsignedNumBlocks(value)*7;
    }

    private static int unsignedNumBlocks(final long value) {
        return numVariableBlocks(unsignedBitLength(value));
    }
    // 需要多少个VariableBlocks VariableBlock长度是1,这里需要两个
    private static int numVariableBlocks(final int numBits) {
        assert numBits > 0;
        return (numBits - 1) / 7 + 1;
    }
    //第二次贴这个方法了,不多说,需要占据多少位 539 -> 10
    public static int unsignedBitLength(final long value) {
        return (value == 0) ? 1 : Long.SIZE - Long.numberOfLeadingZeros(value);
    }

    private static void writeUnsigned(WriteBuffer out, int offset, final long value) {
        assert offset % 7 == 0;
        while (offset > 0) {
          // 7位7位写数据
            offset -= 7;
          // BIT_MASK是1111111b
            byte b = (byte) ((value >>> offset) & BIT_MASK);
            if (offset == 0) {
              // STOPMASK是-128 补码低7位都是0,目前看来这个STOP_MASK没用
                b = (byte) (b | STOP_MASK);
            }
            out.putByte(b);
        }
    }

WriteByteBuffer.getStaticBuffer()方法

这里就是缓存的nio处理,返回一个StaticArrayBuffer,Buffer里的内容不变

@Override
public StaticBuffer getStaticBuffer() {
    return getStaticBufferFlipBytes(0,0);
}

@Override
public StaticBuffer getStaticBufferFlipBytes(int from, int to) {
    ByteBuffer b = buffer.duplicate();
    b.flip();
    Preconditions.checkArgument(from>=0 && from<=to);
    Preconditions.checkArgument(to<=b.limit());
    for (int i=from;i<to;i++) b.put(i,(byte)~b.get(i));
    return StaticArrayBuffer.of(b);
}

public static StaticArrayBuffer of(ByteBuffer b) {
  // 返回一个StaticArrayBuffer
  if (b.hasArray()) {
    return new StaticArrayBuffer(b.array(),b.arrayOffset()+b.position(),b.arrayOffset()+b.limit());
  } else {
    byte[] array = new byte[b.remaining()];
    b.mark();
    b.get(array);
    b.reset();
    return StaticArrayBuffer.of(array);
  }
}

IDManager.getKey(long vertexId)

根据vertexId获取传进BackendTransaction中的keyId

    public StaticBuffer getKey(long vertexId) {
        if (VertexIDType.Schema.is(vertexId)) {
            //No partition for schema vertices
            return BufferUtil.getLongBuffer(vertexId);
        } else {
            assert isUserVertexId(vertexId);
            VertexIDType type = getUserVertexIDType(vertexId);
            assert type.offset()==USERVERTEX_PADDING_BITWIDTH;
         // 10101
            long partition = getPartitionId(vertexId);
          //10000 vertexId最前面这几位就是count
            long count = vertexId>>>(partitionBits+USERVERTEX_PADDING_BITWIDTH);
            assert count>0;
          // keyId 生产 patition 后面补59个0
            long keyId = (partition<<partitionOffset) | type.addPadding(count);
            return BufferUtil.getLongBuffer(keyId);
        }
    }

// UserVertex就是以下这三种类型,NormalVertex PratitionedVertex UnmodifiableVertex
    private static VertexIDType getUserVertexIDType(long vertexId) {
        VertexIDType type=null;
        if (VertexIDType.NormalVertex.is(vertexId)) type=VertexIDType.NormalVertex;
        else if (VertexIDType.PartitionedVertex.is(vertexId)) type=VertexIDType.PartitionedVertex;
        else if (VertexIDType.UnmodifiableVertex.is(vertexId)) type=VertexIDType.UnmodifiableVertex;
        if (null == type) {
            throw new InvalidIDException("Vertex ID " + vertexId + " has unrecognized type");
        }
        return type;
    }

    public long getPartitionId(long vertexId) {
        if (VertexIDType.Schema.is(vertexId)) return SCHEMA_PARTITION;
        assert isUserVertexId(vertexId) && getUserVertexIDType(vertexId)!=null;
      // vertexId 1000010101000b USERVERTEX_PADDING_BITWIDTH = 3   (partitionIDBound-1)  11111    10101  vertexId中间这五位就是patition
        long partition = (vertexId>>>USERVERTEX_PADDING_BITWIDTH) & (partitionIDBound-1);
        assert partition>=0;
        return partition;
    }

        public final long addPadding(long count) {
            assert offset()>0;
            Preconditions.checkArgument(count>0 && count<(1L <<(TOTAL_BITS-offset())),"Count out of range for type [%s]: %s",this,count);
            return (count << offset()) | suffix();
        }

BackendTransaction.mutateEntries

这里开始涉及到JanusGraph和cassandra的交互了,其中edgeStore是KCVCache的引用,storeTx是CacheTransaction的引用,都是BackendTransaction中的属性

/**
 * Applies the specified insertion and deletion mutations on the edge store to the provided key.
 * Both, the list of additions or deletions, may be empty or NULL if there is nothing to be added and/or deleted.
 *
 * @param key       Key
 * @param additions List of entries (column + value) to be added
 * @param deletions List of columns to be removed
 */
public void mutateEdges(StaticBuffer key, List<Entry> additions, List<Entry> deletions) throws BackendException {
    edgeStore.mutateEntries(key, additions, deletions, storeTx);
}

KCVSCache.mutateEntries

这里指教调用了StoreTransaction的mutate方法

    public void mutateEntries(StaticBuffer key, List<Entry> additions, List<Entry> deletions, StoreTransaction txh) throws BackendException {
        assert txh instanceof CacheTransaction;
        ((CacheTransaction) txh).mutate(this, key, additions, deletions);
    }

CacheTransaction.mutate

    void mutate(KCVSCache store, StaticBuffer key, List<Entry> additions, List<Entry> deletions) throws BackendException {
        Preconditions.checkNotNull(store);
        if (additions.isEmpty() && deletions.isEmpty()) return;

        KCVEntryMutation m = new KCVEntryMutation(additions, deletions);
      // 代码不难理解,初始化这个storeMutation这个Map,key就是之前根据vertexId计算出的keyId,value就是additions和deletions的包装类KCVEntryMutation m
        final Map<StaticBuffer, KCVEntryMutation> storeMutation = mutations.computeIfAbsent(store, k -> new HashMap<>());
        KCVEntryMutation existingM = storeMutation.get(key);
        if (existingM != null) {
            existingM.merge(m);
        } else {
            storeMutation.put(key, m);
        }

        numMutations += m.getTotalMutations();
                // persistChunkSize为1024这里相当于做了个buffer,batchLoading模式下批量处理变化
        if (batchLoading && numMutations >= persistChunkSize) {
            flushInternal();
        }
    }

BackendTransaction.commitStorage()

直接调用了CacheTransaction的commit方法

    public void commitStorage() throws BackendException {
        storeTx.commit();
    }

CacheTransaction.commit()

还是要看下flushInternal里面干了啥,代码贴在下面,其中tx是StoreTransaction的引用,其实现是ExpectedValueCheckingTransaction

@Override
public void commit() throws BackendException {
    flushInternal();
    tx.commit();
}

CacheTransaction.flushInternal()

    private void flushInternal() throws BackendException {
        if (numMutations > 0) {
            //Consolidate all mutations prior to persistence to ensure that no addition accidentally gets swallowed by a delete
          // consolidate操作,防止重复删除
            for (Map<StaticBuffer, KCVEntryMutation> store : mutations.values()) {
                for (KCVEntryMutation mut : store.values()) mut.consolidate();
            }

            //Chunk up mutations
          //这里对数据处理为subMuatations key是表名,value是HashMap key是表示keyId的StaticArrayBuffer,
            final Map<String, Map<StaticBuffer, KCVMutation>> subMutations = new HashMap<>(mutations.size());
            int numSubMutations = 0;
            for (Map.Entry<KCVSCache,Map<StaticBuffer, KCVEntryMutation>> storeMutations : mutations.entrySet()) {
                final Map<StaticBuffer, KCVMutation> sub = new HashMap<>();
              // storeMutation中Key中存储的即是KCVSCache,其中的getName就是存储在cassandra的表名,这里是edge,value是additions和deletion的包装类KCVMutation,其中还有批处理
                subMutations.put(storeMutations.getKey().getName(),sub);
                for (Map.Entry<StaticBuffer,KCVEntryMutation> mutationsForKey : storeMutations.getValue().entrySet()) {
                    if (mutationsForKey.getValue().isEmpty()) continue;
                    sub.put(mutationsForKey.getKey(), convert(mutationsForKey.getValue()));
                    numSubMutations+=mutationsForKey.getValue().getTotalMutations();
                    if (numSubMutations>= persistChunkSize) {
                        numSubMutations = persist(subMutations);
                        sub.clear();
                        subMutations.put(storeMutations.getKey().getName(),sub);
                    }
                }
            }
          // 批处理很简单就不说了,这里直接看持久化的过程 persist的源码解析见下文
            if (numSubMutations>0) persist(subMutations);


            for (Map.Entry<KCVSCache,Map<StaticBuffer, KCVEntryMutation>> storeMutations : mutations.entrySet()) {
                final KCVSCache cache = storeMutations.getKey();
                for (Map.Entry<StaticBuffer,KCVEntryMutation> mutationsForKey : storeMutations.getValue().entrySet()) {
                    if (cache.hasValidateKeysOnly()) {
                        cache.invalidate(mutationsForKey.getKey(), Collections.EMPTY_LIST);
                    } else {
                        final KCVEntryMutation m = mutationsForKey.getValue();
                        final List<CachableStaticBuffer> entries = new ArrayList<>(m.getTotalMutations());
                        for (final Entry e : m.getAdditions()) {
                            assert e instanceof CachableStaticBuffer;
                            entries.add((CachableStaticBuffer)e);
                        }
                        for (final StaticBuffer e : m.getDeletions()) {
                            assert e instanceof CachableStaticBuffer;
                            entries.add((CachableStaticBuffer)e);
                        }
                        cache.invalidate(mutationsForKey.getKey(),entries);
                    }
                }
            }
            clear();
        }
    }

CacheTransaction.persist()

这里是存储后端持久化,终于要把我们增加的节点存到cassandra里了,这里tx还是StoreTransaction的引用,CacheTransaction的那个属性,manager是KeyColumnVlueStoreManager的引用,实现为ExpectedValueCheckingStoreManager

    private int persist(final Map<String, Map<StaticBuffer, KCVMutation>> subMutations) {
        BackendOperation.execute(new Callable<Boolean>() {
            @Override
            public Boolean call() throws Exception {
              // 主要进行操作的就是这个方法,我们下文进去分析一下
                manager.mutateMany(subMutations, tx);
                return true;
            }

            @Override
            public String toString() {
                return "CacheMutation";
            }
        }, maxWriteTime);
        subMutations.clear();
        return 0;
    }

ExpectedValueCheckingStoreManager.mutateMany()

manager是ExpectedValueCheckingStoreManager的顺序,实现是CQLStoreManager

    @Override
    public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
        ExpectedValueCheckingTransaction etx = (ExpectedValueCheckingTransaction)txh;
      // 看名字应该是判断同步是否有锁的,下文分析下prepareForMutations()
        boolean hasAtLeastOneLock = etx.prepareForMutations();
        if (hasAtLeastOneLock) {
            // Force all mutations on this transaction to use strong consistency
            log.debug("Transaction {} holds one or more locks: writing using consistent transaction {} due to held locks", etx, etx.getConsistentTx());
            manager.mutateMany(mutations, etx.getConsistentTx());
        } else {
          // 如果你是批量导入数据的话会看一直打这条日志,这就是批处理的过程
            log.debug("Transaction {} holds no locks: writing mutations using store transaction {}", etx, etx.getInconsistentTx());
          // 这里要调用到CQLStoreManager的mutateMany方法了,源码解析在下面
            manager.mutateMany(mutations, etx.getInconsistentTx());
        }
    }

ExpectedValueCheckingTransaction.prepareForMutations()

 /**
     * If {@code !}{@link #isMutationStarted()}, check all locks and expected
     * values, then mark the transaction as started.
     * <p>
     * If {@link #isMutationStarted()}, this does nothing.
     *
     * @throws org.janusgraph.diskstorage.BackendException
     *
     * @return true if this transaction holds at least one lock, false if the
     *         transaction holds no locks
     */
    boolean prepareForMutations() throws BackendException {
        if (!isMutationStarted()) {
            checkAllLocks();
            checkAllExpectedValues();
            mutationStarted();
        }
      //expectedValuesByStore是空表示没有锁
        return !expectedValuesByStore.isEmpty();
    }

   /**
     * Check all locks attempted by earlier
     * {@link KeyColumnValueStore#acquireLock(StaticBuffer, StaticBuffer, StaticBuffer, StoreTransaction)}
     * calls using this transaction.
     *
     * @throws org.janusgraph.diskstorage.BackendException
     */
    void checkAllLocks() throws BackendException {
      // 这里getConsistentTx()返回的是ExpectedValueCheckingTransaction中的strongConsistentTx属性,实现为CQLTransanction
        StoreTransaction lt = getConsistentTx();
      // expectedValuesByStore这里现在还没东西
        for (ExpectedValueCheckingStore store : expectedValuesByStore.keySet()) {
            Locker locker = store.getLocker();
            // Ignore locks on stores without a locker
            if (null == locker)
                continue;
            locker.checkLocks(lt);
        }
    }

CQLStoreManager.mutateMany()

这里终于看到CQL了,txh传入的是CQLTransaction,这里全部是函数式编程

到这里就已经把数据插入到cassandra中了

    public void mutateMany(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
        if (this.atomicBatch) {
            this.mutateManyLogged(mutations, txh);
        } else {
            this.mutateManyUnlogged(mutations, txh);
        }

    }
//直接看这个不记日志的方法,一眼看去好多Future,好多异步处理
  private void mutateManyUnlogged(Map<String, Map<StaticBuffer, KCVMutation>> mutations, StoreTransaction txh) throws BackendException {
    //事务提交时间
        MaskedTimestamp commitTime = new MaskedTimestamp(txh);
    // 使用BatchStatement异步加入关系CQLKeyColumnValueStore来插入节点关系
        Future<Seq<ResultSet>> result = Future.sequence(this.executorService, Iterator.ofAll(mutations.entrySet()).flatMap((tableNameAndMutations) -> {
            String tableName = (String)tableNameAndMutations.getKey();
            Map<StaticBuffer, KCVMutation> tableMutations = (Map)tableNameAndMutations.getValue();
            CQLKeyColumnValueStore columnValueStore = (CQLKeyColumnValueStore)Option.of(this.openStores.get(tableName)).getOrElseThrow(() -> {
                return new IllegalStateException("Store cannot be found: " + tableName);
            });
            return Iterator.ofAll(tableMutations.entrySet()).flatMap((keyAndMutations) -> {
                StaticBuffer key = (StaticBuffer)keyAndMutations.getKey();
                KCVMutation keyMutations = (KCVMutation)keyAndMutations.getValue();
                Iterator<Statement> deletions = Iterator.of(commitTime.getDeletionTime(this.times)).flatMap((deleteTime) -> {
                    return Iterator.ofAll(keyMutations.getDeletions()).map((deletion) -> {
                        return columnValueStore.deleteColumn(key, deletion, deleteTime);
                    });
                });
                Iterator<Statement> additions = Iterator.of(commitTime.getAdditionTime(this.times)).flatMap((addTime) -> {
                    return Iterator.ofAll(keyMutations.getAdditions()).map((addition) -> {
                        return columnValueStore.insertColumn(key, addition, addTime);
                    });
                });
                return Iterator.concat(new Iterable[]{deletions, additions}).grouped(this.batchSize).map((group) -> {
                    return Future.fromJavaFuture(this.executorService, this.session.executeAsync((new BatchStatement(Type.UNLOGGED)).addAll(group).setConsistencyLevel(CQLTransaction.getTransaction(txh).getWriteConsistencyLevel())));
                });
            });
        }));
        result.await();
        if (result.isFailure()) {
            throw (BackendException)CQLKeyColumnValueStore.EXCEPTION_MAPPER.apply(result.getCause().get());
        } else {
            this.sleepAfterWrite(txh, commitTime);
        }
    }
上一篇下一篇

猜你喜欢

热点阅读