6. kubebuilder 实战: status & event

注:本文已发布超过一年,请注意您所使用工具的相关版本是否适用

注:本文所有示例代码都可以在 blog-code 仓库中找到

在上篇文章当中我们实现了 NodePool Operator 基本的 CURD 功能,跑了一小段时间之后除了 CURD 之外我们有了更高的需求,想知道一个节点池有多少的节点,现在的资源占比是多少,这样可以清晰的知道我们现在的水位线是多少,除此之外也想知道节点池数量发生变化的相关事件信息,什么时候节点池增加或者是减少了一个节点等。

需求

我们先整理一下需求

能够通过 kubectl get Nodepool了解当前的节点池的以下信息

  • 节点池的状态,是否异常
  • 节点池现在包含多少个节点
  • 节点池的资源情况现在有多少 CPU、Memory

能够通过事件信息得知 controller 的错误情况以及节点池内节点的变化情况

实现

Status

先修改一下 status 对象,注意要确保下面的 //+kubebuilder:subresource:status注释存在,这个表示开启 status 子资源,status 对象修改好之后需要重新执行一遍 make install

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// NodePoolStatus defines the observed state of NodePool
type NodePoolStatus struct {
// status=200 说明正常,其他情况为异常情况
Status int `json:"status"`

// 节点的数量
NodeCount int `json:"nodeCount"`

// 允许被调度的容量
Allocatable corev1.ResourceList `json:"allocatable,omitempty" protobuf:"bytes,2,rep,name=allocatable,casttype=ResourceList,castkey=ResourceName"`
}

//+kubebuilder:object:root=true
//+kubebuilder:resource:scope=Cluster
//+kubebuilder:subresource:status

// NodePool is the Schema for the nodepools API
type NodePool struct {

然后修改 Reconcile 中的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// ......

if len(nodes.Items) > 0 {
r.Log.Info("find nodes, will merge data", "nodes", len(nodes.Items))
+ pool.Status.Allocatable = corev1.ResourceList{}
+ pool.Status.NodeCount = len(nodes.Items)
for _, n := range nodes.Items {
n := n

// 更新节点的标签和污点信息
err := r.Update(ctx, pool.Spec.ApplyNode(n))
if err != nil {
return ctrl.Result{}, err
}

+ for name, quantity := range n.Status.Allocatable {
+ q, ok := pool.Status.Allocatable[name]
+ if ok {
+ q.Add(quantity)
+ pool.Status.Allocatable[name] = q
+ continue
+ }
+ pool.Status.Allocatable[name] = quantity
+ }
}
}

// ......

+ pool.Status.Status = 200
+ err = r.Status().Update(ctx, pool)
return ctrl.Result{}, err
}

修改好了之后我们提交一个 NodePool 测试一下

1
2
3
4
5
6
7
8
9
10
11
12
apiVersion: nodes.lailin.xyz/v1
kind: NodePool
metadata:
name: worker
spec:
taints:
- key: node-pool.lailin.xyz
value: worker
effect: NoSchedule
labels:
"node-pool.lailin.xyz/worker": "10"
handler: runc

可以看到我们现在是有两个 worker 节点

1
2
3
4
5
▶ kubectl get no 
NAME STATUS ROLES AGE VERSION
kind-control-plane Ready control-plane,master 29m v1.20.2
kind-worker Ready worker 28m v1.20.2
kind-worker2 Ready worker 28m v1.20.2

然后我们看看 NodePool,可以发现已经存在了预期的 status

1
2
3
4
5
6
7
8
9
10
status:
allocatable:
cpu: "8"
ephemeral-storage: 184026512Ki
hugepages-1Gi: "0"
hugepages-2Mi: "0"
memory: 6129040Ki
pods: "220"
nodeCount: 2
status: 200

现在这样只能通过查看 yaml 详情才能看到,当 NodePool 稍微多一些的时候就不太方便,我们现在给NodePool 增加一些 kubectl 展示的列

