Docker容器

k8s网络系列学习笔记之三----kube-proxy原理分析

2019-03-06  本文已影响12人  何约什

本篇主要从代码的角度分析一下基于ipvs模式的实现原理。

ProxyServer的创建

系统代码基于Cobra实现,前面的代码逻辑很清晰,就不做分析,kube-proxy的关键的对象是ProxyServer,初始化的过程就是升恒ProxyServer对象,并执行ProxyServer.Run()。
下面我们先看看ProxyServer的定义:

type ProxyServer struct {
    Client                 clientset.Interface
    EventClient            v1core.EventsGetter
    IptInterface           utiliptables.Interface
    IpvsInterface          utilipvs.Interface
    IpsetInterface         utilipset.Interface
    execer                 exec.Interface
    Proxier                proxy.ProxyProvider
    Broadcaster            record.EventBroadcaster
    Recorder               record.EventRecorder
    ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration
    Conntracker            Conntracker // if nil, ignored
    ProxyMode              string
    NodeRef                *v1.ObjectReference
    CleanupAndExit         bool
    CleanupIPVS            bool
    MetricsBindAddress     string
    EnableProfiling        bool
    OOMScoreAdj            *int32
    ResourceContainer      string
    ConfigSyncPeriod       time.Duration
    ServiceEventHandler    config.ServiceHandler
    EndpointsEventHandler  config.EndpointsHandler
    HealthzServer          *healthcheck.HealthzServer
}

底层命令接口

下面分析一下这个结构体的成员,client和EventClient就不啰嗦了,iptInterface、IpvsInterface 、IpsetInterface、execer这四个变量对应到底层命令:iptables,ipvsadm, ipset。

    iptInterface = utiliptables.New(execer, dbus, protocol) // 用于修改iptables
    ipvsInterface = utilipvs.New(execer) // 用于操作ipvs配置
    kernelHandler = ipvs.NewLinuxKernelHandler()  // 用于获取ipvs相关的内核模块信息,没有配置相关的内核模块,将会降级为iptables代理模式
    ipsetInterface = utilipset.New(execer) // 执行ipset命令,维护ip地址组

这几个成员的作用如注释中锁描述的,关键的成员是Proxyier,kube-proxy支持user space、iptables和ipvs三种代理模式,不同代理模式使用不同的Proxier实例。
如下所示,用判断kube-proxy具体采用的模式。

proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})

config.Mode是在启动kube-proxy进程时配置的模式,下面是kube-proxy的一个配置文件例子,其中mode:"ipvs"描述期望采用ipvs模式。

    apiVersion: kubeproxy.config.k8s.io/v1alpha1
    bindAddress: 0.0.0.0
    clientConnection:
      acceptContentTypes: ""
      burst: 10
      contentType: application/vnd.kubernetes.protobuf
      kubeconfig: /var/lib/kube-proxy/kubeconfig.conf
      qps: 5
    clusterCIDR: 10.244.0.0/16
    configSyncPeriod: 15m0s
    conntrack:
      max: null
      maxPerCore: 32768
      min: 131072
      tcpCloseWaitTimeout: 1h0m0s
      tcpEstablishedTimeout: 24h0m0s
    enableProfiling: false
    healthzBindAddress: 0.0.0.0:10256
    hostnameOverride: ""
    iptables:
      masqueradeAll: false
      masqueradeBit: 14
      minSyncPeriod: 0s
      syncPeriod: 30s
    ipvs:
      excludeCIDRs: null
      minSyncPeriod: 0s
      scheduler: ""
      syncPeriod: 30s
    kind: KubeProxyConfiguration
    metricsBindAddress: 127.0.0.1:10249
    mode: "ipvs"
    nodePortAddresses: null
    oomScoreAdj: -999
    portRange: ""
    resourceContainer: /kube-proxy
    udpIdleTimeout: 250ms

虽然这里配置了ipvs模式,但是是不是最终采用ipvs,还要检查一下是否满足条件,可参考学习笔记2。具体的代码就不展示了。如果最终确定采用ipvs模式,那么Proxier成员为ipvs.Proxyer类型。

