kube-proxy分析

2018-09-27  本文已影响0人  陈先生_9e91

kube-proxy分析

service是一个反向代理,为后端Pod提供统一入口。可以通过cluster IP 、 dns、Node IP进行访问。kube-proxy帮助service完成具体操作,service-controller基本没干什么事情。

iptables

默认采用iptables来实现负载均衡,Headless service不需要iptables实现负载均衡,直接就可以获得Pod IP,所以没有相应的iptables规则。

iptables -t nat -S

ClusterIP

// 在PREROUTING & OUTPUT之后跳转KUBE-SERVICES链
-A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES

// 发往clusterIP & nodePort的流量跳转KUBE-SVC-VWFCBIJMABKDN6RN链
-A KUBE-SERVICES -d 169.169.197.95/32 -p tcp -m comment --comment "default/nt-svc: cluster IP" -m tcp --dport 23423 -j KUBE-SVC-VWFCBIJMABKDN6RN

// 1/replicas概率发给其中一个pod-10.1.67.6
-A KUBE-SVC-VWFCBIJMABKDN6RN -m statistic --mode random --probability 0.50000000000 -j KUBE-SEP-ISPC6NJY64FLTZW5
-A KUBE-SEP-ISPC6NJY64FLTZW5 -p tcp -m tcp -j DNAT --to-destination 10.1.67.6:23423
-A KUBE-SEP-ISPC6NJY64FLTZW5 -s 10.1.67.6/32 -j KUBE-MARK-MASQ

// 1/replicas概率发给其中一个pod-10.1.69.2
-A KUBE-SVC-VWFCBIJMABKDN6RN -j KUBE-SEP-7KPB6CDAAPY2W5YP
-A KUBE-SEP-7KPB6CDAAPY2W5YP -p tcp -m tcp -j DNAT --to-destination 10.1.69.2:23423
-A KUBE-SEP-7KPB6CDAAPY2W5YP -s 10.1.69.2/32 -j KUBE-MARK-MASQ

NodePort

与clusterIP不同,nodePort通过每个node上的kube-proxy作为代理,对外提供服务。所以nodePort需要增加一条iptables规则,将发给nodePort的流量跳转给svc。

-A KUBE-NODEPORTS -p tcp -m comment --comment "mind-automl/mongo:" -m tcp --dport 27017 -j KUBE-SVC-3SO264LNGXICPHPS

问题

华为云在 K8S 大规模场景下的 Service 性能优化实践

code

kube-proxy watch Service & Endpoint,然后管理iptables路由规则。

k8s.io\kubernetes\cmd\kube-proxy\app\server.go

// Run runs the specified ProxyServer.  This should never exit (unless CleanupAndExit is set).
func (s *ProxyServer) Run() error { 
    informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod)

    // Create configs (i.e. Watches for Services and Endpoints)
    // Note: RegisterHandler() calls need to happen before creation of Sources because sources
    // only notify on changes, and the initial update (on process start) may be lost if no handlers
    // are registered yet.
    serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
    serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
    go serviceConfig.Run(wait.NeverStop)

    endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
    endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
    go endpointsConfig.Run(wait.NeverStop)

    // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
    // functions must configure their shared informer event handlers first.
    go informerFactory.Start(wait.NeverStop)

重点关注ServiceEventHandlerEndpointsEventHandler

k8s.io\kubernetes\pkg\proxy\config\config.go

// ServiceHandler is an abstract interface of objects which receive
// notifications about service object changes.
type ServiceHandler interface {
    // OnServiceAdd is called whenever creation of new service object
    // is observed.
    OnServiceAdd(service *v1.Service)
    // OnServiceUpdate is called whenever modification of an existing
    // service object is observed.
    OnServiceUpdate(oldService, service *v1.Service)
    // OnServiceDelete is called whenever deletion of an existing service
    // object is observed.
    OnServiceDelete(service *v1.Service)
    // OnServiceSynced is called once all the initial even handlers were
    // called and the state is fully propagated to local cache.
    OnServiceSynced()
}

