kubernetes 调度组件 kube-scheduler 1.16.3 源代码阅读指引

Tags: kubernetes 

本篇目录

说明

找不到命令行参数的初始化过程,是刚开始阅读 kube-scheduler 源代码时遇到的最大障碍。这一步走不通,后面的代码即使能看明白,也会感觉心里没底,如同空中楼阁。 第二个障碍是不知道分散在哪里的 init()。

代码阅读技巧:从 main 函数开始,每进入一个新目录,把新目录下的文件粗略扫视一下,如果目录中有 readme 文件一定要看,每个文件开头的注释要看。

怎样才算熟悉代码?:想要查看某一环节的代码实现时,能够通过目录翻找到对应的代码文件,不需要从 main 函数开始按照执行顺序查找。

kube-scheduler 的开发文档:

补充

调度策略的 调整方法

--policy-config-file 指向一个设置了调度策略的 json 文件:

{
  "kind": "Policy",
  "apiVersion": "v1",
  "predicates": [
    {
      "name": "CheckNodeUnschedulable"
    },
    {
      "name": "CheckVolumeBinding"
    },
    {
      "name": "GeneralPredicates"
    },
    {
      "name": "MatchInterPodAffinity"
    },
    {
      "name": "MaxEBSVolumeCount"
    },
    {
      "name": "MaxGCEPDVolumeCount"
    },
    {
      "name": "MaxAzureDiskVolumeCount"
    },
    {
      "name": "MaxCSIVolumeCountPred"
    },
    {
      "name": "NoDiskConflict"
    },
    {
      "name": "NoVolumeZoneConflict"
    },
    {
      "name": "PodToleratesNodeTaints"
    }
  ],
  "priorities": [
    {
      "name": "ServiceSpreadingPriority",
      "weight": 1
    },
    {
      "name": "EqualPriority",
      "weight": 1
    },
    {
      "name": "ImageLocalityPriority",
      "weight": 1
    },
    {
      "name": "MostRequestedPriority",
      "weight": 1
    },
    {
      "name": "SelectorSpreadPriority",
      "weight": 1
    },
    {
      "name": "InterPodAffinityPriority",
      "weight": 1
    },
    {
      "name": "LeastRequestedPriority",
      "weight": 1
    },
    {
      "name": "BalancedResourceAllocation",
      "weight": 1
    },
    {
      "name": "NodePreferAvoidPodsPriority",
      "weight": 10000
    },
    {
      "name": "NodeAffinityPriority",
      "weight": 1
    },
    {
      "name": "TaintTolerationPriority",
      "weight": 1
    },
    {
      "name": "GPUAllocationPriority",
      "weight": 10
    }
  ],
  "extenders" : [
    {
      "urlPrefix": "http://127.0.0.1:12346/scheduler",
      "filterVerb": "filter",
      "bindVerb": "bind",
      "prioritizeVerb": "prioritize",
      "weight": 5,
      "enableHttps": false,
      "nodeCacheCapable": false
    }
  ],
  "hardPodAffinitySymmetricWeight" : 10,
  "alwaysCheckAllPredicates" : false
}

代码入口与代码组成

cmd/kube-scheduler 是源代码阅读入口,main 函数就在这个目录中:

kube-scheduler源代码1

上面只是 kube-scheduler 所有代码的的很小一部分,大部分代码位于 pkg/scheduler:

kube-scheduler源代码2

kube-scheduler源代码8

命令行参数设置

如果不熟悉 corbra 会很疑惑,命令行参数是什么时候设置的?

kube-scheduler源代码2

注意 2 和 4,位置 2 通过 cmd 获取 fs,然后在位置 4 调用 fs.AddFlags,参数 f 来自位置 3,进入 opts.Flags() 就会看到熟悉的 flags 设置了。 opts.Flags() 中指定用 opts 的成员保留命令行参数的值:

// Flags returns flags for a specific scheduler by section name
func (o *Options) Flags() (nfs cliflag.NamedFlagSets) {
    fs := nfs.FlagSet("misc")
    fs.StringVar(&o.ConfigFile, "config", o.ConfigFile, "The path to the configuration file. Flags override values in this file.")
    fs.StringVar(&o.WriteConfigTo, "write-config-to", o.WriteConfigTo, "If set, write the configuration values to this file and exit.")
    fs.StringVar(&o.Master, "master", o.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)")

    o.SecureServing.AddFlags(nfs.FlagSet("secure serving"))
    o.CombinedInsecureServing.AddFlags(nfs.FlagSet("insecure serving"))
    o.Authentication.AddFlags(nfs.FlagSet("authentication"))
    o.Authorization.AddFlags(nfs.FlagSet("authorization"))
    o.Deprecated.AddFlags(nfs.FlagSet("deprecated"), &o.ComponentConfig)

    leaderelectionconfig.BindFlags(&o.ComponentConfig.LeaderElection.LeaderElectionConfiguration, nfs.FlagSet("leader election"))
    utilfeature.DefaultMutableFeatureGate.AddFlag(nfs.FlagSet("feature gate"))

    return nfs
}