ProxyServer的启动

        // 创建eventBroadcaster
    hostname := utilnode.GetHostname(config.HostnameOverride)
    eventBroadcaster := record.NewBroadcaster()
    recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})

        // 启动时间记录eventBroadcaster到Api-Server
    s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})

代码逻辑比较清晰,就不列举了,我们可以通过下面的工具来检查health信息。

root@bogon2:~/k8s-yml-tests# curl http://127.0.0.1:10256/healthz
{"lastUpdated": "2019-02-22 07:10:11.258317649 +0000 UTC m=+109458.567293187","currentTime": "2019-02-22 07:10:37.209843398 +0000 UTC m=+109484.518818898"}
root@bogon2:~/k8s-yml-tests# curl http://127.0.0.1:10249/metrics
# HELP apiserver_audit_event_total Counter of audit events generated and sent to the audit backend.
# TYPE apiserver_audit_event_total counter
apiserver_audit_event_total 0
# HELP apiserver_audit_requests_rejected_total Counter of apiserver requests rejected due to an error in audit logging backend.
# TYPE apiserver_audit_requests_rejected_total counter
apiserver_audit_requests_rejected_total 0
......
rest_client_request_latency_seconds_sum{url="https://172.25.39.6:6443/%7Bprefix%7D",verb="POST"} 0.007607341
rest_client_request_latency_seconds_count{url="https://172.25.39.6:6443/%7Bprefix%7D",verb="POST"} 1
# HELP rest_client_requests_total Number of HTTP requests, partitioned by status code, method, and host.
# TYPE rest_client_requests_total counter
rest_client_requests_total{code="200",host="172.25.39.6:6443",method="GET"} 490
rest_client_requests_total{code="201",host="172.25.39.6:6443",method="POST"} 1

那么Netfilter又是如何生成连接记录项的呢?每一个数据,都有“来源”与“目的”主机,发起连接的主机称为“来源”,响应“来源”的请求的主机即 为目的,所谓生成记录项,就是对每一个这样的连接的产生、传输及终止进行跟踪记录。由所有记录项产生的表,即称为连接跟踪表。

kube-proxy通过结构体KubeProxyContrackConfiguration来设置conntrack参数信息,对应的结构如下所示:


// KubeProxyConntrackConfiguration contains conntrack settings for
// the Kubernetes proxy server.
type KubeProxyConntrackConfiguration struct {
    // max is the maximum number of NAT connections to track (0 to
    // leave as-is).  This takes precedence over maxPerCore and min.
    Max *int32
    // maxPerCore is the maximum number of NAT connections to track
    // per CPU core (0 to leave the limit as-is and ignore min).
    MaxPerCore *int32
    // min is the minimum value of connect-tracking records to allocate,
    // regardless of maxPerCore (set maxPerCore=0 to leave the limit as-is).
    Min *int32
    // tcpEstablishedTimeout is how long an idle TCP connection will be kept open
    // (e.g. '2s').  Must be greater than 0 to set.
    TCPEstablishedTimeout *metav1.Duration
    // tcpCloseWaitTimeout is how long an idle conntrack entry
    // in CLOSE_WAIT state will remain in the conntrack
    // table. (e.g. '60s'). Must be greater than 0 to set.
    TCPCloseWaitTimeout *metav1.Duration
}

conntracker的设置是由ProxyServer中的Conntracker成员负责设置,在linux中,具体的结构为:type realConntracker struct{},下面设设置Max参数的代码:

func (rct realConntracker) SetMax(max int) error {
    if err := rct.setIntSysCtl("nf_conntrack_max", max); err != nil {
        return err
    }
    glog.Infof("Setting nf_conntrack_max to %d", max)

    // Linux does not support writing to /sys/module/nf_conntrack/parameters/hashsize
    // when the writer process is not in the initial network namespace
    // (https://github.com/torvalds/linux/blob/v4.10/net/netfilter/nf_conntrack_core.c#L1795-L1796).
    // Usually that's fine. But in some configurations such as with github.com/kinvolk/kubeadm-nspawn,
    // kube-proxy is in another netns.
    // Therefore, check if writing in hashsize is necessary and skip the writing if not.
    hashsize, err := readIntStringFile("/sys/module/nf_conntrack/parameters/hashsize")
    if err != nil {
        return err
    }
    if hashsize >= (max / 4) {
        return nil
    }

    // sysfs is expected to be mounted as 'rw'. However, it may be
    // unexpectedly mounted as 'ro' by docker because of a known docker
    // issue (https://github.com/docker/docker/issues/24000). Setting
    // conntrack will fail when sysfs is readonly. When that happens, we
    // don't set conntrack hashsize and return a special error
    // readOnlySysFSError here. The caller should deal with
    // readOnlySysFSError differently.
    writable, err := isSysFSWritable()
    if err != nil {
        return err
    }
    if !writable {
        return readOnlySysFSError
    }
    // TODO: generify this and sysctl to a new sysfs.WriteInt()
    glog.Infof("Setting conntrack hashsize to %d", max/4)
    return writeIntStringFile("/sys/module/nf_conntrack/parameters/hashsize", max/4)
}

从代码中可以看出,通过sysctl修改/proc/sys/net/netfilter/nf_conntrack_max文件中的参数值,然后通过代码直接修改/sys/module/nf_conntrack/parameters/hashsize的值为max的1/4。conntrack-tcp-timeout-established与–conntrack-tcp-timeout-close-wait参数的设置同理,最后对应的文件路径分别为/proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established和/proc/sys/net/netfilter/nf_conntrack_tcp_timeout_close_wait。

这些参数会影响后面的网络连接,所以设置的时候要慎重,最好与系统规划时的配置一致,设置过小,可能会导致nf_conntrack:table full,dropping packet的错误。

这里初始化了serviceConfig和endpointsConfig实例,并启动它们,如下所示:

      serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
    serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
    go serviceConfig.Run(wait.NeverStop)

    endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
    endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
    go endpointsConfig.Run(wait.NeverStop)

    // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
    // functions must configure their shared informer event handlers first.
    go informerFactory.Start(wait.NeverStop)
    // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
    // functions must configure their shared informer event handlers first.
    go informerFactory.Start(wait.NeverStop)

serviceConfig和endpointsConfig注册的事件对象实际都是proxier,也就是前面初始化的ipvs.Proxier实例对象。serviceConfig和endpointsConfig的Run逻辑是等待相应的Infomer同步完成后,回调Proxier的OnServiceSynced和OnEndpointsSynced方法。

相应对象的事件回调方法,在ServiceConfig和EndpointsConfig对象的构建中完成,如下所示:

func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
    result := &ServiceConfig{
        lister:       serviceInformer.Lister(),
        listerSynced: serviceInformer.Informer().HasSynced,
    }
        // 这里注册回调事件
    serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
        cache.ResourceEventHandlerFuncs{
            AddFunc:    result.handleAddService,
            UpdateFunc: result.handleUpdateService,
            DeleteFunc: result.handleDeleteService,
        },
        resyncPeriod,
    )

    return result
}

核心的业务逻辑主要是就是对于各种事件的监听和处理,后面专门进行分析。

循环处理的驱动有事件驱动和超时驱动两种,代码如下所示:

// SyncLoop runs periodic work.  This is expected to run as a goroutine or as the main loop of the app.  It does not return.
func (proxier *Proxier) SyncLoop() {
    // Update healthz timestamp at beginning in case Sync() never succeeds.
    if proxier.healthzServer != nil {
        proxier.healthzServer.UpdateTimestamp()
    }
    proxier.syncRunner.Loop(wait.NeverStop)
}


// Loop handles the periodic timer and run requests.  This is expected to be
// called as a goroutine.
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
    glog.V(3).Infof("%s Loop running", bfr.name)
    bfr.timer.Reset(bfr.maxInterval)
    for {
        select {
        case <-stop:
            bfr.stop()
            glog.V(3).Infof("%s Loop stopping", bfr.name)
            return
        case <-bfr.timer.C():
            bfr.tryRun()
        case <-bfr.run:
            bfr.tryRun()
        }
    }
}

有Service、Endpoints事件或者超时时,都会调用BoundedFrequencyRunner.tryRun方法,BoundedFrequencyRunner的结构体如下所示:

// BoundedFrequencyRunner manages runs of a user-provided function.
// See NewBoundedFrequencyRunner for examples.
type BoundedFrequencyRunner struct {
    name        string        // the name of this instance
    minInterval time.Duration // the min time between runs, modulo bursts
    maxInterval time.Duration // the max time between runs

    run chan struct{} // try an async run

    mu      sync.Mutex  // guards runs of fn and all mutations
    fn      func()      // function to run
    lastRun time.Time   // time of last run
    timer   timer       // timer for deferred runs
    limiter rateLimiter // rate limiter for on-demand runs
}

在tryRun方法中,会调用fn成员函数,该成员函数为proxyier.syncProxyRules方法。在后面的处理中会单独分析这个方法。

核心业务逻辑----更新代理规则

在上一章节,最后两个步骤的处理是核心业务逻辑的,第5步负责注册事件回调,在事件回调方法中,驱动第6步中的逻辑的(syncProxyRules方法)处理。

事件回调函数

在看事件回调函数之前,我们先来看一下Proxier的两个成员:

type Proxier struct {
    // endpointsChanges and serviceChanges contains all changes to endpoints and
    // services that happened since last syncProxyRules call. For a single object,
    // changes are accumulated, i.e. previous is state from before all of them,
    // current is state after applying all of those.
    endpointsChanges endpointsChangeMap //  endpoints变更记录
    serviceChanges   serviceChangeMap       // service变更记录

    serviceMap   proxyServiceMap                 // service记录
    endpointsMap proxyEndpointsMap           // endpoints记录
    portsMap     map[utilproxy.LocalPort]utilproxy.Closeable
    ......
}

endpointsChanges和serviceChanges分别用于记录service和endpoints的变更记录信息,而serviceMap、endpointsMap和portsMap则记录了实际的服务、EP和端口等信息内容,也就是所有的变更操作,都会更新到这些成员中。

事件回调方法OnServiceAdd的代码如下:

// OnServiceAdd is called whenever creation of new service object is observed.
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
    namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
    if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
        proxier.syncRunner.Run()
    }
}

在事件回调方法中,主要完成两个工作:

更新代理规则的处理

这里讲只分析最常用的ClusterIP模式的Service的规则处理部分。

前面列举过Proxier结构的核心变量,前面几次对Service和Endpoints资源的侦听函数,会实时监测到Service和Endpoints(Pod)的变化,把变化的内容更新到Proxyier的endpointsChanges和serviceChanges成员后,触发更新代理规则的动作。
更新代理规则这块逻辑在Proxier.syncProxyRules中完成,下面将分步骤来进行说明:

K8S事件机制能够保证通知的幂等性,收到service和endpoints的变更内容,但是有可能变更的内容是一些周边资料信息,那这些不会影响实际的代理规则,同时也难保事件的重复触发,所以首先需要排除这些情况,把changes与实际的内存中存储的数据进行对比,得出真正的有意义的变化内容,确保我们后续的操作的内容是真正需要进行规则变更的。

整个过程是通过updateServiceMap和updateEndpointsMap来进行检查的。

