Lucene Term Query删除流程源码剖析-上

2018-12-22  本文已影响0人  persisting_

(超出文章最大长度,上只有1-4节内容,属于删除前的准备,准备之后的处理,即5-6节的内容放在Lucene Term Query删除流程源码剖析-下介绍

1 概述

首先本文内容基于Lucene 7.5源码以及对文章Lucene解析 - IndexWriter的理解。

我们首先可以把Lucene中的删除操作进行分类,下面会对这里的分类进行详细的介绍,通过阅读源码可以知道,Lucene中从方式上来说删除可分为Term删除和ByQuery删除,其实Term删除和Query删除也可以认为是删除具体动作发生时机的分类结果。从删除发生“地点”来说由可分为全局删除和局部删除。

下面我们介绍下分类的依据:

首先我们可以看下IndexWriter中的两个deleteDocuments函数的声明,分别为public long deleteDocuments(Query... queries)public long deleteDocuments(Term... terms),从这两个重载的deleteDocuments可以知道Term删除和Query删除的划分依据。

除了deleteDocuments(Term... terms)可以实现ByTerm删除,熟悉Lucene的同学应该知道,Lucene中的Update也是通过先删除后新增实现的,这里对原文档的删除也是使用Term的方式实现的。了解Lucene中“线程池”以及DocumentsWriterPerThread(后文使用DWPT代替)原理的可以知道deleteDocuments属于全局删除,而IndexWriter.updateDocument系列重载函数因为需要分配DWPT属于局部删除。

看Lucene源码也可以知道,addDocument也是通过updateDocument实现的。

在上文也提到了也可以通过区分删除动作具体发生时机将删除分为Term和Query,这里首先你得了解Lucene中“线程池”以及DWPT等的原理(这里后续有机会会有文章进行介绍),在flush或commt(因为comit也会触发flush,所以下文只会提到flush)时,IndexWriter会调用DocumentsWriter.flushAllThreads(),触发每个DWPT的刷新动作,每个DWPT会单独刷新成自己的segment,即会将自己维护的内存中数据刷新到自己的si、.cfs、.cfe(假设UseCompoundFile = true)文件中,Term删除需要删除Term查询得到的所有文档ID删除,所以在每个DWPT flush时即可遍历所有Term删除指定的文档,即该动作发生在触发全局flush时每个DWPT的flush中。

但是Query删除却需要等待所有的文档刷新之后,即所有DWPT刷新完成之后才能进行查询,然后才能进行Query删除动作,所以Query删除的发生时机发生在所有DWPT刷新之后。

上面还说到删除从删除发生“地点”来说由可分为全局删除和局部删除,这里涉及到对DocumentsWriterDeleteQueue的理解,DocumentsWriter自己会维护一个全局的DocumentsWriterDeleteQueue(下文使用globalQueue代替)以及一个DeleteSlice全局变量globalSlice,每个DWPT自己也会使用DeleteSlice(下文使用deleteSlice指代DWPT维护的DeleteSlice)维护globalQueue上的一个分片,IndexWriter.deleteDocuments就直接作将删除操作放到globalQueue中,放入之后会更新globalSlice,而IndexWriter.updateDocument因为发生在DWPT中,其删除操作在放入globalQueue后更新自己的deleteSlice。

这里虽然区分了globalQueue(或者globalSlice)和deleteSlice,但是Lucene在DocumentsWriter只会维护唯一的一个globalQueue,DocumentsWriterDeleteQueue采用单链表实现,globalSlice维护全局的删除操作,只是采用两个指针在globalQueue上截取一个分片,记录自己还没有处理的删除操作。同理,每个deleteSlice也是在globalQueue上采用两个“指针”记录了一段需要自己关注的删除操作,在apply delete queue中的删除操作时,DWPT会将自己关注的那些删除操作记录到自己的BufferedUpdates中。这里可以大概注意下,文章一开始提到的参考文章介绍了删除操作是一个全局的操作,全局删除和局部删除会进行双向的同步下文会具体介绍下其实现逻辑。

2 删除相关基础类介绍

2.1 BufferedUpdates

BufferedUpdates源码注释中可以知道其功能:

Holds buffered deletes and updates, by docID, term or query for a single segment. This is used to hold buffered pending deletes and updates against the to-be-flushed segment. Once the deletes and updates are pushed (on flush in DocumentsWriter), they are converted to a {@link FrozenBufferedUpdates} instance and pushed to the {@link BufferedUpdatesStream}.

由DWPT持有的记录删除或更新操作的工具类,其实除了每个DWPT会持有BufferedUpdates对象实例:pendingUpdates,DocumentsWriterDeleteQueue也会持有一个该对象的全局实例:globalBufferedUpdates,上文说到了全局删除和局部删除的双向同步,其实现关键就在DWPT持有的pendingUpdates和全局globalBufferedUpdates中。等介绍完其他相关基础类会介绍双向同步原理。

BufferedUpdates中维护三种类型的删除,可从其成员得出:

//维护Term删除操作
final Map<Term,Integer> deleteTerms = new HashMap<>();
//维护query删除操作
final Map<Query,Integer> deleteQueries = new HashMap<>();
//存放需要删除的docID,这里主要是记录新增或更新文档发生异常时已经缓存了
//的docID,在写入时会过滤这些doc,避免写入发生异常的doc
final List<Integer> deleteDocIDs = new ArrayList<>();

2.2 DocumentsWriterDeleteQueue

DocumentsWriterDeleteQueue负责保存全局删除操作,由DocumentsWriter持有唯一的一个实例。其内部采用单链表数据结构,DocumentsWriterDeleteQueue则持有该链表的tail节点,即尾节点。相信读者会有个疑问,为什么只持有链表的尾节点呢?因为尾节点之前的删除操作都已经加入到BufferedUpdates中了,也就是已经被处理过了,所以不再有用。

上面说的处理并不是执行了实际的删除动作,还是已经将节点代表的删除操作放置到全局globalBufferedUpdates或DWPT持有的pendingUpdates中。

首先看下DocumentsWriterDeleteQueue中节点类型,其基类为DocumentsWriterDeleteQueue中的静态内部类Node,主要实现见下图:

Node类主要子类.png

为了简单,我们只看TermNode:

//DocumentsWriterDeleteQueue.TermNode

private static final class TermNode extends Node<Term> {
    TermNode(Term term) {
      super(term);
    }
    //这里要重点关注下apply函数,从下面函数实现可以知道其主要工作就是将
    //删除操作放到BufferedUpdates中
    @Override
    void apply(BufferedUpdates bufferedDeletes, int docIDUpto) {
      bufferedDeletes.addTerm(item, docIDUpto);
    }

    @Override
    public String toString() {
      return "del=" + item;
    }

}

Lucene会在合适的时机触发Node.apply操作,比如在进行全局删除时调用DocumentsWriterDeleteQueue.tryApplyGlobalSlice进行全局操作,也会在update时,调用DeleteSlice.apply进行局部操作。

2.3 DeleteSlice

DeleteSlice同样是DocumentsWriterDeleteQueue中的静态内部类,其主要包含两个域(field、成员变量_):sliceHead、sliceTail,用来从globalQueue上截取自上次flush之后自己需要记录的删除操作。

DeleteSlice实现如下:

static class DeleteSlice {
    // No need to be volatile, slices are thread captive (only accessed by one thread)!
    Node<?> sliceHead; // we don't apply this one
    Node<?> sliceTail;

    DeleteSlice(Node<?> currentTail) {
      assert currentTail != null;
      /*
       * Initially this is a 0 length slice pointing to the 'current' tail of
       * the queue. Once we update the slice we only need to assign the tail and
       * have a new slice
       */
      sliceHead = sliceTail = currentTail;
    }
    //2.2提到的调用Node.apply的一个地方
    void apply(BufferedUpdates del, int docIDUpto) {
      if (sliceHead == sliceTail) {
        // 0 length slice
        return;
      }
      /*
       * When we apply a slice we take the head and get its next as our first
       * item to apply and continue until we applied the tail. If the head and
       * tail in this slice are not equal then there will be at least one more
       * non-null node in the slice!
       */
      Node<?> current = sliceHead;
      do {
        current = current.next;
        assert current != null : "slice property violated between the head on the tail must not be a null node";
        current.apply(del, docIDUpto);
      } while (current != sliceTail);
      reset();
    }

    void reset() {
      // Reset to a 0 length slice
      sliceHead = sliceTail;
    }

    /**
     * Returns <code>true</code> iff the given node is identical to the the slices tail,
     * otherwise <code>false</code>.
     */
    boolean isTail(Node<?> node) {
      return sliceTail == node;
    }

    /**
     * Returns <code>true</code> iff the given item is identical to the item
     * hold by the slices tail, otherwise <code>false</code>.
     */
    boolean isTailItem(Object object) {
      return sliceTail.item == object;
    }

    boolean isEmpty() {
      return sliceHead == sliceTail;
    }
}

2.4 DocumentsWriterFlushQueue

DocumentsWriterFlushQueue(后文使用ticketQueue代替)主要用于在flush时记录每个DWPT的刷新操作,这里记录的刷新操作主要是刷新之后形成的FlushedSegment以及全局删除操作,FlushedSegment是此次刷新之后的最终段内存视图。其中各种更新操作已经进行过具体处理,Term删除也进行了处理,但是Query删除还依然保持未处理,所以FlushedSegment中还保存了Query删除操作,但是没有
Term删除操作。

ticketQueue保存的这些操作会在所有DWPT的flush操作完成之后,会被purge到全局eventQueue中,然后再调用processEvents进行操作处理,使得内存中保存的SegmentInfo保持最新。这里也是NRT(near real-time)的实现关键。

2.5 FrozenBufferedUpdates

FrozenBufferedUpdates是每个DWPT在进行flush时,对自己持有pendingUpdates做的一个快照。

2.6 IndexWriter.eventQueue

IndexWriter.eventQueueIndexWriter中定义如下:

private final Queue<Event> eventQueue = new ConcurrentLinkedQueue<>();

就是采用JDK ConcurrentLinkedQueue定义的一个事件queue,刷盘过程中的的一些必要操作会被封装为Event并放入eventQueue中国,这些事件会在刷盘完成之后进行处理。

3 相关删除入口函数的实现

本节主要介绍相关删除的入口函数,这里可能会涉及到局部删除、全局删除双向同步相关函数的调用,遇到的话会稍微提及一下,具体的双向同步逻辑会在下一节介绍。

3.1 全局删除IndexWriter.deleteDocuments

IndexWriter.deleteDocuments系列重载函数主要做全局删除动作。

3.1.1 deleteDocuments(Term... terms)

直接看起实现函数:

//IndexWriter
 public long deleteDocuments(Term... terms) throws IOException {
    ensureOpen();
    try {
    //调用DocumentsWriter.deleteTerms进行删除
      long seqNo = docWriter.deleteTerms(terms);
      if (seqNo < 0) {
        seqNo = -seqNo;
        processEvents(true);
      }
      return seqNo;
    } catch (VirtualMachineError tragedy) {
      tragicEvent(tragedy, "deleteDocuments(Term..)");
      throw tragedy;
    }
}

//DocumentsWriter
long deleteTerms(final Term... terms) throws IOException {
    return applyDeleteOrUpdate(q -> q.addDelete(terms));
}

//DocumentsWriter
private synchronized long applyDeleteOrUpdate(ToLongFunction<DocumentsWriterDeleteQueue> function) throws IOException {
    // TODO why is this synchronized?
    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
    //这里将此Term删除放入globalQueue中
    long seqNo = function.applyAsLong(deleteQueue);
    flushControl.doOnDelete();
    lastSeqNo = Math.max(lastSeqNo, seqNo);
    if (applyAllDeletes(deleteQueue)) {
      seqNo = -seqNo;
    }
    return seqNo;
}

//DocumentsWriterDeleteQueue
long addDelete(Term... terms) {
    long seqNo = add(new TermArrayNode(terms));
    //将globalSlice中截取的删除分片放入globalBufferedUpdates中
    tryApplyGlobalSlice();
    return seqNo;
}

3.1.2 deleteDocuments(Query... queries)

deleteDocuments(Query... queries)其代码路径和deleteDocuments(Term... terms)一样,最后也是将删除操作放入globalQueue中。

但是和deleteDocuments(Term... terms)不同的是,其放入globalQueue中的Node子类是QueryArrayNode,而deleteDocuments(Term... terms)放入的Node子类是TermArrayNode

还有个区别是,如果是MatchAllDocsQuery删除,则会删除所有的文档:

//IndexWriter
public long deleteDocuments(Query... queries) throws IOException {
    ensureOpen();

    // LUCENE-6379: Specialize MatchAllDocsQuery
    for(Query query : queries) {
      if (query.getClass() == MatchAllDocsQuery.class) {
        //删除所有文档
        return deleteAll();
      }
    }

    try {
      long seqNo = docWriter.deleteQueries(queries);
      if (seqNo < 0) {
        seqNo = -seqNo;
        processEvents(true);
      }

      return seqNo;
    } catch (VirtualMachineError tragedy) {
      tragicEvent(tragedy, "deleteDocuments(Query..)");
      throw tragedy;
    }
}

这里不再展开deleteAll的实现,可自行去看源码。

3.2 局部删除IndexWriter.updateDocument

IndexWriter.addDocument也是通过调用IndexWriter.updateDocument实现的新增文档,在调用时,如下:

//IndexWriter
public long addDocument(Iterable<? extends IndexableField> doc) throws IOException {
    //第一个参数传null,表示新增
    return updateDocument((DocumentsWriterDeleteQueue.Node<?>) null, doc);
}

同样直接看源码

//IndexWriter
 public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
    //判断是否要删除原先的文档
    return updateDocument(term == null ? null : DocumentsWriterDeleteQueue.newNode(term), doc);
}

