k8s网络系列学习笔记之三----kube-proxy原理分析
本篇主要从代码的角度分析一下基于ipvs模式的实现原理。
ProxyServer的创建
系统代码基于Cobra实现,前面的代码逻辑很清晰,就不做分析,kube-proxy的关键的对象是ProxyServer,初始化的过程就是升恒ProxyServer对象,并执行ProxyServer.Run()。
下面我们先看看ProxyServer的定义:
type ProxyServer struct {
Client clientset.Interface
EventClient v1core.EventsGetter
IptInterface utiliptables.Interface
IpvsInterface utilipvs.Interface
IpsetInterface utilipset.Interface
execer exec.Interface
Proxier proxy.ProxyProvider
Broadcaster record.EventBroadcaster
Recorder record.EventRecorder
ConntrackConfiguration kubeproxyconfig.KubeProxyConntrackConfiguration
Conntracker Conntracker // if nil, ignored
ProxyMode string
NodeRef *v1.ObjectReference
CleanupAndExit bool
CleanupIPVS bool
MetricsBindAddress string
EnableProfiling bool
OOMScoreAdj *int32
ResourceContainer string
ConfigSyncPeriod time.Duration
ServiceEventHandler config.ServiceHandler
EndpointsEventHandler config.EndpointsHandler
HealthzServer *healthcheck.HealthzServer
}
底层命令接口
下面分析一下这个结构体的成员,client和EventClient就不啰嗦了,iptInterface、IpvsInterface 、IpsetInterface、execer这四个变量对应到底层命令:iptables,ipvsadm, ipset。
iptInterface = utiliptables.New(execer, dbus, protocol) // 用于修改iptables
ipvsInterface = utilipvs.New(execer) // 用于操作ipvs配置
kernelHandler = ipvs.NewLinuxKernelHandler() // 用于获取ipvs相关的内核模块信息,没有配置相关的内核模块,将会降级为iptables代理模式
ipsetInterface = utilipset.New(execer) // 执行ipset命令,维护ip地址组
这几个成员的作用如注释中锁描述的,关键的成员是Proxyier,kube-proxy支持user space、iptables和ipvs三种代理模式,不同代理模式使用不同的Proxier实例。
如下所示,用判断kube-proxy具体采用的模式。
proxyMode := getProxyMode(string(config.Mode), iptInterface, kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
config.Mode是在启动kube-proxy进程时配置的模式,下面是kube-proxy的一个配置文件例子,其中mode:"ipvs"描述期望采用ipvs模式。
apiVersion: kubeproxy.config.k8s.io/v1alpha1
bindAddress: 0.0.0.0
clientConnection:
acceptContentTypes: ""
burst: 10
contentType: application/vnd.kubernetes.protobuf
kubeconfig: /var/lib/kube-proxy/kubeconfig.conf
qps: 5
clusterCIDR: 10.244.0.0/16
configSyncPeriod: 15m0s
conntrack:
max: null
maxPerCore: 32768
min: 131072
tcpCloseWaitTimeout: 1h0m0s
tcpEstablishedTimeout: 24h0m0s
enableProfiling: false
healthzBindAddress: 0.0.0.0:10256
hostnameOverride: ""
iptables:
masqueradeAll: false
masqueradeBit: 14
minSyncPeriod: 0s
syncPeriod: 30s
ipvs:
excludeCIDRs: null
minSyncPeriod: 0s
scheduler: ""
syncPeriod: 30s
kind: KubeProxyConfiguration
metricsBindAddress: 127.0.0.1:10249
mode: "ipvs"
nodePortAddresses: null
oomScoreAdj: -999
portRange: ""
resourceContainer: /kube-proxy
udpIdleTimeout: 250ms
虽然这里配置了ipvs模式,但是是不是最终采用ipvs,还要检查一下是否满足条件,可参考学习笔记2。具体的代码就不展示了。如果最终确定采用ipvs模式,那么Proxier成员为ipvs.Proxyer类型。
ProxyServer的启动
- 启动EventBroadcaster,事件同步到API-Server
// 创建eventBroadcaster
hostname := utilnode.GetHostname(config.HostnameOverride)
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})
// 启动时间记录eventBroadcaster到Api-Server
s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
- 启动health服务接口
代码逻辑比较清晰,就不列举了,我们可以通过下面的工具来检查health信息。
root@bogon2:~/k8s-yml-tests# curl http://127.0.0.1:10256/healthz
{"lastUpdated": "2019-02-22 07:10:11.258317649 +0000 UTC m=+109458.567293187","currentTime": "2019-02-22 07:10:37.209843398 +0000 UTC m=+109484.518818898"}
- 启动metrics服务接口
启动好以后,可以查看metrics数据,这是通过promethues工具统计的数据,可以由promethues收集后,进行汇总展示。
- 启动metrics服务接口
root@bogon2:~/k8s-yml-tests# curl http://127.0.0.1:10249/metrics
# HELP apiserver_audit_event_total Counter of audit events generated and sent to the audit backend.
# TYPE apiserver_audit_event_total counter
apiserver_audit_event_total 0
# HELP apiserver_audit_requests_rejected_total Counter of apiserver requests rejected due to an error in audit logging backend.
# TYPE apiserver_audit_requests_rejected_total counter
apiserver_audit_requests_rejected_total 0
......
rest_client_request_latency_seconds_sum{url="https://172.25.39.6:6443/%7Bprefix%7D",verb="POST"} 0.007607341
rest_client_request_latency_seconds_count{url="https://172.25.39.6:6443/%7Bprefix%7D",verb="POST"} 1
# HELP rest_client_requests_total Number of HTTP requests, partitioned by status code, method, and host.
# TYPE rest_client_requests_total counter
rest_client_requests_total{code="200",host="172.25.39.6:6443",method="GET"} 490
rest_client_requests_total{code="201",host="172.25.39.6:6443",method="POST"} 1
- conntracker参数维护
conntracker跟踪并且记录连接状态。Linux为每一个经过网络堆栈的数据包,生成一个新的连接记录项 (Connection entry)。此后,所有属于此连接的数据包都被唯一地分配给这个连接,并标识连接的状态。连接跟踪是防火墙模块的状态检测的基础,同时也是地址转换中实 现SNAT和DNAT的前提。
- conntracker参数维护
那么Netfilter又是如何生成连接记录项的呢?每一个数据,都有“来源”与“目的”主机,发起连接的主机称为“来源”,响应“来源”的请求的主机即 为目的,所谓生成记录项,就是对每一个这样的连接的产生、传输及终止进行跟踪记录。由所有记录项产生的表,即称为连接跟踪表。
kube-proxy通过结构体KubeProxyContrackConfiguration来设置conntrack参数信息,对应的结构如下所示:
// KubeProxyConntrackConfiguration contains conntrack settings for
// the Kubernetes proxy server.
type KubeProxyConntrackConfiguration struct {
// max is the maximum number of NAT connections to track (0 to
// leave as-is). This takes precedence over maxPerCore and min.
Max *int32
// maxPerCore is the maximum number of NAT connections to track
// per CPU core (0 to leave the limit as-is and ignore min).
MaxPerCore *int32
// min is the minimum value of connect-tracking records to allocate,
// regardless of maxPerCore (set maxPerCore=0 to leave the limit as-is).
Min *int32
// tcpEstablishedTimeout is how long an idle TCP connection will be kept open
// (e.g. '2s'). Must be greater than 0 to set.
TCPEstablishedTimeout *metav1.Duration
// tcpCloseWaitTimeout is how long an idle conntrack entry
// in CLOSE_WAIT state will remain in the conntrack
// table. (e.g. '60s'). Must be greater than 0 to set.
TCPCloseWaitTimeout *metav1.Duration
}
conntracker的设置是由ProxyServer中的Conntracker成员负责设置,在linux中,具体的结构为:type realConntracker struct{},下面设设置Max参数的代码:
func (rct realConntracker) SetMax(max int) error {
if err := rct.setIntSysCtl("nf_conntrack_max", max); err != nil {
return err
}
glog.Infof("Setting nf_conntrack_max to %d", max)
// Linux does not support writing to /sys/module/nf_conntrack/parameters/hashsize
// when the writer process is not in the initial network namespace
// (https://github.com/torvalds/linux/blob/v4.10/net/netfilter/nf_conntrack_core.c#L1795-L1796).
// Usually that's fine. But in some configurations such as with github.com/kinvolk/kubeadm-nspawn,
// kube-proxy is in another netns.
// Therefore, check if writing in hashsize is necessary and skip the writing if not.
hashsize, err := readIntStringFile("/sys/module/nf_conntrack/parameters/hashsize")
if err != nil {
return err
}
if hashsize >= (max / 4) {
return nil
}
// sysfs is expected to be mounted as 'rw'. However, it may be
// unexpectedly mounted as 'ro' by docker because of a known docker
// issue (https://github.com/docker/docker/issues/24000). Setting
// conntrack will fail when sysfs is readonly. When that happens, we
// don't set conntrack hashsize and return a special error
// readOnlySysFSError here. The caller should deal with
// readOnlySysFSError differently.
writable, err := isSysFSWritable()
if err != nil {
return err
}
if !writable {
return readOnlySysFSError
}
// TODO: generify this and sysctl to a new sysfs.WriteInt()
glog.Infof("Setting conntrack hashsize to %d", max/4)
return writeIntStringFile("/sys/module/nf_conntrack/parameters/hashsize", max/4)
}
从代码中可以看出,通过sysctl修改/proc/sys/net/netfilter/nf_conntrack_max文件中的参数值,然后通过代码直接修改/sys/module/nf_conntrack/parameters/hashsize的值为max的1/4。conntrack-tcp-timeout-established与–conntrack-tcp-timeout-close-wait参数的设置同理,最后对应的文件路径分别为/proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established和/proc/sys/net/netfilter/nf_conntrack_tcp_timeout_close_wait。
这些参数会影响后面的网络连接,所以设置的时候要慎重,最好与系统规划时的配置一致,设置过小,可能会导致nf_conntrack:table full,dropping packet的错误。
- 侦听Service和Endpoints的变化
这里初始化了serviceConfig和endpointsConfig实例,并启动它们,如下所示:
serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod)
serviceConfig.RegisterEventHandler(s.ServiceEventHandler)
go serviceConfig.Run(wait.NeverStop)
endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod)
endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
// functions must configure their shared informer event handlers first.
go informerFactory.Start(wait.NeverStop)
// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
// functions must configure their shared informer event handlers first.
go informerFactory.Start(wait.NeverStop)
serviceConfig和endpointsConfig注册的事件对象实际都是proxier,也就是前面初始化的ipvs.Proxier实例对象。serviceConfig和endpointsConfig的Run逻辑是等待相应的Infomer同步完成后,回调Proxier的OnServiceSynced和OnEndpointsSynced方法。
相应对象的事件回调方法,在ServiceConfig和EndpointsConfig对象的构建中完成,如下所示:
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
result := &ServiceConfig{
lister: serviceInformer.Lister(),
listerSynced: serviceInformer.Informer().HasSynced,
}
// 这里注册回调事件
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: result.handleAddService,
UpdateFunc: result.handleUpdateService,
DeleteFunc: result.handleDeleteService,
},
resyncPeriod,
)
return result
}
核心的业务逻辑主要是就是对于各种事件的监听和处理,后面专门进行分析。
- 启动Proxier的循环处理请求
循环处理的驱动有事件驱动和超时驱动两种,代码如下所示:
// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
func (proxier *Proxier) SyncLoop() {
// Update healthz timestamp at beginning in case Sync() never succeeds.
if proxier.healthzServer != nil {
proxier.healthzServer.UpdateTimestamp()
}
proxier.syncRunner.Loop(wait.NeverStop)
}
// Loop handles the periodic timer and run requests. This is expected to be
// called as a goroutine.
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
glog.V(3).Infof("%s Loop running", bfr.name)
bfr.timer.Reset(bfr.maxInterval)
for {
select {
case <-stop:
bfr.stop()
glog.V(3).Infof("%s Loop stopping", bfr.name)
return
case <-bfr.timer.C():
bfr.tryRun()
case <-bfr.run:
bfr.tryRun()
}
}
}
有Service、Endpoints事件或者超时时,都会调用BoundedFrequencyRunner.tryRun方法,BoundedFrequencyRunner的结构体如下所示:
// BoundedFrequencyRunner manages runs of a user-provided function.
// See NewBoundedFrequencyRunner for examples.
type BoundedFrequencyRunner struct {
name string // the name of this instance
minInterval time.Duration // the min time between runs, modulo bursts
maxInterval time.Duration // the max time between runs
run chan struct{} // try an async run
mu sync.Mutex // guards runs of fn and all mutations
fn func() // function to run
lastRun time.Time // time of last run
timer timer // timer for deferred runs
limiter rateLimiter // rate limiter for on-demand runs
}
在tryRun方法中,会调用fn成员函数,该成员函数为proxyier.syncProxyRules方法。在后面的处理中会单独分析这个方法。
核心业务逻辑----更新代理规则
在上一章节,最后两个步骤的处理是核心业务逻辑的,第5步负责注册事件回调,在事件回调方法中,驱动第6步中的逻辑的(syncProxyRules方法)处理。
事件回调函数
在看事件回调函数之前,我们先来看一下Proxier的两个成员:
type Proxier struct {
// endpointsChanges and serviceChanges contains all changes to endpoints and
// services that happened since last syncProxyRules call. For a single object,
// changes are accumulated, i.e. previous is state from before all of them,
// current is state after applying all of those.
endpointsChanges endpointsChangeMap // endpoints变更记录
serviceChanges serviceChangeMap // service变更记录
serviceMap proxyServiceMap // service记录
endpointsMap proxyEndpointsMap // endpoints记录
portsMap map[utilproxy.LocalPort]utilproxy.Closeable
......
}
endpointsChanges和serviceChanges分别用于记录service和endpoints的变更记录信息,而serviceMap、endpointsMap和portsMap则记录了实际的服务、EP和端口等信息内容,也就是所有的变更操作,都会更新到这些成员中。
事件回调方法OnServiceAdd的代码如下:
// OnServiceAdd is called whenever creation of new service object is observed.
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
proxier.syncRunner.Run()
}
}
在事件回调方法中,主要完成两个工作:
- 更新变更记录
- 驱动runner运行,做实际的规则变更处理
这是一种异步的驱动逻辑,可以把同一时间大量的变更处理进行合并后,批量进行处理。
更新代理规则的处理
这里讲只分析最常用的ClusterIP模式的Service的规则处理部分。
前面列举过Proxier结构的核心变量,前面几次对Service和Endpoints资源的侦听函数,会实时监测到Service和Endpoints(Pod)的变化,把变化的内容更新到Proxyier的endpointsChanges和serviceChanges成员后,触发更新代理规则的动作。
更新代理规则这块逻辑在Proxier.syncProxyRules中完成,下面将分步骤来进行说明:
- 检查确定真正需要进行的变更内容
K8S事件机制能够保证通知的幂等性,收到service和endpoints的变更内容,但是有可能变更的内容是一些周边资料信息,那这些不会影响实际的代理规则,同时也难保事件的重复触发,所以首先需要排除这些情况,把changes与实际的内存中存储的数据进行对比,得出真正的有意义的变化内容,确保我们后续的操作的内容是真正需要进行规则变更的。
整个过程是通过updateServiceMap和updateEndpointsMap来进行检查的。
func (proxier *Proxier) syncProxyRules() {
proxier.mu.Lock()
defer proxier.mu.Unlock()
......
serviceUpdateResult := updateServiceMap(
proxier.serviceMap, &proxier.serviceChanges)
endpointUpdateResult := updateEndpointsMap(
proxier.endpointsMap, &proxier.endpointsChanges, proxier.hostname)
......
}
- 准备安装IPtables规则
首先介绍一下iptables中的INPUT、FORWARD等规则链和规则,
在处理各种数据包时,根据防火墙规则的不同介入时机,iptables供涉及5种默认规则链,从应用时间点的角度理解这些链:
INPUT链:当接收到防火墙本机地址的数据包(入站)时,应用此链中的规则。
OUTPUT链:当防火墙本机向外发送数据包(出站)时,应用此链中的规则。
FORWARD链:当接收到需要通过防火墙发送给其他地址的数据包(转发)时,应用此链中的规则。
PREROUTING链:在对数据包作路由选择之前,应用此链中的规则,如DNAT。
POSTROUTING链:在对数据包作路由选择之后,应用此链中的规则,如SNAT。
-->PREROUTING-->[ROUTE]-->FORWARD-->POSTROUTING-->
mangle | mangle ^ mangle
nat | filter | nat
| |
| |
v |
INPUT OUTPUT
| mangle ^ mangle
| filter | nat
v ------>local------->| filter
我们可以通过iptables-save命令来备份iptables规则,以iptables-save的格式进行简单举例:
root@ubuntu:/home/yuxianbing# iptables-save -t nat
# Generated by iptables-save v1.6.0 on Tue Mar 5 09:26:02 2019
注释内容
*nat
:PREROUTING ACCEPT [14:12354]
-- :PREROUTING ACCEPT,表示nat表中的PREROUTING 链默认报文策略是接受(匹配不到规则继续) ,
-- [14:12354] 即[packet, bytes],表示当前有14个包(12354字节)经过nat表的PREROUTING 链
:INPUT ACCEPT [0:0]
:OUTPUT ACCEPT [4:222]
:POSTROUTING ACCEPT [3:149]
:CNI-DN-603508538baa710bd2110 - [0:0]
:CNI-HOSTPORT-DNAT - [0:0]
:CNI-HOSTPORT-SNAT - [0:0]
:CNI-SN-603508538baa710bd2110 - [0:0]
:KUBE-FIRE-WALL - [0:0]
:KUBE-MARK-DROP - [0:0]
:KUBE-MARK-MASQ - [0:0]
:KUBE-POSTROUTING - [0:0]
:KUBE-SERVICES - [0:0]
-- 解释同上(这些是自定义链)
---------- 下面开始按条输出所有规则----------
-A PREROUTING -m addrtype --dst-type LOCAL -j CNI-HOSTPORT-DNAT
---- 这是用iptables命令配置此规则的命令(详解选项可参考iptables帮助)。
-A PREROUTING -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A OUTPUT -m addrtype --dst-type LOCAL -j CNI-HOSTPORT-DNAT
-A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES
-A POSTROUTING -s 127.0.0.1/32 ! -d 127.0.0.1/32 -j CNI-HOSTPORT-SNAT
-A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING
-A POSTROUTING -s 10.0.0.0/8 ! -d 10.0.0.0/8 -j MASQUERADE
-A CNI-DN-603508538baa710bd2110 -p tcp -m tcp --dport 8080 -j DNAT --to-destination 10.221.2.42:80
-A CNI-HOSTPORT-DNAT -m comment --comment "dnat name: \"cni0\" id: \"42ccbebb916d082a6d872aaa48efea33c4cc33267d14779046f687f4dcddda8d\"" -j CNI-DN-603508538baa710bd2110
-A CNI-HOSTPORT-SNAT -m comment --comment "snat name: \"cni0\" id: \"42ccbebb916d082a6d872aaa48efea33c4cc33267d14779046f687f4dcddda8d\"" -j CNI-SN-603508538baa710bd2110
-A CNI-SN-603508538baa710bd2110 -s 127.0.0.1/32 -d 10.221.2.42/32 -p tcp -m tcp --dport 80 -j MASQUERADE
-A KUBE-MARK-DROP -j MARK --set-xmark 0x8000/0x8000
-A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000
-A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE
-A KUBE-POSTROUTING -m set --match-set KUBE-LOOP-BACK dst,dst,src -j MASQUERADE
-A KUBE-SERVICES -m set --match-set KUBE-CLUSTER-IP dst,dst -j KUBE-MARK-MASQ
-A KUBE-SERVICES -p tcp -m tcp -m set --match-set KUBE-NODE-PORT-TCP dst -j KUBE-MARK-MASQ
COMMIT
-- 应用上述配置
# Completed on Tue Mar 5 09:26:02 2019
从iptables的NAT规则表包含两个部分:链和规则,其中链还保存了经过该链的包和字节数。
针对iptables和IPVS规则的处理逻辑为:
err := proxier.iptables.SaveInto(utiliptables.TableNAT, proxier.iptablesData)
// 1.执行iptables-save命令
if err != nil { // if we failed to get any rules
glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err)
} else { // otherwise parse the output
existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, proxier.iptablesData.Bytes())
// 2. 读取所有的链路信息
}
// 3. 重置iptables规则缓冲区
proxier.natChains.Reset()
proxier.natRules.Reset()
// Write table headers.
writeLine(proxier.natChains, "*nat") // 3. 写nat表头信息
// 4. 写POSTROUTING链路信息
if chain, ok := existingNATChains[kubePostroutingChain]; ok {
writeLine(proxier.natChains, chain)
} else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain))
}
// 写POSTROUTING 规则
// 这条POSTROUTING规则的作用是为消息打标记的包,执行MASQUERADE操作,也就是SNAT处理
// 对于出主机的包,我们要做SNAT处理
writeLine(proxier.natRules, []string{
"-A", string(kubePostroutingChain),
"-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`,
"-m", "mark", "--mark", proxier.masqueradeMark,
"-j", "MASQUERADE",
}...)
// 5. KUBE-MARK-MASQ链路信息
if chain, ok := existingNATChains[KubeMarkMasqChain]; ok {
writeLine(proxier.natChains, chain)
} else {
writeLine(proxier.natChains, utiliptables.MakeChainLine(KubeMarkMasqChain))
}
// KUBE-MARK-MASQ规则,执行MARK,打上masquerade标记
writeLine(proxier.natRules, []string{
"-A", string(KubeMarkMasqChain),
"-j", "MARK", "--set-xmark", proxier.masqueradeMark,
}...)
// 6. 安装Dummy网卡kube-ipvs0
// 这个网卡的作用是绑定所有Service的ClusterIP到该网卡上
_, err = proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice)
if err != nil {
glog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err)
return
}
// 7. ipsets初始化,建立所有的ipsets表,并清空里面的内容
// make sure ip sets exists in the system.
ipSets := []*IPSet{proxier.loopbackSet, proxier.clusterIPSet, proxier.externalIPSet, proxier.nodePortSetUDP, proxier.nodePortSetTCP,
proxier.lbIngressSet, proxier.lbMasqSet, proxier.lbWhiteListCIDRSet, proxier.lbWhiteListIPSet}
if err := ensureIPSets(ipSets...); err != nil {
return
}
for i := range ipSets {
ipSets[i].resetEntries()
}
......
// linkKubeServiceChain will Create chain KUBE-SERVICES and link the chin in PREROUTING and OUTPUT
// Chain PREROUTING (policy ACCEPT)
// target prot opt source destination
// KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0
// Chain OUTPUT (policy ACCEPT)
// target prot opt source destination
// KUBE-SERVICES all -- 0.0.0.0/0 0.0.0.0/0
// Chain KUBE-SERVICES (2 references)
// 8. 创建PrePosting与Output链,匹配所有的协议、源IP和目标IP,自动进入目标KUBE-SERVICES就行匹配和处理
// 同时创建KUBE-SERVICES链,注意:规则后续根据Cluster Service与NodePort Service来按需创建。
// 类似命令为: iptables -t nat -N KUBE-SERVICES
if err := proxier.linkKubeServiceChain(existingNATChains, proxier.natChains); err != nil {
glog.Errorf("Failed to link KUBE-SERVICES chain: %v", err)
return
}
// Kube service ipset
if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
return
}
// 9. 创建KUBE-FIRE-WALL链,不过目前我们系统没有相关的规则,暂时不知道用处
if err := proxier.createKubeFireWallChain(existingNATChains, proxier.natChains); err != nil {
glog.Errorf("Failed to create KUBE-FIRE-WALL chain: %v", err)
return
}
// 10. 进入最关键的环节,为每个服务构建IPVS规则
for svcName, svcInfo := range proxier.serviceMap {
protocol := strings.ToLower(string(svcInfo.protocol))
// Precompute svcNameString; with many services the many calls
// to ServicePortName.String() show up in CPU profiles.
svcNameString := svcName.String()
// Handle traffic that loops back to the originator with SNAT.
// 10.1 hairpin模式的处理,匹配ip,port,ip这种模式,这块代码不分析了,一般很少有这种现象。
......
// 10.2 ClusterIP处理
// 准备 IP Set的项,存储Cluster IP和对应的Port
entry := &utilipset.Entry{
IP: svcInfo.clusterIP.String(),
Port: svcInfo.port,
Protocol: protocol,
SetType: utilipset.HashIPPort,
}
// 如果kube-proxy启动设置了masqueradeAll或者clusterCIDR,则安装伪装规则,做SNAT操作
if proxier.masqueradeAll || len(proxier.clusterCIDR) > 0 {
......
proxier.clusterIPSet.activeEntries.Insert(entry.String())
}
// 准备ipvs虚拟服务器
serv := &utilipvs.VirtualServer{
Address: svcInfo.clusterIP,
Port: uint16(svcInfo.port),
Protocol: string(svcInfo.protocol),
Scheduler: proxier.ipvsScheduler,
}
// Set session affinity flag and timeout for IPVS service
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds)
}
// We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService()
// 10.3 创建或者更新IPVS虚拟服务器
// 把ClusterIP绑定到kube-ipvs0设备,最后一个参数bindAddr为true
if err := proxier.syncService(svcNameString, serv, true); err == nil {
activeIPVSServices[serv.String()] = true
// ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP
// So we still need clusterIP rules in onlyNodeLocalEndpoints mode.
// 这里要看一下,传入的第二个参数中,onlyNodeLocalEndpoints传入为false,所以我们会把所有的Endpoints注册到IPVS虚拟服务器中
// onlyNodeLocalEndpoints只有NodePort和LB中设置才会生效,用于只绑定到本地的Endpoints
if err := proxier.syncEndpoint(svcName, false, serv); err != nil {
glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
}
} else {
glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
}
// 10.4 Capture externalIPs. 不关注,很少用
// 10.5 Capture load-balancer ingress. 不关注,很少用,需要云厂商支持
......
// 10.6 针对NodePort类型的Service进行处理
if svcInfo.nodePort != 0 {
lp := utilproxy.LocalPort{
Description: "nodePort for " + svcNameString,
IP: "",
Port: svcInfo.nodePort,
Protocol: protocol,
}
if proxier.portsMap[lp] != nil {
glog.V(4).Infof("Port %s was open before and is still needed", lp.String())
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
// 侦听Host上的PORT端口(TCP/UDP),把端口占用起来,以便安全的安装IPVS规则
socket, err := proxier.portMapper.OpenLocalPort(&lp)
if err != nil {
glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err)
continue
}
if lp.Protocol == "udp" {
isIPv6 := utilproxy.IsIPv6(svcInfo.clusterIP)
utilproxy.ClearUDPConntrackForPort(proxier.exec, lp.Port, isIPv6)
}
replacementPortsMap[lp] = socket
} // We're holding the port, so it's OK to install ipvs rules.
// 如果没有指定onlyNodeLocalEndpoints,则需要做SNAT处理,需要把对应的端口插入到IPSET集中,这样,可以匹配到KUBE-SERVICES规则,从而做原地址伪装,实现SNAT功能。
// Nodeports need SNAT, unless they're local.
// ipset call
if !svcInfo.onlyNodeLocalEndpoints {
entry = &utilipset.Entry{
// No need to provide ip info
Port: svcInfo.nodePort,
Protocol: protocol,
SetType: utilipset.BitmapPort,
}
var nodePortSet *IPSet
switch protocol {
case "tcp":
nodePortSet = proxier.nodePortSetTCP
case "udp":
nodePortSet = proxier.nodePortSetUDP
default:
// It should never hit
glog.Errorf("Unsupported protocol type: %s", protocol)
}
if nodePortSet != nil {
if valid := nodePortSet.validateEntry(entry); !valid {
glog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, nodePortSet.Name))
continue
}
nodePortSet.activeEntries.Insert(entry.String())
}
}
// 为节点上的每个物理IP创建对应的IPVS虚拟服务器和对应的路由Endpoint规则
// Build ipvs kernel routes for each node ip address
nodeIPs, err := proxier.ipGetter.NodeIPs()
if err != nil {
glog.Errorf("Failed to get node IP, err: %v", err)
} else {
for _, nodeIP := range nodeIPs {
// ipvs call
serv := &utilipvs.VirtualServer{
Address: nodeIP,
Port: uint16(svcInfo.nodePort),
Protocol: string(svcInfo.protocol),
Scheduler: proxier.ipvsScheduler,
}
if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP {
serv.Flags |= utilipvs.FlagPersistent
serv.Timeout = uint32(svcInfo.stickyMaxAgeSeconds)
}
// There is no need to bind Node IP to dummy interface, so set parameter `bindAddr` to `false`.
if err := proxier.syncService(svcNameString, serv, false); err == nil {
activeIPVSServices[serv.String()] = true
if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
glog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err)
}
} else {
glog.Errorf("Failed to sync service: %v, err: %v", serv, err)
}
}
}
}
}
总结
kube-proxy的IPVS Proxier是通过iptables和ipvs来实现代理功能,iptables基本上是几条固定的链路和规则,而大量的Service ClusterIP和Endpoint IP等等信息,都封装进了IPSET中,通过iptables通过match-set来匹配,大大减少了iptables规则数量,提高了iptables维护性能和匹配性能。
iptables规则主要用于做SNAT等操作,配合ipvs完成代理服务能力。