- 概述
- StepByStep
- 项目初始化
- 什么是自定义控制器?
- 自定义控制器开发说明
- main 函数开发
- 自定义控制器定义
- 控制器业务逻辑开发
- 实战运行
概述
在 Kubernetes 中,大部分功能都是通过声明式 API 来描述的。而 Kubernetes 为了能够更加灵活,还支持了 CRD(自定义资源)类型,而针对自定义的资源对象的操作,则需要通过 Kubernetes Operator 来进行管理。在本文中,我们将会详细介绍如何从零开始开发一个 K8s Operator。
StepByStep
项目初始化
在本文中,我们希望实现如下一个 CRD 文件的处理,文件名称为 network.yaml,内容如下图所示:
apiVersion: apiextensions.k8s.io/v1beta1kind: CustomResourceDefinitionmetadata:name: networks.samplecrd.k8s.iospec:group: samplecrd.k8s.ioversion: v1names:kind: Networkplural: networksscope: Namespaced
可以看到,在这个 CRD 中,我指定了“group: samplecrd.k8s.io”“version: v1”这样的 API 信息,也指定了这个 CR 的资源类型叫作 Network,复数(plural)是 networks。然后,我还声明了它的 scope 是 Namespaced,即:我们定义的这个 Network 是一个属于 Namespace 的对象,类似于 Pod。这就是一个 Network API 资源类型的 API 部分的宏观定义了。接下来,我还需要让 Kubernetes“认识”这种 YAML 文件里描述的“网络”部分,比如“cidr”(网段),“gateway”(网关)这些字段的含义。这些内容就需要涉及到一定的代码开发了。我们需要创建一个如下目录结构的项目:
$ tree github.com/<your-name>/k8s-controller-custom-resource.├── controller.go├── crd│ └── network.yaml├── example│ └── example-network.yaml├── main.go└── pkg└── apis└── samplecrd├── register.go└── v1├── doc.go├── register.go└── types.go
其中,pkg/apis/samplecrd 就是 API 组的名字,v1 是版本,而 v1 下面的 types.go 文件里,则定义了 Network 对象的完整描述。然后,我在 pkg/apis/samplecrd 目录下创建了一个 register.go 文件,用来放置后面要用到的全局变量。这个文件的内容如下所示:
package samplecrdconst (GroupName = "samplecrd.k8s.io"Version = "v1")
接着,我需要在 pkg/apis/samplecrd 目录下添加一个 doc.go 文件(Golang 的文档源文件)。这个文件里的内容如下所示:
// +k8s:deepcopy-gen=package// +groupName=samplecrd.k8s.iopackage v1
在这个文件中,你会看到 +
package v1...// +genclient// +genclient:noStatus// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object// Network describes a Network resourcetype Network struct {// TypeMeta is the metadata for the resource, like kind and apiversionmetav1.TypeMeta `json:",inline"`// ObjectMeta contains the metadata for the particular object, including// things like...// - name// - namespace// - self link// - labels// - ... etc ...metav1.ObjectMeta `json:"metadata,omitempty"`Spec networkspec `json:"spec"`}// networkspec is the spec for a Network resourcetype networkspec struct {Cidr string `json:"cidr"`Gateway string `json:"gateway"`}// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object// NetworkList is a list of Network resourcestype NetworkList struct {metav1.TypeMeta `json:",inline"`metav1.ListMeta `json:"metadata"`Items []Network `json:"items"`}
在上面这部分代码里,你可以看到 Network 类型定义方法跟标准的 Kubernetes 对象一样,都包括了 TypeMeta(API 元数据)和 ObjectMeta(对象元数据)字段。而其中的 Spec 字段,就是需要我们自己定义的部分。所以,在 networkspec 里,我定义了 Cidr 和 Gateway 两个字段。其中,每个字段最后面的部分比如json:”cidr”,指的就是这个字段被转换成 JSON 格式之后的名字,也就是 YAML 文件里的字段名字。此外,除了定义 Network 类型,你还需要定义一个 NetworkList 类型,用来描述一组 Network 对象应该包括哪些字段。之所以需要这样一个类型,是因为在 Kubernetes 中,获取所有 X 对象的 List() 方法,返回值都是List 类型,而不是 X 类型的数组。这是不一样的。同样地,在 Network 和 NetworkList 类型上,也有代码生成注释。其中,+genclient 的意思是:请为下面这个 API 资源类型生成对应的 Client 代码(这个 Client,我马上会讲到)。而 +genclient:noStatus 的意思是:这个 API 资源类型定义里,没有 Status 字段。否则,生成的 Client 就会自动带上 UpdateStatus 方法。如果你的类型定义包括了 Status 字段的话,就不需要这句 +genclient:noStatus 注释了。比如下面这个例子:
// +genclient// Network is a specification for a Network resourcetype Network struct {metav1.TypeMeta `json:",inline"`metav1.ObjectMeta `json:"metadata,omitempty"`Spec NetworkSpec `json:"spec"`Status NetworkStatus `json:"status"`}
需要注意的是,+genclient 只需要写在 Network 类型上,而不用写在 NetworkList 上。因为 NetworkList 只是一个返回值类型,Network 才是“主类型”。
而由于我在 Global Tags 里已经定义了为所有类型生成 DeepCopy 方法,所以这里就不需要再显式地加上 +k8s:deepcopy-gen=true 了。当然,这也就意味着你可以用 +k8s:deepcopy-gen=false 来阻止为某些类型生成 DeepCopy。
你可能已经注意到,在这两个类型上面还有一句+k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object的注释。它的意思是,请在生成 DeepCopy 的时候,实现 Kubernetes 提供的 runtime.Object 接口。否则,在某些版本的 Kubernetes 里,你的这个类型定义会出现编译错误。这是一个固定的操作,记住即可。
最后,我需要再编写一个 pkg/apis/samplecrd/v1/register.go 文件。在前面对 APIServer 工作原理的讲解中,我已经提到,“registry”的作用就是注册一个类型(Type)给 APIServer。其中,Network 资源类型在服务器端注册的工作,APIServer 会自动帮我们完成。但与之对应的,我们还需要让客户端也能“知道”Network 资源类型的定义。这就需要我们在项目里添加一个 register.go 文件。它最主要的功能,就是定义了如下所示的 addKnownTypes() 方法:
package v1...// addKnownTypes adds our types to the API scheme by registering// Network and NetworkListfunc addKnownTypes(scheme *runtime.Scheme) error {scheme.AddKnownTypes(SchemeGroupVersion,&Network{},&NetworkList{},)// register the type in the schememetav1.AddToGroupVersion(scheme, SchemeGroupVersion)return nil}
有了这个方法,Kubernetes 就能够在后面生成客户端的时候,“知道”Network 以及 NetworkList 类型的定义了。像上面这种 register.go 文件里的内容其实是非常固定的,你以后可以直接使用我提供的这部分代码做模板,然后把其中的资源类型、GroupName 和 Version 替换成你自己的定义即可。这样,Network 对象的定义工作就全部完成了。
接下来,我就要使用 Kubernetes 提供的代码生成工具,为上面定义的 Network 资源类型自动生成 clientset、informer 和 lister。
这个代码生成工具名为 k8s.io/code-generator,使用方法如下所示:
# 代码生成的工作目录,也就是我们的项目路径ROOT_PACKAGE="github.com/resouer/k8s-controller-custom-resource"# API GroupCUSTOM_RESOURCE_NAME="samplecrd"# API VersionCUSTOM_RESOURCE_VERSION="v1"# 安装k8s.io/code-generatorgo get -u k8s.io/code-generator/...cd $GOPATH/src/k8s.io/code-generator# 执行代码自动生成,其中pkg/client是生成目标目录,pkg/apis是类型定义目录./generate-groups.sh all "$ROOT_PACKAGE/pkg/client" "$ROOT_PACKAGE/pkg/apis" "$CUSTOM_RESOURCE_NAME:$CUSTOM_RESOURCE_VERSION"
代码生成工作完成之后,我们再查看一下这个项目的目录结构:
$ tree.├── controller.go├── crd│ └── network.yaml├── example│ └── example-network.yaml├── main.go└── pkg├── apis│ └── samplecrd│ ├── constants.go│ └── v1│ ├── doc.go│ ├── register.go│ ├── types.go│ └── zz_generated.deepcopy.go└── client├── clientset├── informers└── listers
其中,pkg/apis/samplecrd/v1 下面的 zz_generated.deepcopy.go 文件,就是自动生成的 DeepCopy 代码文件。而整个 client 目录,以及下面的三个包(clientset、informers、 listers),都是 Kubernetes 为 Network 类型生成的客户端库,这些库会在后面编写自定义控制器的时候用到。
什么是自定义控制器?
“声明式 API”并不像“命令式 API”那样有着明显的执行逻辑。这就使得基于声明式 API 的业务功能实现,往往需要通过控制器模式来“监视”API 对象的变化(比如,创建或者删除 Network),然后以此来决定实际要执行的具体工作。下面,我们来看看在 K8s 中自定义控制器的工作原理。在 Kubernetes 项目中,一个自定义控制器的工作原理,可以用下面这样一幅流程图来表示:
我们从左开始看起。首先,自定义控制器要做的第一件事就是从 Kubernetes 的 APIServer 里获取它所关心的对象,也就是我们自定义的资源对象。这个操作,依靠的是一个叫作 Informer(可以翻译为:通知器)的代码库完成的。Informer 与 API 对象是一一对应的。例如对于一个名为 Network 的 CRD而言,所以我们传递给自定义控制器的就是一个 Network 对象的 Informer(Network Informer)。
对于一个 Informer 对象而言,真正负责与 K8s APIServer 建立和维护连接的是 Informer 中所使用的 Reflector 包。更具体地说,Reflector 使用的是一种叫作 ListAndWatch 的方法,来“获取”并“监听”这些 Network 对象实例的变化。在 ListAndWatch 机制下,一旦 APIServer 端有新的 Network 实例被创建、删除或者更新,Reflector 都会收到“事件通知”。这时,该事件及它对应的 API 对象这个组合,就被称为增量(Delta),它会被放进一个 Delta FIFO Queue(即:增量先进先出队列)中。而另一方面,Informe 会不断地从这个 Delta FIFO Queue 里读取(Pop)增量。每拿到一个增量,Informer 就会判断这个增量里的事件类型,然后创建或者更新本地对象的缓存。这个缓存,在 Kubernetes 里一般被叫作 Store。比如,如果事件类型是 Added(添加对象),那么 Informer 就会通过一个叫作 Indexer 的库把这个增量里的 API 对象保存在本地缓存中,并为它创建索引。相反,如果增量的事件类型是 Deleted(删除对象),那么 Informer 就会从本地缓存中删除这个对象。
这个同步本地缓存的工作,是 Informer 的第一个职责,也是它最重要的职责。
而 Informer 的第二个职责,则是根据这些事件的类型,触发事先注册好的 ResourceEventHandler。这些 Handler,需要在创建控制器的时候注册给它对应的 Informer。
自定义控制器开发说明
编写自定义控制器代码的过程包括:编写 main 函数、编写自定义控制器的定义,以及编写控制器里的业务逻辑三个部分。
main 函数开发
main 函数的主要工作就是,定义并初始化一个自定义控制器(Custom Controller),然后启动它。这部分代码的主要内容如下所示:
func main() {...cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)...kubeClient, err := kubernetes.NewForConfig(cfg)...networkClient, err := clientset.NewForConfig(cfg)...networkInformerFactory := informers.NewSharedInformerFactory(networkClient, ...)controller := NewController(kubeClient, networkClient,networkInformerFactory.Samplecrd().V1().Networks())go networkInformerFactory.Start(stopCh)if err = controller.Run(2, stopCh); err != nil {glog.Fatalf("Error running controller: %s", err.Error())}}
这个 main 函数主要通过三步完成了初始化并启动一个自定义控制器的工作:
- 根据我提供的 Master 配置(APIServer 的地址端口和 kubeconfig 的路径),创建一个 Kubernetes 的 client(kubeClient)和 Network 对象的 client(networkClient)。
- main 函数为 Network 对象创建一个叫作 InformerFactory(即:networkInformerFactory)的工厂,并使用它生成一个 Network 对象的 Informer,传递给控制器。
- main 函数启动上述的 Informer,然后执行 controller.Run,启动自定义控制器。
自定义控制器定义
接下来,我们就来编写这个控制器的定义,它的主要内容如下所示:
func NewController(kubeclientset kubernetes.Interface,networkclientset clientset.Interface,networkInformer informers.NetworkInformer) *Controller {...controller := &Controller{kubeclientset: kubeclientset,networkclientset: networkclientset,networksLister: networkInformer.Lister(),networksSynced: networkInformer.Informer().HasSynced,workqueue: workqueue.NewNamedRateLimitingQueue(..., "Networks"),...}networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{AddFunc: controller.enqueueNetwork,UpdateFunc: func(old, new interface{}) {oldNetwork := old.(*samplecrdv1.Network)newNetwork := new.(*samplecrdv1.Network)if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {return}controller.enqueueNetwork(new)},DeleteFunc: controller.enqueueNetworkForDelete,return controller}
我们前面在 main 函数里创建了两个 client(kubeclientset 和 networkclientset),然后在这段代码里,使用这两个 client 和前面创建的 Informer,初始化了自定义控制器。值得注意的是,在这个自定义控制器里,我还设置了一个工作队列(work queue),它正是处于示意图中间位置的 WorkQueue。这个工作队列的作用是,负责同步 Informer 和控制循环之间的数据。
然后,我们为 networkInformer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc),分别对应 API 对象的“添加”“更新”和“删除”事件。而具体的处理操作,都是将该事件对应的 API 对象加入到工作队列中。需要注意的是,实际入队的并不是 API 对象本身,而是它们的 Key,即:该 API 对象的namespace/name。而我们后面即将编写的控制循环,则会不断地从这个工作队列里拿到这些 Key,然后开始执行真正的控制逻辑。
综合上面的讲述,你现在应该就能明白,所谓 Informer,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client。它是自定义控制器跟 APIServer 进行数据同步的重要组件。
更具体地说,Informer 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。其中,ListAndWatch 方法的含义是:首先,通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化。而通过监听到的事件变化,Informer 就可以实时地更新本地缓存,并且调用这些事件对应的 EventHandler 了。此外,在这个过程中,每经过 resyncPeriod 指定的时间,Informer 维护的本地缓存,都会使用最近一次 LIST 返回的结果强制更新一次,从而保证缓存的有效性。在 Kubernetes 中,这个缓存强制更新的操作就叫作:resync。需要注意的是,这个定时 resync 操作,也会触发 Informer 注册的“更新”事件。但此时,这个“更新”事件对应的 Network 对象实际上并没有发生变化,即:新、旧两个 Network 对象的 ResourceVersion 是一样的。在这种情况下,Informer 就不需要对这个更新事件再做进一步的处理了。
控制器业务逻辑开发
接下来,我们就来到了示意图中最后面的控制循环(Control Loop)部分,也正是我在 main 函数最后调用 controller.Run() 启动的“控制循环”。它的主要内容如下所示:
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {...if ok := cache.WaitForCacheSync(stopCh, c.networksSynced); !ok {return fmt.Errorf("failed to wait for caches to sync")}...for i := 0; i < threadiness; i++ {go wait.Until(c.runWorker, time.Second, stopCh)}...return nil}
可以看到,启动控制循环的逻辑非常简单:
- 首先,等待 Informer 完成一次本地缓存的数据同步操作;
- 然后,直接通过 goroutine 启动一个(或者并发启动多个)“无限循环”的任务。
而这个“无限循环”任务的每一个循环周期,执行的正是我们真正关心的业务逻辑。
所以接下来,我们就来编写这个自定义控制器的业务逻辑,它的主要内容如下所示:
func (c *Controller) runWorker() {for c.processNextWorkItem() {}}func (c *Controller) processNextWorkItem() bool {obj, shutdown := c.workqueue.Get()...err := func(obj interface{}) error {...if err := c.syncHandler(key); err != nil {return fmt.Errorf("error syncing '%s': %s", key, err.Error())}c.workqueue.Forget(obj)...return nil}(obj)...return true}func (c *Controller) syncHandler(key string) error {namespace, name, err := cache.SplitMetaNamespaceKey(key)...network, err := c.networksLister.Networks(namespace).Get(name)if err != nil {if errors.IsNotFound(err) {glog.Warningf("Network does not exist in local cache: %s/%s, will delete it from Neutron ...",namespace, name)glog.Warningf("Network: %s/%s does not exist in local cache, will delete it from Neutron ...",namespace, name)// FIX ME: call Neutron API to delete this network by name.//// neutron.Delete(namespace, name)return nil}...return err}glog.Infof("[Neutron] Try to process network: %#v ...", network)// FIX ME: Do diff().//// actualNetwork, exists := neutron.Get(namespace, name)//// if !exists {// neutron.Create(namespace, name)// } else if !reflect.DeepEqual(actualNetwork, network) {// neutron.Update(namespace, name)// }return nil}
可以看到,在这个执行周期里(processNextWorkItem),我们:
- 首先从工作队列里出队(workqueue.Get)了一个成员,也就是一个 Key(Network 对象的:namespace/name)。
- 然后,在 syncHandler 方法中,我使用这个 Key,尝试从 Informer 维护的缓存中拿到了它所对应的 Network 对象。可以看到,在这里,我使用了 networksLister 来尝试获取这个 Key 对应的 Network 对象。这个操作,其实就是在访问本地缓存的索引。实际上,在 Kubernetes 的源码中,你会经常看到控制器从各种 Lister 里获取对象,比如:podLister、nodeLister 等等,它们使用的都是 Informer 和缓存机制。而如果控制循环从缓存中拿不到这个对象(即:networkLister 返回了 IsNotFound 错误),那就意味着这个 Network 对象的 Key 是通过前面的“删除”事件添加进工作队列的。所以,尽管队列里有这个 Key,但是对应的 Network 对象已经被删除了。这时候,我就需要调用 Neutron 的 API,把这个 Key 对应的 Neutron 网络从真实的集群里删除掉。而如果能够获取到对应的 Network 对象,我就可以执行控制器模式里的对比“期望状态”和“实际状态”的逻辑了。其中,自定义控制器“千辛万苦”拿到的这个 Network 对象,正是 APIServer 里保存的“期望状态”,即:用户通过 YAML 文件提交到 APIServer 里的信息。当然,在我们的例子里,它已经被 Informer 缓存在了本地。
- 那么,“实际状态”又从哪里来呢?当然是来自于实际的集群了。所以,我们的控制循环需要通过 Neutron API 来查询实际的网络情况。比如,我可以先通过 Neutron 来查询这个 Network 对象对应的真实网络是否存在。
- 如果不存在,这就是一个典型的“期望状态”与“实际状态”不一致的情形。这时,我就需要使用这个 Network 对象里的信息(比如:CIDR 和 Gateway),调用 Neutron API 来创建真实的网络。
- 如果存在,那么,我就要读取这个真实网络的信息,判断它是否跟 Network 对象里的信息一致,从而决定我是否要通过 Neutron 来更新这个已经存在的真实网络。
- 这样,我就通过对比“期望状态”和“实际状态”的差异,完成了一次调协(Reconcile)的过程。
至此,一个完整的自定义 API 对象和它所对应的自定义控制器,就编写完毕了。
实战运行







评论