//DocumentsWriterDeleteQueue
static Node<Term> newNode(Term term) {
    return new TermNode(term);
}

//IndexWriter
private long updateDocument(final DocumentsWriterDeleteQueue.Node<?> delNode,
                              Iterable<? extends IndexableField> doc) throws IOException {
    ensureOpen();
    boolean success = false;
    try {
        //调用DocumentsWriter.updateDocument
      long seqNo = docWriter.updateDocument(doc, analyzer, delNode);
      if (seqNo < 0) {
        seqNo = -seqNo;
        processEvents(true);
      }
      success = true;
      return seqNo;
    } catch (VirtualMachineError tragedy) {
      tragicEvent(tragedy, "updateDocument");
      throw tragedy;
    } finally {
      if (success == false) {
        if (infoStream.isEnabled("IW")) {
          infoStream.message("IW", "hit exception updating document");
        }
      }
      maybeCloseOnTragicEvent();
    }
}

下面看发生在DocumentsWriter.updateDocument中的具体操作:

//DocumentsWriter
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
                      final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {

    boolean hasEvents = preUpdate();
    //获取一个DWPT
    final ThreadState perThread = flushControl.obtainAndLock();

    final DocumentsWriterPerThread flushingDWPT;
    long seqNo;
    try {
      // This must happen after we've pulled the ThreadState because IW.close
      // waits for all ThreadStates to be released:
      ensureOpen();
      //初始化DWPT
      ensureInitialized(perThread);
      assert perThread.isInitialized();
      final DocumentsWriterPerThread dwpt = perThread.dwpt;
      final int dwptNumDocs = dwpt.getNumDocsInRAM();
      try {
        //这里是具体文档更新逻辑所在,调用DWPT.updateDocument实现
        seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
      } finally {
        if (dwpt.isAborted()) {
          flushControl.doOnAbort(perThread);
        }
        // We don't know whether the document actually
        // counted as being indexed, so we must subtract here to
        // accumulate our separate counter:
        numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
      }
      final boolean isUpdate = delNode != null && delNode.isDelete();
      flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);

      assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
      perThread.lastSeqNo = seqNo;

    } finally {
      perThreadPool.release(perThread);
    }

    if (postUpdate(flushingDWPT, hasEvents)) {
      seqNo = -seqNo;
    }
    
    return seqNo;
}