1
2
3
4
5
+//+kubebuilder:printcolumn:JSONPath=".status.status",name=Status,type=integer
+//+kubebuilder:printcolumn:JSONPath=".status.nodeCount",name=NodeCount,type=integer
//+kubebuilder:object:root=true
//+kubebuilder:resource:scope=Cluster
//+kubebuilder:subresource:status

如上所示只需要添加好对应的注释,然后执行 make install即可

然后再执行 kubectl get NodePool 就可以看到对应的列了

1
2
3
▶ kubectl get NodePool 
NAME STATUS NODECOUNT
worker 200 2

Event

我们在 controller 当中添加 Recorder 用来记录事件,K8s 中事件有 Normal 和 Warning 两种类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// NodePoolReconciler reconciles a NodePool object
type NodePoolReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
+ Recorder record.EventRecorder
}

func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {

+ // 添加测试事件
+ r.Recorder.Event(pool, corev1.EventTypeNormal, "test", "test")

pool.Status.Status = 200
err = r.Status().Update(ctx, pool)
return ctrl.Result{}, err
}


添加好之后还需要在 main.go 中加上 Recorder的初始化逻辑

1
2
3
4
5
6
7
8
9
if err = (&controllers.NodePoolReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("NodePool"),
Scheme: mgr.GetScheme(),
+ Recorder: mgr.GetEventRecorderFor("NodePool"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NodePool")
os.Exit(1)
}

加好之后我们运行一下,然后在 describe Nodepool 对象就能看到事件信息了

1
2
3
4
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal test 4s NodePool test

监听更多资源

之前我们所有的代码都是围绕着 NodePool 的变化来展开的,但是我们如果修改了 Node 的相关标签,将 Node 添加到一个 NodePool,Node 上对应的属性和 NodePool 的 status 信息也不会改变。如果我们想要实现上面的效果就需要监听更多的资源变化。

在 controller 当中我们可以看到一个 SetupWithManager方法,这个方法说明了我们需要监听哪些资源的变化

1
2
3
4
5
6
// SetupWithManager sets up the controller with the Manager.
func (r *NodePoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&nodesv1.NodePool{}).
Complete(r)
}

其中 NewControllerManagedBy是一个建造者模式,返回的是一个 builder 对象,其包含了用于构建的 ForOwnsWatchesWithEventFilter等方法

这里我们就可以利用 ``Watches方法来监听 Node 的变化,我们这里使用handler.Funcs`自定义了一个入队器

监听 Node 对象的更新事件,如果存在和 NodePool 关联的 node 对象更新就把对应的 NodePool 入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// SetupWithManager sets up the controller with the Manager.
func (r *NodePoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&nodesv1.NodePool{}).
Watches(&source.Kind{Type: &corev1.Node{}}, handler.Funcs{UpdateFunc: r.nodeUpdateHandler}).
Complete(r)
}

func (r *NodePoolReconciler) nodeUpdateHandler(e event.UpdateEvent, q workqueue.RateLimitingInterface) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

oldPool, err := r.getNodePoolByLabels(ctx, e.ObjectOld.GetLabels())
if err != nil {
r.Log.Error(err, "get node pool err")
}
if oldPool != nil {
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{Name: oldPool.Name},
})
}

newPool, err := r.getNodePoolByLabels(ctx, e.ObjectNew.GetLabels())
if err != nil {
r.Log.Error(err, "get node pool err")
}
if newPool != nil {
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{Name: newPool.Name},
})
}
}

func (r *NodePoolReconciler) getNodePoolByLabels(ctx context.Context, labels map[string]string) (*nodesv1.NodePool, error) {
pool := &nodesv1.NodePool{}
for k := range labels {
ss := strings.Split(k, "node-role.kubernetes.io/")
if len(ss) != 2 {
continue
}
err := r.Client.Get(ctx, types.NamespacedName{Name: ss[1]}, pool)
if err == nil {
return pool, nil
}

if client.IgnoreNotFound(err) != nil {
return nil, err
}
}
return nil, nil
}

总结

今天我们完善了 status & event 和自定义对象 watch 下一篇我们看一下如何对我们的 Operator 进行测试

关注我获取更新

wechat
知乎
github

猜你喜欢


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议,转载请注明出处,禁止全文转载