kubernetes的Controller-manager的工作过程

Tags: kubernetes 

目录

启动

controller-manager的启动过程比较简单。完成了Leader选举之后,leader运行run函数,启动多个controller。

k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go:

run := func(stop <-chan struct{}) {
	rootClientBuilder := controller.SimpleControllerClientBuilder{
		ClientConfig: kubeconfig,
	}
	var clientBuilder controller.ControllerClientBuilder
	if len(s.ServiceAccountKeyFile) > 0 && s.UseServiceAccountCredentials {
		clientBuilder = controller.SAControllerClientBuilder{
			ClientConfig:         restclient.AnonymousClientConfig(kubeconfig),
			CoreClient:           kubeClient.Core(),
			AuthenticationClient: kubeClient.Authentication(),
			Namespace:            "kube-system",
		}
	} else {
		clientBuilder = rootClientBuilder
	}
	
	err := StartControllers(newControllerInitializers(), s, rootClientBuilder, clientBuilder, stop)
	glog.Fatalf("error running controllers: %v", err)
	panic("unreachable")
}

Controllers

k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go:

func newControllerInitializers() map[string]InitFunc {
	controllers := map[string]InitFunc{}
	controllers["endpoint"] = startEndpointController
	controllers["replicationcontroller"] = startReplicationController
	controllers["podgc"] = startPodGCController
	controllers["resourcequota"] = startResourceQuotaController
	controllers["namespace"] = startNamespaceController
	controllers["serviceaccount"] = startServiceAccountController
	controllers["garbagecollector"] = startGarbageCollectorController
	controllers["daemonset"] = startDaemonSetController
	controllers["job"] = startJobController
	controllers["deployment"] = startDeploymentController
	controllers["replicaset"] = startReplicaSetController
	controllers["horizontalpodautoscaling"] = startHPAController
	controllers["disruption"] = startDisruptionController
	controllers["statefuleset"] = startStatefulSetController
	controllers["cronjob"] = startCronJobController
	controllers["certificatesigningrequests"] = startCSRController
	controllers["ttl"] = startTTLController
	controllers["bootstrapsigner"] = startBootstrapSignerController
	controllers["tokencleaner"] = startTokenCleanerController
	
	return controllers
}

每个controller承担不同的工作,譬如EndpointController负责维护Service的endpoint(service对应的pod的IP),具体内容还是看代码为好,这里不展开了。

StartControllers

k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go:

func StartControllers(controllers map[string]InitFunc, s *options.CMServer, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) error {
	versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
	sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
	...
	ctx := ControllerContext{
		ClientBuilder:      clientBuilder,
		InformerFactory:    sharedInformers,
		Options:            *s,
		AvailableResources: availableResources,
		Stop:               stop,
	}

versionedClient被用来创建sharedInformers,sharedInformers又被用来创建ControllerContext。

启动具体的controller的时候,又会传入ControllerContext,这里以endpointController为例子:

k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go:

func startEndpointController(ctx ControllerContext) (bool, error) {
	go endpointcontroller.NewEndpointController(
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.InformerFactory.Core().V1().Services(),
		ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
	).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop)
	return true, nil
}

在了解Run()中做了哪些事情之前,先看一下传入参数,NewEndpointController的传入参数。

ctx.InformerFactory,即sharedInformers

ctx.InformerFactory就是变量sharedInformers,sharedInformers实现了接口SharedInformerFactory。

k8s.io/kubernetes/cmd/kube-controller-manager/app/controllermanager.go:

sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

startEndpointController中调用了ctx.InformerFactory.Core().V1().Pods(),依次看着这些方法里做了什么事情。

sharedInformers.Core(), k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/factory.go:

func (f *sharedInformerFactory) Core() core.Interface {
	return core.New(f)
}

core.New(f), k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/interface.go

func New(f internalinterfaces.SharedInformerFactory) Interface {
	return &group{f}
}

group, k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/interface.go:

type group struct {
	internalinterfaces.SharedInformerFactory
}

ctx.InformerFactory.Core()执行后就是创建了一个group类型的变量,唯一成员就调用者sharedInformers,而在随后的V1()中,又使用了v1.New()

InformerFactory.Core().V1(),k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/interface.go:

func (g *group) V1() v1.Interface {
	return v1.New(g.SharedInformerFactory)
}

v1.New(),k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1/interface.go:

func New(f internalinterfaces.SharedInformerFactory) Interface {
	return &version{f}
}

继续传递sharedInformers,直到创建了类型为version的变量,version变量实现了Interface接口

k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1/interface.go:

type Interface interface {
	ComponentStatuses() ComponentStatusInformer
	ConfigMaps() ConfigMapInformer
	Endpoints() EndpointsInformer
	Events() EventInformer
	LimitRanges() LimitRangeInformer
	Pods() PodInformer
	...

通过version的成员方法,可以得到类型更为具体的Informer,例如ctx.InformerFactory.Core().V1().Pods():

k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1/interface.go:

func (v *version) Pods() PodInformer {
	return &podInformer{factory: v.SharedInformerFactory}
}

一言而概之,这个过程就是以sharedInformers为传入参数,生成更为具体的Informer,后续从更为具体的Informer中获取信息。

podInformer

以podInformer为例子大概了解一下Informer。

k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1/pod.go:

type podInformer struct {
	factory internalinterfaces.SharedInformerFactory
}

--podInformer : struct
    [fields]
   -factory : internalinterfaces.SharedInformerFactory
    [methods]
   +Informer() : cache.SharedIndexInformer
   +Lister() : v1.PodLister

k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1/pod.go:

func (f *podInformer) Lister() v1.PodLister {
	return v1.NewPodLister(f.Informer().GetIndexer())
}

func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&api_v1.Pod{}, newPodInformer)
}

注意f.factory就是前面一开始创的变量sharedInformers:

k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/factory.go:

func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()

	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}
	informer = newFunc(f.client, f.defaultResync)
	f.informers[informerType] = informer

	return informer
}

newFunc就是newPodInformer,而f.client就是一开始就创建的versionedClient

newPodInformer,k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1/pod.go:

func newPodInformer(client clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
	sharedIndexInformer := cache.NewSharedIndexInformer(
		&cache.ListWatch{
			ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
				return client.CoreV1().Pods(meta_v1.NamespaceAll).List(options)
			},
			WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
				return client.CoreV1().Pods(meta_v1.NamespaceAll).Watch(options)
			},
		},
		&api_v1.Pod{},
		resyncPeriod,
		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
	)
	
	return sharedIndexInformer
}

PodInformer就是一个类型为sharedIndexInformer的变量,它的ListFunc和WatchFunc只通过client获取pod相关的信息。

EndpointController

以EndpointController为例,说明controller是如何使用Informer。

创建EndpointController的时候,传入具体类型的Informer,例如EndpoingController的创建:

k8s.io/kubernetes/cmd/kube-controller-manager/app/core.go

func startEndpointController(ctx ControllerContext) (bool, error) {
	go endpointcontroller.NewEndpointController(
		ctx.InformerFactory.Core().V1().Pods(),
		ctx.InformerFactory.Core().V1().Services(),
		ctx.ClientBuilder.ClientOrDie("endpoint-controller"),
	).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop)
	return true, nil
}

NewEndpointController(),k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go:

func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, client clientset.Interface) *EndpointController {
	if client != nil && client.Core().RESTClient().GetRateLimiter() != nil {
		metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().RESTClient().GetRateLimiter())
	}
	...
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    e.addPod,
		UpdateFunc: e.updatePod,
		DeleteFunc: e.deletePod,
	})
	e.podLister = podInformer.Lister()
	e.podsSynced = podInformer.Informer().HasSynced
	...

可以看到在podInformer中注入了事件响应方法,e.addPode.updatePode.deletePod,这三个方法负责维护EndpointController中的数据。

例入addPod,当有新的pod时候,addPod会找到这个pod对应的services,把service作为key写入queue。

k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go

func (e *EndpointController) addPod(obj interface{}) {
	pod := obj.(*v1.Pod)
	services, err := e.getPodServiceMemberships(pod)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err))
		return
	}
	for key := range services {
		e.queue.Add(key)
	}
}

EndpointController的主工作流程就会从queue中取出受到影响的service,完成对应的处理:

EndpointController主流程

Run(),k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go:

func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
	...
	for i := 0; i < workers; i++ {
		go wait.Until(e.worker, time.Second, stopCh)
	}
	...
}

worker(),k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go:

func (e *EndpointController) worker() {
	for e.processNextWorkItem() {
	}
}

processNextWorkItem(),k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go:

func (e *EndpointController) processNextWorkItem() bool {
	eKey, quit := e.queue.Get()
	if quit {
		return false
	}
	defer e.queue.Done(eKey)

	err := e.syncService(eKey.(string))
	...
}

syncService(),k8s.io/kubernetes/pkg/controller/endpoint/endpoints_controller.go:

func (e *EndpointController) syncService(key string) error {
...
if createEndpoints {
	_, err = e.client.Core().Endpoints(service.Namespace).Create(newEndpoints)
} else {
	_, err = e.client.Core().Endpoints(service.Namespace).Update(newEndpoints)
}

kubernetes

  1. kubernetes 使用:多可用区、Pod 部署拓扑与 Topology Aware Routing
  2. kubernetes 扩展:Cloud Controller Manager
  3. kubernetes 准入:操作合法性检查(Admission Control)
  4. kubernetes 鉴权:用户操作权限鉴定(Authorization)
  5. kubernetes 认证:用户管理与身份认证(Authenticating)
  6. kubernetes 开发:代码生成工具
  7. kubernetes 扩展:operator 开发
  8. kubernetes 扩展:CRD 的使用方法
  9. kubernetes configmap 热加载,inotifywatch 监测文件触发热更新
  10. kubernetes 扩展:扩展点和方法(api/cr/plugin...)
  11. kubernetes 调度组件 kube-scheduler 1.16.3 源代码阅读指引
  12. kubernetes 代码中的 k8s.io 是怎么回事?
  13. 阅读笔记《不一样的 双11 技术,阿里巴巴经济体云原生实践》
  14. kubernetes ingress-nginx 启用 upstream 长连接,需要注意,否则容易 502
  15. ingress-nginx 的限速功能在 nginx.conf 中的对应配置
  16. kubernetes 中的容器设置透明代理,自动在 HTTP 请求头中注入 Pod 信息
  17. kubernetes ingress-nginx 的测试代码(单元测试+e2e测试)
  18. kubernetes ingress-nginx http 请求复制功能与 nginx mirror 的行为差异
  19. kubernetes 基于 openresty 的 ingress-nginx 的状态和配置查询
  20. kubernetes ingress-nginx 0.25 源代码走读笔记
  21. kubernetes ingress-nginx 的金丝雀(canary)/灰度发布功能的使用方法
  22. kubernetes 操作命令 kubectl 在 shell 中的自动补全配置
  23. kubernetes 组件 kube-proxy 的 IPVS 功能的使用
  24. kubernetes initializer 功能的使用方法: 在 Pod 等 Resource 落地前进行修改
  25. kubernetes 版本特性: 新特性支持版本和组件兼容版本
  26. kubernetes API 与 Operator: 不为人知的开发者战争(完整篇)
  27. kubernetes 1.12 从零开始(七): kubernetes开发资源
  28. kubernetes 1.12 从零开始(六): 从代码编译到自动部署
  29. kubernetes 网络方案 Flannel 的学习笔记
  30. kubernetes 1.12 从零开始(五): 自己动手部署 kubernetes
  31. kubernetes 1.12 从零开始(四): 必须先讲一下基本概念
  32. kubernetes 1.12 从零开始(三): 用 kubeadm 部署多节点集群
  33. kubernetes 1.12 从零开始(二): 用 minikube 部署开发测试环境
  34. kubernetes 1.12 从零开始(一): 部署环境准备
  35. kubernetes 1.12 从零开始(零): 遇到的问题与解决方法
  36. kubernetes 1.12 从零开始(初): 课程介绍与官方文档汇总
  37. kubernetes 集群状态监控:通过 grafana 和 prometheus
  38. 一些比较有意思的Kubernetes周边产品
  39. Borg论文阅读笔记
  40. kubelet下载pod镜像时,docker口令文件的查找顺序
  41. kubernetes 的 Client Libraries 的使用
  42. kubernetes的网络隔离networkpolicy
  43. kube-router的源码走读
  44. kubernetes 跨网段通信: 通过 calico 的 ipip 模式
  45. kubernetes的调试方法
  46. kubernetes 与 calico 的衔接过程
  47. 怎样理解 kubernetes 以及微服务?
  48. kubernetes中部署有状态的复杂分布式系统
  49. kubernetes的apiserver的启动过程
  50. kubernetes的api定义与装载
  51. kubernetes的federation部署,跨区Service
  52. kubernetes的编译、打包、发布
  53. kubernetes的第三方包的使用
  54. kubernetes的Storage的实现
  55. kubernetes 的 Apiserver 的 storage 使用
  56. kubernetes的Controller-manager的工作过程
  57. kubernetes的Client端Cache
  58. kubernetes 的 Apiserver 的工作过程
  59. kubernetes的CNI插件初始化与Pod网络设置
  60. kubernetes的Pod变更过程
  61. kubernetes的kubelet的工作过程
  62. kuberntes 的 Cmdline 实现
  63. kubernetes的Pod内挂载的Service Account的使用方法
  64. kubernetes的社区资源与项目参与方式
  65. kubernetes的Kube-proxy的转发规则分析
  66. kubernetes的基本操作
  67. kubernetes在CentOS上的集群部署
  68. kubernetes在CentOS上的All In One部署
  69. 怎样选择集群管理系统?

推荐阅读

Copyright @2011-2019 All rights reserved. 转载请添加原文连接,合作请加微信lijiaocn或者发送邮件: [email protected],备注网站合作

友情链接:  系统软件  程序语言  运营经验  水库文集  网络课程  微信网文  发现知识星球