ensureInitialized负责对此次分配到的DWPT进行初始化,如果DWPT为null,则new一个新的实例。

//
private void ensureInitialized(ThreadState state) throws IOException {
    if (state.dwpt == null) {
      final FieldInfos.Builder infos = new FieldInfos.Builder(globalFieldNumberMap);
      //DWPT的构造函数会传入globalQueue对象
      state.dwpt = new DocumentsWriterPerThread(indexCreatedVersionMajor, segmentNameSupplier.get(), directoryOrig,
                                                directory, config, infoStream, deleteQueue, infos,
                                                pendingNumDocs, enableTestPoints);
    }
  }

DWPT构造函数会构造此DWPT维护的pendingUpdates和deleteSlice:

 public DocumentsWriterPerThread(int indexVersionCreated, String segmentName, Directory directoryOrig, Directory directory, LiveIndexWriterConfig indexWriterConfig, InfoStream infoStream, DocumentsWriterDeleteQueue deleteQueue,
                                  FieldInfos.Builder fieldInfos, AtomicLong pendingNumDocs, boolean enableTestPoints) throws IOException {
    ...
    //DWPT维护的pendingUpdates初始化
    pendingUpdates = new BufferedUpdates(segmentName);
    intBlockAllocator = new IntBlockAllocator(bytesUsed);
    this.deleteQueue = deleteQueue;
    assert numDocsInRAM == 0 : "num docs " + numDocsInRAM;
    //新建deleteSlice
    deleteSlice = deleteQueue.newSlice();
   //此DWPT维护的Segment分段信息
    segmentInfo = new SegmentInfo(directoryOrig, Version.LATEST, Version.LATEST, segmentName, -1, false, codec, Collections.emptyMap(), StringHelper.randomId(), new HashMap<>(), indexWriterConfig.getIndexSort());
    ...
    // this should be the last call in the ctor 
    // it really sucks that we need to pull this within the ctor and pass this ref to the chain!
    //consumer用来进行实际的索引相关操作
    consumer = indexWriterConfig.getIndexingChain().getChain(this);
    ...
}

