1. 程式人生 > >14.深入k8s:kube-proxy ipvs及其原始碼分析

14.深入k8s:kube-proxy ipvs及其原始碼分析

![82062770_p0](https://img.luozhiyun.com/20201008173220.png) > 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 原始碼版本是[1.19](https://github.com/kubernetes/kubernetes/tree/release-1.19) 這一篇是講service,但是基礎使用以及基本概念由於官方實在是寫的比較完整了,我沒有必要複述一遍,所以還不太清楚的小夥伴們可以去看官方的文件:https://kubernetes.io/docs/concepts/services-networking/service/。 ## IPVS 概述 在 Kubernetes 叢集中,每個 Node 執行一個 `kube-proxy` 程序。`kube-proxy` 負責為 Service 實現了一種 VIP(虛擬 IP)的形式。 從官方文件介紹來看: > 從 Kubernetes v1.0 開始,您已經可以使用 userspace 代理模式。 Kubernetes v1.1 添加了 iptables 模式代理,在 Kubernetes v1.2 中,kube-proxy 的 iptables 模式成為預設設定。 Kubernetes v1.8 添加了 ipvs 代理模式。 現在我們看的原始碼是基於1.19,所以現在預設是ipvs代理模式。 LVS是國內章文嵩博士開發並貢獻給社群的,主要由ipvs和ipvsadm組成。 ipvs是工作在核心態的4層負載均衡,基於核心底層netfilter實現,netfilter主要通過各個鏈的鉤子實現包處理和轉發。ipvs由ipvsadm提供簡單的CLI介面進行ipvs配置。由於ipvs工作在核心態,只處理四層協議,因此只能基於路由或者[NAT](https://zh.wikipedia.org/zh-cn/%E7%BD%91%E7%BB%9C%E5%9C%B0%E5%9D%80%E8%BD%AC%E6%8D%A2)進行資料轉發,可以把ipvs當作一個特殊的路由器閘道器,這個閘道器可以根據一定的演算法自動選擇下一跳。 ### IPVS vs IPTABLES - iptables 使用連結串列,ipvs 使用雜湊表; - iptables 只支援隨機、輪詢兩種負載均衡演算法而 ipvs 支援的多達 8 種; - ipvs 還支援 realserver 執行狀況檢查、連線重試、埠對映、會話保持等功能。 ### IPVS用法 IPVS可以通過ipvsadm 命令進行配置,如-L列舉,-A新增,-D刪除。 如下命令建立一個service例項`172.17.0.1:32016`,`-t`指定監聽的為`TCP`埠,`-s`指定演算法為輪詢演算法rr(Round Robin),ipvs支援簡單輪詢(rr)、加權輪詢(wrr)、最少連線(lc)、源地址或者目標地址雜湊(sh、dh)等10種排程演算法。 ``` ipvsadm -A -t 172.17.0.1:32016 -s rr ``` 在新增排程演算法的時候還需要用-r指定server地址,-w指定權值,-m指定轉發模式,-m設定masquerading表示NAT模式(-g為`gatewaying`,即直連路由模式),如下所示: ``` ipvsadm -a -t 172.17.0.1:32016 -r 10.244.1.2:8080 -m -w 1 ipvsadm -a -t 172.17.0.1:32016 -r 10.244.1.3:8080 -m -w 1 ipvsadm -a -t 172.17.0.1:32016 -r 10.244.3.2:8080 -m -w 1 ``` ### Service ClusterIP原理 不清楚iptables呼叫鏈的同學可以先看看:https://www.zsythink.net/archives/1199瞭解一下。 我們這裡使用上一篇HPA的一個例子: ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: hpatest spec: replicas: 1 selector: matchLabels: app: hpatest template: metadata: labels: app: hpatest spec: containers: - name: hpatest image: nginx imagePullPolicy: IfNotPresent command: ["/bin/sh"] args: ["-c","/usr/sbin/nginx; while true;do echo `hostname -I` > /usr/share/nginx/html/index.html; sleep 120;done"] ports: - containerPort: 80 resources: requests: cpu: 1m memory: 100Mi limits: cpu: 3m memory: 400Mi --- apiVersion: v1 kind: Service metadata: name: hpatest-svc spec: selector: app: hpatest ports: - port: 80 targetPort: 80 protocol: TCP ``` 建立好之後,看看svc: ```sh # kubectl get svc NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE hpatest-svc ClusterIP 10.68.196.212 80/TCP 2m44s ``` 然後我們檢視ipvs配置情況: ```sh # ipvsadm -S -n | grep 10.68.196.212 -A -t 10.68.196.212:80 -s rr -a -t 10.68.196.212:80 -r 172.20.0.251:80 -m -w 1 ``` -S表示輸出所儲存的規則,-n表示以數字的形式輸出ip和埠。可以看到ipvs的LB IP為ClusterIP,演算法為rr,RS為Pod的IP。使用的模式為NAT模式。 當我們建立Service之後,kube-proxy 首先會在宿主機上建立一個虛擬網絡卡(叫作:kube-ipvs0),併為它分配 Service VIP 作為 IP 地址,如下所示: ```sh # ip addr show kube-ipvs0 7: kube-ipvs0: mtu 1500 qdisc noop state DOWN group default link/ether 12:bb:85:91:96:4d brd ff:ff:ff:ff:ff:ff ... inet 10.68.196.212/32 brd 10.68.196.212 scope global kube-ipvs0 valid_lft forever preferred_lft forever ``` 下面看看ClusterIP如何傳遞的。使用命令:**iptables -t nat -nvL**可以看到由很多Chain,ClusterIP訪問方式為: ``` PREROUTING --> KUBE-SERVICES --> KUBE-CLUSTER-IP --> INPUT --> KUBE-FIREWALL --> POSTROUTING ``` 當使用命令連結服務時: ``` curl 10.68.196.212:80 ``` 由於10.96.54.11就在本地,所以會以這個IP作為出口地址,即源IP和目標IP都是10.96.54.11,此時相當於: ``` 10.68.196.212:xxxx -> 10.68.196.212:80 ``` 然後經過ipvs,ipvs會從RS ip列中選擇其中一個Pod ip作為目標IP: ``` 10.68.196.212:xxxx -> 10.68.196.212:80 | | IPVS v 172.20.0.251:xxxx -> 172.20.0.251:80 ``` 檢視OUTPUT規則: ``` # iptables-save -A OUTPUT -m comment --comment "kubernetes service portals" -j KUBE-SERVICES -A KUBE-SERVICES ! -s 172.20.0.0/16 -m comment --comment "Kubernetes service cluster ip + port for masquerade purpose" -m set --match-set KUBE-CLUSTER-IP dst,dst -j KUBE-MARK-MASQ -A KUBE-MARK-MASQ -j MARK --set-xmark 0x4000/0x4000 ``` 如上規則的意思就是除了Pod以外訪問ClusterIP的包都打上`0x4000/0x4000`。 到了POSTROUTING鏈: ``` -A POSTROUTING -m comment --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING -A KUBE-POSTROUTING -m comment --comment "kubernetes service traffic requiring SNAT" -m mark --mark 0x4000/0x4000 -j MASQUERADE ``` 如上規則的意思就是隻要匹配mark`0x4000/0x4000`的包都做SNAT,由於172.20.0.251是從flannel.1出去的,因此源ip會改成flannel.1的ip `172.20.0.0`: ``` 10.68.196.212:xxxx -> 10.68.196.212:80 | | IPVS v 10.68.196.212:xxxx -> 172.20.0.251:80 | | MASQUERADE v 172.20.0.0:xxxx -> 172.20.0.251:80 ``` 最後通過VXLAN隧道發到Pod的Node上,轉發給Pod的veth,回包通過路由到達源Node節點,源Node節點通過之前的MASQUERADE再把目標IP還原為172.20.0.251。 ## kube-proxy ipvs 原始碼分析 ### 初始化ipvs 檔案位置:cmd/kube-proxy/app/server_others.go kube-proxy啟動的時候會呼叫NewProxyServer初始化ipvs 代理: ```go func newProxyServer( config *proxyconfigapi.KubeProxyConfiguration, cleanupAndExit bool, master string) (*ProxyServer, error) { ... //獲取代理模式userspace iptables ipvs proxyMode := getProxyMode(string(config.Mode), canUseIPVS, iptables.LinuxKernelCompatTester{}) ... //代理模式是iptables if proxyMode == proxyModeIPTables { ... // 代理模式是ipvs } else if proxyMode == proxyModeIPVS { klog.V(0).Info("Using ipvs Proxier.") //判斷是夠啟用了 ipv6 雙棧 if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) { ... } else { ... //初始化 ipvs 模式的 proxier proxier, err = ipvs.NewProxier( ... ) } ... } else { ... } return &ProxyServer{ ... }, nil } ``` NewProxyServer方法會根據proxyMode來選擇是IPVS還是IPTables,ipvs會呼叫ipvs.NewProxier方法來初始化一個proxier。 **NewProxier** ```go func NewProxier(... ) (*Proxier, error) { ... //對於 SNAT iptables 規則生成 masquerade 標記 masqueradeValue := 1 << uint(masqueradeBit) ... //設定預設排程演算法 rr if len(scheduler) == 0 { klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler) scheduler = DefaultScheduler } // healthcheck伺服器物件建立 serviceHealthServer := healthcheck.NewServiceHealthServer(hostname, recorder) ... //初始化 proxier proxier := &Proxier{ ... } //初始化 ipset 規則 proxier.ipsetList = make(map[string]*IPSet) for _, is := range ipsetInfo { proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment) } burstSyncs := 2 klog.V(2).Infof("ipvs(%s) sync params: minSyncPeriod=%v, syncPeriod=%v, burstSyncs=%d", ipt.Protocol(), minSyncPeriod, syncPeriod, burstSyncs) //初始化 syncRunner proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs) //啟動 gracefuldeleteManager proxier.gracefuldeleteManager.Run() return proxier, nil } ``` 這個方法主要做了如下幾件事: 1. 對於 SNAT iptables 規則生成 masquerade 標記; 2. 設定預設排程演算法 rr; 3. healthcheck伺服器物件建立; 4. 初始化 proxier; 5. 初始化 ipset 規則; 6. 初始化 syncRunner; 7. 啟動 gracefuldeleteManager; 這個方法在初始化syncRunner的時候設定了proxier.syncProxyRules方法作為一個引數構建了同步執行器syncRunner。 ### 呼叫同步執行器 檔案位置:cmd/kube-proxy/app/server.go ```go func (s *ProxyServer) Run() error { ... //呼叫ipvs的SyncLoop方法 go s.Proxier.SyncLoop() return <-errCh } ``` kube-proxy在啟動的時候會初始化完ProxyServer 物件後,會呼叫runLoop方法,然後呼叫到ProxyServer的Run方法中,最後呼叫ipvs的SyncLoop方法。 ```go func (proxier *Proxier) SyncLoop() { ... proxier.syncRunner.Loop(wait.NeverStop) //執行NewBoundedFrequencyRunner物件Loop } func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) { bfr.timer.Reset(bfr.maxInterval) for { select { case <-stop: bfr.stop() return case <-bfr.timer.C(): //定時器方式執行 bfr.tryRun() case <-bfr.run: //按需方式執行(傳送執行指令訊號) bfr.tryRun() } } } func (bfr *BoundedFrequencyRunner) tryRun() { bfr.mu.Lock() defer bfr.mu.Unlock() //限制條件允許執行func if bfr.limiter.TryAccept() { bfr.fn() // 重點執行部分,呼叫func,上下文來看此處就是 // 對syncProxyRules()的呼叫 bfr.lastRun = bfr.timer.Now() // 記錄執行時間 bfr.timer.Stop() bfr.timer.Reset(bfr.maxInterval) // 重設下次執行時間 klog.V(3).Infof("%s: ran, next possible in %v, periodic in %v", bfr.name, bfr.minInterval, bfr.maxInterval) return } //限制條件不允許執行,計算下次執行時間 elapsed := bfr.timer.Since(bfr.lastRun) // elapsed:上次執行時間到現在已過多久 nextPossible := bfr.minInterval - elapsed // nextPossible:下次執行至少差多久(最小週期) nextScheduled := bfr.maxInterval - elapsed // nextScheduled:下次執行最遲差多久(最大週期) klog.V(4).Infof("%s: %v since last run, possible in %v, scheduled in %v", bfr.name, elapsed, nextPossible, nextScheduled) if nextPossible < nextScheduled { bfr.timer.Stop() bfr.timer.Reset(nextPossible) klog.V(3).Infof("%s: throttled, scheduling run in %v", bfr.name, nextPossible) } } ``` SyncLoop方法會呼叫到proxier的syncRunner例項設定的syncProxyRules方法。 ### 同步執行器 syncProxyRules方法比較長,所以這裡就分開來一步步的講,跟好程式碼節奏來就好了。 程式碼位置:pkg/proxy/ipvs/proxier.go ```go //更新 service 與 endpoint變化資訊 serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges) endpointUpdateResult := proxier.endpointsMap.Update(proxier.endpointsChanges) staleServices := serviceUpdateResult.UDPStaleClusterIP // 合併 service 列表 for _, svcPortName := range endpointUpdateResult.StaleServiceNames { if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) { klog.V(2).Infof("Stale %s service %v ->
%s", strings.ToLower(string(svcInfo.Protocol())), svcPortName, svcInfo.ClusterIP().String()) staleServices.Insert(svcInfo.ClusterIP().String()) for _, extIP := range svcInfo.ExternalIPStrings() { staleServices.Insert(extIP) } } } ``` 這裡是同步與新更service和endpoints,然後 合併 service 列表。 ```go //nat鏈 proxier.natChains.Reset() //nat規則 proxier.natRules.Reset() //filter鏈 proxier.filterChains.Reset() //filter規則 proxier.filterRules.Reset() // Write table headers. writeLine(proxier.filterChains, "*filter") writeLine(proxier.natChains, "*nat") //建立kubernetes的表連線鏈資料 proxier.createAndLinkeKubeChain() ``` 這裡會重置連結串列規則,然後呼叫createAndLinkeKubeChain方法建立kubernetes的表連線鏈資料,下面我們看看createAndLinkeKubeChain方法: ```go func (proxier *Proxier) createAndLinkeKubeChain() { //通過iptables-save獲取現有的filter和NAT表存在的鏈資料 existingFilterChains := proxier.getExistingChains(proxier.filterChainsData, utiliptables.TableFilter) existingNATChains := proxier.getExistingChains(proxier.iptablesData, utiliptables.TableNAT) // Make sure we keep stats for the top-level chains //裡面儲存了NAT錶鏈和Filter錶鏈 // NAT錶鏈: KUBE-SERVICES / KUBE-POSTROUTING / KUBE-FIREWALL // KUBE-NODE-PORT / KUBE-LOAD-BALANCER / KUBE-MARK-MASQ // Filter錶鏈: KUBE-FORWARD for _, ch := range iptablesChains { //不存在則建立鏈,建立頂層鏈 if _, err := proxier.iptables.EnsureChain(ch.table, ch.chain); err != nil { klog.Errorf("Failed to ensure that %s chain %s exists: %v", ch.table, ch.chain, err) return } //nat表寫鏈 if ch.table == utiliptables.TableNAT { if chain, ok := existingNATChains[ch.chain]; ok { writeBytesLine(proxier.natChains, chain) } else { writeLine(proxier.natChains, utiliptables.MakeChainLine(kubePostroutingChain)) } // filter表寫鏈 } else { if chain, ok := existingFilterChains[KubeForwardChain]; ok { writeBytesLine(proxier.filterChains, chain) } else { writeLine(proxier.filterChains, utiliptables.MakeChainLine(KubeForwardChain)) } } } // 預設鏈下建立kubernete服務專用跳轉規則 // iptables -I OUTPUT -t nat --comment "kubernetes service portals" -j KUBE-SERVICES // iptables -I PREROUTING -t nat --comment "kubernetes service portals" -j KUBE-SERVICES // iptables -I POSTROUTING -t nat --comment "kubernetes postrouting rules" -j KUBE-POSTROUTING // iptables -I FORWARD -t filter --comment "kubernetes forwarding rules" -j KUBE-FORWARD for _, jc := range iptablesJumpChain { args := []string{"-m", "comment", "--comment", jc.comment, "-j", string(jc.to)} if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, jc.table, jc.from, args...); err != nil { klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", jc.table, jc.from, jc.to, err) } } } ``` createAndLinkeKubeChain方法首先會獲取現存的filter和NAT表,然後再遍歷iptablesChains。 iptablesChains裡面儲存了NAT錶鏈和Filter錶鏈:NAT錶鏈 KUBE-SERVICES / KUBE-POSTROUTING / KUBE-FIREWALL KUBE-NODE-PORT / KUBE-LOAD-BALANCER / KUBE-MARK-MASQ;Filter錶鏈 KUBE-FORWARD; 然後再根據iptablesJumpChain建立跳轉規則。 下面回到syncProxyRules往下走。 ```go // 建立 dummy interface kube-ipvs0 _, err = proxier.netlinkHandle.EnsureDummyDevice(DefaultDummyDevice) if err != nil { klog.Errorf("Failed to create dummy interface: %s, error: %v", DefaultDummyDevice, err) return } // 建立預設的 ipset 規則,http://ipset.netfilter.org/ for _, set := range proxier.ipsetList { if err := ensureIPSet(set); err != nil { return } set.resetEntries() } ``` 設定預設Dummy介面,並確定ipsets規則已存在的集合,ipset相關可以看:http://ipset.netfilter.org/。 下面會遍歷proxier.serviceMap,對每一個服務建立 ipvs 規則,比較長,也分開說。 ```go for svcName, svc := range proxier.serviceMap { ... //基於此服務的有效endpoint列表,更新KUBE-LOOP-BACK的ipset集,以備後面生成相應iptables規則(SNAT偽裝地址) for _, e := range proxier.endpointsMap[svcName] { ep, ok := e.(*proxy.BaseEndpointInfo) if !ok { klog.Errorf("Failed to cast BaseEndpointInfo %q", e.String()) continue } if !ep.IsLocal { continue } epIP := ep.IP() epPort, err := ep.Port() // Error parsing this endpoint has been logged. Skip to next endpoint. if epIP == "" || err != nil { continue } entry := &utilipset.Entry{ IP: epIP, Port: epPort, Protocol: protocol, IP2: epIP, SetType: utilipset.HashIPPortIP, } // 校驗KUBE-LOOP-BACK集合entry記錄項 if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoopBackIPSet].Name)) continue } // 插入此entry記錄至active記錄佇列 proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String()) } ... } ``` 這一段是根據有效endpoint列表,更新KUBE-LOOP-BACK的ipset集,以備後面生成相應iptables規則(SNAT偽裝地址); ```go for svcName, svc := range proxier.serviceMap { ... //構建ipset entry entry := &utilipset.Entry{ IP: svcInfo.ClusterIP().String(), Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } // add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin. // proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String()) // 型別校驗ipset entry if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeClusterIPSet].Name)) continue } // 名為KUBE-CLUSTER-IP的ipset集插入entry,以備後面統一生成IPtables規則 proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String()) // ipvs call // 構建ipvs虛擬伺服器VS服務物件 serv := &utilipvs.VirtualServer{ Address: svcInfo.ClusterIP(), Port: uint16(svcInfo.Port()), Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } // Set session affinity flag and timeout for IPVS service // 設定IPVS服務的會話保持標誌和超時時間 if svcInfo.SessionAffinityType() == v1.ServiceAffinityClientIP { serv.Flags |= utilipvs.FlagPersistent serv.Timeout = uint32(svcInfo.StickyMaxAgeSeconds()) } // We need to bind ClusterIP to dummy interface, so set `bindAddr` parameter to `true` in syncService() // 將clusterIP繫結至dummy虛擬介面上,syncService()處理中需置bindAddr地址為True if err := proxier.syncService(svcNameString, serv, true, bindedAddresses); err == nil { activeIPVSServices[serv.String()] = true activeBindAddrs[serv.Address.String()] = true // ExternalTrafficPolicy only works for NodePort and external LB traffic, does not affect ClusterIP // So we still need clusterIP rules in onlyNodeLocalEndpoints mode. //同步endpoints資訊,IPVS為VS更新realServer if err := proxier.syncEndpoint(svcName, false, serv); err != nil { klog.Errorf("Failed to sync endpoint for service: %v, err: %v", serv, err) } } else { klog.Errorf("Failed to sync service: %v, err: %v", serv, err) } ... } ``` ipset集KUBE-CLUSTER-IP更新,以備後面生成相應iptables規則。 ```go for svcName, svc := range proxier.serviceMap { ... //為 load-balancer型別建立 ipvs 規則 for _, ingress := range svcInfo.LoadBalancerIPStrings() { if ingress != "" { // 構建ipset entry entry = &utilipset.Entry{ IP: ingress, Port: svcInfo.Port(), Protocol: protocol, SetType: utilipset.HashIPPort, } if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSet].Name)) continue } //KUBE-LOAD-BALANCER ipset集更新 proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String()) //服務指定externalTrafficPolicy=local時,KUBE-LOAD-BALANCER-LOCAL ipset集更新 if svcInfo.OnlyNodeLocalEndpoints() { if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerLocalSet].Name)) continue } proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String()) } // 服務的LoadBalancerSourceRanges被指定時,基於源IP保護的防火牆策略開啟,KUBE-LOAD-BALANCER-FW ipset集更新 if len(svcInfo.LoadBalancerSourceRanges()) != 0 { if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadbalancerFWSet].Name)) continue } proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String()) allowFromNode := false for _, src := range svcInfo.LoadBalancerSourceRanges() { // ipset call entry = &utilipset.Entry{ IP: ingress, Port: svcInfo.Port(), Protocol: protocol, Net: src, SetType: utilipset.HashIPPortNet, } // 列舉所有源CIDR白名單列表,KUBE-LOAD-BALANCER-SOURCE-CIDR ipset集更新 //cidr:https://cloud.google.com/kubernetes-engine/docs/how-to/flexible-pod-cidr if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid { klog.Errorf("%s", fmt.Sprintf(EntryInvalidErr, entry, proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)) continue } proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String()) // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) if cidr.Contains(proxier.nodeIP) { allowFromNode = true } } ... } // ipvs call // 構建ipvs 虛擬主機物件 serv := &utilipvs.VirtualServer{ Address: net.ParseIP(ingress), Port: uint16(svcInfo.Port()), Protocol: string(svcInfo.Protocol()), Scheduler: proxier.ipvsScheduler, } ... } } ... } ``` 這裡為 load-balancer型別建立 ipvs 規則,LoadBalancerSourceRanges和externalTrafficPolicy=local被指定時將對KUBE-LOAD-BALANCER-LOCAL、KUBE-LOAD-BALANCER-FW、KUBE-LOAD-BALANCER-SOURCE-CIDR、KUBE-LOAD-BALANCER-SOURCE-IP ipset集更新,以備後面生成相應iptables規則。 ```go ... //同步 ipset 記錄,清理 conntrack for _, set := range proxier.ipsetList { set.syncIPSetEntries() } //建立 iptables 規則資料 proxier.writeIptablesRules() // 合併iptables規則 proxier.iptablesData.Reset() proxier.iptablesData.Write(proxier.natChains.Bytes()) proxier.iptablesData.Write(proxier.natRules.Bytes()) proxier.iptablesData.Write(proxier.filterChains.Bytes()) proxier.iptablesData.Write(proxier.filterRules.Bytes()) klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes()) //基於iptables格式化規則資料,使用iptables-restore重新整理iptables規則 err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters) ... ``` 最後這裡會重新整理iptables 規則,然後建立 iptables 規則資料,將合併iptables規則,iptables-restore重新整理iptables規則。 ## 總結 這一篇沒有怎麼講service是怎麼執行的,怎麼使用的,而是選擇講了kube-proxy的ipvs代理是怎麼做的,以及在開頭講了ipvs與iptables區別與關係,看不懂的同學需要自己去補充一下iptables相關的知識,文中的 ipvs 的知識我也是現學的,如果有講解不好的地方歡迎指出。 ## Reference https://kubernetes.io/docs/tasks/administer-cluster/dns-debugging-resolution/ https://kubernetes.io/docs/tasks/debug-application-cluster/debug-service/ https://kubernetes.io/docs/concepts/services-networking/service/ https://cloud.google.com/kubernetes-engine/docs/how-to/flexible-pod-cidr https://github.com/kubernetes/kubernetes/tree/master/pkg/proxy/ipvs https://zh.wikipedia.org/zh-cn/%E7%BD%91%E7%BB%9C%E5%9C%B0%E5%9D%80%E8%BD%AC%E6%8D%A2 https://segmentfault.com/a/1190000009043962 http://ipset.netfilt