阿里云CCM源码分析

2024-02-27  本文已影响0人  Teddy_b

阿里云Cloud-Provider

系开源项目,见github

主要提供Service的LoadBalance功能

源码解析

源码相对比较简单,就是一个简单的自定义控制器,默认包括了这几个控制器:

节点控制器

当使用云厂商的cloud-provider时,Kubelet 必须配置启动参数--cloud-provider=external

设置后kubelet会给节点添加污点,参考kubelet代码

if kl.externalCloudProvider {
        taint := v1.Taint{
            Key:    cloudproviderapi.TaintExternalCloudProvider,
            Value:  "true",
            Effect: v1.TaintEffectNoSchedule,
        }

        nodeTaints = append(nodeTaints, taint)
    }

而这里的节点控制器就是为了从云厂商获取节点IP、状态等信息,然后去除这个污点,使节点可用

func (m *ReconcileNode) syncCloudNode(node *corev1.Node) error {
        var cloudTaint *v1.Taint
    for _, taint := range taints {
        if taint.Key == api.TaintExternalCloudProvider {
            cloudTaint = &taint
        }
    }
    if cloudTaint == nil {
        klog.V(5).Infof("node %s is registered without cloud taint. return ok", node.Name)
        return nil
    }


    err := m.doAddCloudNode(node)
    
    return nil
}

这里可用看到如果污点已经没了,则节点控制器就不再处理这个节点了;否则就需要进行初始化了

func findCloudECS(ins prvd.IInstance, node *v1.Node) (*prvd.NodeAttribute, error) {
    if node.Spec.ProviderID != "" {
        return findCloudECSById(ins, node)
    } else {
        return findCloudECSByIp(ins, node)
    }
}

其中providerId的格式是cn-hangzhou.i-v98dklsmnxkkgiiil7:即REGION.NODEID

如果指定了providerId,则会查询region下是否存在这个nodeid对应的ECS

如果没有指定,则会查询VPC下这个节点的内网IP对应的ECS

var internalIP []string
    for _, addr := range node.Status.Addresses {
        if addr.Type == v1.NodeInternalIP {
            internalIP = append(internalIP, addr.Address)
        }
    }
func setFields(node *v1.Node, ins *prvd.NodeAttribute, cfgRoute bool) {

    var modifiers []nodeModifier
    if ins.InstanceType != "" {
        modify := func(n *v1.Node) {
            n.Labels["beta.kubernetes.io/instance-type"] = ins.InstanceType
            n.Labels["node.kubernetes.io/instance-type"] = ins.InstanceType
        }
        modifiers = append(modifiers, modify)
    }

    if ins.Zone != "" {
        modify := func(n *v1.Node) {
            n.Labels["failure-domain.beta.kubernetes.io/zone"] = ins.Zone
            n.Labels["topology.kubernetes.io/zone"] = ins.Zone
        }
        modifiers = append(modifiers, modify)
    }

    if ins.Region != "" {
        modify := func(n *v1.Node) {
            n.Labels["failure-domain.beta.kubernetes.io/region"] = ins.Region
            n.Labels["topology.kubernetes.io/region"] = ins.Region
        }
        modifiers = append(modifiers, modify)
    }

    if node.Spec.ProviderID == "" && ins.InstanceID != "" {
        prvdId := fmt.Sprintf("%s.%s", ins.Region, ins.InstanceID)
        modify := func(n *v1.Node) {
            n.Spec.ProviderID = prvdId
        }
        modifiers = append(modifiers, modify)
    }

    modifiers = append(modifiers, removeCloudTaints)

    for _, modify := range modifiers {
        modify(node)
    }
}

主要包括这些标签,可以看到主要就是ECS对应的信息

beta.kubernetes.io/instance-type
node.kubernetes.io/instance-type
failure-domain.beta.kubernetes.io/zone
topology.kubernetes.io/zone
failure-domain.beta.kubernetes.io/region
topology.kubernetes.io/region

然后移除污点、并且设置node.Spec.ProviderIDREGION.NODEID