//DocumentsWriterDeleteQueue
DeleteSlice newSlice() {
    //返回一个新的DeleteSlice对象实例
    return new DeleteSlice(tail);
}

DeleteSlice类的代码上面已经列出过,这里看下其构造函数:

DeleteSlice(Node<?> currentTail) {
      assert currentTail != null;
      /*
       * Initially this is a 0 length slice pointing to the 'current' tail of
       * the queue. Once we update the slice we only need to assign the tail and
       * have a new slice
       */
       //此DeleteSlice的首尾指针均指向globalQueue的当前尾节点
      sliceHead = sliceTail = currentTail;
}

好了,现在DWPT已经初始化好了,现在看updateDocument的具体实现,上面代码已经指出updateDocument最终调用DWPT.updateDocument完成:

//DocumentsWriterPerThread
public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
    try {
      ...更新之前相关处理
      docState.doc = doc;
      docState.analyzer = analyzer;
      docState.docID = numDocsInRAM;//采用计数器分配当前文档ID
      ...
      boolean success = false;
      try {
        try {
        //consumer实际上是DefaultIndexingChain,负责具体操作,这里不再展开
          consumer.processDocument();
        } finally {
          docState.clear();
        }
        success = true;
      } finally {
        if (!success) {
        //如果更新失败,则标识删除此文档
          // mark document as deleted
          deleteDocID(docState.docID);
          numDocsInRAM++;
        }
      }
    //这里会处理具体的deleteSlice操作
      return finishDocument(deleteNode);
    } finally {
      maybeAbort("updateDocument", flushNotifications);
    }
  }