func (proxier *Proxier) syncProxyRules() {
    proxier.mu.Lock()
    defer proxier.mu.Unlock()
        ......
    serviceUpdateResult := updateServiceMap(
        proxier.serviceMap, &proxier.serviceChanges)
    endpointUpdateResult := updateEndpointsMap(
        proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
        ......
}

首先介绍一下iptables中的INPUT、FORWARD等规则链和规则,
在处理各种数据包时,根据防火墙规则的不同介入时机,iptables供涉及5种默认规则链,从应用时间点的角度理解这些链:

INPUT链:当接收到防火墙本机地址的数据包(入站)时,应用此链中的规则。
OUTPUT链:当防火墙本机向外发送数据包(出站)时,应用此链中的规则。
FORWARD链:当接收到需要通过防火墙发送给其他地址的数据包(转发)时,应用此链中的规则。
PREROUTING链:在对数据包作路由选择之前,应用此链中的规则,如DNAT。
POSTROUTING链:在对数据包作路由选择之后,应用此链中的规则,如SNAT。

-->PREROUTING-->[ROUTE]-->FORWARD-->POSTROUTING-->
     mangle        |       mangle        ^ mangle
      nat          |       filter        |  nat
                   |                     |
                   |                     |
                   v                     |
                 INPUT                 OUTPUT
                   | mangle              ^ mangle
                   | filter              |  nat
                   v ------>local------->| filter

我们可以通过iptables-save命令来备份iptables规则,以iptables-save的格式进行简单举例:

root@ubuntu:/home/yuxianbing# iptables-save -t nat
# Generated by iptables-save v1.6.0 on Tue Mar  5 09:26:02 2019 
注释内容
*nat
:PREROUTING ACCEPT [14:12354]
-- :PREROUTING ACCEPT,表示nat表中的PREROUTING 链默认报文策略是接受(匹配不到规则继续) ,
-- [14:12354] 即[packet, bytes],表示当前有14个包(12354字节)经过nat表的PREROUTING 链
:INPUT ACCEPT [0:0]
:OUTPUT ACCEPT [4:222]
:POSTROUTING ACCEPT [3:149]
:CNI-DN-603508538baa710bd2110 - [0:0]
:CNI-HOSTPORT-DNAT - [0:0]
:CNI-HOSTPORT-SNAT - [0:0]
:CNI-SN-603508538baa710bd2110 - [0:0]
:KUBE-FIRE-WALL - [0:0]
:KUBE-MARK-DROP - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SERVICES - [0:0]
-- 解释同上(这些是自定义链)

---------- 下面开始按条输出所有规则----------
-A PREROUTING -m addrtype --dst-type LOCAL -j CNI-HOSTPORT-DNAT
---- 这是用iptables命令配置此规则的命令(详解选项可参考iptables帮助)。
-A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A OUTPUT -m addrtype --dst-type LOCAL -j CNI-HOSTPORT-DNAT
-A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A POSTROUTING -s 127.0.0.1/32 ! -d 127.0.0.1/32 -j CNI-HOSTPORT-SNAT
-A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
-A POSTROUTING -s 10.0.0.0/8 ! -d 10.0.0.0/8 -j MASQUERADE
-A CNI-DN-603508538baa710bd2110 -p tcp -m tcp --dport 8080 -j DNAT --to-destination 10.221.2.42:80
-A CNI-HOSTPORT-DNAT -m comment --comment "dnat name: \"cni0\" id: \"42ccbebb916d082a6d872aaa48efea33c4cc33267d14779046f687f4dcddda8d\"" -j CNI-DN-603508538baa710bd2110
-A CNI-HOSTPORT-SNAT -m comment --comment "snat name: \"cni0\" id: \"42ccbebb916d082a6d872aaa48efea33c4cc33267d14779046f687f4dcddda8d\"" -j CNI-SN-603508538baa710bd2110
-A CNI-SN-603508538baa710bd2110 -s 127.0.0.1/32 -d 10.221.2.42/32 -p tcp -m tcp --dport 80 -j MASQUERADE
-A KUBE-MARK-DROP -j MARK --set-xmark 0x8000/0x8000
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE
-A KUBE-POSTROUTING -m set --match-set KUBE-LOOP-BACK dst,dst,src -j MASQUERADE
-A KUBE-SERVICES -m set --match-set KUBE-CLUSTER-IP dst,dst -j KUBE-MARK-MASQ
-A KUBE-SERVICES -p tcp -m tcp -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-MARK-MASQ
COMMIT
-- 应用上述配置
# Completed on Tue Mar  5 09:26:02 2019

从iptables的NAT规则表包含两个部分:链和规则,其中链还保存了经过该链的包和字节数。

针对iptables和IPVS规则的处理逻辑为:

    err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData) 
        // 1.执行iptables-save命令
    if err != nil { // if we failed to get any rules
        glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
    } else { // otherwise parse the output
        existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes()) 
                // 2. 读取所有的链路信息
    }
        // 3. 重置iptables规则缓冲区
    proxier.natChains.Reset() 
    proxier.natRules.Reset()
    // Write table headers.
    writeLine(proxier.natChains, "*nat") // 3. 写nat表头信息
        // 4. 写POSTROUTING链路信息
    if chain, ok := existingNATChains[kubePostroutingChain]; ok {
        writeLine(proxier.natChains, chain)
    } else {
        writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
    }

        // 写POSTROUTING 规则
        // 这条POSTROUTING规则的作用是为消息打标记的包,执行MASQUERADE操作,也就是SNAT处理
        // 对于出主机的包,我们要做SNAT处理
    writeLine(proxier.natRules, []string{
        "-A", string(kubePostroutingChain),
        "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
        "-m", "mark", "--mark", proxier.masqueradeMark,
        "-j", "MASQUERADE",
    }...)

        // 5. KUBE-MARK-MASQ链路信息
    if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
        writeLine(proxier.natChains, chain)
    } else {
        writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
    }
       // KUBE-MARK-MASQ规则,执行MARK,打上masquerade标记
    writeLine(proxier.natRules, []string{
        "-A", string(KubeMarkMasqChain),
        "-j", "MARK", "--set-xmark", proxier.masqueradeMark,
    }...)

       // 6. 安装Dummy网卡kube-ipvs0
       // 这个网卡的作用是绑定所有Service的ClusterIP到该网卡上
    _, err = proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
    if err != nil {
        glog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
        return
    }
        // 7. ipsets初始化,建立所有的ipsets表,并清空里面的内容
    // make sure ip sets exists in the system.
    ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP,
        proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet}
    if err := ensureIPSets(ipSets...); err != nil {
        return
    }
    for i := range ipSets {
        ipSets[i].resetEntries()
    }
        ......
        
// linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT

// Chain PREROUTING (policy ACCEPT)
// target            prot opt source               destination
// KUBE-SERVICES     all  --  0.0.0.0/0            0.0.0.0/0

// Chain OUTPUT (policy ACCEPT)
// target            prot opt source               destination
// KUBE-SERVICES     all  --  0.0.0.0/0            0.0.0.0/0

// Chain KUBE-SERVICES (2 references)
        // 8. 创建PrePosting与Output链,匹配所有的协议、源IP和目标IP,自动进入目标KUBE-SERVICES就行匹配和处理
        //  同时创建KUBE-SERVICES链,注意:规则后续根据Cluster Service与NodePort Service来按需创建。
        // 类似命令为: iptables -t nat -N KUBE-SERVICES
    if err := proxier.linkKubeServiceChain(existingNATChains, proxier.natChains); err != nil {
        glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
        return
    }
    // Kube service ipset
    if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
        glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
        return
    }
        
        // 9. 创建KUBE-FIRE-WALL链,不过目前我们系统没有相关的规则,暂时不知道用处
    if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
        glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
        return
    }
        
        // 10. 进入最关键的环节,为每个服务构建IPVS规则
        for svcName, svcInfo := range proxier.serviceMap {
        protocol := strings.ToLower(string(svcInfo.protocol))
        // Precompute svcNameString; with many services the many calls
        // to ServicePortName.String() show up in CPU profiles.
        svcNameString := svcName.String()

        // Handle traffic that loops back to the originator with SNAT.
            // 10.1 hairpin模式的处理,匹配ip,port,ip这种模式,这块代码不分析了,一般很少有这种现象。
                ......
                // 10.2 ClusterIP处理
                // 准备 IP Set的项,存储Cluster IP和对应的Port
        entry := &utilipset.Entry{
            IP:       svcInfo.clusterIP.String(),
            Port:     svcInfo.port,
            Protocol: protocol,
            SetType:  utilipset.HashIPPort,
        }
                // 如果kube-proxy启动设置了masqueradeAll或者clusterCIDR,则安装伪装规则,做SNAT操作
        if proxier.masqueradeAll || len(proxier.clusterCIDR) > 0 {
                        ......
            proxier.clusterIPSet.activeEntries.Insert(entry.String())
        }
        // 准备ipvs虚拟服务器
        serv := &utilipvs.VirtualServer{
            Address:   svcInfo.clusterIP,
            Port:      uint16(svcInfo.port),
            Protocol:  string(svcInfo.protocol),
            Scheduler: proxier.ipvsScheduler,
        }
        // Set session affinity flag and timeout for IPVS service
        if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
            serv.Flags |= utilipvs.FlagPersistent
            serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds)
        }
        // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
                // 10.3 创建或者更新IPVS虚拟服务器 
                //  把ClusterIP绑定到kube-ipvs0设备,最后一个参数bindAddr为true
        if err := proxier.syncService(svcNameString, serv, true); err == nil {
            activeIPVSServices[serv.String()] = true
            // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
            // So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
                        // 这里要看一下,传入的第二个参数中,onlyNodeLocalEndpoints传入为false,所以我们会把所有的Endpoints注册到IPVS虚拟服务器中
                        // onlyNodeLocalEndpoints只有NodePort和LB中设置才会生效,用于只绑定到本地的Endpoints
                      
            if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
                glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
            }
        } else {
            glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
        }

        // 10.4 Capture externalIPs. 不关注,很少用
        // 10.5 Capture load-balancer ingress.  不关注,很少用,需要云厂商支持
                ......
                // 10.6 针对NodePort类型的Service进行处理        
        if svcInfo.nodePort != 0 {
            lp := utilproxy.LocalPort{
                Description: "nodePort for " + svcNameString,
                IP:          "",
                Port:        svcInfo.nodePort,
                Protocol:    protocol,
            }
            if proxier.portsMap[lp] != nil {
                glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
                replacementPortsMap[lp] = proxier.portsMap[lp]
            } else {
                                // 侦听Host上的PORT端口(TCP/UDP),把端口占用起来,以便安全的安装IPVS规则
                socket, err := proxier.portMapper.OpenLocalPort(&lp)
                if err != nil {
                    glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
                    continue
                }
                if lp.Protocol == "udp" {
                    isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP)
                    utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6)
                }
                replacementPortsMap[lp] = socket
            } // We're holding the port, so it's OK to install ipvs rules.
                        // 如果没有指定onlyNodeLocalEndpoints,则需要做SNAT处理,需要把对应的端口插入到IPSET集中,这样,可以匹配到KUBE-SERVICES规则,从而做原地址伪装,实现SNAT功能。
            // Nodeports need SNAT, unless they're local.
            // ipset call
            if !svcInfo.onlyNodeLocalEndpoints {
                entry = &utilipset.Entry{
                    // No need to provide ip info
                    Port:     svcInfo.nodePort,
                    Protocol: protocol,
                    SetType:  utilipset.BitmapPort,
                }
                var nodePortSet *IPSet
                switch protocol {
                case "tcp":
                    nodePortSet = proxier.nodePortSetTCP
                case "udp":
                    nodePortSet = proxier.nodePortSetUDP
                default:
                    // It should never hit
                    glog.Errorf("Unsupported protocol type: %s", protocol)
                }
                if nodePortSet != nil {
                    if valid := nodePortSet.validateEntry(entry); !valid {
                        glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
                        continue
                    }
                    nodePortSet.activeEntries.Insert(entry.String())
                }
            }
                        // 为节点上的每个物理IP创建对应的IPVS虚拟服务器和对应的路由Endpoint规则
            // Build ipvs kernel routes for each node ip address
            nodeIPs, err := proxier.ipGetter.NodeIPs()
            if err != nil {
                glog.Errorf("Failed to get node IP, err: %v", err)
            } else {
                for _, nodeIP := range nodeIPs {
                    // ipvs call
                    serv := &utilipvs.VirtualServer{
                        Address:   nodeIP,
                        Port:      uint16(svcInfo.nodePort),
                        Protocol:  string(svcInfo.protocol),
                        Scheduler: proxier.ipvsScheduler,
                    }
                    if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
                        serv.Flags |= utilipvs.FlagPersistent
                        serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds)
                    }
                    // There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
                    if err := proxier.syncService(svcNameString, serv, false); err == nil {
                        activeIPVSServices[serv.String()] = true
                        if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
                            glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
                        }
                    } else {
                        glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
                    }
                }
            }
        }
    }

总结

kube-proxy的IPVS Proxier是通过iptables和ipvs来实现代理功能,iptables基本上是几条固定的链路和规则,而大量的Service ClusterIP和Endpoint IP等等信息,都封装进了IPSET中,通过iptables通过match-set来匹配,大大减少了iptables规则数量,提高了iptables维护性能和匹配性能。
iptables规则主要用于做SNAT等操作,配合ipvs完成代理服务能力。

上一篇下一篇

猜你喜欢

热点阅读