func findAddress(instance *ecs.Instance) []v1.NodeAddress {
    var addrs []v1.NodeAddress

    if len(instance.PublicIpAddress.IpAddress) > 0 {
        for _, ipaddr := range instance.PublicIpAddress.IpAddress {
            addrs = append(addrs, v1.NodeAddress{Type: v1.NodeExternalIP, Address: ipaddr})
        }
    }

    if instance.EipAddress.IpAddress != "" {
        addrs = append(addrs, v1.NodeAddress{Type: v1.NodeExternalIP, Address: instance.EipAddress.IpAddress})
    }

    if len(instance.InnerIpAddress.IpAddress) > 0 {
        for _, ipaddr := range instance.InnerIpAddress.IpAddress {
            addrs = append(addrs, v1.NodeAddress{Type: v1.NodeInternalIP, Address: ipaddr})
        }
    }

    if len(instance.VpcAttributes.PrivateIpAddress.IpAddress) > 0 {
        for _, ipaddr := range instance.VpcAttributes.PrivateIpAddress.IpAddress {
            addrs = append(addrs, v1.NodeAddress{Type: v1.NodeInternalIP, Address: ipaddr})
        }
    }

    return addrs
}

这里会统计ECS上的公网、内网IP;其中公网IP作为status.address中的ExternalIP,内网IP作为status.address中的InternalIP

然后再拼接kubelet默认为节点设置的hostname,作为status.address中的Hostname

func setHostnameAddress(node *v1.Node, addrs []v1.NodeAddress) []v1.NodeAddress {
    // Check if a hostname address exists in the cloud provided addresses
    hostnameExists := false
    for i := range addrs {
        if addrs[i].Type == v1.NodeHostName {
            hostnameExists = true
        }
    }
    // If hostname was not present in cloud provided addresses, use the hostname
    // from the existing node (populated by kubelet)
    if !hostnameExists {
        for _, addr := range node.Status.Addresses {
            if addr.Type == v1.NodeHostName {
                addrs = append(addrs, addr)
            }
        }
    }
    return addrs
}

拼接完成后,使用patch接口将信息更新到节点上

diff := func(copy runtime.Object) (client.Object, error) {
            nins := copy.(*corev1.Node)
            nins.Status.Addresses = cloudNode.Addresses
            return nins, nil
        }
        err := helper.PatchM(m.client, node, diff, helper.PatchStatus)

路由控制器

路由控制器主要是为了解决POD跨主访问的问题

通过cloud-provider的配置参数启动,默认就是会配置路由信息

fs.BoolVar(&cfg.ConfigureCloudRoutes, flagConfigureCloudRoutes, true, "Should CIDRs allocated by allocate-node-cidrs be configured on the cloud provider.")

配置的是节点的POD CIDR

func getIPv4RouteForNode(node *v1.Node) (*net.IPNet, string, error) {
    var (
        ipv4CIDR    *net.IPNet
        ipv4CIDRStr string
        err         error
    )
    for _, podCidr := range append(node.Spec.PodCIDRs, node.Spec.PodCIDR) {
        if podCidr != "" {
            _, ipv4CIDR, err = net.ParseCIDR(podCidr)
            
            ipv4CIDRStr = ipv4CIDR.String()
            if len(ipv4CIDR.Mask) == net.IPv4len {
                ipv4CIDRStr = ipv4CIDR.String()
                break
            }
        }
    }
    return ipv4CIDR, ipv4CIDRStr, nil
}

然后获取VPC路由表,vpc id可以直接通过阿里云的metadata服务器获取,参考terway系列文章中的metadata获取

func getRouteTables(ctx context.Context, providerIns prvd.Provider) ([]string, error) {
    vpcId, err := providerIns.VpcID()
    
    tables, err := providerIns.ListRouteTables(ctx, vpcId)
    
    if len(tables) > 1 {
        return nil, fmt.Errorf("multiple route tables found by vpc id[%s], length(tables)=%d", ctrlCfg.CloudCFG.Global.VpcID, len(tables))
    }
    if len(tables) == 0 {
        return nil, fmt.Errorf("no route tables found by vpc id[%s]", ctrlCfg.CloudCFG.Global.VpcID)
    }
    return tables, nil
}

