
controller-runtime
Controller-runtime 是一个用于开发 Kubernetes Controller 的库,包含了各种Controller 常用的模块,兼顾了灵活性和模块化。
一开始做 Kubernetes Controller 开发时,是使用 client-go 进行开发,中间会有很多与业务无关的重复工作。后来社区推出了 kubebuilder 和 operatorSDK 这种脚手架,它可以方便的渲染出 Controller 的整个框架,让开发者只用专注 Controller 本身的业务逻辑,特别是在开发 CRD 时,极为方便,这两个脚手架就是基于 controller-runtime。
controller-runtime 就是 client-go 的高级封装。
1、主要模块
Client
:用于读写 Kubernetes 资源Cache
:本地缓存,可供 Client 直接读取资源。Manager
:可以管理协调多个 Controller,提供 Controller 共用的依赖。Controller
:“组装”多个模块(例如Source
、Queue
、Reconciler
),实现 Kubernetes Controller 的通用逻辑:1)监听 k8s 资源,缓存资源,并根据
EventHandler
入队事件;2)启动多个 goroutine,每个 goroutine 会从队列中获取 event,并调用
Reconciler
方法处理。
Reconciler
:状态同步的逻辑所在,是开发者需要实现的主要接口,供Controller 调用。Reconciler 的重点在于“状态同步”,由于 Reconciler 传入的参数是资源的Namespace
和Name
,而非 event,Reconciler 并非用于“处理事件”,而是根据指定资源的状态,来同步“预期集群状态”与“当前集群状态”。Webhook
:用于开发 webhook server,实现 Kubernetes Admission Webhooks 机制。Source
:source of event,Controller 从中获取 event。EventHandler
:顾名思义,event 的处理方法,决定了一个 event 是否需要入队列、如何入队列。Predicate
:相当于 event 的过滤器。
2、Controller 的生成与管理
2.1、manager
controller-runtime 库提供的第一个重要抽象是 Manager,它为在管理器内运行的所有控制器提供共享资源,包括:
读取和写入 Kubernetes 资源的 Kubernetes 客户端
用于从本地缓存中读取 Kubernetes 资源的缓存
用于注册所有 Kubernetes 本地和自定义资源的 scheme
要创建管理器,你需要使用提供的 New 函数,如下所示:
import (
"flag"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
flag.Parse() ❶
mgr, err := manager.New(
config.GetConfigOrDie(),
manager.Options{},
)
第一个参数是 rest.Config 对象,在这个例子中,选择了 controller-runtime 库提供的 GetConfigOrDie() 实用函数,而不是使用 client-go 库的函数。
GetConfigOrDie() 函数将尝试获得一个配置来连接到集群:
通过获取
--kubeconfig
标志的值,如果定义了的话,并在这个路径上读取 kubeconfig 文件。为此,首先你需要执行flag.Parse()
通过获取 KUBECONFIG 环境变量的值(如果定义了的话),并读取此路径下的 kubeconfig 文件
通过查看 in-cluster 配置,如果定义了的话
通过读取
$HOME/.kube/config
文件
如果前面的情况都不可行,该函数将使程序退出,代码为1。第二个参数是一个用于选项的结构体。
一个重要的选项是 “Scheme"。默认情况下,如果你没有为这个选项指定任何值,将使用 Client-go 库提供的 Scheme。如果控制器只需要访问本地 Kubernetes 资源,这就足够了。然而,如果你想让控制器访问自定义资源,你将需要提供一个能够解决自定义资源的 Scheme。
例如,如果你想让控制器访问自定义资源,你将需要在初始化时运行以下代码:
import (
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
mygroupv1alpha1 "github.com/myid/myresource-crd/pkg/apis/mygroup.example.com/v1alpha1"
)
scheme := runtime.NewScheme() ❶
clientgoscheme.AddToScheme(scheme) ❷
mygroupv1alpha1.AddToScheme(scheme) ❸
mgr, err := manager.New(
config.GetConfigOrDie(),
manager.Options{
Scheme: scheme, ❹
},
)
❶ 创建一个新的空 scheme。
❷ 使用 Client-go 库添加 Kubernetes 内置资源(k8s 内置资源都在 client-go/kubernetes 下面)。
❷ 将 mygroup/v1alpha1 的资源添加到 scheme 中,就是自定义资源。
❹ 在这个管理器中使用这个 scheme。
2.2、controller
第二个重要的抽象是 Controller。控制器负责实现特定 Kubernetes 资源的实例所给出的规范(Spec)。(在 operator 的情况下,自定义资源是由 operator 处理的)。
为此,控制器观察特定的资源,并接收这些资源的 watch 事件(即,创建、更新、删除)。当事件发生在资源上时,控制器用包含受事件影响的 “主要资源” 实例的名称和命名空间的请求填充一个队列。
请注意,入队的对象只是被 operator 监视的主要资源的实例。如果事件是由另一个资源的实例接收的,则通过跟踪所有者参考(ownerReference)找到主资源。例如,Deployment 控制器观察 Deployment 资源和 ReplicaSet 资源。所有 ReplicaSet 实例都包含一个指向 Deployment 实例的 ownerReference。
当 Deployment 被创建时,控制器会收到一个 Create 事件,刚刚创建的 Deployment 实例被入队
当 ReplicaSet 被修改时(例如,被一些用户修改),这个 ReplicaSet 会收到一个 Update 事件,控制器会使用 ReplicaSet 中包含的 ownerReference 找到被更新的 ReplicaSet 引用的 Deployment。然后,被引用的 Deployment 实例被入队。
控制器实现 Reconcile 方法,每当队列中出现一个 Request 时,它就会被调用。这个 Reconcile 方法接收 Request 作为参数,其中包含要调谐(Reconcile)的主要资源的名称和命名空间。
此外,由于多个事件可能在短时间内发生,并与同一主要资源有关,请求可以被批量处理,以限制入队的请求数量。
2.2.1、创建 Controller
要创建控制器,你需要使用提供的 New 函数:
import (
"sigs.k8s.io/controller-runtime/pkg/controller"
)
controller, err = controller.New(
"my-operator", mgr,
controller.Options{
Reconciler: myReconciler,
})
Reconciler 调谐器选项是必需的,其值是一个实现 Reconciler 接口的对象,定义为:
type Reconciler interface {
Reconcile(context.Context, Request) (Result, error)
}
2.2.2、监控资源
在控制器创建后,你需要向容器指出哪些资源需要观察,以及这些资源是主要资源还是被拥有的(owned)资源。
控制器上的 Watch 方法被用来添加一个 Watch。该方法定义如下:
Watch(
src source.Source,
eventhandler handler.EventHandler,
predicates ...predicate.Predicate,
) error
第一个参数表示什么是要观察的事件的来源,其类型是 source.Source
接口。controller-runtime 库为 Source 接口提供了两种实现方式:
Kind source 用于监视特定种类(kind)的 Kubernetes 对象的事件。Kind 结构体中的 Type 字段是必需的,其值是所需类型的对象。例如,如果我们想监视 Deployment,src 参数的值将是:
controller.Watch( &source.Kind{ Type: &appsv1.Deployment{}, }, ...
Channel source 是用来观察来自集群外的事件的。Channel 结构体的 Source 字段是必需的,它的值是一个发射
event.GenericEvent
类型对象的通道。
第二个参数是事件处理程序,其类型是 handler.EventHandler
接口。controller-runtime 库为 EventHandler 接口提供了两种实现方式:
EnqueueRequestForObject 事件处理程序用于控制器处理的主要资源。在这种情况下,控制器将把连接到事件的对象放入队列中。
controller.Watch( &source.Kind{ Type: &mygroupv1alpha1.MyResource{}, }, &handler.EnqueueRequestForObject{}, )
EnqueueRequestForOwner 事件处理程序用于由主资源拥有(owned)的资源。EnqueueRequestForOwner 的一个字段是必需的:OwnerType。这个字段的值是主资源类型的对象;控制器将跟踪 ownerReferences,直到找到这种类型的对象,并将这个对象放入队列。
例如,如果控制器处理 MyResource 主资源,并且正在创建 Pod 来实现 MyResource,它将希望使用这个事件处理程序来观察 Pod 资源,并指定一个 MyResource 对象作为 OwnerType。如果字段 IsController 被设置为true,控制器将只考虑
Controller: true
的 ownerReferences。controller.Watch( &source.Kind{ Type: &corev1.Pod{}, }, &handler.EnqueueRequestForOwner{ OwnerType: &mygroupv1alpha1.MyResource{}, IsController: true, }, )
第三个参数是一个可选的谓词列表,其类型为 predicate.Predicate
。controller-runtime 库为 Predicate 接口提供了几种实现:
Funcs 是最通用的实现。Funcs 的结构体定义如下:
type Funcs struct { // Create returns true if the Create event // should be processed CreateFunc func(event.CreateEvent) bool // Delete returns true if the Delete event // should be processed DeleteFunc func(event.DeleteEvent) bool // Update returns true if the Update event // should be processed UpdateFunc func(event.UpdateEvent) bool // Generic returns true if the Generic event // should be processed GenericFunc func(event.GenericEvent) bool }
你可以把这个结构体的一个实例传递给 Watch 方法,作为 Predicate。
未定义的字段将表示匹配类型的事件应该被处理。
对于非 nil 字段,匹配事件的函数将被调用(注意,当源是一个通道时,GenericFunc 将被调用;见前文),如果函数返回 true,事件将被处理。
使用 Predicate 的这种实现,你可以为每个事件类型定义一个特定的函数。
func NewPredicateFuncs( filter func(object client.Object) bool, ) Funcs
该函数接受一个 filter 函数,并返回一个 Funcs 结构体,该过滤器被应用于所有事件。使用这个函数,你可以定义一个适用于所有事件类型的单一过滤器。
ResourceVersionChangedPredicate
结构体将定义一个只用于 UpdateEvent 的 filter。使用这个 predicate ,所有的Create、Delete 和 Generic 事件将被处理,不需要过滤,而 Update 事件将被过滤,以便只处理有
metadata.resourceVersion
变化的更新。每次保存资源的新版本时,
metadata.resourceVersion
字段都会被更新,不管资源的变化是什么。GenerationChangedPredicate
结构体定义了一个只用于更新事件过滤器。使用这个谓词,所有的 Create、Delete 和 Generic 事件都将被处理,无需过滤,而 Update 事件将被过滤,因此只有具有
metadata.Generation
增量的更新才会被处理。每次发生资源的 Spec 部分的更新时,API 服务器都会按顺序递增
metadata.Generation
。请注意,有些资源并不尊重这个假设。例如,当 Annotations 字段被更新时,Deployment 的 Generation 也会被递增。
对于自定义资源,只有当状态子资源被启用时,生成才会被递增。
AnnotationChangedPredicate
结构体定义了一个只用于更新事件过滤器。使用这个谓词,所有的创建、删除和通用事件都将被处理,而更新事件将被过滤,因此只有元数据.注释变化的更新才会被处理。
用 controller-runtime,就可以直接用 watch 监听资源变化,自动入队了,不需要像之前 client-go 那样,用 informer 的 eventhandler,还要自己写入队操作了。
2.2.3、例子
在第一个例子中,你将创建管理器和控制器。控制器将管理主要的自定义资源 MyResource,并观察这个资源以及 Pod 资源。
Reconcile 函数将只显示 MyResource 实例的命名空间和名称来进行调节。
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
mygroupv1alpha1 "github.com/myid/myresource-crd/pkg/apis/mygroup.example.com/v1alpha1"
)
func main() {
scheme := runtime.NewScheme() ❶
clientgoscheme.AddToScheme(scheme)
mygroupv1alpha1.AddToScheme(scheme)
mgr, err := manager.New( ❷
config.GetConfigOrDie(),
manager.Options{
Scheme: scheme,
},
)
panicIf(err)
controller, err := controller.New( ❸
"my-operator", mgr,
controller.Options{
Reconciler: &MyReconciler{},
})
panicIf(err)
err = controller.Watch( ❹
&source.Kind{
Type: &mygroupv1alpha1.MyResource{},
},
&handler.EnqueueRequestForObject{},
)
panicIf(err)
err = controller.Watch( ❺
&source.Kind{
Type: &corev1.Pod{},
},
&handler.EnqueueRequestForOwner{
OwnerType: &corev1.Pod{},
IsController: true,
},
)
panicIf(err)
err = mgr.Start(context.Background()) ❻
panicIf(err)
}
type MyReconciler struct{} ➐
func (o *MyReconciler) Reconcile( ➑
ctx context.Context,
r reconcile.Request,
) (reconcile.Result, error) {
fmt.Printf("reconcile %v\n", r)
return reconcile.Result{}, nil
}
// panicIf panic if err is not nil
// Please call from main only!
func panicIf(err error) {
if err != nil {
panic(err)
}
}
➊ 用本地资源和自定义资源 MyResource 创建 scheme。
➋ 使用刚刚创建的 scheme 创建管理器。
➌ 创建控制器,连接到管理器,并传递 Reconciler 实现。
➍ 开始观察作为主要资源的 MyResource 实例。
➎ 开始观察 Pod 实例,作为自有(owned)资源。
➏ 启动管理器。这个函数是长期运行的,只有在发生错误时才会返回。
➐ 实现 Reconciler 接口的类型。
➑ 实现 Reconcile 方法。这将显示要 reconcile 的实例的名称空间和名称(reconcile.Request 就是请求资源的命名空间和名称)。
当资源变化时,就会自动从队列里拿出来,以 Request 形式传入,不需要像client-go一样手动从队列里拿 key 了。
2.2.4、Controller Builder
Controller-runtime 库提出了一个 Controller Builder,使控制器的创建更加简洁。
import (
"sigs.k8s.io/controller-runtime/pkg/builder"
)
func ControllerManagedBy(m manager.Manager) *Builder
ControllerManagedBy 函数被用来启动一个新的 ControllerBuilder。构建的控制器将被添加到 m manager 中。一个流畅的接口帮助配置构建:
For(object client.Object, opts ...ForOption) *Builder
- 该方法用于指示控制器处理的主要资源。它只能被调用一次,因为一个控制器只能有一个主要资源。这将在内部调用 Watch 函数与事件处理程序 EnqueueRequestForObject 。可以用 WithPredicates 函数为这个 watch 添加谓词(Predicates),其结果实现了 ForOption 接口。
Owns(object client.Object, opts ...OwnsOption) *Builder
- 这个方法用来表示控制器拥有(owned)的资源。这将在内部调用 Watch 函数与事件处理程序 EnqueueRequestForOwner。可以用 WithPredicates 函数为这个Watch添加谓词,该函数的结果实现了OwnsOption接口。
Watches(src source.Source, eventhandler handler.EventHandler, opts ...WatchesOption) *Builder
- 这个方法可以用来添加更多For或Owns方法没有涵盖的 watcher –例如,具有 Channel source 的观察者。可以用 WithPredicates 函数为该 watch 添加谓词,该函数的结果实现了 WatchesOption 接口。
WithEventFilter(p predicate.Predicate) *Builder
- 这个方法可以用来添加所有用 For、Owns 和 Watch 方法创建的观察者共有的谓词。WithOptions(options controller.Options) *Builder
- 此处设置将在内部传递给controller.New
函数的选项。WithLogConstructor(- 这设置了 logConstructor 选项。
func(*reconcile.Request) logr.Logger, ) *Builder
Named(name string) *Builder
- 这设置了构造函数的名称。它应该只使用下划线和字母数字字符。默认情况下,它是主要资源的 kind 的小写版本。build() 构建并返回控制器。
Build( r reconcile.Reconciler, ) (controller.Controller, error)
Complete(r reconcile.Reconciler) error
–这就建立了控制器。你一般不需要直接访问控制器,所以你可以使用这个不返回控制器值的方法,而不是Build。
2.2.5、controller builder 的例子
在这个例子中,你将使用 ControllerBuilder 构建控制器,而不是使用 Controller.New 函数和控制器上的 Watch 方法。
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
mygroupv1alpha1 "github.com/feloy/myresource-crd/pkg/apis/mygroup.example.com/v1alpha1"
)
func main() {
scheme := runtime.NewScheme()
clientgoscheme.AddToScheme(scheme)
mygroupv1alpha1.AddToScheme(scheme)
mgr, err := manager.New(
config.GetConfigOrDie(),
manager.Options{
Scheme: scheme,
},
)
panicIf(err)
err = builder.
ControllerManagedBy(mgr).
For(&mygroupv1alpha1.MyResource{}).
Owns(&corev1.Pod{}).
Complete(&MyReconciler{})
panicIf(err)
err = mgr.Start(context.Background())
panicIf(err)
}
type MyReconciler struct {}
func (a *MyReconciler) Reconcile(
ctx context.Context,
req reconcile.Request,
) (reconcile.Result, error) {
fmt.Printf("reconcile %v\n", req)
return reconcile.Result{}, nil
}
func panicIf(err error) {
if err != nil {
panic(err)
}
}
2.3、将 manager 资源注入 Reconciler 中
管理器为控制器提供共享资源,包括读取和写入 Kubernetes 资源的客户端、从本地缓存中读取资源的缓存以及解析资源的方案。Reconcile 函数需要访问这些共享资源。有两种方法来共享它们:
2.3.1、创建 Reconciler 结构体时传递数值
当控制器被创建时,你正在传递一个 Reconcile 结构体的实例,实现 Reconciler 接口:
type MyReconciler struct {}
err = builder.
ControllerManagedBy(mgr).
For(&mygroupv1alpha1.MyResource{}).
Owns(&corev1.Pod{}).
Complete(&MyReconciler{})
在这之前,管理器已经被创建,你可以使用管理器上的 Getters 来访问共享资源。
作为例子,下面是如何从新创建的管理器中获取客户端、缓存和 schema :
mgr, err := manager.New(
config.GetConfigOrDie(),
manager.Options{
manager.Options{
Scheme: scheme,
},
},
)
// handle err
mgrClient := mgr.GetClient()
mgrCache := mgr.GetCache()
mgrScheme := mgr.GetScheme()
你可以在 Reconciler 结构体中添加字段来传递这些值:
type MyReconciler struct {
client client.Client
cache cache.Cache
scheme *runtime.Scheme
}
最后,你可以在创建控制器的过程中传递这些值:
err = builder.
ControllerManagedBy(mgr).
For(&mygroupv1alpha1.MyResource{}).
Owns(&corev1.Pod{}).
Complete(&MyReconciler{
client: mgr.GetClient(),
cache: mgr.GetCache(),
scheme: mgr.GetScheme(),
})
2.3.2、使用 Injector
controller-runtime 库提供了一个注入器(Injector)系统,用于将共享资源注入 Reconcilers,以及其他结构体,如你自己实现的 Sources、EventHandlers 和 Predicates。
Reconciler 的实现需要实现 inject 包中的特定 Injector 接口:inject.Client
、inject.Cache
、inject.Scheme
,等等。
这些方法将在初始化时被调用,当你调用 controller.New
或 builder.Complete
。为此,需要为每个接口创建一个方法,例如:
type MyReconciler struct {
client client.Client
cache cache.Cache
scheme *runtime.Scheme
}
func (a *MyReconciler) InjectClient(c client.Client) error {
a.client = c
return nil
}
func (a *MyReconciler) InjectCache(c cache.Cache) error {
a.cache = c
return nil
}
func (a *MyReconciler) InjectScheme(s *runtime.Scheme) error {
a.scheme = s
return nil
}
3、使用客户端
客户端可以用来读取和写入集群上的资源,并更新资源的状态。
Read 方法在内部使用一个基于 Informers 和 Listers 的 Cache 系统,以限制对 API 服务器的读取访问。使用这个缓存,同一个管理器的所有控制器都有对资源的读取权限,同时限制对 API 服务器的请求。
必须注意:
由读操作返回的对象是指向缓存的值的指针。你决不能直接修改这些对象。相反,你必须在修改这些对象之前创建一个返回对象的深度拷贝。
3.1、获取资源信息
Get 方法用来获取资源的信息。
Get(
ctx context.Context,
key ObjectKey,
obj Object,
opts ...GetOption,
) error
它需要一个 ObjectKey 值作为参数,表示资源的名称空间和名称,以及一个Object,表示要获得的资源的类型,并存储结果。Object 必须是一个指向类型资源的指针-例如,一个 Pod 或 MyResource 结构体。ObjectKey 类型是 type.NamespacedName
的别名,定义在 API Machinery 库中。
NamespacedName 也是嵌入在作为参数传递给 Reconcile 函数的 Request 中的对象的类型。你可以直接将 req.NamespacedName 作为 ObjectKey 传递,以获得要调和(reconcile)的资源。例如,使用下面的方法来获取要调和(reconcile)的资源:
myresource := mygroupv1alpha1.MyResource{}
err := a.client.Get(
ctx,
req.NamespacedName,
&myresource,
)
可以向 Get 请求传递一个特定的 resourceVersion
值,传递一个 client.GetOptions
结构体实例作为最后一个参数。
GetOptions 结构体实现了 GetOption 接口,包含一个具有 metav1.GetOptions
值的单一Raw字段。例如,指定一个值为 “0 " 的 resourceVersion 来获取资源的任何版本:
err := a.client.Get(
ctx,
req.NamespacedName,
&myresource,
&client.GetOptions{
Raw: &metav1.GetOptions{
ResourceVersion: "0",
},
},
)
3.2、列出资源
List 方法用于列出特定种类的资源:
List(
ctx context.Context,
list ObjectList,
opts ...ListOption,
) error
list 参数是一个 ObjectList 值,表示要列出并存储结果的资源的种类(kind)。默认情况下,list 是在所有命名空间中进行的。
List 方法接受实现 ListOption 接口的对象的零个或多个参数。这些类型由以下支持:
InNamespace,string 的别名,用于返回特定命名空间的资源。
MatchingLabels,
map[string]string
的别名,用来表示标签的列表和它们的精确值,这些标签必须被定义为资源的返回。下面的例子建立了一个MatchingLabels 结构体来过滤带有标签 “app=myapp” 的资源。matchLabel := client.MatchingLabels{ "app": "myapp", }
HasLabels,别名为
[]string
,用来表示标签的列表,独立于它们的值,必须为一个资源的返回而定义。下面的例子建立了一个 HasLabels 结构体来过滤带有 “app” 和 “debug” 标签的资源。hasLabels := client.HasLabels{"app", “debug”}
MatchingLabelsSelector,嵌入了一个
labels.Selector
接口,用来传递更高级的标签选择器。关于如何建立一个选择器的更多信息,请参见第6章的 “过滤列表结果” 部分。下面的例子建立了一个 MatchingLabelsSelector 结构,它可以作为 List 的一个选项来过滤标签 mykey 不同于 ignore 的资源。selector := labels.NewSelector() require, err := labels.NewRequirement( "mykey", selection.NotEquals, []string{"ignore"}, ) // assert err is nil selector = selector.Add(*require) labSelOption := client.MatchingLabelsSelector{ Selector: selector, }
MatchingFields 是
fields.Set
的别名,它本身是map[string]string
的别名,用来指示要匹配的字段和它们的值。下面的例子建立了一个 MatchingFields 结构体,用来过滤字段 “status.phase” 为 “Running” 的资源:matchFields := client.MatchingFields{ "status.phase": "Running", }
MatchingFieldsSelector,嵌入了一个
fields.Selector
,用来传递更高级的字段选择器。下面的例子建立了一个 MatchingFieldsSelector 结构体来过滤字段 “status.phase” 与 “Running” 不同的资源:fieldSel := fields.OneTermNotEqualSelector( "status.phase", "Running", ) fieldSelector := client.MatchingFieldsSelector{ Selector: fieldSel, }
Limit(别名 int64)和Continue(别名 string)用于对结果进行分页。
3.3、创建资源
Create 方法用来在集群中创建一个新的资源。
Create(
ctx context.Context,
obj Object,
opts ...CreateOption,
) error
作为参数传递的 obj 定义了要创建的对象的种类(kind),以及它的定义。下面的例子将在集群中创建一个 Pod:
podToCreate := corev1.Pod{ [...] }
podToCreate.SetName("nginx")
podToCreate.SetNamespace("default")
err = a.client.Create(ctx, &podToCreate)
以下选项可以作为 CreateOption 传递,以使 Create 请求参数化。
DryRunAll 值表示所有的操作都应该被执行,除了那些将资源持久化到存储的操作。
FieldOwner,别名为字符串,表示创建操作的字段管理器的名称。这个信息对于服务器端应用操作的正常工作很有用。
3.4、删除资源
删除方法是用来从集群中删除资源。
Delete(
ctx context.Context,
obj Object, k
opts ...DeleteOption,
) error
作为参数传递的 obj 定义了要删除的对象的种类(kind),以及它的命名空间(如果资源有命名空间)和它的名字。下面的例子可以用来删除 Pod。
podToDelete := corev1.Pod{}
podToDelete.SetName("nginx")
podToDelete.SetNamespace("prj2")
err = a.client.Delete(ctx, &podToDelete)
以下选项可以作为 DeleteOption 被传递,以便对 Delete 请求进行参数化。
DryRunAll - 这个值表示所有的操作都应该被执行,除了那些将资源持久化到存储的操作。
GracePeriodSeconds, alias to int64 - 这个值只在删除 pod 时有用。它表示在删除 pod 之前的持续时间(秒)。
Preconditions / 前提条件,别名
metav1.Preconditions
- 这表明你期望删除的资源。PropagationPolicy,别名
metav1.DeletionPropagation
- 这表明是否以及如何进行垃圾回收。
4、编写 reconcile 调谐
Reconcile 函数包含 Operator 的所有业务逻辑。该函数将在一个单一的资源种类上工作 – 或者说 Operator 调和这个资源 – 并且可以在其他类型的对象触发事件时得到通知,通过使用所有者引用(Owner References)将这些其他类型映射到调和的对象上。
Reconcile 函数的作用是确保系统的状态与要 Reconcile 的资源中指定的内容相匹配。为此,它将创建 “低级” 资源来实现要调和的资源。这些资源反过来将由其他控制器或 Operator 进行调和。当这些资源的调和完成后,它们的状态将被更新以反映它们的新状态。此外,Operator 将能够检测到这些变化,并相应地调整被调和资源的状态。
Reconcile 函数从队列中接收一个要调谐的资源。第一个要做的操作是获得关于这个资源的信息来进行调谐。
事实上,Request只有资源的命名空间和名称(它的种类/kind 是由它的设计知道的,是由 operator 调和的种类),但你并没有收到资源的完整定义。
Get 操作被用来获取资源的定义。
该资源可能由于各种原因被入队:它被创建、修改或删除(或者另一个拥有(owned)的资源被创建、修改或删除)。在前两种情况下(创建或修改),Get 操作将成功,Reconcile 函数将知道这时资源的定义。在删除的情况下,获取操作将以 Notfound 错误失败,因为该资源现在已经被删除了。
Operator 的良好做法是,在调和资源时为其创建的资源添加 OwnerReferences。主要目的是当这些创建的资源被修改时,能够调和其所有者,而添加这些 OwnerReferences 的结果是,当所有者资源被删除时,这些拥有(owned)的资源将被 Kubernetes 垃圾收集器删除。
出于这个原因,当一个被调和的对象被删除时,集群中没有什么可做的,因为相关的创建资源的删除将由集群处理。
如果在集群中找到了要调和的资源,operator’s 的下一步就是创建 “低级” 资源来实现这个要调和的资源。
因为 Kubernetes 是一个声明式平台,创建这些资源的好方法是声明低级资源应该是什么,与集群中存在或不存在什么无关,并依靠 Kubernetes 控制器来接管和调和这些低级资源。
由于这个原因,不可能盲目地使用 Create 方法,因为我们不确定资源是否存在,如果资源已经存在,操作就会失败。
你可以检查资源是否存在,如果不存在则创建,如果存在则修改。正如前几章所示,服务器端应用方法非常适合这种情况:在运行 Apply 操作时,如果资源不存在,就会被创建;如果资源存在,就会被修补,在资源被其他参与者修改的情况下解决冲突。
使用服务器端的 apply 方法,Operator 不需要检查资源是否存在,或是否被另一个参与者修改过。Operator 只需要从 Operator 的角度,用资源的定义运行服务器端 Apply 操作。
在应用低级别的资源后,应该考虑两种可能性。
情况1:如果低层资源已经存在,并且没有被 Apply 修改,那么将不会为这些资源触发 MODIFIED 事件,并且 Reconcile 函数将不会被再次调用(至少对于这个 Apply 操作)。
情况2:如果低层资源被创建或修改,这将触发这些资源的 CREATED 或 MODIFIED 事件,并且 Reconcile 函数将因为这些事件而被再次调用。这个函数的新执行将再次应用低层资源,如果在此期间没有更新这些资源,Operator 将落入情况1。
新的底层资源最终将由其各自的 Operator 或控制器处理。反过来,他们将调和这些资源,并更新它们的状态,宣布它们的当前状态。
一旦这些低层资源的状态被更新,这些资源的 MODIFIED 事件将被触发,Reconcile 函数将被再次调用。再一次,Operator 将应用这些资源,案例1和2必须被考虑。
Operator 在某些时候需要读取它所创建的低级资源的状态,以便计算出调和后的资源的状态。在简单的情况下,这可以在执行低级资源的服务器端应用后完成。
为了说明这一点,这里有一个完整的 Reconcile 函数,该函数适用于 Operator,该 Operator 用 MyResource 实例中提供的镜像和内存信息创建 Deployment。
func (a *MyReconciler) Reconcile(
ctx context.Context,
req reconcile.Request,
) (reconcile.Result, error) {
log := log.FromContext(ctx)
log.Info("getting myresource instance")
myresource := mygroupv1alpha1.MyResource{}
err := a.client.Get( ❶
ctx,
req.NamespacedName,
&myresource,
&client.GetOptions{},
)
if err != nil {
if errors.IsNotFound(err) { ❷
log.Info("resource is not found")
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
ownerReference := metav1.NewControllerRef( ❸
&myresource,
mygroupv1alpha1.SchemeGroupVersion.
WithKind("MyResource"),
)
err = a.applyDeployment( ❹
ctx,
&myresource,
ownerReference,
)
if err != nil {
return reconcile.Result{}, err
}
status, err := a.computeStatus(ctx, &myresource) ❺
if err != nil {
return reconcile.Result{}, err
}
myresource.Status = *status
log.Info("updating status", "state", status.State)
err = a.client.Status().Update(ctx, &myresource) ❻
if err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
❶ 获取要调和的资源的定义
❷ 如果资源不存在,立即返回
❸ 建立指向要调和的资源的 ownerReference
❹ 使用服务器端应用于 “低层次” deployment
❺ 根据 “低级别” deployment 计算资源的状态
❻ 更新资源的状态以进行调和
下面是一个为 operator 创建的 deployment 实现服务器端 Apply 操作的例子:
func (a *MyReconciler) applyDeployment(
ctx context.Context,
myres *mygroupv1alpha1.MyResource,
ownerref *metav1.OwnerReference,
) error {
deploy := createDeployment(myres, ownerref)
err := a.client.Patch( ❼
ctx,
deploy,
client.Apply,
client.FieldOwner(Name),
client.ForceOwnership,
)
return err
}
func createDeployment(
myres *mygroupv1alpha1.MyResource,
ownerref *metav1.OwnerReference,
) *appsv1.Deployment {
deploy := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"myresource": myres.GetName(),
},
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"myresource": myres.GetName(),
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"myresource": myres.GetName(),
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "main",
Image: myres.Spec.Image, ❽
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceMemory: myres.Spec.Memory, ❾
},
},
},
},
},
},
},
}
deploy.SetName(myres.GetName() + "-deployment")
deploy.SetNamespace(myres.GetNamespace())
deploy.SetGroupVersionKind(
appsv1.SchemeGroupVersion.WithKind("Deployment"),
)
deploy.SetOwnerReferences([]metav1.OwnerReference{ ❿
*ownerref,
})
return deploy
}
❼ 使用补丁方法执行服务器端的应用操作
❾ 使用资源中定义的镜像来进行调和
❾ 使用资源中定义的内存来进行调和
❿ 设置 OwnerReference 指向要调和的资源
然后,下面是一个如何实现状态的计算和更新的例子:
const (
_buildingState = "Building"
_readyState = "Ready"
)
func (a *MyReconciler) computeStatus(
ctx context.Context,
myres *mygroupv1alpha1.MyResource,
) (*mygroupv1alpha1.MyResourceStatus, error) {
logger := log.FromContext(ctx)
result := mygroupv1alpha1.MyResourceStatus{
State: _buildingState,
}
deployList := appsv1.DeploymentList{}
err := a.client.List( ⓫
ctx,
&deployList,
client.InNamespace(myres.GetNamespace()),
client.MatchingLabels{
"myresource": myres.GetName(),
},
)
if err != nil {
return nil, err
}
if len(deployList.Items) == 0 {
logger.Info("no deployment found")
return &result, nil
}
if len(deployList.Items) > 1 {
logger.Info(
"too many deployments found", "count",
len(deployList.Items),
)
return nil, fmt.Errorf(
"%d deployment found, expected 1",
len(deployList.Items),
)
}
status := deployList.Items[0].Status ⓬
logger.Info(
"got deployment status",
"status", status,
)
if status.ReadyReplicas == 1 {
result.State = _readyState ⓭
}
return &result, nil
}
⓫ 获取为该资源创建的 deployment 以进行调谐
⓬ 获取找到的唯一 deployment 的状态
⓭ 当 replicas 为1时,为调和的资源设置就绪状态