Cni terway-vpc模式源码详解

2024-02-21  本文已影响0人  Teddy_b

Terway

VPC模式

从参考中Terway的设计文档中可以看到他的网络模型

源码解析

func doCmdAdd(ctx context.Context, logger *logrus.Entry, client rpc.TerwayBackendClient, cmdArgs *cniCmdArgs) (containerIPNet *terwayTypes.IPNetSet, gatewayIPSet *terwayTypes.IPSet, err error) {
    var conf, cniNetns, k8sConfig, args = cmdArgs.conf, cmdArgs.netNS, cmdArgs.k8sArgs, cmdArgs.inputArgs

    allocResult, err := client.AllocIP(ctx, &rpc.AllocIPRequest{
        Netns:                  args.Netns,
        K8SPodName:             string(k8sConfig.K8S_POD_NAME),
        K8SPodNamespace:        string(k8sConfig.K8S_POD_NAMESPACE),
        K8SPodInfraContainerId: string(k8sConfig.K8S_POD_INFRA_CONTAINER_ID),
        IfName:                 args.IfName,
    })


    multiNetwork := len(allocResult.NetConfs) > 1

        // 文件锁,确保多个POD并发安全性
    l, err := utils.GrabFileLock(terwayCNILock)
    defer l.Close()

    for _, netConf := range allocResult.NetConfs {
        var setupCfg *types.SetupConfig
        setupCfg, err = parseSetupConf(args, netConf, conf, allocResult.IPType)
        
        setupCfg.HostVETHName, _ = link.VethNameForPod(string(k8sConfig.K8S_POD_NAME), string(k8sConfig.K8S_POD_NAMESPACE), netConf.IfName, defaultVethPrefix)
        setupCfg.HostIPSet = hostIPSet
        setupCfg.MultiNetwork = multiNetwork

        switch setupCfg.DP {
        case types.VPCRoute:
            utils.Hook.AddExtraInfo("dp", "vpcRoute")

            var r cniTypes.Result
            // 调用IPAM分配IP
            r, err = ipam.ExecAdd(delegateIpam, []byte(fmt.Sprintf(delegateConf, setupCfg.ContainerIPNet.IPv4)))
            var ipamResult *current.Result
            ipamResult, err = current.NewResultFromResult(r)


            err = func() (err error) {
                podIPAddr := ipamResult.IPs[0].Address
                gateway := ipamResult.IPs[0].Gateway

                containerIPNet = &terwayTypes.IPNetSet{
                    IPv4: &podIPAddr,
                }
                gatewayIPSet = &terwayTypes.IPSet{
                    IPv4: gateway,
                }

                setupCfg.ContainerIPNet = containerIPNet
                setupCfg.GatewayIP = gatewayIPSet

                return datapath.NewVPCRoute().Setup(setupCfg, cniNetns)
            }()
...
    return
}

分别展开看下:

首先调用daemon获取网段信息

allocResult, err := client.AllocIP(ctx, &rpc.AllocIPRequest{
        Netns:                  args.Netns,
        K8SPodName:             string(k8sConfig.K8S_POD_NAME),
        K8SPodNamespace:        string(k8sConfig.K8S_POD_NAMESPACE),
        K8SPodInfraContainerId: string(k8sConfig.K8S_POD_INFRA_CONTAINER_ID),
        IfName:                 args.IfName,
    })

这里daemon通过unix socket提供了grpc服务,所以cni插件可以通过这个unix socket文件进行grpc调用

先来看下daemon的grpc实现

func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*rpc.AllocIPReply, error) {
    podID := utils.PodInfoKey(r.K8SPodNamespace, r.K8SPodName)

    reply := &rpc.AllocIPReply{
        Success: true,
        IPv4:    n.enableIPv4,
        IPv6:    n.enableIPv6,
    }

    // 0. Get pod Info
    pod, err := n.k8s.GetPod(ctx, r.K8SPodNamespace, r.K8SPodName, true)

    // 1. Init Context

    cni := &daemon.CNI{
        PodName:      r.K8SPodName,
        PodNamespace: r.K8SPodNamespace,
        PodID:        podID,
        PodUID:       pod.PodUID,
        NetNSPath:    r.Netns,
    }

    var resourceRequests []eni.ResourceRequest

    var netConf []*rpc.NetConf
    // 3. Allocate network resource for pod
    switch pod.PodNetworkType {
    case daemon.PodNetworkTypeENIMultiIP:
        reply.IPType = rpc.IPType_TypeENIMultiIP

        if pod.PodENI {
            resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
        } else {
            req := &eni.LocalIPRequest{}

            if len(oldRes.GetResourceItemByType(daemon.ResourceTypeENIIP)) == 1 {
                old := oldRes.GetResourceItemByType(daemon.ResourceTypeENIIP)[0]

                setRequest(req, old)
            }

            resourceRequests = append(resourceRequests, req)
        }
    case daemon.PodNetworkTypeVPCENI:
        reply.IPType = rpc.IPType_TypeVPCENI

        if pod.PodENI || n.ipamType == types.IPAMTypeCRD {
            resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
        } else {
            req := &eni.LocalIPRequest{}

            if len(oldRes.GetResourceItemByType(daemon.ResourceTypeENI)) == 1 {
                old := oldRes.GetResourceItemByType(daemon.ResourceTypeENI)[0]

                setRequest(req, old)
            }
            resourceRequests = append(resourceRequests, req)
        }
    case daemon.PodNetworkTypeVPCIP:
        reply.IPType = rpc.IPType_TypeVPCIP
        resourceRequests = append(resourceRequests, &eni.VethRequest{})
    }

    var networkResource []daemon.ResourceItem

    resp, err := n.eniMgr.Allocate(ctx, cni, &eni.AllocRequest{
        ResourceRequests: resourceRequests,
    })

    for _, res := range resp {
        netConf = append(netConf, res.ToRPC()...)
        networkResource = append(networkResource, res.ToStore()...)
    }

    for _, c := range netConf {
        if c.BasicInfo == nil {
            c.BasicInfo = &rpc.BasicInfo{}
        }
        c.BasicInfo.ServiceCIDR = n.k8s.GetServiceCIDR().ToRPC()
        if pod.PodNetworkType == daemon.PodNetworkTypeVPCIP {
            c.BasicInfo.PodCIDR = n.k8s.GetNodeCidr().ToRPC()
        }
        c.Pod = &rpc.Pod{
            Ingress:         pod.TcIngress,
            Egress:          pod.TcEgress,
            NetworkPriority: pod.NetworkPriority,
        }
    }

    reply.NetConfs = netConf
    reply.Success = true

    return reply, nil
}

daemon是通过DaemonSet方式运行在k8s集群里的,因此可以通过serviceaccount来调用k8s api

这里先通过k8s api查询pod信息,pod名称和命名空间均来自kubelet调用cni插件时设置的环境变量K8S_POD_NAMEK8S_POD_NAMESPACEK8S_POD_INFRA_CONTAINER_ID

然后确定POD的网络模式

func podNetworkType(daemonMode string, pod *corev1.Pod) string {
    switch daemonMode {
    case daemon.ModeENIMultiIP:
        return daemon.PodNetworkTypeENIMultiIP
    case daemon.ModeVPC:
        podAnnotation := pod.GetAnnotations()
        useENI := false
        if needEni, ok := podAnnotation[podNeedEni]; ok && (needEni != "" && needEni != ConditionFalse && needEni != "0") {
            useENI = true
        }

        for _, c := range pod.Spec.Containers {
            if _, ok := c.Resources.Requests[deviceplugin.ENIResName]; ok {
                useENI = true
                break
            }
        }

        if useENI {
            return daemon.PodNetworkTypeVPCENI
        }
        return daemon.PodNetworkTypeVPCIP
    case daemon.ModeENIOnly:
        return daemon.PodNetworkTypeVPCENI
    }

}

我们以最简单的VPC-IP模式为例

然后确定网络资源请求类型

switch pod.PodNetworkType {
    ...
    case daemon.PodNetworkTypeVPCIP:
        reply.IPType = rpc.IPType_TypeVPCIP
        resourceRequests = append(resourceRequests, &eni.VethRequest{})
    }

对于VPC-IP类型,对应的就是VethRequest,不同的类型生成的网络配置不同

func (r *Veth) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceRequest) (chan *AllocResp, []Trace) {
    if request.ResourceType() != ResourceTypeVeth {
        return nil, nil
    }
    ch := make(chan *AllocResp)

    go func() {
        name, _ := link.VethNameForPod(cni.PodName, cni.PodNamespace, "", "cali")
        var nfs []NetworkResource
        nfs = append(nfs, &VethResource{Name: name})

        select {
        case <-ctx.Done():

        case ch <- &AllocResp{
            NetworkConfigs: nfs,
        }:

        }
    }()

    return ch, nil
}

