1. 程式人生 > >【kubernetes/k8s原始碼分析】kube-dns 原始碼解析之kubedns

【kubernetes/k8s原始碼分析】kube-dns 原始碼解析之kubedns

https://github.com/kubernetes/dns

結構和kubernetes的程式碼結構類似:首先看cmd/kube-dns/dns.go

啟動引數例子: /kube-dns --domain=cluster.local. --dns-port=10053 --config-dir=/kube-dns-config --v=2

kubedns程序的功能:

  • 接入SkyDNS,為dnsmasq提供查詢服務
  • 替換etcd容器,使用樹形結構在記憶體中儲存DNS記錄
  • 通過K8S API監視Service資源endpoint變化並更新DNS記錄
  • 服務10053埠

一 kube-dns啟動流程

1 main函式

主要函式為NewKubeDNSServerDefault(3節講解),以及 Run函式,構造server,並執行Run方法(4節講解)

func main() {
   config := options.NewKubeDNSConfig()
   config.AddFlags(pflag.CommandLine)

   flag.InitFlags()
   // Convinces goflags that we have called Parse() to avoid noisy logs.
   // OSS Issue: kubernetes/kubernetes#17162.
goflag.CommandLine.Parse([]string{})
   logs.InitLogs()
   defer logs.FlushLogs
() version.PrintAndExitIfRequested() glog.V(0).Infof("version: %+v", version.VERSION) server := app.NewKubeDNSServerDefault(config) server.Run() }

2 NewKubeDNSConfig函式

生成一個預設配置的KubeDNSConfig結構體,包括端括8081和53等預設配置,建立在kube-system名稱空間

func NewKubeDNSConfig() *KubeDNSConfig {
   return &KubeDNSConfig{
      ClusterDomain:      "cluster.local."
, HealthzPort: 8081, DNSBindAddress: "0.0.0.0", DNSPort: 53, InitialSyncTimeout: 60 * time.Second, Federations: make(map[string]string), ConfigMapNs: api.NamespaceSystem, ConfigMap: "", // default to using command line flags ConfigPeriod: 10 * time.Second, ConfigDir: "", NameServers: "", } }

3 NewKubeDNSServerDefault函式

  • 建立kubernetes客戶端
  • config配置configMap與configDir不能同時設定
  • 根據啟動引數為configDir,NewFileSync建立了檔案同步方式(3.2)
  • NewKubeDNS設定KubeDNS結構(3.1)
func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
   kubeClient, err := newKubeClient(config)

   var configSync dnsconfig.Sync
   switch {
   case config.ConfigMap != "" && config.ConfigDir != "":
      glog.Fatal("Cannot use both ConfigMap and ConfigDir")
   case config.ConfigDir != "":
      glog.V(0).Infof("Using configuration read from directory: %v with period %v", config.ConfigDir, config.ConfigPeriod)
      configSync = dnsconfig.NewFileSync(config.ConfigDir, config.ConfigPeriod)
   }

   return &KubeDNSServer{
      domain:         config.ClusterDomain,
healthzPort:    config.HealthzPort,
dnsBindAddress: config.DNSBindAddress,
dnsPort:        config.DNSPort,
nameServers:    config.NameServers,
kd:             dns.NewKubeDNS(kubeClient, config.ClusterDomain, config.InitialSyncTimeout, configSync),
}
}

3.1 NewKubeDNS函式

  • 建立KubeDNS結構體,包含kubernetes客戶端,初始同步超時時間
  • setEndpointsStore(3.1.1)
  • setServicesStore(3.1.2)
func NewKubeDNS(client clientset.Interface, clusterDomain string, timeout time.Duration, configSync config.Sync) *KubeDNS {
   kd := &KubeDNS{
      kubeClient:          client,
domain:              clusterDomain,
cache:               treecache.NewTreeCache(),
cacheLock:           sync.RWMutex{},
nodesStore:          kcache.NewStore(kcache.MetaNamespaceKeyFunc),
reverseRecordMap:    make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*v1.Service),
domainPath:          util.ReverseArray(strings.Split(strings.TrimRight(clusterDomain, "."), ".")),
initialSyncTimeout:  timeout,
configLock: sync.RWMutex{},
configSync: configSync,
}

   kd.setEndpointsStore()
   kd.setServicesStore()

   return kd
}

3.1.1 setEndpointsStore函式

  • 啟動endpoint的list-watch機制