// EndpointsHandler is an abstract interface of objects which receive
// notifications about endpoints object changes.
type EndpointsHandler interface {
    // OnEndpointsAdd is called whenever creation of new endpoints object
    // is observed.
    OnEndpointsAdd(endpoints *v1.Endpoints)
    // OnEndpointsUpdate is called whenever modification of an existing
    // endpoints object is observed.
    OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints)
    // OnEndpointsDelete is called whever deletion of an existing endpoints
    // object is observed.
    OnEndpointsDelete(endpoints *v1.Endpoints)
    // OnEndpointsSynced is called once all the initial event handlers were
    // called and the state is fully propagated to local cache.
    OnEndpointsSynced()
}

这两个接口封装了svc 和 ep的处理函数,由于我们这边采用iptables,所以他们的具体实现是k8s.io\kubernetes\pkg\proxy\iptables\proxier.go

// Proxier is an iptables based proxy for connections between a localhost:lport
// and services that provide the actual backends.
type Proxier struct {
    // endpointsChanges and serviceChanges contains all changes to endpoints and
    // services that happened since iptables was synced. For a single object,
    // changes are accumulated, i.e. previous is state from before all of them,
    // current is state after applying all of those.
    endpointsChanges *proxy.EndpointChangeTracker
    serviceChanges   *proxy.ServiceChangeTracker

    mu           sync.Mutex // protects the following fields
    serviceMap   proxy.ServiceMap
    endpointsMap proxy.EndpointsMap
    portsMap     map[utilproxy.LocalPort]utilproxy.Closeable
    // endpointsSynced and servicesSynced are set to true when corresponding
    // objects are synced after startup. This is used to avoid updating iptables
    // with some partial data after kube-proxy restart.
    endpointsSynced bool
    servicesSynced  bool
    initialized     int32
    syncRunner      *async.BoundedFrequencyRunner // governs calls to syncProxyRules

    // These are effectively const and do not need the mutex to be held.
    iptables       utiliptables.Interface
    masqueradeAll  bool
    masqueradeMark string
    exec           utilexec.Interface
    clusterCIDR    string
    hostname       string
    nodeIP         net.IP
    portMapper     utilproxy.PortOpener
    recorder       record.EventRecorder
    healthChecker  healthcheck.Server
    healthzServer  healthcheck.HealthzUpdater

    // Since converting probabilities (floats) to strings is expensive
    // and we are using only probabilities in the format of 1/n, we are
    // precomputing some number of those and cache for future reuse.
    precomputedProbabilities []string

    // The following buffers are used to reuse memory and avoid allocations
    // that are significantly impacting performance.
    iptablesData *bytes.Buffer
    filterChains *bytes.Buffer
    filterRules  *bytes.Buffer
    natChains    *bytes.Buffer
    natRules     *bytes.Buffer

    // endpointChainsNumber is the total amount of endpointChains across all
    // services that we will generate (it is computed at the beginning of
    // syncProxyRules method). If that is large enough, comments in some
    // iptable rules are dropped to improve performance.
    endpointChainsNumber int

    // Values are as a parameter to select the interfaces where nodeport works.
    nodePortAddresses []string
    // networkInterfacer defines an interface for several net library functions.
    // Inject for test purpose.
    networkInterfacer utilproxy.NetworkInterfacer
}
func (proxier *Proxier) OnServiceAdd(service *v1.Service) {
    proxier.OnServiceUpdate(nil, service)
}

func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
    if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
        proxier.syncRunner.Run()
    }
}

func (proxier *Proxier) OnServiceDelete(service *v1.Service) {
    proxier.OnServiceUpdate(service, nil)
}

主要看Update方法,其中serviceChanges保存了service的变化,然后通过syncRunner进行同步。

proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)

syncRunner是一个定时任务,除了手动触发以外还支持定时触发。具体的任务就是syncProxyRules方法。

// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
func (proxier *Proxier) syncProxyRules() {
    ...
}

syncProxyRules方法太长(700+)就不扣细节了。

上一篇下一篇

猜你喜欢

热点阅读