void deleteDocID(int docIDUpto) {
    //如果更新失败,则调用次函数删除更新的文档,这里直接放入DWPT自己的
    //pendingUpdates里,直接放入pendingUpdates里的删除操作不会进行全局同步
    //其实这也很合理,自己线程内发生的更新异常,只要自己记录,待刷新时删除
    //此文档保证不记录到文件中即可,其他的线程根本不需要知道这些。
    pendingUpdates.addDocID(docIDUpto);
    // NOTE: we do not trigger flush here.  This is
    // potentially a RAM leak, if you have an app that tries
    // to add docs but every single doc always hits a
    // non-aborting exception.  Allowing a flush here gets
    // very messy because we are only invoked when handling
    // exceptions so to do this properly, while handling an
    // exception we'd have to go off and flush new deletes
    // which is risky (likely would hit some other
    // confounding exception).
  }

DWPT.finishDocument会进行DWPT内部deleteSlice相关处理:

//DocumentsWriterPerThread
private long finishDocument(DocumentsWriterDeleteQueue.Node<?> deleteNode) {
    /*
     * here we actually finish the document in two steps 1. push the delete into
     * the queue and update our slice. 2. increment the DWPT private document
     * id.
     * 
     * the updated slice we get from 1. holds all the deletes that have occurred
     * since we updated the slice the last time.
     */
    boolean applySlice = numDocsInRAM != 0;
    long seqNo;
    //deleteNode != null表示是更新操作
    if (deleteNode != null) {
      seqNo = deleteQueue.add(deleteNode, deleteSlice);
      assert deleteSlice.isTail(deleteNode) : "expected the delete node as the tail";
    } else  {
        //表示deleteNode = null,表示是addDocument触发的调用,简单更新deleteSlice即可
      seqNo = deleteQueue.updateSlice(deleteSlice);
      
      if (seqNo < 0) {
        seqNo = -seqNo;
      } else {
        applySlice = false;
      }
    }
    
    if (applySlice) {
      deleteSlice.apply(pendingUpdates, numDocsInRAM);
    } else { // if we don't need to apply we must reset!
      deleteSlice.reset();
    }
    ++numDocsInRAM;

    return seqNo;
}

4 全局删除和局部删除之间的同步

第3节已经介绍了相关删除入口函数的实现,其中也看到了deleteSlice、globalSlice、globalQueue、pendingUpdates的相关调用和操作,但是没有具体展开介绍,这里将详细介绍。

在阅读之前需要先理解第2节中对deleteSlice、pendingUpdates、globalQueue、globalSlice、globalBufferedUpdates的介绍。

首先有个前提,Term删除属于一种具体的删除,在segment写入disk时,只需要测试每个doc相应的Field是否匹配该Term,如果匹配则取消对该doc的写入即可。此时是不会产生liv文件的,因为应该写入的都写入了,被删除的doc也进行了过滤而没有写入文件,所以文件中所有的文件都是没有被删除的,也即都是存活的,因此不需要liv文件进行标记。

但是上述描述还有一种意外情况,如果DWPT在update文档时发生了异常,导致pendingUpdates.deleteDocIDs记录的由异常文档id,那么此时需要生成liv文件,当然这属于一种异常情况。

除了异常情况,使用liv文件进行标记的就是在发生Query删除时,上文也说过Query删除需要在所有DWPT刷新完之后,打开Reader进行查询,然后更新livDoc,再生成liv文件标识那些文件被删除。Query删除只会发生在全局删除,调用deleteDocuments实现。

首先看下globalSlice的初始化,在DocumentsWriterDeleteQueue的构造函数中:

//DocumentsWriterDeleteQueue
DocumentsWriterDeleteQueue(InfoStream infoStream, BufferedUpdates globalBufferedUpdates, long generation, long startSeqNo) {
    ...
    //尾部使用哨兵节点初始化
    tail = new Node<>(null); // sentinel
    //globalSlice默认指向globalQueue的尾部
    globalSlice = new DeleteSlice(tail);
}

初始时globalQueue、globalSlice、globalBufferedUpdates如下:

1双向同步初始状态.png