最后就是添加POD CIDR到路由表上

func (r *VPCProvider) CreateRoute(
    ctx context.Context, table string, provideID string, destinationCIDR string,
) (*model.Route, error) {
    createRouteEntryRequest := vpc.CreateCreateRouteEntryRequest()
    createRouteEntryRequest.RouteTableId = table
    createRouteEntryRequest.DestinationCidrBlock = destinationCIDR
    createRouteEntryRequest.NextHopType = model.RouteNextHopTypeInstance
    _, instance, err := util.NodeFromProviderID(provideID)
    if err != nil {
        return nil, fmt.Errorf("invalid provide id: %v, err: %v", provideID, err)
    }
    createRouteEntryRequest.NextHopId = instance
    _, err = r.auth.VPC.CreateRouteEntry(createRouteEntryRequest)
    if err != nil {
        return nil, fmt.Errorf("error create route entry for %s, %s, error: %v", provideID, destinationCIDR, err)
    }
    return &model.Route{
        Name:            fmt.Sprintf("%s-%s", provideID, destinationCIDR),
        DestinationCIDR: destinationCIDR,
        ProviderId:      provideID,
    }, nil
}

可以看到这里的路由信息:目标网段是POD CIDR的,设置下一跳为当前ECS的nodeid,即所有到POD CIDR的数据包下一跳都是当前ECS,这样就能打通POD跨主访问的网络了,都是通过VPC路由来实现的

Service控制器

Service是主要控制对象,而且支持的注解很多,参考阿里云CLB文档

下面主要分两个主要流程分析下

创建CLB

这个场景对应的是在创建Service时指定type: LoadBalancer

if err := m.finalizerManager.AddFinalizers(req.Ctx, req.Service, helper.ServiceFinalizer); err != nil {
        
    }
func (mgr *LoadBalancerManager) BuildLocalModel(reqCtx *svcCtx.RequestContext, mdl *model.LoadBalancer) error {
    mdl.LoadBalancerAttribute.AddressType = model.AddressType(reqCtx.Anno.Get(annotation.AddressType))
    mdl.LoadBalancerAttribute.InternetChargeType = model.InternetChargeType(reqCtx.Anno.Get(annotation.ChargeType))
    mdl.LoadBalancerAttribute.InstanceChargeType = model.InstanceChargeType(reqCtx.Anno.Get(annotation.InstanceChargeType))
    mdl.LoadBalancerAttribute.LoadBalancerSpec = model.LoadBalancerSpecType(reqCtx.Anno.Get(annotation.Spec))
    bandwidth := reqCtx.Anno.Get(annotation.Bandwidth)
    if bandwidth != "" {
        i, err := strconv.Atoi(bandwidth)
        mdl.LoadBalancerAttribute.Bandwidth = i
    }
    mdl.LoadBalancerAttribute.LoadBalancerSpec = model.LoadBalancerSpecType(reqCtx.Anno.Get(annotation.Spec))

    if reqCtx.Anno.Get(annotation.LoadBalancerId) != "" {
        mdl.LoadBalancerAttribute.LoadBalancerId = reqCtx.Anno.Get(annotation.LoadBalancerId)
        mdl.LoadBalancerAttribute.IsUserManaged = true
    }
    mdl.LoadBalancerAttribute.LoadBalancerName = reqCtx.Anno.Get(annotation.LoadBalancerName)
    mdl.LoadBalancerAttribute.VSwitchId = reqCtx.Anno.Get(annotation.VswitchId)
    mdl.LoadBalancerAttribute.MasterZoneId = reqCtx.Anno.Get(annotation.MasterZoneID)
    mdl.LoadBalancerAttribute.SlaveZoneId = reqCtx.Anno.Get(annotation.SlaveZoneID)
    mdl.LoadBalancerAttribute.ResourceGroupId = reqCtx.Anno.Get(annotation.ResourceGroupId)
    mdl.LoadBalancerAttribute.AddressIPVersion = model.AddressIPVersionType(reqCtx.Anno.Get(annotation.IPVersion))
    mdl.LoadBalancerAttribute.DeleteProtection = model.FlagType(reqCtx.Anno.Get(annotation.DeleteProtection))
    mdl.LoadBalancerAttribute.ModificationProtectionStatus =
        model.ModificationProtectionType(reqCtx.Anno.Get(annotation.ModificationProtection))
    mdl.LoadBalancerAttribute.Tags = reqCtx.Anno.GetLoadBalancerAdditionalTags()
    mdl.LoadBalancerAttribute.Address = reqCtx.Anno.Get(annotation.IP)
    return nil
}
func setBackendsFromEndpoints(candidates *backend.EndpointWithENI, vgroup model.VServerGroup) []model.BackendAttribute {
    var backends []model.BackendAttribute

    if len(candidates.Endpoints.Subsets) == 0 {
        return nil
    }
    for _, ep := range candidates.Endpoints.Subsets {
        var backendPort int
        if vgroup.ServicePort.TargetPort.Type == intstr.Int {
            backendPort = vgroup.ServicePort.TargetPort.IntValue()
        } else {
            for _, p := range ep.Ports {
                if p.Name == vgroup.ServicePort.Name {
                    backendPort = int(p.Port)
                    break
                }
            }
            if backendPort == 0 {
                klog.Warningf("%s cannot find port according port name: %s", vgroup.VGroupName, vgroup.ServicePort.Name)
            }
        }

        for _, addr := range ep.Addresses {
            backends = append(backends, model.BackendAttribute{
                NodeName: addr.NodeName,
                ServerIp: addr.IP,
                // set backend port to targetPort by default
                // if backend type is ecs, update backend port to nodePort
                Port:        backendPort,
                Description: vgroup.VGroupName,
            })
        }
    }
    return backends
}

