informer demo

2021-08-16  本文已影响0人  wwq2020

eventhandler模式

package main

import (
    "fmt"
    "sync"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
)

func main() {
    restConfig := rest.Config{
        Host:        "",
        BearerToken: "",
    }
    clientSet, err := kubernetes.NewForConfig(&restConfig)
    if err != nil {
        panic(err)
    }
    m := make(map[string][]*corev1.Pod)
    var lock sync.Mutex
    demoLabel := "demoLabel"
    factory := informers.NewSharedInformerFactory(clientSet, 0)
    informer := factory.Core().V1().Pods().Informer()
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            demoLabel, exist := pod.Annotations[demoLabel]
            if !exist {
                return
            }
            lock.Lock()
            defer lock.Unlock()
            m[demoLabel] = append(m[demoLabel], pod)
        },
        UpdateFunc: func(oldObj, newObj interface{}) {
            oldPod := oldObj.(*corev1.Pod)
            newPod := newObj.(*corev1.Pod)
            oldDemoLabel, oldexist := oldPod.Annotations[demoLabel]
            newDemoLabel, newexist := newPod.Annotations[demoLabel]
            lock.Lock()
            defer lock.Unlock()
            if oldexist {
                pods := m[oldDemoLabel]
                for i, pod := range pods {
                    if pod.Name == oldPod.Name && pod.Namespace == oldPod.Namespace {
                        m[oldDemoLabel] = append(pods[:i], pods[i+1])
                        break
                    }
                }
            }
            if newexist {
                m[newDemoLabel] = append(m[newDemoLabel], newPod)
            }

        },
        DeleteFunc: func(obj interface{}) {
            pod := obj.(*corev1.Pod)
            demoLabel, exist := pod.Annotations[demoLabel]
            if !exist {
                return
            }
            lock.Lock()
            defer lock.Unlock()
            pods := m[demoLabel]
            for i, each := range pods {
                if pod.Name == each.Name && pod.Namespace == each.Namespace {
                    m[demoLabel] = append(pods[:i], pods[i+1])
                    break
                }
            }
        },
    })
    done := make(chan struct{})
    factory.Start(done)
    factory.WaitForCacheSync(done)
    lock.Lock()
    defer lock.Unlock()
    fmt.Println(m["somepodname"])
}

indexer模式

package main

import (
    "fmt"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/tools/cache"
)

func main() {
    restConfig := rest.Config{
        Host:        "",
        BearerToken: "",
    }
    clientSet, err := kubernetes.NewForConfig(&restConfig)
    if err != nil {
        panic(err)
    }
    factory := informers.NewSharedInformerFactory(clientSet, 0)
    demoLabel := "demoLabel"
    indexer := factory.Core().V1().Pods().Informer().GetIndexer()
    demoLabelToPodsIndexer := func(obj interface{}) ([]string, error) {
        pod, ok := obj.(*corev1.Pod)
        if !ok {
            return []string{}, nil
        }
        demoLabel, exist := pod.Annotations[demoLabel]
        if !exist {
            return []string{}, nil
        }
        return []string{demoLabel}, nil
    }
    demoLabelToPodsIndexerName := "demoLabelToPodsIndexer"
    indexer.AddIndexers(map[string]cache.IndexFunc{
        demoLabelToPodsIndexerName: demoLabelToPodsIndexer,
    })
    done := make(chan struct{})
    factory.Start(done)
    factory.WaitForCacheSync(done)
    vals, err := indexer.ByIndex(demoLabelToPodsIndexerName, "somedemolabel")
    fmt.Println(vals, err)
}

```
上一篇 下一篇

猜你喜欢

热点阅读