注:本文已发布超过一年,请注意您所使用工具的相关版本是否适用
在上篇文章当中我们实现了 NodePool Operator 基本的 CURD 功能,跑了一小段时间之后除了 CURD 之外我们有了更高的需求,想知道一个节点池有多少的节点,现在的资源占比是多少,这样可以清晰的知道我们现在的水位线是多少,除此之外也想知道节点池数量发生变化的相关事件信息,什么时候节点池增加或者是减少了一个节点等。
需求 我们先整理一下需求
能够通过 kubectl get Nodepool
了解当前的节点池的以下信息
节点池的状态,是否异常 节点池现在包含多少个节点 节点池的资源情况现在有多少 CPU、Memory 能够通过事件信息得知 controller 的错误情况以及节点池内节点的变化情况
实现 Status 先修改一下 status 对象,注意要确保下面的 //+kubebuilder:subresource:status
注释存在,这个表示开启 status 子资源,status 对象修改好之后需要重新执行一遍 make install
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 type NodePoolStatus struct { Status int `json:"status"` NodeCount int `json:"nodeCount"` Allocatable corev1.ResourceList `json:"allocatable,omitempty" protobuf:"bytes,2,rep,name=allocatable,casttype=ResourceList,castkey=ResourceName"` }type NodePool struct {
然后修改 Reconcile
中的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { // ...... if len(nodes.Items) > 0 { r.Log.Info("find nodes, will merge data", "nodes", len(nodes.Items))+ pool.Status.Allocatable = corev1.ResourceList{} + pool.Status.NodeCount = len(nodes.Items) for _, n := range nodes.Items { n := n // 更新节点的标签和污点信息 err := r.Update(ctx, pool.Spec.ApplyNode(n)) if err != nil { return ctrl.Result{}, err }+ for name, quantity := range n.Status.Allocatable { + q, ok := pool.Status.Allocatable[name] + if ok { + q.Add(quantity) + pool.Status.Allocatable[name] = q + continue + } + pool.Status.Allocatable[name] = quantity + } } } // ...... + pool.Status.Status = 200 + err = r.Status().Update(ctx, pool) return ctrl.Result{}, err }
修改好了之后我们提交一个 NodePool 测试一下
1 2 3 4 5 6 7 8 9 10 11 12 apiVersion: nodes.lailin.xyz/v1 kind: NodePool metadata: name: worker spec: taints: - key: node-pool.lailin.xyz value: worker effect: NoSchedule labels: "node-pool.lailin.xyz/worker": "10" handler: runc
可以看到我们现在是有两个 worker 节点
1 2 3 4 5 ▶ kubectl get no NAME STATUS ROLES AGE VERSION kind-control-plane Ready control-plane,master 29m v1.20.2 kind-worker Ready worker 28m v1.20.2 kind-worker2 Ready worker 28m v1.20.2
然后我们看看 NodePool,可以发现已经存在了预期的 status
1 2 3 4 5 6 7 8 9 10 status: allocatable: cpu: "8" ephemeral-storage: 184026512Ki hugepages-1Gi: "0" hugepages-2Mi: "0" memory: 6129040Ki pods: "220" nodeCount: 2 status: 200
现在这样只能通过查看 yaml 详情才能看到,当 NodePool 稍微多一些的时候就不太方便,我们现在给NodePool 增加一些 kubectl 展示的列
1 2 3 4 5 +//+kubebuilder:printcolumn:JSONPath=".status.status",name=Status,type=integer +//+kubebuilder:printcolumn:JSONPath=".status.nodeCount",name=NodeCount,type=integer //+kubebuilder:object:root=true //+kubebuilder:resource:scope=Cluster //+kubebuilder:subresource:status
如上所示只需要添加好对应的注释,然后执行 make install
即可
然后再执行 kubectl get NodePool
就可以看到对应的列了
1 2 3 ▶ kubectl get NodePool NAME STATUS NODECOUNT worker 200 2
Event 我们在 controller 当中添加 Recorder
用来记录事件,K8s 中事件有 Normal 和 Warning 两种类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 // NodePoolReconciler reconciles a NodePool object type NodePoolReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme+ Recorder record.EventRecorder } func (r *NodePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // 添加测试事件 + r.Recorder.Event(pool, corev1.EventTypeNormal, "test", "test") pool.Status.Status = 200 err = r.Status().Update(ctx, pool) return ctrl.Result{}, err }
添加好之后还需要在 main.go
中加上 Recorder
的初始化逻辑
1 2 3 4 5 6 7 8 9 if err = (&controllers.NodePoolReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("NodePool"), Scheme: mgr.GetScheme(),+ Recorder: mgr.GetEventRecorderFor("NodePool"), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "NodePool") os.Exit(1) }
加好之后我们运行一下,然后在 describe Nodepool 对象就能看到事件信息了
1 2 3 4 Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal test 4s NodePool test
监听更多资源 之前我们所有的代码都是围绕着 NodePool 的变化来展开的,但是我们如果修改了 Node 的相关标签,将 Node 添加到一个 NodePool,Node 上对应的属性和 NodePool 的 status 信息也不会改变。如果我们想要实现上面的效果就需要监听更多的资源变化。
在 controller 当中我们可以看到一个 SetupWithManager
方法,这个方法说明了我们需要监听哪些资源的变化
1 2 3 4 5 6 func (r *NodePoolReconciler) SetupWithManager (mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&nodesv1.NodePool{}). Complete(r) }
其中 NewControllerManagedBy
是一个建造者模式,返回的是一个 builder 对象,其包含了用于构建的 For
、Owns
、Watches
、WithEventFilter
等方法
这里我们就可以利用 ``Watches方法来监听 Node 的变化,我们这里使用
handler.Funcs`自定义了一个入队器
监听 Node 对象的更新事件,如果存在和 NodePool 关联的 node 对象更新就把对应的 NodePool 入队
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 func (r *NodePoolReconciler) SetupWithManager (mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&nodesv1.NodePool{}). Watches(&source.Kind{Type: &corev1.Node{}}, handler.Funcs{UpdateFunc: r.nodeUpdateHandler}). Complete(r) }func (r *NodePoolReconciler) nodeUpdateHandler (e event.UpdateEvent, q workqueue.RateLimitingInterface) { ctx, cancel := context.WithTimeout(context.Background(), 5 *time.Second) defer cancel() oldPool, err := r.getNodePoolByLabels(ctx, e.ObjectOld.GetLabels()) if err != nil { r.Log.Error(err, "get node pool err" ) } if oldPool != nil { q.Add(reconcile.Request{ NamespacedName: types.NamespacedName{Name: oldPool.Name}, }) } newPool, err := r.getNodePoolByLabels(ctx, e.ObjectNew.GetLabels()) if err != nil { r.Log.Error(err, "get node pool err" ) } if newPool != nil { q.Add(reconcile.Request{ NamespacedName: types.NamespacedName{Name: newPool.Name}, }) } }func (r *NodePoolReconciler) getNodePoolByLabels (ctx context.Context, labels map [string ]string ) (*nodesv1.NodePool, error) { pool := &nodesv1.NodePool{} for k := range labels { ss := strings.Split(k, "node-role.kubernetes.io/" ) if len (ss) != 2 { continue } err := r.Client.Get(ctx, types.NamespacedName{Name: ss[1 ]}, pool) if err == nil { return pool, nil } if client.IgnoreNotFound(err) != nil { return nil , err } } return nil , nil }
总结 今天我们完善了 status & event 和自定义对象 watch 下一篇我们看一下如何对我们的 Operator 进行测试
关注我获取更新 猜你喜欢