loki源码阅读之ingester

2020-06-18  本文已影响0人  wwq2020

简介

ingester负责在把日志写入到长期存储后端和返回日志数据

相关源码

cmd/loki/main.go中

func main() {
    ...
    t, err := loki.New(config)
    util.CheckFatal("initialising loki", err)

    level.Info(util.Logger).Log("msg", "Starting Loki", "version", version.Info())

    err = t.Run()
    ...
}

pkg/loki/loki.go中

func New(cfg Config) (*Loki, error) {
    loki := &Loki{
        cfg: cfg,
    }

    loki.setupAuthMiddleware()
    if err := loki.setupModuleManager(); err != nil {
        return nil, err
    }
    storage.RegisterCustomIndexClients(cfg.StorageConfig, prometheus.DefaultRegisterer)

    return loki, nil
}

func (t *Loki) setupModuleManager() error {
    mm := modules.NewManager()

    mm.RegisterModule(Server, t.initServer)
    mm.RegisterModule(RuntimeConfig, t.initRuntimeConfig)
    mm.RegisterModule(MemberlistKV, t.initMemberlistKV)
    mm.RegisterModule(Ring, t.initRing)
    mm.RegisterModule(Overrides, t.initOverrides)
    mm.RegisterModule(Distributor, t.initDistributor)
    mm.RegisterModule(Store, t.initStore)
    mm.RegisterModule(Ingester, t.initIngester)
    mm.RegisterModule(Querier, t.initQuerier)
    mm.RegisterModule(QueryFrontend, t.initQueryFrontend)
    mm.RegisterModule(TableManager, t.initTableManager)
    mm.RegisterModule(All, nil)

    // Add dependencies
    deps := map[string][]string{
        Ring:          {RuntimeConfig, Server, MemberlistKV},
        Overrides:     {RuntimeConfig},
        Distributor:   {Ring, Server, Overrides},
        Store:         {Overrides},
        Ingester:      {Store, Server, MemberlistKV},
        Querier:       {Store, Ring, Server},
        QueryFrontend: {Server, Overrides},
        TableManager:  {Server},
        All:           {Querier, Ingester, Distributor, TableManager},
    }

    for mod, targets := range deps {
        if err := mm.AddDependency(mod, targets...); err != nil {
            return err
        }
    }

    t.moduleManager = mm

    return nil
}

func (t *Loki) Run() error {
    serviceMap, err := t.moduleManager.InitModuleServices(t.cfg.Target)
    if err != nil {
        return err
    }

    t.serviceMap = serviceMap
    ...
    var servs []services.Service
    for _, s := range serviceMap {
        servs = append(servs, s)
    }
    ...
    sm, err := services.NewManager(servs...)
    ...
    err = sm.StartAsync(context.Background())

    ...
}

github.com/cortexproject/cortex/pkg/util/modules/modules.go中

func (m *Manager) InitModuleServices(target string) (map[string]services.Service, error) {
    if _, ok := m.modules[target]; !ok {
        return nil, fmt.Errorf("unrecognised module name: %s", target)
    }
    servicesMap := map[string]services.Service{}

    // initialize all of our dependencies first
    deps := m.orderedDeps(target)
    deps = append(deps, target) // lastly, initialize the requested module

    for ix, n := range deps {
        mod := m.modules[n]

        var serv services.Service

        if mod.initFn != nil {
            s, err := mod.initFn()
            if err != nil {
                return nil, errors.Wrap(err, fmt.Sprintf("error initialising module: %s", n))
            }

            if s != nil {
                // We pass servicesMap, which isn't yet complete. By the time service starts,
                // it will be fully built, so there is no need for extra synchronization.
                serv = newModuleServiceWrapper(servicesMap, n, s, mod.deps, m.findInverseDependencies(n, deps[ix+1:]))
            }
        }

        if serv != nil {
            servicesMap[n] = serv
        }
    }

    return servicesMap, nil
}

github.com/cortexproject/cortex/pkg/util/services/manager.go中

func NewManager(services ...Service) (*Manager, error) {
    if len(services) == 0 {
        return nil, errors.New("no services")
    }

    m := &Manager{
        services:  services,
        byState:   map[State][]Service{},
        healthyCh: make(chan struct{}),
        stoppedCh: make(chan struct{}),
    }
    ...
    return m, nil
}