如果此时进行了全局删除操作,比如调用deleteDocuments(Term... terms)
上面已经列出过源码,这里再次简要列出,删除操作还有一个优化,即如果缓存的删除太多会先解析一部分写入磁盘,以此释放一部分的内存占用,这里不做过多的介绍。

//IndexWriter
public long deleteDocuments(Term... terms) throws IOException {
    ensureOpen();
    try {
      long seqNo = docWriter.deleteTerms(terms);
      ...
}

//DocumentsWriter
 long deleteTerms(final Term... terms) throws IOException {
    return applyDeleteOrUpdate(q -> q.addDelete(terms));
}

private synchronized long applyDeleteOrUpdate(ToLongFunction<DocumentsWriterDeleteQueue> function) throws IOException {
    // TODO why is this synchronized?
    final DocumentsWriterDeleteQueue deleteQueue = this.deleteQueue;
    //这里调用q.addDelete(terms)
    long seqNo = function.applyAsLong(deleteQueue);
    flushControl.doOnDelete();
    lastSeqNo = Math.max(lastSeqNo, seqNo);
    //如果缓存的删除太多会先解析一部分写入磁盘,以此释放一部分的内存占用
    //这里不做过多的介绍
    if (applyAllDeletes(deleteQueue)) {
      seqNo = -seqNo;
    }
    return seqNo;
}

//DocumentsWriterDeleteQueue
long addDelete(Term... terms) {
    long seqNo = add(new TermArrayNode(terms));
    //更新globalSlice,将globalSlice中截取的删除操作放入
    //globalBufferedUpdates中
    tryApplyGlobalSlice();
    return seqNo;
  }
synchronized long add(Node<?> newNode) {
    tail.next = newNode;
    this.tail = newNode;
    return getNextSequenceNumber();
  }

//将globalSlice截取的Node节点放入globalBufferedUpdates中
void tryApplyGlobalSlice() {
    if (globalBufferLock.tryLock()) {
      /*
       * The global buffer must be locked but we don't need to update them if
       * there is an update going on right now. It is sufficient to apply the
       * deletes that have been added after the current in-flight global slices
       * tail the next time we can get the lock!
       */
      try {
          //更新globalSlice尾指针sliceTail使其指向globalQueue新的尾节点
        if (updateSliceNoSeqNo(globalSlice)) {
        //将globalSlice中截取的删除操作放入globalBufferedUpdates
          globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT);
        }
      } finally {
        globalBufferLock.unlock();
      }
    }
  }

   /** Just like updateSlice, but does not assign a sequence number */
  boolean updateSliceNoSeqNo(DeleteSlice slice) {
    if (slice.sliceTail != tail) {
      // new deletes arrived since we last checked
      //更新sliceTail使其指向新的尾节点
      slice.sliceTail = tail;
      return true;
    }
    return false;
  }
//DocumentsWriterDeleteQueue.DeleteSlice
void apply(BufferedUpdates del, int docIDUpto) {
    if (sliceHead == sliceTail) {
    // 0 length slice
    return;
    }
    /*
    * When we apply a slice we take the head and get its next as our first
    * item to apply and continue until we applied the tail. If the head and
    * tail in this slice are not equal then there will be at least one more
    * non-null node in the slice!
    */
    //对于sliceHead和sliceTail之间的所有Node,全部放入globalBufferedUpdates中
    Node<?> current = sliceHead;
    do {
        current = current.next;
        assert current != null : "slice property violated between the head on the tail must not be a null node";
        //具体Node的apply函数可自行查看代码,其实就是将Node中的Term或Query放入BufferedUpdates中
        current.apply(del, docIDUpto);
    } while (current != sliceTail);
    //重置globalSlice头尾节点指向位置
    reset();
}

void reset() {
    // Reset to a 0 length slice
    sliceHead = sliceTail;
}

DocumentsWriterDeleteQueue.add函数调用之后状态,此时globalSlice还没有更新:

DocumentsWriterDeleteQueue.add函数调用之后状态2.png

DocumentsWriterDeleteQueue.add在globalQueue中添加一个节点之后,会调用tryApplyGlobalSlice(),tryApplyGlobalSlice()中会调用updateSliceNoSeqNo(globalSlice)将globalSlice的尾指针指向globalQueue最新尾节点:

updateSliceNoSeqNo调用之后状态3.png

tryApplyGlobalSlice()调用updateSliceNoSeqNo(globalSlice)之后会调用globalSlice.apply(globalBufferedUpdates, BufferedUpdates.MAX_INT)将globalSlice首尾节点之间的所有节点(不包含首节点包含尾节点)都加入globalBufferedUpdates里,最后调用'reset'将globalSlice的头节点指向其尾节点,其实也就是将globalSlice首尾节点都指向globalQueue最新的尾节点:

tryApplyGlobalSlice函数调用之后状态4.png

