sofa-jraft线性一致读实现

2020-02-05  本文已影响0人  zhangsean

线性一致读

实现api

//NodeImpl
readIndex(final byte[] requestContext, final ReadIndexClosure done)

调用readIndex后

  1. 把请求添加到队列readIndexQueue (disruptor队列)

  2. 添加后被ReadIndexEventHandler处理 (如果生产速度过快32个一批)

  3. 构造请求ReadIndexRequest, 调用NodeImpl.handleReadIndexRequest处理,根据Node是leader还是follwer执行不同的操作

  4. leader

    • 记录lastCommittedIndex

    • 向所有的peer发送心跳,过半节点心跳反馈后,触发ReadIndexResponseClosure的run方法。

    • 如果lastCommittedInde未提交则批量请求都等待

    • 如果lastCommittedInde已提交,则批量的请求都释放,见ReadIndexClosure (核心代码ReadIndexResponseClosure.notifySuccess)

      private void notifySuccess(final ReadIndexStatus status) {
          final long nowMs = Utils.monotonicMs();
          final List<ReadIndexState> states = status.getStates();
          final int taskCount = states.size();
          for (int i = 0; i < taskCount; i++) {
              final ReadIndexState task = states.get(i);
              final ReadIndexClosure done = task.getDone(); // stack copy
              if (done != null) {
                  this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
                  done.setResult(task.getIndex(), task.getRequestContext().get());
                  done.run(Status.OK());
              }
          }
      }
      

      ReadIndexClosure.run核心代码会执行用户定义的run方法

      public void run(final Status status) {
          run(status, this.index, this.requestContext);
      }
      
  5. follower

    • 向leader发送readIndex rpc请求
    • leader执行一次readIndex, 返回leader的readIndex给follower
    • 等待收到的readIndex已经apply,执行donw
上一篇下一篇

猜你喜欢

热点阅读