API网关Kong学习笔记(八):Kong Ingress Controller的实现

作者: 李佶澳   转载请保留:原文地址   更新时间:2018-11-05 10:52:44 +0800

说明

这是API网关Kong的学习笔记中的一篇,使用过程中遇到的问题和解决方法记录在API网关Kong的使用过程中遇到的问题以及解决方法

Kong Ingress Controller可以将Kong与Kubernetes无缝集成,自动将kubernetes中的操作同步到kong中。 API网关Kong(二):Kong与Kubernetes集成的方法有过介绍,这里做代码级别的了解,部署与使用方法参考API网关Kong(二):Kong与Kubernetes集成的方法

CustomResourceDefinitions

API网关Kong(二):Kong与Kubernetes集成的方法: CustomResourceDefinitions中介绍了Kong Ingress Controller在Kubernetes中定义的CRD。

这些CRD中的记录需要被同步到Kong中,Kong Ingress Controller监督这些CRD以及Kubernetes集群中的其它相关资源,发现变化后,及时将其同步。

代码编译

Kong kubernetes ingress controller使用dep管理依赖包,将代码下载之后,先用下面的命令导入依赖包:

make deps

然后编译:

make build

程序启动

主流程在ngx中实现,ngx中包含两个client,一个是访问kubernetes的kubeClient,一个是访问kong的kongClient。

//kubernetes-ingress-controller/cli/ingress-controller/main.go
func main() {
...
    conf.KubeClient = kubeClient
    conf.KubeConf = kubeCfg
    conf.Kong.Client = kongClient

    ngx := controller.NewNGINXController(conf, fs)
    ...
    ngx.Start()
...

监听Kubernetes中的资源

ngx中有一个store成员:

//kubernetes-ingress-controller/internal/ingress/controller/run.go
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
...
	n.store = store.New(
		config.EnableSSLChainCompletion,
		config.Namespace,
		"",
		"",
		"",
		"",
		config.ResyncPeriod,
		config.KubeClient,
		config.KubeConf,
		fs,
		n.updateCh)
...

n.store中存放的是kubernetes的client-go实现的informer,当kubernetes中的资源发生变化时,注册到informer中的handler会被调用:

//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
	...
	store := &k8sStore{
		isOCSPCheckEnabled: checkOCSP,
		informers:          &Informer{},
		listers:            &Lister{},
		sslStore:           NewSSLCertTracker(),
		filesystem:         fs,
		updateCh:           updateCh,
		mu:                 &sync.Mutex{},
		secretIngressMap:   NewObjectRefMap(),
	}
	...
	ingEventHandler := cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		ing := obj.(*extensions.Ingress)
		...
		recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
		store.updateSecretIngressMap(ing)
		store.syncSecrets(ing)
		updateCh.In() <- Event{
			Type: CreateEvent,
			Obj:  obj,
		}
	},
	...
	store.informers.Ingress.AddEventHandler(ingEventHandler)
	...

一共有5个Handler:

//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
ingEventHandler := cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		ing := obj.(*extensions.Ingress)
	...
secrEventHandler := cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		sec := obj.(*corev1.Secret)
	...
epEventHandler := cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		updateCh.In() <- Event{
			Type: CreateEvent,
			Obj:  obj,
		}
	},
	...
serviceEventHandler := cache.ResourceEventHandlerFuncs{
	UpdateFunc: func(old, cur interface{}) {
		updateCh.In() <- Event{
			Type: ConfigurationEvent,
			Obj:  cur,
		}
	},
	...
crdEventHandler := cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		updateCh.In() <- Event{
			Type: ConfigurationEvent,
			Obj:  obj,
		}
	},
	...

这5个Handler被注册到8个informer中,分别监听kubernetes中的ingress、endpoint、secret、service、kong plugin、kong consumer、kong credential、kong configuration:

//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.Service.AddEventHandler(serviceEventHandler)
store.informers.Kong.Plugin.AddEventHandler(crdEventHandler)
store.informers.Kong.Consumer.AddEventHandler(crdEventHandler)
store.informers.Kong.Credential.AddEventHandler(crdEventHandler)
store.informers.Kong.Configuration.AddEventHandler(crdEventHandler)

需要注意的CRD的的监听方式,就是Kong.Plugin,Kong.Consumer,Kong.Credential,Kong.Configuration,它们不是Kubernetes中的原生类型。

//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...

pluginClient, _ := pluginclientv1.NewForConfig(clientConf)
pluginFactory := plugininformer.NewFilteredSharedInformerFactory(pluginClient, resyncPeriod, namespace, func(*metav1.ListOptions) {})

store.informers.Kong.Plugin = pluginFactory.Configuration().V1().KongPlugins().Informer()
store.listers.Kong.Plugin = store.informers.Kong.Plugin.GetStore()
store.informers.Kong.Plugin.AddEventHandler(crdEventHandler)