func (m *Manager) StartAsync(ctx context.Context) error {
    for _, s := range m.services {
        err := s.StartAsync(ctx)
        if err != nil {
            return err
        }
    }
    return nil
}

pkg/loki/modules.go中

func (t *Loki) initStore() (_ services.Service, err error) {
    if activePeriodConfig(t.cfg.SchemaConfig).IndexType == local.BoltDBShipperType {
        t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
        switch t.cfg.Target {
        case Ingester:
            // We do not want ingester to unnecessarily keep downloading files
            t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeWriteOnly
        case Querier:
            // We do not want query to do any updates to index
            t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadOnly
        default:
            t.cfg.StorageConfig.BoltDBShipperConfig.Mode = local.ShipperModeReadWrite
        }
    }

    t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides, prometheus.DefaultRegisterer)
    if err != nil {
        return
    }

    return services.NewIdleService(nil, func(_ error) error {
        t.store.Stop()
        return nil
    }), nil
}

func (t *Loki) initIngester() (_ services.Service, err error) {
    t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
    t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
    t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort

    // We want ingester to also query the store when using boltdb-shipper
    pc := activePeriodConfig(t.cfg.SchemaConfig)
    if pc.IndexType == local.BoltDBShipperType {
        t.cfg.Ingester.QueryStore = true
        mlb, err := calculateMaxLookBack(pc, t.cfg.Ingester.QueryStoreMaxLookBackPeriod, t.cfg.Ingester.MaxChunkAge)
        if err != nil {
            return nil, err
        }
        t.cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
    }

    t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, prometheus.DefaultRegisterer)
    if err != nil {
        return
    }

    logproto.RegisterPusherServer(t.server.GRPC, t.ingester)
    logproto.RegisterQuerierServer(t.server.GRPC, t.ingester)
    logproto.RegisterIngesterServer(t.server.GRPC, t.ingester)
    grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester)
    t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
    return t.ingester, nil
}

pkg/ingester/ingester.go中

func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
    if cfg.ingesterClientFactory == nil {
        cfg.ingesterClientFactory = client.New
    }
    enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding)
    if err != nil {
        return nil, err
    }

    i := &Ingester{
        cfg:          cfg,
        clientConfig: clientConfig,
        instances:    map[string]*instance{},
        store:        store,
        loopQuit:     make(chan struct{}),
        flushQueues:  make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
        tailersQuit:  make(chan struct{}),
        factory: func() chunkenc.Chunk {
            return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize)
        },
    }

    i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer)
    if err != nil {
        return nil, err
    }

    i.lifecyclerWatcher = services.NewFailureWatcher()
    i.lifecyclerWatcher.WatchService(i.lifecycler)

    // Now that the lifecycler has been created, we can create the limiter
    // which depends on it.
    i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)

    i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
    return i, nil
}

func (i *Ingester) starting(ctx context.Context) error {
    i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
    for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
        i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength)
        go i.flushLoop(j)
    }

    // pass new context to lifecycler, so that it doesn't stop automatically when Ingester's service context is done
    err := i.lifecycler.StartAsync(context.Background())
    if err != nil {
        return err
    }

    err = i.lifecycler.AwaitRunning(ctx)
    if err != nil {
        return err
    }

    // start our loop
    i.loopDone.Add(1)
    go i.loop()
    return nil
}

func (i *Ingester) loop() {
    defer i.loopDone.Done()

    flushTicker := time.NewTicker(i.cfg.FlushCheckPeriod)
    defer flushTicker.Stop()

    for {
        select {
        case <-flushTicker.C:
            i.sweepUsers(false)

        case <-i.loopQuit:
            return
        }
    }
}

func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
    instanceID, err := user.ExtractOrgID(ctx)
    if err != nil {
        return nil, err
    } else if i.readonly {
        return nil, ErrReadOnly
    }

    instance := i.getOrCreateInstance(instanceID)
    err = instance.Push(ctx, req)
    return &logproto.PushResponse{}, err
}