而后带有命令行参数值的 opts 被前面图中的 runCommand(cmd, args, opts) 使用,从而将参数代入下一个环节。

参数默认值/运行时配置

通常情况下,大部分命令行参数都不会直接设置,因此命令行参数的默认值非常关键。kube-scheduler 的命令默认值设置过程有点绕。 这里只说明一些最主要最绕的 ComponentConfig 的设置过程。

前面图片的位置 1 调用下面函数生成 opts:

// NewOptions returns default scheduler app options.
func NewOptions() (*Options, error) {
    cfg, err := newDefaultComponentConfig()    //运行时配置的主要组成
    if err != nil {
        return nil, err
    }
    ...
    o := &Options{
    ComponentConfig: *cfg,
    ...

runCommand(cmd, args, opts) 中怎样用 opts 生成运行配置这里不说了,这里关心 opts 的重要成员也是运行时配置的关键组成 ComponentConfig。

下面这段代码信息含量很大:

func newDefaultComponentConfig() (*kubeschedulerconfig.KubeSchedulerConfiguration, error) {
    cfgv1alpha1 := kubeschedulerconfigv1alpha1.KubeSchedulerConfiguration{}
    kubeschedulerscheme.Scheme.Default(&cfgv1alpha1)     //这里是关键
    cfg := kubeschedulerconfig.KubeSchedulerConfiguration{}
    if err := kubeschedulerscheme.Scheme.Convert(&cfgv1alpha1, &cfg, nil); err != nil {
        return nil, err
    }
    return &cfg, nil
}

这段代码把我们引到 pkg/scheduler 目录中,它引用的两个 package 是::

kubeschedulerconfigv1alpha1 "k8s.io/kube-scheduler/config/v1alpha1"
kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"

kubeschedulerscheme.Scheme 是一个全局变量用于 struct 转换,跳转到 kubeschedulerscheme.Scheme.Default(&cfgv1alpha1) 的实现,会发现这里会使用 Scheme 存放的 default 函数对传入参数,也就是前面的 cfgv1alpha1 进行设置:

// Default sets defaults on the provided Object.
func (s *Scheme) Default(src Object) {
    if fn, ok := s.defaulterFuncs[reflect.TypeOf(src)]; ok {
        fn(src)
    }
}

那么这些 default 函数是什么时候存放到 Scheme 中的?如下图所示:

kube-scheduler源代码4

这里藏着一个 init,进入 addDefaultingFuncs() -> RegisterDefaults() -> … -> SetDefaults_KubeSchedulerConfiguration():

func SetDefaults_KubeSchedulerConfiguration(obj *kubescedulerconfigv1alpha1.KubeSchedulerConfiguration) {
    if len(obj.SchedulerName) == 0 {
        obj.SchedulerName = api.DefaultSchedulerName
    }

    if obj.HardPodAffinitySymmetricWeight == 0 {
        obj.HardPodAffinitySymmetricWeight = api.DefaultHardPodAffinitySymmetricWeight
    }

    if obj.AlgorithmSource.Policy == nil &&
        (obj.AlgorithmSource.Provider == nil || len(*obj.AlgorithmSource.Provider) == 0) {
        val := kubescedulerconfigv1alpha1.SchedulerDefaultProviderName
        obj.AlgorithmSource.Provider = &val
    }
    ...
}

上面就是设置默认值的代码,可以看到 AlgorithmSource.Provider 的默认值为 SchedulerDefaultProviderName(字符串 “DefaultProvider”)。

执行主线

调度启动的主线

调度的主线位于 pkg/scheduler/scheduler.go ,包含调度算法的 config 和 informer 的设置都是在生成/设置的,sched 的 Run() 就是调度任务的开始:

// New returns a Scheduler
func New(client clientset.Interface,
    ...
    config = sc
    ...
    sched := NewFromConfig(config)
    ...
    AddAllEventHandlers(sched, options.schedulerName, nodeInformer, podInformer, pvInformer, pvcInformer, serviceInformer, storageClassInformer)
    ...
}

// Run begins watching and scheduling. It waits for cache to be synced, then starts a goroutine and returns immediately.
func (sched *Scheduler) Run() {
    if !sched.config.WaitForCacheSync() {
        return
    }

    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

调度算法的主线

调度算法执行的主线位于 pkg/scheduler/core/generic_scheduler.go:

// pkg/scheduler/core/generic_scheduler.go
// NewGenericScheduler creates a genericScheduler object.
func NewGenericScheduler(
    ...
) ScheduleAlgorithm {
    ...
}

genericScheduler 的 Schedule() 方法就是调度算法执行的主线,被 scheduleOne() 调用:

// pkg/scheduler/scheduler.go
func (sched *Scheduler) scheduleOne() {
    ...
    scheduleResult, err := sched.schedule(pod, pluginContext)
    ...
}

func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
    result, err := sched.config.Algorithm.Schedule(pod, sched.config.NodeLister, pluginContext)
    if err != nil {
        pod = pod.DeepCopy()
        sched.recordSchedulingFailure(pod, err, v1.PodReasonUnschedulable, err.Error())
        return core.ScheduleResult{}, err
    }
    return result, err
}

// pkg/scheduler/core/generic_scheduler.go
func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
    ...
}

调度任务获取

调度就是为 Pod 选择 Node,kube-scheduler 通过各种 informer 获取调度任务。

事件处理函数都在 pkg/scheduler/eventhandlers.go 中:

kube-scheduler源代码6

调度执行过程

kube-scheduler 一次只调度一个 Pod (2019-12-09 16:39:35),调度过程在文件 pkg/scheduler/scheduler.go 中:

// pkg/scheduler/scheduler.go
func (sched *Scheduler) Run() {
    ...
    //每次只调度一个 pod 的 sched.scheduleOne,无限重复执行
    go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
}

调度算法

调度算法通过 –algorithm-provider 或者 –policy-config-file/–policy-configmap 设置,默认配置是 –algorithm-provider=”DefaultProvider”。

调度算法在 pkg/scheduler/scheduler.go 中设置:

kube-scheduler源代码7

DefaultProvider 在 pkg/scheduler/algorithmprovider/defaults/ 中定义:

kube-scheduler源代码5

在 pkg/scheduler/algorithmprovider/defaults/register_predicates.go 中,在 init() 中把算法函数注册到 pkg/scheduler/factory/factory.go:

kube-scheduler源代码9

调度插件

调度插件是调度过程中一系列钩子,在 pkg/scheduler/scheduler.go 中调用:

// pkg/scheduler/scheduler.go
func (sched *Scheduler) scheduleOne() {
    fwk := sched.config.Framework
    ...
    // Run "reserve" plugins.
    if sts := fwk.RunReservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
        sched.recordSchedulingFailure(assumedPod, sts.AsError(), SchedulerError, sts.Message())
        metrics.PodScheduleErrors.Inc()
        return
    }
    ...
        // Run "permit" plugins.
        permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
        ...
        // Run "prebind" plugins.
        prebindStatus := fwk.RunPrebindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)

            // Run "postbind" plugins.
            fwk.RunPostbindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
    ...
}