它们对应的informer实现略复杂一些,是通过封装Kubernetes的REST Client实现的,后面单独分析。

其它的informer是直接调用client-go的方法创建的):

infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})
store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()

CRD informer的实现

CRD的informer是通过封装Kubernetes的REST Client实现的:

//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
pluginClient, _ := pluginclientv1.NewForConfig(clientConf)
pluginFactory := plugininformer.NewFilteredSharedInformerFactory(pluginClient, resyncPeriod, namespace, func(*metav1.ListOptions) {})

store.informers.Kong.Plugin = pluginFactory.Configuration().V1().KongPlugins().Informer()
...

pluginclientv1.NewForConfig()封装了client-go中的原生REST:

//kubernetes-ingress-controller/internal/client/plugin/clientset/versioned/clientset.go
func NewForConfig(c *rest.Config) (*Clientset, error) {
	...
	var cs Clientset
	cs.configurationV1, err = configurationv1.NewForConfig(&configShallowCopy)
	...
	cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)

可以看到创建了cs.configurationV1cs.DiscoveryClient两个client。

cs.DiscoveryClient是调用client-go的接口创建,没有特别之处,重点是cs.configurationV1。

cs.configurationV1的实现如下:

//kubernetes-ingress-controller/internal/client/plugin/clientset/versioned/typed/plugin/v1/plugin_client.go
func NewForConfig(c *rest.Config) (*ConfigurationV1Client, error) {
	config := *c
	if err := setConfigDefaults(&config); err != nil {
		return nil, err
	}
	client, err := rest.RESTClientFor(&config)
	if err != nil {
		return nil, err
	}
	return &ConfigurationV1Client{client}, nil
}

关键是setConfigDefaults(&config)中,设置了config:

//kubernetes-ingress-controller/internal/client/plugin/clientset/versioned/typed/plugin/v1/plugin_client.go
func setConfigDefaults(config *rest.Config) error {
	gv := v1.SchemeGroupVersion
	config.GroupVersion = &gv
	config.APIPath = "/apis"
	config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}
	if config.UserAgent == "" {
		config.UserAgent = rest.DefaultKubernetesUserAgent()
	}
	return nil
}

config.GroupVersion的值为:

config.GroupVersion -- > gv --> &v1.SchemeGroupVersion

继续追究v1.SchemeGroupVersion,发现它包含了CRD的Group名称和版本:

//kubernetes-ingress-controller/internal/apis/plugin/v1/register.go
var SchemeGroupVersion = info.SchemeGroupVersion

//kubernetes-ingress-controller/internal/apis/group/info.go
var GroupName = "configuration.konghq.com"
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1"}

这个GroupName和Version是与CRD定义中的字段对应的:

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: kongconsumers.configuration.konghq.com
spec:
  group: configuration.konghq.com
  version: v1
  scope: Namespaced
  names:
    kind: KongConsumer
    plural: kongconsumers
    shortNames:
    - kc

配置更新

上一节注册的Handler函数中,最后都会向updateCh中写入事件:

//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
	...
	updateCh *channels.RingChannel) Storer {
	...
	ingEventHandler := cache.ResourceEventHandlerFuncs{
	AddFunc: func(obj interface{}) {
		ing := obj.(*extensions.Ingress)
		...
		updateCh.In() <- Event{
			Type: CreateEvent,
			Obj:  obj,
		}
	},
	...

这个updateCh和ngx.updateCh是同一个,创建n.store的时候传入的:

//kubernetes-ingress-controller/internal/ingress/controller/run.go
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
...
	n := &NGINXController{
		...
		}
	n.store = store.New( config.EnableSSLChainCompletion,
			config.Namespace,
			"",
			"",
			"",
			"",
			config.ResyncPeriod,
			config.KubeClient,
			config.KubeConf,
			fs,
			n.updateCh)
...

ngx调用Start()启动之后,每从updateCh中收到一个事件,就向任务队列中添加一个任务:

//kubernetes-ingress-controller/internal/ingress/controller/run.go
func (n *NGINXController) Start() {
	glog.Infof("starting Ingress controller")
	...
	n.store.Run(n.stopCh)
	...
	go n.syncQueue.Run(time.Second, n.stopCh)
	// force initial sync
	n.syncQueue.Enqueue(&extensions.Ingress{})
	for {
		select {
		...
		case event := <-n.updateCh.Out():
			...
			if evt, ok := event.(store.Event); ok {
				...
				n.syncQueue.Enqueue(evt.Obj)
			...
		case <-n.stopCh:
			break
		}
	}

任务队列的创建

任务队列也是在创建ngx的时候创建的:

//kubernetes-ingress-controller/internal/ingress/controller/run.go
func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
...
	n.syncQueue = task.NewTaskQueue(n.syncIngress)

n.syncIngress(interface{}))传入参数是通过event.Obj生成的字符串key,分析kubernetes-ingress-controller/internal/task/queue.go中queue的实现可以知晓。

n.syncIngress()是输入参数它的用途是将配置信息同步到kong中,当队列中有事件时,被触发执行,并不关心具体是什么事件。

n.syncIngress()

n.syncIngress()的实现分为两步,第一步从n.store中读取ings信息,第二步是将信息传递给n.OnUpdate()

//kubernetes-ingress-controller/internal/ingress/controller/controller.go:
func (n *NGINXController) syncIngress(interface{}) error {
	...
	ings := n.store.ListIngresses()
	...
	upstreams, servers := n.getBackendServers(ings)
	pcfg := ingress.Configuration{
		Backends: upstreams,
		Servers:  servers,
	}
	err := n.OnUpdate(&pcfg)
	n.runningConfig = &pcfg
	...

注意当前syncIngress(0.2.0版本)的实现中,无论输入参数是什么,都进行全局更新,这可能是一个隐患,需要改成根据输入参数进行部分更新。

ings

ings是存放在n.store中的,n.store.ListIngresses()使用的是通过client-go创建的informer,以及store

//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func (s k8sStore) ListIngresses() []*extensions.Ingress {
	...
	var ingresses []*extensions.Ingress
	for _, item := range s.listers.Ingress.List() {
		ing := item.(*extensions.Ingress)
		...
		ingresses = append(ingresses, ing)
	}
	
	return ingresses
}

//kubernetes-ingress-controller/internal/ingress/controller/store/store.go
func New(checkOCSP bool,
...
	infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})

	store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
	store.listers.Ingress.Store = store.informers.Ingress.GetStore()
...

因此查询出来的ings就是kubernetes中标准的ingress:

//k8s.io/api/extentions/v1beta1/types.go
type Ingress struct {
	metav1.TypeMeta 
	metav1.ObjectMeta 
	
	Spec IngressSpec 
	//这里直接嵌套IngressSpec的定义,这不是Go的语法,只是方便查看,下同
	type IngressSpec struct {
		//默认Backend,至少需要一个Backend或者一个Rule
		Backend *IngressBackend 
		
		TLS []IngressTLS 
			type IngressTLS struct {
				//证书绑定的hostname
				Hosts []string 
				//证书
				SecretName string 
			}
			
		Rules []IngressRule 
			type IngressRule struct {
				Host string 
				IngressRuleValue 
				type IngressRuleValue struct {
					HTTP *HTTPIngressRuleValue 
					type HTTPIngressRuleValue struct {
						Paths []HTTPIngressPath 
						type HTTPIngressPath struct {
							Path string 
							Backend IngressBackend 
								type IngressBackend struct {
									ServiceName string 
									ServicePort intstr.IntOrString 
								}
						}
					}
				}
			}
	}
	Status IngressStatus 
}

通过遍历所有的ingress,得到server和backend列表:

//kubernetes-ingress-controller/internal/ingress/controller/controller.go:
func (n *NGINXController) syncIngress(interface{}) error {
	...
	upstreams, servers := n.getBackendServers(ings)
	...
	pcfg := ingress.Configuration{
		Backends: upstreams,
		Servers:  servers,
	}
	err := n.OnUpdate(&pcfg)
	...

通过函数n.OnUpdate()进行更新:

//kubernetes-ingress-controller/internal/ingress/controller/controller.go:
func (n *NGINXController) syncIngress(interface{}) error {
	...
	err := n.syncCertificates(ingressCfg.Servers)
	for _, server := range ingressCfg.Servers {
		...
		err := n.syncUpstreams(server.Locations, ingressCfg.Backends)
		...
	}
	err = n.syncConsumers()
	...
	err = n.syncCredentials()
	...
	err = n.syncGlobalPlugins()
	...
	checkServices, err := n.syncServices(ingressCfg)
	...
	checkRoutes, err := n.syncRoutes(ingressCfg)
	...

参考

  1. Github: Kong kubernetes ingress controller
  2. API网关Kong(二):Kong与Kubernetes集成的方法
  3. API网关Kong(二):Kong与Kubernetes集成的方法: CustomResourceDefinitions

本文原创首发于网站:www.lijiaocn.com

QQ交流群

区块链实践互助QQ群:576555864

Kubernetes实践互助QQ群:947371129

Prometheus实践互助QQ群:952461804

Kong/Envoy实践互助QQ群:952503851

Ansible实践互助QQ群:955105412

Copyright @2011-2019 All rights reserved. 转载请添加原文连接,合作请加微信lijiaocn或者发送邮件: [email protected],备注网站合作 友情链接: lijiaocn github.com