func (kd *KubeDNS) setEndpointsStore() {
   // Returns a cache.ListWatch that gets all changes to endpoints.
kd.endpointsStore, kd.endpointsController = kcache.NewInformer(
      kcache.NewListWatchFromClient(
         kd.kubeClient.Core().RESTClient(),
"endpoints",
v1.NamespaceAll,
fields.Everything()),
&v1.Endpoints{},
resyncPeriod,
kcache.ResourceEventHandlerFuncs{
         AddFunc:    kd.handleEndpointAdd,
UpdateFunc: kd.handleEndpointUpdate,
// If Service is named headless need to remove the reverse dns entries.
DeleteFunc: kd.handleEndpointDelete,
},
)
}

3.1.2 setEndpointsStore函式

  • 啟動endpoint的list-watch機制
func (kd *KubeDNS) setServicesStore() {
   // Returns a cache.ListWatch that gets all changes to services.
kd.servicesStore, kd.serviceController = kcache.NewInformer(
      kcache.NewListWatchFromClient(
         kd.kubeClient.Core().RESTClient(),
"services",
v1.NamespaceAll,
fields.Everything()),
&v1.Service{},
resyncPeriod,
kcache.ResourceEventHandlerFuncs{
         AddFunc:    kd.newService,
DeleteFunc: kd.removeService,
UpdateFunc: kd.updateService,
},
)
}

3.2 NewFileSync函式

  • 定期檢查檔案是否被修改
// NewFileSync returns a Sync that scans the given dir periodically for config data
func NewFileSync(dir string, period time.Duration) Sync {
   return newSync(newFileSyncSource(dir, period, clock.RealClock{}))
}

// newFileSyncSource returns a syncSource that scans the given dir periodically as determined by the specified clock
func newFileSyncSource(dir string, period time.Duration, clock clock.Clock) syncSource {
   return &kubeFileSyncSource{
      dir:     dir,
clock:   clock,
period:  period,
channel: make(chan syncResult),
}
}
func (sync *kubeSync) Periodic() <-chan *Config {
   go func() {
      resultChan := sync.syncSource.Periodic()
      for {
         syncResult := <-resultChan
         config, changed, err := sync.processUpdate(syncResult, false)
         if err != nil {
            continue
}
         if !changed {
            continue
}
         sync.channel <- config
      }
   }()
   return sync.channel
}

4 Run函式

  • 主要時中間四個函式
  • setupSingalHandlers(4.1講解)
  • startSkyDNSServer(4.2講解)
  • Start(4.3講解)
  • setupHandlers(4.4講解)
func (server *KubeDNSServer) Run() {
   pflag.VisitAll(func(flag *pflag.Flag) {
      glog.V(0).Infof("FLAG: --%s=%q", flag.Name, flag.Value)
   })
   setupSignalHandlers()
   server.startSkyDNSServer()
   server.kd.Start()
   server.setupHandlers()

   glog.V(0).Infof("Status HTTP port %v", server.healthzPort)
   if server.nameServers != "" {
      glog.V(0).Infof("Upstream nameservers: %s", server.nameServers)
   }
   glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", server.healthzPort), nil))
}

4.1 setupSingnalHandlers函式

  • 忽略SIGINT和SIGTERM訊號,除非SIGKILL除外
// setupSignalHandlers installs signal handler to ignore SIGINT and
// SIGTERM. This daemon will be killed by SIGKILL after the grace
// period to allow for some manner of graceful shutdown.
func setupSignalHandlers() {
   sigChan := make(chan os.Signal)
   signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
   go func() {
      for {
         glog.V(0).Infof("Ignoring signal %v (can only be terminated by SIGKILL)", <-sigChan)
         glog.Flush()
      }
   }()
}

4.2 startSkyDNSServer函式

  • 啟動skyDNS服務
func (d *KubeDNSServer) startSkyDNSServer() {
   glog.V(0).Infof("Starting SkyDNS server (%v:%v)", d.dnsBindAddress, d.dnsPort)
   skydnsConfig := &server.Config{
      Domain:  d.domain,
DnsAddr: fmt.Sprintf("%s:%d", d.dnsBindAddress, d.dnsPort),
}
   server.SetDefaults(skydnsConfig)
   s := server.New(d.kd, skydnsConfig)
   if err := metrics.Metrics(); err != nil {
      glog.Fatalf("Skydns metrics error: %s", err)
   } else if metrics.Port != "" {
      glog.V(0).Infof("Skydns metrics enabled (%v:%v)", metrics.Path, metrics.Port)
   } else {
      glog.V(0).Infof("Skydns metrics not enabled")
   }

   d.kd.SkyDNSConfig = skydnsConfig
   go s.Run()
}