func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
    inst, ok := i.getInstanceByID(instanceID)
    if ok {
        return inst
    }

    i.instancesMtx.Lock()
    defer i.instancesMtx.Unlock()
    inst, ok = i.instances[instanceID]
    if !ok {
        inst = newInstance(&i.cfg, instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
        i.instances[instanceID] = inst
    }
    return inst
}

pkg/ingester/flush.go中

func (i *Ingester) flushLoop(j int) {
    defer func() {
        level.Debug(util.Logger).Log("msg", "Ingester.flushLoop() exited")
        i.flushQueuesDone.Done()
    }()

    for {
        o := i.flushQueues[j].Dequeue()
        if o == nil {
            return
        }
        op := o.(*flushOp)

        level.Debug(util.Logger).Log("msg", "flushing stream", "userid", op.userID, "fp", op.fp, "immediate", op.immediate)

        err := i.flushUserSeries(op.userID, op.fp, op.immediate)
        if err != nil {
            level.Error(util.WithUserID(op.userID, util.Logger)).Log("msg", "failed to flush user", "err", err)
        }

        // If we're exiting & we failed to flush, put the failed operation
        // back in the queue at a later point.
        if op.immediate && err != nil {
            op.from = op.from.Add(flushBackoff)
            i.flushQueues[j].Enqueue(op)
        }
    }
}

func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
    instance, ok := i.getInstanceByID(userID)
    if !ok {
        return nil
    }

    chunks, labels := i.collectChunksToFlush(instance, fp, immediate)
    if len(chunks) < 1 {
        return nil
    }

    ctx := user.InjectOrgID(context.Background(), userID)
    ctx, cancel := context.WithTimeout(ctx, i.cfg.FlushOpTimeout)
    defer cancel()
    err := i.flushChunks(ctx, fp, labels, chunks)
    if err != nil {
        return err
    }

    instance.streamsMtx.Lock()
    for _, chunk := range chunks {
        chunk.flushed = time.Now()
    }
    instance.streamsMtx.Unlock()
    return nil
}


func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc) error {
    ...
    if err := i.store.Put(ctx, wireChunks); err != nil {
        return err
    }
    ...
    return nil
}

pkg/ingester/instance.go中

func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
    i := &instance{
        cfg:        cfg,
        streams:    map[model.Fingerprint]*stream{},
        index:      index.New(),
        instanceID: instanceID,

        streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
        streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),

        factory: factory,
        tailers: map[uint32]*tailer{},
        limiter: limiter,

        syncPeriod:  syncPeriod,
        syncMinUtil: syncMinUtil,
    }
    i.mapper = newFPMapper(i.getLabelsFromFingerprint)
    return i
}

func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
    i.streamsMtx.Lock()
    defer i.streamsMtx.Unlock()

    var appendErr error
    for _, s := range req.Streams {

        stream, err := i.getOrCreateStream(s)
        if err != nil {
            appendErr = err
            continue
        }

        prevNumChunks := len(stream.chunks)
        if err := stream.Push(ctx, s.Entries, i.syncPeriod, i.syncMinUtil); err != nil {
            appendErr = err
            continue
        }

        memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks))
    }

    return appendErr
}

func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) {
    ...
    stream = newStream(i.cfg, fp, sortedLabels, i.factory)
    ...
}

pkg/ingester/stream.go中

func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
    ...
    chunk := &s.chunks[len(s.chunks)-1]
    ...
    if err := chunk.chunk.Append(&entries[i]); err != nil {
    ...
}

github.com/cortexproject/cortex/pkg/util/services/basic_service.go中

func NewBasicService(start StartingFn, run RunningFn, stop StoppingFn) *BasicService {
    return &BasicService{
        startFn:             start,
        runningFn:           run,
        stoppingFn:          stop,
        state:               New,
        runningWaitersCh:    make(chan struct{}),
        terminatedWaitersCh: make(chan struct{}),
    }
}

func (b *BasicService) StartAsync(parentContext context.Context) error {
    switched, oldState := b.switchState(New, Starting, func() {
        b.serviceContext, b.serviceCancel = context.WithCancel(parentContext)
        b.notifyListeners(func(l Listener) { l.Starting() }, false)
        go b.main()
    })

    if !switched {
        return invalidServiceStateError(oldState, New)
    }
    return nil
}
上一篇下一篇

猜你喜欢

热点阅读