支持的钩子列表:

type Framework interface {
    FrameworkHandle
    QueueSortFunc() LessFunc
    RunPrefilterPlugins(pc *PluginContext, pod *v1.Pod) *Status
    RunPrebindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
    RunPostbindPlugins(pc *PluginContext, pod *v1.Pod, nodeName string)
    RunReservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
    RunUnreservePlugins(pc *PluginContext, pod *v1.Pod, nodeName string)
    RunPermitPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
}

kube-scheduler 只是预留了插件接口,方便开发者自行开发插件干预调度过程,现在 kube-scheduler 中的插件是空的:

func NewRegistry() Registry {
    return Registry{
        // FactoryMap:
        // New plugins are registered here.
        // example:
        // {
        //  stateful_plugin.Name: stateful.NewStatefulMultipointExample,
        //  fooplugin.Name: fooplugin.New,
        // }
    }
}

pkg/scheduler/framework/plugins/examples 中有几个示范插件,可以参照实现。

参考

  1. 李佶澳的博客

kubernetes

  1. Prometheus 采集 Kubernetes 中的 pod 的 metrics 的方法
  2. kubernetes configmap 热加载,inotifywatch 监测文件触发热更新
  3. kubernetes 百变定制: 支持的扩展点和扩展方法(api/crd/plugin...)
  4. kubernetes 调度组件 kube-scheduler 1.16.3 源代码阅读指引
  5. kubernetes 代码中的 k8s.io 是怎么回事?
  6. 旌旗招展,向网格而行!
  7. 《不一样的 双11 技术,阿里巴巴经济体云原生实践》阅读笔记
  8. kubernetes ingress-nginx 启用 upstream 长连接,需要注意,否则容易 502
  9. ingress-nginx 的限速功能在 nginx.conf 中的对应配置
  10. kubernetes 中的容器设置透明代理,自动在 HTTP 请求头中注入 Pod 信息
  11. kubernetes ingress-nginx 的测试代码(单元测试+e2e测试)
  12. kubernetes ingress-nginx http 请求复制功能与 nginx mirror 的行为差异
  13. kubernetes 基于 openresty 的 ingress-nginx 的状态和配置查询
  14. Kubernetes ingress-nginx 0.25 源代码走读笔记
  15. kubernetes ingress-nginx 的金丝雀(canary)/灰度发布功能的使用方法
  16. kubernetes code-generator 用法: 生成 kubernetes-style 的 api 和 client 代码
  17. kubernetes 操作命令 kubectl 在 shell 中的自动补全配置
  18. flannel ip 地址段扩容方法
  19. kubernetes 组件 kube-proxy 的 IPVS 功能的使用
  20. lxcfs 是什么? 怎样通过 lxcfs 在容器内显示容器的 CPU、内存状态
  21. kubernetes initializer 功能的使用方法: 在 Pod 等 Resource 落地前进行修改
  22. kubernetes 版本特性: 新特性支持版本和组件兼容版本
  23. kubernetes API 与 Operator: 不为人知的开发者战争(完整篇)
  24. kubernetes 1.12 从零开始(七): kubernetes开发资源
  25. kubernetes 1.12 从零开始(六): 从代码编译到自动部署
  26. kubernetes 网络方案 Flannel 的学习笔记
  27. kubernetes 1.12 从零开始(五): 自己动手部署 kubernetes
  28. kubernetes 1.12 从零开始(四): 必须先讲一下基本概念
  29. kubernetes 1.12 从零开始(三): 用 kubeadm 部署多节点集群
  30. kubernetes 1.12 从零开始(二): 用 minikube 部署开发测试环境
  31. kubernetes 1.12 从零开始(一): 部署环境准备
  32. kubernetes 1.12 从零开始(零): 遇到的问题与解决方法
  33. kubernetes 1.12 从零开始(初): 课程介绍与官方文档汇总
  34. 通过Prometheus查询计算Kubernetes集群中的容器CPU、内存使用率等指标
  35. 使用 grafana 和 prometheus 监控 kubernetes 集群状态
  36. 一些比较有意思的Kubernetes周边产品
  37. Borg论文阅读笔记
  38. kubelet下载pod镜像时,docker口令文件的查找顺序
  39. kubernetes 的 Client Libraries 的使用
  40. kubernetes的网络隔离networkpolicy
  41. kube-router的源码走读
  42. 使用calico的ipip模式解决k8s的跨网段通信
  43. kubernetes的调试方法
  44. kubernetes 与 calico 的衔接过程
  45. 怎样理解 kubernetes 以及微服务?
  46. kubernetes中部署有状态的复杂分布式系统
  47. kubernetes的apiserver的启动过程
  48. kubernetes的api定义与装载
  49. kubernetes的federation部署,跨区Service
  50. kubernetes的编译、打包、发布
  51. kubernetes的第三方包的使用
  52. kubernetes的Storage的实现
  53. kubernetes 的 Apiserver 的 storage 使用
  54. kubernetes的Controller-manager的工作过程
  55. kubernetes的Client端Cache
  56. kubernetes 的 Apiserver 的工作过程
  57. kubernetes的CNI插件初始化与Pod网络设置
  58. kubernetes的Pod变更过程
  59. kubernetes的kubelet的工作过程
  60. kuberntes 的 Cmdline 实现
  61. kubernetes的Pod内挂载的Service Account的使用方法
  62. kubernetes的社区资源与项目参与方式
  63. kubernetes的Kube-proxy的转发规则分析
  64. kubernetes的基本操作
  65. kubernetes在CentOS上的集群部署
  66. kubernetes在CentOS上的All In One部署
  67. 怎样选择集群管理系统?

推荐阅读

Copyright @2011-2019 All rights reserved. 转载请添加原文连接,合作请加微信lijiaocn或者发送邮件: [email protected],备注网站合作

友情链接:  李佶澳的博客  小鸟笔记  软件手册  编程手册  运营手册  网络课程  收藏文章  发现知识星球  百度搜索 谷歌搜索