4.2.1 Run函式

  • 啟動並阻塞式監聽,主要是ServeDNS函式位於路徑vendor/github.com/skynetservices/skydns/server/server.go:132行
  • func (s *server) ServeDNS(w dns.ResponseWriter, req *dns.Msg) (第四章講解)
// Run is a blocking operation that starts the server listening on the DNS ports.
func (s *server) Run() error {
   mux := dns.NewServeMux()
   mux.Handle(".", s)

   if s.config.Systemd {
     ....
   } else {
      s.group.Add(1)
      go func() {
         defer s.group.Done()
         if err := dns.ListenAndServe(s.config.DnsAddr, "tcp", mux); err != nil {
            fatalf("%s", err)
         }
      }()
      dnsReadyMsg(s.config.DnsAddr, "tcp")
      s.group.Add(1)
      go func() {
         defer s.group.Done()
         if err := dns.ListenAndServe(s.config.DnsAddr, "udp", mux); err != nil {
            fatalf("%s", err)
         }
      }()
      dnsReadyMsg(s.config.DnsAddr, "udp")
   }

   s.group.Wait()
   return nil
}

4.3 Start函式

  • 非同步啟動list-watch的endpoint,service
func (kd *KubeDNS) Start() {
   glog.V(2).Infof("Starting endpointsController")
   go kd.endpointsController.Run(wait.NeverStop)

   glog.V(2).Infof("Starting serviceController")
   go kd.serviceController.Run(wait.NeverStop)

   kd.startConfigMapSync()

   // Wait synchronously for the initial list operations to be
   // complete of endpoints and services from APIServer.
kd.waitForResourceSyncedOrDie()
}

4.4 setupHandlers函式

  • 建立/readiness /cache handler
func (server *KubeDNSServer) setupHandlers() {
   glog.V(0).Infof("Setting up Healthz Handler (/readiness)")
   http.HandleFunc("/readiness", func(w http.ResponseWriter, req *http.Request) {
      fmt.Fprintf(w, "ok\n")
   })

   glog.V(0).Infof("Setting up cache handler (/cache)")
   http.HandleFunc("/cache", func(w http.ResponseWriter, req *http.Request) {
      serializedJSON, err := server.kd.GetCacheAsJSON()
      if err == nil {
         fmt.Fprint(w, serializedJSON)
      } else {
         w.WriteHeader(http.StatusInternalServerError)
         fmt.Fprint(w, err)
      }
   })
}

二 資源同步資訊

1 資源同步資訊

DNS是提供service域名解析,將service的域名解析到clusterIP就可以

  • endpoint同步,主要針對headless service不依賴k8s負載均衡,直接對映到後端容器endpoint,
kcache.ResourceEventHandlerFuncs{
   AddFunc:    kd.handleEndpointAdd,
UpdateFunc: kd.handleEndpointUpdate,
// If Service is named headless need to remove the reverse dns entries.
DeleteFunc: kd.handleEndpointDelete,
},
  • service同步,主要針對headless service不依賴k8s負載均衡,直接對映到後端容器endpoint,
kcache.ResourceEventHandlerFuncs{
   AddFunc:    kd.newService,
DeleteFunc: kd.removeService,
UpdateFunc: kd.updateService,
},

1.1 updateService函式

  • 建立新的service就會呼叫newService函式(1.1.1講解)
func (kd *KubeDNS) updateService(oldObj, newObj interface{}) {
   if new, ok := assertIsService(newObj); ok {
      if old, ok := assertIsService(oldObj); ok {
         // Remove old cache path only if changing type to/from ExternalName.
         // In all other cases, we'll update records in place.
if (new.Spec.Type == v1.ServiceTypeExternalName) !=
            (old.Spec.Type == v1.ServiceTypeExternalName) {
            kd.removeService(oldObj)
         }
         kd.newService(newObj)
      }
   }
}

1.1.1 newService函式

  • 外部服務返回CNAME records
  • 沒有設定clusterIP,建立headless service
  • newPortalService(1.1.1.1講解)
func (kd *KubeDNS) newService(obj interface{}) {
   if service, ok := assertIsService(obj); ok {
      // ExternalName services are a special kind that return CNAME records
if service.Spec.Type == v1.ServiceTypeExternalName {
         kd.newExternalNameService(service)
         return
}
      // if ClusterIP is not set, a DNS entry should not be created
if !v1.IsServiceIPSet(service) {
         if err := kd.newHeadlessService(service); err != nil {
            glog.Errorf("Could not create new headless service %v: %v", service.Name, err)
         }
         return
}
      kd.newPortalService(service)
   }
}

