informer demo
2021-08-16 本文已影响0人
wwq2020
eventhandler模式
package main
import (
"fmt"
"sync"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
func main() {
restConfig := rest.Config{
Host: "",
BearerToken: "",
}
clientSet, err := kubernetes.NewForConfig(&restConfig)
if err != nil {
panic(err)
}
m := make(map[string][]*corev1.Pod)
var lock sync.Mutex
demoLabel := "demoLabel"
factory := informers.NewSharedInformerFactory(clientSet, 0)
informer := factory.Core().V1().Pods().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
demoLabel, exist := pod.Annotations[demoLabel]
if !exist {
return
}
lock.Lock()
defer lock.Unlock()
m[demoLabel] = append(m[demoLabel], pod)
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod := oldObj.(*corev1.Pod)
newPod := newObj.(*corev1.Pod)
oldDemoLabel, oldexist := oldPod.Annotations[demoLabel]
newDemoLabel, newexist := newPod.Annotations[demoLabel]
lock.Lock()
defer lock.Unlock()
if oldexist {
pods := m[oldDemoLabel]
for i, pod := range pods {
if pod.Name == oldPod.Name && pod.Namespace == oldPod.Namespace {
m[oldDemoLabel] = append(pods[:i], pods[i+1])
break
}
}
}
if newexist {
m[newDemoLabel] = append(m[newDemoLabel], newPod)
}
},
DeleteFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
demoLabel, exist := pod.Annotations[demoLabel]
if !exist {
return
}
lock.Lock()
defer lock.Unlock()
pods := m[demoLabel]
for i, each := range pods {
if pod.Name == each.Name && pod.Namespace == each.Namespace {
m[demoLabel] = append(pods[:i], pods[i+1])
break
}
}
},
})
done := make(chan struct{})
factory.Start(done)
factory.WaitForCacheSync(done)
lock.Lock()
defer lock.Unlock()
fmt.Println(m["somepodname"])
}
indexer模式
package main
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
func main() {
restConfig := rest.Config{
Host: "",
BearerToken: "",
}
clientSet, err := kubernetes.NewForConfig(&restConfig)
if err != nil {
panic(err)
}
factory := informers.NewSharedInformerFactory(clientSet, 0)
demoLabel := "demoLabel"
indexer := factory.Core().V1().Pods().Informer().GetIndexer()
demoLabelToPodsIndexer := func(obj interface{}) ([]string, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return []string{}, nil
}
demoLabel, exist := pod.Annotations[demoLabel]
if !exist {
return []string{}, nil
}
return []string{demoLabel}, nil
}
demoLabelToPodsIndexerName := "demoLabelToPodsIndexer"
indexer.AddIndexers(map[string]cache.IndexFunc{
demoLabelToPodsIndexerName: demoLabelToPodsIndexer,
})
done := make(chan struct{})
factory.Start(done)
factory.WaitForCacheSync(done)
vals, err := indexer.ByIndex(demoLabelToPodsIndexerName, "somedemolabel")
fmt.Println(vals, err)
}
```