构建后端负载均衡池的时候会分三种类型处理
1)如果Service指定了externalTrafficPolicy: Local

我们知道这种类型只会将POD所在的那些节点加入到负载均衡池里面,因此只需要将Endpoints中的那些POD加入到负载均衡池中即可

这里的ServerId对应的就是ECS的nodeid,然后置空ServerIp,通过NodePort方式访问

for _, backend := range initBackends {
        backend.ServerId = id
        backend.ServerIp = ""
        backend.Type = model.ECSBackendType
        // for ECS backend type, port should be set to NodePort
        backend.Port = int(vgroup.ServicePort.NodePort)
        ecsBackends = append(ecsBackends, backend)
    }

最后在设置后端负载均衡池中的各个后端服务器的权重,节点上存在的POD个数就是当前节点的权重

func podNumberAlgorithm(mode helper.TrafficPolicy, backends []model.BackendAttribute) []model.BackendAttribute {
    
    // LocalTrafficPolicy
    ecsPods := make(map[string]int)
    for _, b := range backends {
        ecsPods[b.ServerId] += 1
    }
    for i := range backends {
        backends[i].Weight = ecsPods[backends[i].ServerId]
    }
    return backends
}

2)如果Service指定了externalTrafficPolicy: Cluster

我们知道这种类型会将所有节点都加入到负载均衡池里面,因此这种模式会将所有节点都加入到负载均衡池里

func (mgr *VGroupManager) buildClusterBackends(reqCtx *svcCtx.RequestContext, candidates *backend.EndpointWithENI, vgroup model.VServerGroup) ([]model.BackendAttribute, error) {

    // 1. add ecs backends. add all cluster nodes.
    for _, node := range candidates.Nodes {

        ecsBackends = append(
            ecsBackends,
            model.BackendAttribute{
                ServerId:    id,
                Weight:      DefaultServerWeight,
                Port:        int(vgroup.ServicePort.NodePort),
                Type:        model.ECSBackendType,
                Description: vgroup.VGroupName,
            },
        )
    }

    return setWeightBackends(helper.ClusterTrafficPolicy, backends, vgroup.VGroupWeight), nil
}

然后各个后端服务器的权重都是一样的

