mimir查询主流程源码

2022-04-06  本文已影响0人  wwq2020

frontend http入口在pkg/api/api.go中

func (a *API) RegisterQueryAPI(handler http.Handler, buildInfoHandler http.Handler) {
  ...
  a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/query"), handler, true, true, "GET", "POST")
  ...
}

frontend会访问到pkg/frontend/transport/handler.go中

func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  ...
  resp, err := f.roundTripper.RoundTrip(r)
  ...
}

frontend调用到pkg/frontend/transport/roundtripper.go中

func (a *grpcRoundTripperAdapter) RoundTrip(r *http.Request) (*http.Response, error) {
  ...

  resp, err := a.roundTripper.RoundTripGRPC(r.Context(), req)
  if err != nil {
    return nil, err
  }
  ...
}

frontend调用到pkg/frontend/v2/frontend.go中

func (f *Frontend) RoundTripGRPC(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) {
  ...
case f.requestsCh <- freq:
  ...
case resp := <-freq.response:
        if stats.ShouldTrackHTTPGRPCResponse(resp.HttpResponse) {
            stats := stats.FromContext(ctx)
            stats.Merge(resp.Stats) // Safe if stats is nil.
        }

        return resp.HttpResponse, nil
    }
  ...
}

frontend通过chan发送到pkg/frontend/v2/frontend_scheduler_worker.go中

func (w *frontendSchedulerWorker) schedulerLoop(loop schedulerpb.SchedulerForFrontend_FrontendLoopClient) error {
  ...

        case req := <-w.requestCh:
            err := loop.Send(&schedulerpb.FrontendToScheduler{
                Type:            schedulerpb.ENQUEUE,
                QueryID:         req.queryID,
                UserID:          req.userID,
                HttpRequest:     req.request,
                FrontendAddress: w.frontendAddr,
                StatsEnabled:    req.statsEnabled,
            })
  ...
}

scheduler通过grpc链接接受到查询请求pkg/scheduler/scheduler.go中

func (s *Scheduler) FrontendLoop(frontend schedulerpb.SchedulerForFrontend_FrontendLoopServer) error {
  ...
    case schedulerpb.ENQUEUE:
            err = s.enqueueRequest(frontendCtx, frontendAddress, msg)
  ...
}

scheduler推送给querier,pkg/scheduler/scheduler.go中

func (s *Scheduler) QuerierLoop(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer) error {
  ...
        req, idx, err := s.requestQueue.GetNextRequestForQuerier(querier.Context(), lastUserIndex, querierID)
        if err != nil {
            return err
        }
  ...
        if err := s.forwardRequestToQuerier(querier, r); err != nil {
            return err
        }
  ...
}


func (s *Scheduler) forwardRequestToQuerier(querier schedulerpb.SchedulerForQuerier_QuerierLoopServer, req *schedulerRequest) error {
  ...
    go func() {
        err := querier.Send(&schedulerpb.SchedulerToQuerier{
            UserID:          req.userID,
            QueryID:         req.queryID,
            FrontendAddress: req.frontendAddress,
            HttpRequest:     req.request,
            StatsEnabled:    req.statsEnabled,
        })
        if err != nil {
            errCh <- err
            return
        }

        _, err = querier.Recv()
        errCh <- err
    }()
  ...
}

querier通过grpc接受到查询请求pkg/querier/worker/scheduler_processor.go中

func (sp *schedulerProcessor) processQueriesOnSingleStream(ctx context.Context, conn *grpc.ClientConn, address string) {
  ...
        c, err := schedulerClient.QuerierLoop(ctx)
  ...

        if err := sp.querierLoop(c, address); err != nil {
            level.Error(sp.log).Log("msg", "error processing requests from scheduler", "err", err, "addr", address)
            backoff.Wait()
            continue
        }
  ...
    }
}

func (sp *schedulerProcessor) querierLoop(c schedulerpb.SchedulerForQuerier_QuerierLoopClient, address string) error {
  ...
  sp.runRequest(ctx, logger, request.QueryID, request.FrontendAddress, request.StatsEnabled, request.HttpRequest)

  ...
}


querier通过grpc发送查询响应给frontend pkg/querier/worker/scheduler_processor.go中

func (sp *schedulerProcessor) runRequest(ctx context.Context, logger log.Logger, queryID uint64, frontendAddress string, statsEnabled bool, request *httpgrpc.HTTPRequest) {
  ...
c, err := sp.frontendPool.GetClientFor(frontendAddress)
    if err == nil {
        // Response is empty and uninteresting.
        _, err = c.(frontendv2pb.FrontendForQuerierClient).QueryResult(ctx, &frontendv2pb.QueryResultRequest{
            QueryID:      queryID,
            HttpResponse: response,
            Stats:        stats,
        })
    }
  ...
}

通过grpc获取到查询响应,pkg/frontend/v2/frontend.go

func (f *Frontend) QueryResult(ctx context.Context, qrReq *frontendv2pb.QueryResultRequest) (*frontendv2pb.QueryResultResponse, error) {
  ...
    req := f.requests.get(qrReq.QueryID)
  ...
        case req.response <- qrReq:
  ...
}
上一篇下一篇

猜你喜欢

热点阅读