对于VethRequest类型,不需要申请ENI,所以生成的网络配置比较简单,只需要生成Veth网卡的名称即可,网卡名称格式为calixxxxxxxxxxxx

然后网络配置中补充Service CIDR和POD CIDR

for _, c := range netConf {
        if c.BasicInfo == nil {
            c.BasicInfo = &rpc.BasicInfo{}
        }
        c.BasicInfo.ServiceCIDR = n.k8s.GetServiceCIDR().ToRPC()
        if pod.PodNetworkType == daemon.PodNetworkTypeVPCIP {
            c.BasicInfo.PodCIDR = n.k8s.GetNodeCidr().ToRPC()
        }
        c.Pod = &rpc.Pod{
            Ingress:         pod.TcIngress,
            Egress:          pod.TcEgress,
            NetworkPriority: pod.NetworkPriority,
        }
    }

其中POD的CIDR是通过查询daemon所在节点的spec获取的

spec:
  podCIDR: 10.250.7.0/24
  podCIDRs:
  - 10.250.7.0/24

而Service的CIDR可以通过daemon的配置文件指定,或者能够通过其它的ConfigMap获取到(如kubeadm方式安装的集群存在一个ConfigMap:kube-system/kubeadm-config

daemon的配置文件中也可以指定

kind: ConfigMap
apiVersion: v1
metadata:
  name: eni-config
  namespace: kube-system
data:
  eni_conf: |
    {
      "instance-id": "i-8vb4chhex0xxxxahaxyv",
      "instance/instance-type": "ecs.c6.2xlarge",
      "region-id": "cn-zhaxxxou",
      "zone-id": "cn-zhaxxxxkou-c",
      "vswitch-id": "sw-8vbdxxxp1evxd6r",
      "vpc-id": "vpc-8vbu67xxxfd6jc",
      "mac": "00:16xxxxxxx:1e",
      "version": "1",
      "access_key": "ak",
      "access_secret": "sk",
      "service_cidr": "10.96.0.0/12",
      "security_group": "sg-xxxxxxxxxxx",
      "max_pool_size": 5,
      "min_pool_size": 0
    }

最终grpc的响应会包括这些字段

reply := &rpc.AllocIPReply{
        Success: true,
        IPv4:    n.enableIPv4,
        IPv6:    n.enableIPv6,
                IPType: rpc.IPType_TypeVPCIP
                NetConfs: {
                            BasicInfo:    {
                                      ServiceCIDR: 10.96.0.0/12,
                                      PodCIDR:       10.250.7.0/24
                            },
                    ENIInfo:      nil,
                    Pod:          {
                          Ingress:         pod.TcIngress,
                          Egress:          pod.TcEgress,
                          NetworkPriority: pod.NetworkPriority,
                    },
                    IfName:       "",
                    ExtraRoutes:  nil,
                    DefaultRoute: true,
                }
    }

cni插件拿到daemon的grpc响应结果后,将会通过响应结果中的IPType字段确定使用那种容器网络配置方式

func getDatePath(ipType rpc.IPType, vlanStripType types.VlanStripType, trunk bool) types.DataPath {
    switch ipType {
    case rpc.IPType_TypeVPCIP:
        return types.VPCRoute
    case rpc.IPType_TypeVPCENI:
        if trunk {
            return types.Vlan
        }
        return types.ExclusiveENI
    case rpc.IPType_TypeENIMultiIP:
        if trunk && vlanStripType == types.VlanStripTypeVlan {
            return types.Vlan
        }
        return types.IPVlan
    
    }
}

对于VPC-IP类型的POD网络,直接使用的是VPCRoute

它的配置容器网络的方式

switch setupCfg.DP {
        case types.VPCRoute:
            utils.Hook.AddExtraInfo("dp", "vpcRoute")

            var r cniTypes.Result
            // 调用IPAM分配IP
            r, err = ipam.ExecAdd(delegateIpam, []byte(fmt.Sprintf(delegateConf, setupCfg.ContainerIPNet.IPv4)))
            
            var ipamResult *current.Result
            ipamResult, err = current.NewResultFromResult(r)
            

            err = func() (err error) {
                podIPAddr := ipamResult.IPs[0].Address
                gateway := ipamResult.IPs[0].Gateway

                containerIPNet = &terwayTypes.IPNetSet{
                    IPv4: &podIPAddr,
                }
                gatewayIPSet = &terwayTypes.IPSet{
                    IPv4: gateway,
                }

                setupCfg.ContainerIPNet = containerIPNet
                setupCfg.GatewayIP = gatewayIPSet

                return datapath.NewVPCRoute().Setup(setupCfg, cniNetns)
            }()

首先调用IPAM进行IP分配,这里使用的是host-local,IPAM插件入参是

{
    "name": "networks",
    "cniVersion": "0.4.0",
    "ipam": {
        "type": "host-local",
        "subnet": "10.250.7.0/24",
        "dataDir": "/var/lib/cni/",
        "routes": [
            { "dst": "0.0.0.0/0" }
        ]
    }
}

其中的subnet关键信息来自daemon的grpc响应中的POD CIDR

host-local插件会从这个网段中挑选一个IP作为容器的IP地址,分配过程可以参考ipam解析

通过ipam插件就可以得到容器的IP地址和网关信息,然后就可以开始配置网卡信息了

首先是创建veth网卡

func Setup(cfg *Veth, netNS ns.NetNS) error {
    peer, err := netlink.LinkByName(cfg.PeerName)
    
    contLinkName, err := ip.RandomVethName()
    
    v := &netlink.Veth{
        LinkAttrs: netlink.LinkAttrs{
            MTU:       cfg.MTU,
            Name:      contLinkName,
            Namespace: netlink.NsFd(int(netNS.Fd())),
        },
        PeerName: cfg.PeerName,
    }
    err = utils.LinkAdd(v)
    
    return netNS.Do(func(netNS ns.NetNS) error {
        contLink, innerErr := netlink.LinkByName(contLinkName)
        
        _, innerErr = utils.EnsureLinkName(contLink, cfg.IfName)
        return innerErr
    })
}

这里创建的是veth设备,因此会在宿主机命名空间下创建一个设备calixxxxxxxxx,由于是veth设备,因此同时会在容器命名空间内创建一个随机名称vethxxxxxxxxx的设备

然后进入容器命名空间中将设备名称改为eth0

然后配置veth设备网卡信息

func Setup(link netlink.Link, conf *Conf) error {
    var err error
    if conf.IfName != "" {
        changed, err := utils.EnsureLinkName(link, conf.IfName)
        
    }

    if conf.MTU > 0 {
        _, err = utils.EnsureLinkMTU(link, conf.MTU)
        
    }

    for _, addr := range conf.Addrs {
        _, err = utils.EnsureAddr(link, addr)

    }

    _, err = utils.EnsureLinkUp(link)


    for _, neigh := range conf.Neighs {
        _, err = utils.EnsureNeigh(neigh)
    }

    for _, route := range conf.Routes {
        _, err = utils.EnsureRoute(route)
    }
}

对于容器里的网卡来说,会设置网卡名称为eth0,设置MTU大小,设置IP地址,将设备UP起来,设置路由信息,设置ARP信息

func generateContCfgForVPCRoute(cfg *types.SetupConfig, link netlink.Link, mac net.HardwareAddr) *nic.Conf {
    var routes []*netlink.Route
    var neighs []*netlink.Neigh

    if cfg.ContainerIPNet.IPv4 != nil {
        // add default route
        routes = append(routes, &netlink.Route{
            LinkIndex: link.Attrs().Index,
            Scope:     netlink.SCOPE_UNIVERSE,
            Dst:       "0.0.0.0/0",
            Gw:        "169.254.1.1",
            Flags:     int(netlink.FLAG_ONLINK),
        })

        neighs = append(neighs, &netlink.Neigh{
            LinkIndex:    link.Attrs().Index,
            IP:           "169.254.1.1",
            HardwareAddr: hostVETH.Attrs().HardwareAddr,
            State:        netlink.NUD_PERMANENT,
        })
    }

    contCfg := &nic.Conf{
        IfName: cfg.ContainerIfName,
        MTU:    cfg.MTU,
        Addrs:  utils.NewIPNetToMaxMask(cfg.ContainerIPNet),
        Routes: routes,
        Neighs: neighs,
    }

    return contCfg
}

容器内的路由和ARP信息,其中ARP对应的MAC地址是宿主机上veth设备的mac地址,即calixxxxxxxxx的mac地址

default via 169.254.1.1 dev eth0 onlink

? (169.254.1.1) at da:44:55:66:77:88 [ether] on eth0

然后对于宿主机上的veth设备来说,只需要设置路由信息

func generateHostPeerCfgForVPCRoute(cfg *types.SetupConfig, link netlink.Link) *nic.Conf {
    var routes []*netlink.Route

    if cfg.ContainerIPNet.IPv4 != nil {
        // add route to container
        routes = append(routes, &netlink.Route{
            LinkIndex: link.Attrs().Index,
            Scope:     netlink.SCOPE_LINK,
            Dst:       utils.NewIPNetWithMaxMask(cfg.ContainerIPNet.IPv4),
        })
    }

    return &nic.Conf{
        MTU:       cfg.MTU,
        Routes:    routes,
    }
}

对应的路由信息是这个,意味着发往这个容器IP的数据包就发往veth设备calixxxxxxxxx

10.250.7.2 dev calixxxxxxxxxx scope link

参考

上一篇下一篇

猜你喜欢

热点阅读