func podNumberAlgorithm(mode helper.TrafficPolicy, backends []model.BackendAttribute) []model.BackendAttribute {
    if mode == helper.ClusterTrafficPolicy {
        for i := range backends {
            backends[i].Weight = 100
        }
        return backends
    }
}

3)还一种是直接使用EIP,通过注解service.beta.kubernetes.io/backend-type: eni

这种对应的是Terway网络插件直接分配EIP给POD,然后直接将POD加入到负载均衡池中

func updateENIBackends(reqCtx *svcCtx.RequestContext, mgr *VGroupManager, backends []model.BackendAttribute, ipVersion model.AddressIPVersionType) (
    []model.BackendAttribute, error) {
    vpcId, err := mgr.cloud.VpcID()

    vpcCIDRs, err := mgr.cloud.DescribeVpcCIDRBlock(context.TODO(), vpcId, ipVersion)

    var ips []string
    for _, b := range backends {
        ips = append(ips, b.ServerIp)
    }
    result, err := mgr.cloud.DescribeNetworkInterfaces(vpcId, ips, ipVersion)

    var skipIPs []string
    for i := range backends {
        eniid, ok := result[backends[i].ServerIp]
        // for ENI backend type, port should be set to targetPort (default value), no need to update
        backends[i].ServerId = eniid
        backends[i].Type = model.ENIBackendType
    }

    return backends, nil
}

然后各个后端服务器的权重也都是一样的

func podNumberAlgorithm(mode helper.TrafficPolicy, backends []model.BackendAttribute) []model.BackendAttribute {
    if mode == helper.ENITrafficPolicy{
        for i := range backends {
            backends[i].Weight = 100
        }
        return backends
    }
}
func (mgr *ListenerManager) BuildLocalModel(reqCtx *svcCtx.RequestContext, mdl *model.LoadBalancer) error {
    for _, port := range reqCtx.Service.Spec.Ports {
        listener, err := mgr.buildListenerFromServicePort(reqCtx, port)
        mdl.Listeners = append(mdl.Listeners, listener)
    }
    return nil
}

至此,LoadBalance从Service上解析完成了,可以看到主要包括三个方面

LoadBalance解析完成后,然后就要看是否需要创建这个LoadBalance了,怎么看呢?

首先需要确定这个Service是否已经关联了已有的LoadBalance,主要是看是否通过注解设置了service.beta.kubernetes.io/alibaba-cloud-loadbalancer-id

如果指定了这个注解,则会先去SLB查询这个LoadBalance

func (mgr *LoadBalancerManager) Find(reqCtx *svcCtx.RequestContext, mdl *model.LoadBalancer) error {
    // 1. set load balancer id
    if reqCtx.Anno.Get(annotation.LoadBalancerId) != "" {
        mdl.LoadBalancerAttribute.LoadBalancerId = reqCtx.Anno.Get(annotation.LoadBalancerId)
    }

    // 2. set default loadbalancer name
    // it's safe to set loadbalancer name which will be overwritten in FindLoadBalancer func
    mdl.LoadBalancerAttribute.LoadBalancerName = reqCtx.Anno.GetDefaultLoadBalancerName()

    // 3. set default loadbalancer tag
    // filter tags using logic operator OR, so only TAGKEY tag can be added
    mdl.LoadBalancerAttribute.Tags = []tag.Tag{
        {
            Key:   "kubernetes.do.not.delete",
            Value: reqCtx.Anno.GetDefaultLoadBalancerName(),
        },
    }
    return mgr.cloud.FindLoadBalancer(reqCtx.Ctx, mdl)
}

如果没有指定LoadBalance,那么就会根据上面解析出来的LoadBalance信息自动创建一个

if remote.LoadBalancerAttribute.LoadBalancerId == "" {
        if err := m.slbMgr.Create(reqCtx, local); err != nil {
            
        }
    }

自动创建的LoadBalance名称和Service的UID有关