如果此时再次调用IndexWriter.deleteDocuments(Query... queries)并经历上述过程之后的状态为:

再次调用deleteDocuments5.png

此时如果有更新操作,比如IndexWriter.updateDocument(Term term, Iterable<? extends IndexableField> doc)更新一个文档,则会启动一个DWPT,这里记为DWPT1,DWPT1会维护自己的deleteSlice和pendingUpdates:

//IndexWriter
public long updateDocument(Term term, Iterable<? extends IndexableField> doc) throws IOException {
    return updateDocument(term == null ? null : DocumentsWriterDeleteQueue.newNode(term), doc);
}

private long updateDocument(final DocumentsWriterDeleteQueue.Node<?> delNode,
                            Iterable<? extends IndexableField> doc) throws IOException {
    ensureOpen();
    boolean success = false;
    try {
    //调用DocumentsWriter.updateDocument
        long seqNo = docWriter.updateDocument(doc, analyzer, delNode);
        ...
}

//DocumentsWriter
long updateDocument(final Iterable<? extends IndexableField> doc, final Analyzer analyzer,
                      final DocumentsWriterDeleteQueue.Node<?> delNode) throws IOException {

    boolean hasEvents = preUpdate();

    final ThreadState perThread = flushControl.obtainAndLock();

    final DocumentsWriterPerThread flushingDWPT;
    long seqNo;
    try {
      // This must happen after we've pulled the ThreadState because IW.close
      // waits for all ThreadStates to be released:
      ensureOpen();
      //初始化DWPT
      ensureInitialized(perThread);
      assert perThread.isInitialized();
      final DocumentsWriterPerThread dwpt = perThread.dwpt;
      final int dwptNumDocs = dwpt.getNumDocsInRAM();
      try {
        //调用DWPT.updateDocument
        seqNo = dwpt.updateDocument(doc, analyzer, delNode, flushNotifications);
      } finally {
        if (dwpt.isAborted()) {
          flushControl.doOnAbort(perThread);
        }
        // We don't know whether the document actually
        // counted as being indexed, so we must subtract here to
        // accumulate our separate counter:
        numDocsInRAM.addAndGet(dwpt.getNumDocsInRAM() - dwptNumDocs);
      }
      final boolean isUpdate = delNode != null && delNode.isDelete();
      flushingDWPT = flushControl.doAfterDocument(perThread, isUpdate);

      assert seqNo > perThread.lastSeqNo: "seqNo=" + seqNo + " lastSeqNo=" + perThread.lastSeqNo;
      perThread.lastSeqNo = seqNo;

    } finally {
      perThreadPool.release(perThread);
    }

    if (postUpdate(flushingDWPT, hasEvents)) {
      seqNo = -seqNo;
    }
    
    return seqNo;
}

//DocumentsWriterPerThread
public long updateDocument(Iterable<? extends IndexableField> doc, Analyzer analyzer, DocumentsWriterDeleteQueue.Node<?> deleteNode, DocumentsWriter.FlushNotifications flushNotifications) throws IOException {
    try {
      ...
      reserveOneDoc();
      //保存该文档相关信息
      docState.doc = doc;
      docState.analyzer = analyzer;
      docState.docID = numDocsInRAM;
      if (INFO_VERBOSE && infoStream.isEnabled("DWPT")) {
        infoStream.message("DWPT", Thread.currentThread().getName() + " update delTerm=" + deleteNode + " docID=" + docState.docID + " seg=" + segmentInfo.name);
      }
      ...
      boolean success = false;
      try {
        try {
          consumer.processDocument();
        } finally {
          docState.clear();
        }
        success = true;
      } finally {
        if (!success) {
          // mark document as deleted
          //如果失败,则将该文档放入此DWPT pendingUpdates的deleteDocIDs中
          deleteDocID(docState.docID);
          numDocsInRAM++;
        }
      }
        //结束文档更新,开始此DWPT deleteSlice相关逻辑
      return finishDocument(deleteNode);
    } finally {
      maybeAbort("updateDocument", flushNotifications);
    }
}

private long finishDocument(DocumentsWriterDeleteQueue.Node<?> deleteNode) {
    /*
     * here we actually finish the document in two steps 1. push the delete into
     * the queue and update our slice. 2. increment the DWPT private document
     * id.
     * 
     * the updated slice we get from 1. holds all the deletes that have occurred
     * since we updated the slice the last time.
     */
    //因为上面已经插入一个文档,所以numDocsInRAM !=0,因此applySlice= true
    boolean applySlice = numDocsInRAM != 0;
    long seqNo;
    //如果不是通过addDocument触发的新增更新,则deleteNode不为空
    //此时将此deleteNode添加到deleteSlice中
    if (deleteNode != null) {
      seqNo = deleteQueue.add(deleteNode, deleteSlice);
      assert deleteSlice.isTail(deleteNode) : "expected the delete node as the tail";
    } else  {
      seqNo = deleteQueue.updateSlice(deleteSlice);
      
      if (seqNo < 0) {
        seqNo = -seqNo;
      } else {
        applySlice = false;
      }
    }
    //如果有删除操作,则将此时deleteSlice中维护的Node放入pendingUpdates中
    //否则重置deleteSlice的收尾指针.
    if (applySlice) {
      deleteSlice.apply(pendingUpdates, numDocsInRAM);
    } else { // if we don't need to apply we must reset!
      deleteSlice.reset();
    }
    ++numDocsInRAM;

    return seqNo;
  }

long add(Node<?> deleteNode, DeleteSlice slice) {
    long seqNo = add(deleteNode);
    ...
    slice.sliceTail = deleteNode;
    assert slice.sliceHead != slice.sliceTail : "slice head and tail must differ after add";
    //全局删除同步此局部删除
    tryApplyGlobalSlice(); // TODO doing this each time is not necessary maybe
    // we can do it just every n times or so?

    return seqNo;
  }

初始化一个新的DWPT,新的DWPT维护的pendingUpdates默认为空,没有任何元素,其deleteSlice首尾指针此时都会指向当前globalQueue的尾节点:

开启一个DWPT状态6.png

DWPT1在finishDocument中调用deleteQueue.add(deleteNode, deleteSlice)将删除节点添加到globalQueue中,并更新自己deleteSlice尾节点使其指向globalQueue最新尾节点:

DWPT1将删除操作添加到globalQueue7.png

上面DocumentsWriterDeleteQueue.add(Node<?> deleteNode, DeleteSlice slice)更新完DWPT1自己的deleteSlice之后,会调用tryApplyGlobalSlice,这个函数上面已经介绍过,会先调用updateSliceNoSeqNo(globalSlice)使globalSlice的尾指针指向最新的globalQueue尾节点:

全局同步更新globalSlice尾指针8.png

之后会调用globalSlice.apply将DWPT1的删除操作放入globalBufferedUpdates中:

全局同步globalSlice.apply9.png

可见上面的状态途中已经发生了一次全局删除同步局部删除的操作,局部删除term3已经到了globalBufferedUpdates中。

发生完上述操作之后,DWPT1在finishDocument会调用deleteSlice.apply(pendingUpdates, numDocsInRAM)将自己deleteSlice首尾节点之间的Node添加到自己的pendingUpdates里:

DWPT1调用deleteSlice.apply之后的状态10.png

此时如果再开启DWPT2,并进行更新操作,则新的状态图如下:

启动DWPT2并更新文档11.png

上面的状态图中又发生了一次全局删除从局部删除同步凑哦,DWPT2中的局部删除term4已经同步到了globalBufferedUpdates中。

现在假设发生一次全局删除,状态图如下:


发生全局删除12.png

在此基础上再假设DWPT1发生了一次局部删除,首先globalQueue中新增一个节点:


DWPT1再次发生局部删除add之后13.png

同样地,这里会通过tryApplyGlobalSlice更新globalSlice和globalBufferedUpdates:

DWPT1再次发生局部删除add之后更新globalSlice14.png

globalSlice和globalBufferedUpdates更新之后,DWPT1会更新自己的deleteSlice,这次更新会将deleteSlice首尾指针之间的TermNode4、TermArrayNode5,以及尾指针指向的TermNode6都加入到自己的pendingUpdates中:

DWPT1更新自己的deleteSlice15.png

从上图可知,DWPT1的pendingUpdates中同步了全局删除term5,以及DWPT2的局部删除term4。

通过上面的过程相信大家已经理解了全局删除和局部删除之间的同步原理。

有的读者可能会想DWPT1初始化时直接指向了globalQueue的尾节点,没有包含一开始的全局删除term1,那DWPT1在写入时怎么反映term1删除?

因为term1删除发生在DWPT1初始化之前,如果DWPT1没有操作过满足term1删除的文档,则DWPT1后续写入disk时根本不用理会term1删除,因为其缓存中根本没有相关的满足term1删除条件的文档。如果后续DWPT1又新增或更新了满足term1删除条件的doc,那么DWPT1刷盘时直接新增即可,也不用理会term1删除,这里从发生时间的先后可以理解为删除之后又新增了文档。

对于DocumentsWriterDeleteQueue的垃圾回收,在上述整个过程中globalQueue自身只有一个tail指向最新的尾节点,其他的就是globalSlice和deleteSlice对其还有引用,可以看上面最后一幅图,第一个null节点到TermNode3都属于不可达对象,因此都有机会在GC中被回收。

(超出文章最大长度,上只有1-4节内容,属于删除前的准备,准备之后的处理,即5-6节的内容放在Lucene Term Query删除流程源码剖析-下介绍

上一篇 下一篇

猜你喜欢

热点阅读