1.1.1.1 newPortalService函式

  • treecache替換原來的etcd功能,逆向儲存service的是一個數據結構
  • SetEntry以及fqdn函式(第三章講解)
func (kd *KubeDNS) newPortalService(service *v1.Service) {
   subCache := treecache.NewTreeCache()
   recordValue, recordLabel := util.GetSkyMsg(service.Spec.ClusterIP, 0)
   subCache.SetEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel))

   // Generate SRV Records
for i := range service.Spec.Ports {
      port := &service.Spec.Ports[i]
      if port.Name != "" && port.Protocol != "" {
         srvValue := kd.generateSRVRecordValue(service, int(port.Port))

         l := []string{"_" + strings.ToLower(string(port.Protocol)), "_" + port.Name}
         glog.V(3).Infof("Added SRV record %+v", srvValue)

         subCache.SetEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...)
      }
   }
   subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
   host := getServiceFQDN(kd.domain, service)
   reverseRecord, _ := util.GetSkyMsg(host, 0)

   kd.cacheLock.Lock()
   defer kd.cacheLock.Unlock()
   kd.cache.SetSubCache(service.Name, subCache, subCachePath...)
   kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
   kd.clusterIPServiceMap[service.Spec.ClusterIP] = service
}

1.2 removeService函式

  • 刪除parent以及下面所有節點,對map操作
func (cache *treeCache) DeletePath(path ...string) bool {
   if len(path) == 0 {
      return false
}
   if parentNode := cache.getSubCache(path[:len(path)-1]...); parentNode != nil {
      name := path[len(path)-1]
      if _, ok := parentNode.ChildNodes[name]; ok {
         delete(parentNode.ChildNodes, name)
         return true
}
      // ExternalName services are stored with their name as the leaf key
if _, ok := parentNode.Entries[name]; ok {
         delete(parentNode.Entries, name)
         return true
}
   }
   return false
}

三 treecache實現


1 結構體

type treeCache struct {
   ChildNodes map[string]*treeCache
   Entries    map[string]interface{}
}

func NewTreeCache() TreeCache {
   return &treeCache{
      ChildNodes: make(map[string]*treeCache),
Entries:    make(map[string]interface{}),
}
}

2 fqdn函式

  • 例如cluster.local svc kube-system dashborad,逆序以.分割
  • 呼叫Fqdn加入到treecache裡(2.1講解)
// fqdn constructs the fqdn for the given service. subpaths is a list of path
// elements rooted at the given service, ending at a service record.
func (kd *KubeDNS) fqdn(service *v1.Service, subpaths ...string) string {
   domainLabels := append(append(kd.domainPath, serviceSubdomain, service.Namespace, service.Name), subpaths...)
   return dns.Fqdn(strings.Join(util.ReverseArray(domainLabels), "."))
}

2.1 Fqdn函式

  • 主要針對是.

3 SetEntry函式

  • key,value加入到treecache中,key是設定的service結構體的hash值,value結構體值
func (cache *treeCache) SetEntry(key string, val *skymsg.Service, fqdn string, path ...string) {
   // TODO: Consolidate setEntry and setSubCache into a single method with a
// type switch.
   // TODO: Instead of passing the fqdn as an argument, we can reconstruct
// it from the path, provided callers always pass the full path to the
   // object. This is currently *not* the case, since callers first create
   // a new, empty node, populate it, then parent it under the right path.
   // So we don't know the full key till the final parenting operation.
node := cache.ensureChildNode(path...)

   // This key is used to construct the "target" for SRV record lookups.
   // For normal service/endpoint lookups, this will result in a key like:
   // /skydns/local/cluster/svc/svcNS/svcName/record-hash
   // but for headless services that govern pods requesting a specific
   // hostname (as used by petset), this will end up being:
   // /skydns/local/cluster/svc/svcNS/svcName/pod-hostname
val.Key = skymsg.Path(fqdn)
   node.Entries[key] = val
}

四 ServeDNS函式

主要根據型別進行處理

// ServeDNS is the handler for DNS requests, responsible for parsing DNS request, possibly forwarding
// it to a real dns server and returning a response.
func (s *server) ServeDNS(w dns.ResponseWriter, req *dns.Msg) {
   m := new(dns.Msg)
   m.SetReply(req)
   m.Authoritative = true
m.RecursionAvailable = true
m.Compress = true