func (n *AnnotationRequest) GetDefaultLoadBalancerName() string {
    //GCE requires that the name of a load balancer starts with a lower case letter.
    ret := "a" + string(n.Service.UID)
    ret = strings.Replace(ret, "-", "", -1)
    //AWS requires that the name of a load balancer is shorter than 32 bytes.
    if len(ret) > 32 {
        ret = ret[:32]
    }
    return ret
}

最终,不管是自动创建的LoadBalance还是使用已有的LoadBalance,到现在是已经有了一个可用的LoadBalance了

但是还需要给这个LoadBalance设置负载均衡池和监听器

而负载均衡池和监听器就来自上面解析出来的信息了

需要注意的是,如果使用的是已有的LoadBalance,那么会对比已有LoadBalance中的负载均衡池及监听器 和 上面解析出来的是负载均衡池及监听器,并且以解析出来的 负载均衡池及监听器 为准来更新已有的LoadBalance

比如对负载均衡池的更新,会以local(解析出来的)中的负载均衡池为基准,在remote(SLB中的LoadBalancer)中找对应的后端服务器,如果找到了,就以local的为准更新remote中的后端服务器信息;如果没有找到,就先创建这个后端服务器,然后加入到remote中

func (m *ModelApplier) applyVGroups(reqCtx *svcCtx.RequestContext, local *model.LoadBalancer, remote *model.LoadBalancer) error {
    var errs []error
    for i := range local.VServerGroups {
        found := false
        var old model.VServerGroup
        for _, rv := range remote.VServerGroups {
            // for reuse vgroup case, find by vgroup id first
            if local.VServerGroups[i].VGroupId != "" &&
                local.VServerGroups[i].VGroupId == rv.VGroupId {
                found = true
                old = rv
                break
            }
            // find by vgroup name
            if local.VServerGroups[i].VGroupId == "" &&
                local.VServerGroups[i].VGroupName == rv.VGroupName {
                found = true
                local.VServerGroups[i].VGroupId = rv.VGroupId
                old = rv
                break
            }
        }

        // update
        if found {
            if err := m.vGroupMgr.UpdateVServerGroup(reqCtx, local.VServerGroups[i], old); err != nil {
                
            }
        }

        // create
        if !found {
            
            err := m.vGroupMgr.CreateVServerGroup(reqCtx, &local.VServerGroups[i], remote.LoadBalancerAttribute.LoadBalancerId)
            
            if err := m.vGroupMgr.BatchAddVServerGroupBackendServers(reqCtx, local.VServerGroups[i],
                local.VServerGroups[i].Backends); err != nil {
            }
            remote.VServerGroups = append(remote.VServerGroups, local.VServerGroups[i])
        }
    }

}

LoadBalance设置完成后,会添加Service的标签信息,其中HASH是根据Service中的所有字段求出来的

service.beta.kubernetes.io/hash
service.k8s.alibaba/loadbalancer-id

然后再把LoadBalance的地址更新到Service的status里

if len(newStatus.Ingress) == 0 {
        newStatus.Ingress = append(newStatus.Ingress,
            v1.LoadBalancerIngress{
                IP: lb.LoadBalancerAttribute.Address,
            })
    }
删除CLB

这个主要对应的就是删除Service或者修改Service的类型

func NeedDeleteLoadBalancer(svc *v1.Service) bool {
    return svc.DeletionTimestamp != nil || svc.Spec.Type != v1.ServiceTypeLoadBalancer
}

这时候就只会处理那些带有Finalizer的,而且是自动创建的LoadBalancer,将其自动删除

if helper.NeedDeleteLoadBalancer(reqCtx.Service) {
        if !local.LoadBalancerAttribute.IsUserManaged {
            err := m.slbMgr.Delete(reqCtx, remote)
            
            remote.LoadBalancerAttribute.LoadBalancerId = ""
            remote.LoadBalancerAttribute.Address = ""
            return nil
        }
    }

然后再移除Service的标签、status、Finalizer信息

NLB控制器

最后一个默认会启用的控制器,对应的是阿里云的NLB负载均衡,代码流程和CLB几乎是一模一样

只是由于NLB的自身产品实现和CLB的不同,所以最终的效果才不同

上一篇下一篇

猜你喜欢

热点阅读