无需二次开发,Cloud Alert 快速衔接您的IT事件和钉钉通知
953
2022-11-06
Kubernetes —— ReplicaSet 控制器源码剖析
Kubernetes —— ReplicaSet 控制器源码剖析
ReplicaSet 控制器
ReplicaSet 确保在任何给定时间运行指定数量的 pod 副本,这一组 pod 副本的集合,称之为 ReplicaSet。Deployment 构建于 ReplicaSet 之上。
ReplicaSet 控制器的启动是在 controller manager 中完成
创建 ReplicaSet 控制器
源码位于 kubernetes 项目 pkg/controller/replicaset/replica_set.go
1func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, 2 gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController { 3 if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { 4 ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter()) 5 } 6 7 rsc := &ReplicaSetController{ 8 GroupVersionKind: gvk, 9 kubeClient: kubeClient,10 podControl: podControl,11 burstReplicas: burstReplicas,12 expectations: controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),13 queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),14 }1516 // ①17 rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{18 AddFunc: rsc.addRS, // ②19 UpdateFunc: rsc.updateRS,20 DeleteFunc: rsc.deleteRS,21 })2223 // ...24 // 省去部分代码25 // ...2627 rsc.syncHandler = rsc.syncReplicaSet2829 return rsc30}3132func (rsc *ReplicaSetController) addRS(obj interface{}) { // ②33 rs := obj.(*apps.ReplicaSet)34 klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)35 rsc.enqueueRS(rs) // ③36}
① 向 Informer 注册 ReplicaSet 的添加、修改及删除对应的回调函数。Informer 在 Watch 的基础之上提供了高层次的能力,如本地内存缓存和通过唯一的key来查询缓存对象。如果每次获取一个 k8s 资源对象就向 kube-api-server 发起一次接口调用,会给其带来较大的压力,缓存可以减少 API server 的压力。Infomer 可以对对象的变更进行近乎实时的响应,不需要反复去轮询资源对象的状态。Infomer 还提供了出错时的处理能力,如 Watch 的长连接断开,会通过一个新的 Watch 请求来进行尝试等^[1]^。
开发者在实现自定义的 k8s Operator 时,通常会借助 operator sdk 这样的工具来生成 Informer 脚手架代码。
② 添加 ReplicaSet 对象的回调函数。③ ReplicaSet 控制器监听到发生了 replicaset 对象被添加时,将 replicaset 对象添加至工作队列待进一步处理。
启动 ReplicaSet 控制器
1// Run begins watching and syncing. 2func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) { 3 defer utilruntime.HandleCrash() 4 defer rsc.queue.ShutDown() 5 6 controllerName := strings.ToLower(rsc.Kind) 7 klog.Infof("Starting %v controller", controllerName) 8 defer klog.Infof("Shutting down %v controller", controllerName) 910 if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {11 return12 }1314 for i := 0; i < workers; i++ {15 go wait.UntilWithContext(ctx, rsc.worker, time.Second) // ① 16 }1718 <-ctx.Done()19}
① 运行指定数量的 gorutine 并行执行 worker 逻辑。gorutine 类似协程的概念,可理解成轻量级的“线程”。
ReplicaSet 控制器 worker 逻辑
1// worker runs a worker thread that just dequeues items, processes them, and marks them done. 2// It enforces that the syncHandler is never invoked concurrently with the same key. 3func (rsc *ReplicaSetController) worker(ctx context.Context) { 4 for rsc.processNextWorkItem(ctx) { // ① 5 } 6} 7 8func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool { 9 key, quit := rsc.queue.Get() // ② 10 if quit {11 return false12 }1314 defer rsc.queue.Done(key) // ⑥ 1516 err := rsc.syncHandler(ctx, key.(string)) // ③17 if err == nil {18 rsc.queue.Forget(key) // ⑤ 19 return true20 }2122 utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))2324 rsc.queue.AddRateLimited(key) // ④ 2526 return true27}
ReplicaSet 控制器同步逻辑
1func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error { 2 startTime := time.Now() 3 defer func() { 4 klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) 5 }() 6 7 namespace, name, err := cache.SplitMetaNamespaceKey(key) 8 if err != nil { 9 return err10 }11 rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)12 if apierrors.IsNotFound(err) { // ① ReplicaSet 已删除13 klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)14 rsc.expectations.DeleteExpectations(key)15 return nil16 }17 if err != nil {18 return err19 }2021 rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)22 selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) // ② ReplicaSet 对象上定义的标签选择器,用于选择相对应的 Pod23 if err != nil {24 utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))25 return nil26 }2728 // list all pods to include the pods that don't match the rs`s selector29 // anymore but has the stale controller ref.30 // TODO: Do the List and Filter in a single pass, or use an index.31 allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) // ③ 找出所有的 Pod 用后续的过滤32 if err != nil {33 return err34 }35 // Ignore inactive pods.36 filteredPods := controller.FilterActivePods(allPods) // ④ 过滤非活动状态中的 Pod (排除 Succeeded/Failed/删除的) 3738 // NOTE: filteredPods are pointing to objects from cache - if you need to39 // modify them, you need to copy it first.40 filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods) // ⑤ 根据标签选择器筛选出来的所有 Pods41 if err != nil {42 return err43 }4445 var manageReplicasErr error46 if rsNeedsSync && rs.DeletionTimestamp == nil {47 manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs) // ⑥ 进一步调协 RepliaSet48 }49 rs = rs.DeepCopy()50 newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)5152 // Always updates status as pods come up or die.53 updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) // ⑦ 更新 ReplicaSet 状态54 if err != nil {55 // Multiple things could lead to this update failing. Requeuing the replica set ensures56 // Returning an error causes a requeue without forcing a hotloop57 return err58 }59 // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.60 if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&61 updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&62 updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {63 rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)64 }65 return manageReplicasErr66}6768func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {69 var claimed []*v1.Pod70 var errlist []error7172 match := func(obj metav1.Object) bool { 73 pod := obj.(*v1.Pod)74 // Check selector first so filters only run on potentially matching Pods.75 // ⑤ 判断 pod 是否满足标签选择器76 if !m.Selector.Matches(labels.Set(pod.Labels)) { 77 return false78 }79 // ... 省略部分代码80 return true81 }8283 // ... 省略部分代码84 for _, pod := range pods {85 ok, err := m.ClaimObject(ctx, pod, match, adopt, release)86 if err != nil {87 errlist = append(errlist, err)88 continue89 }90 if ok {91 claimed = append(claimed, pod)92 }93 }94 return claimed, utilerrors.NewAggregate(errlist)95}
① 查询不到 key 对应的 ReplicaSet 对象,表明 ReplicaSet 已被删除。这里并不能返回 err,返回 err 将会被重新加入工作队列,此时并不需要进一步处理。② ReplicaSet 对象上定义的标签选择器,用于筛选 ReplicaSet 关心的 pods。例^[2]^:
1apiVersion: apps/v1 2kind: ReplicaSet 3metadata: 4 name: frontend 5 labels: 6 app: guestbook 7 tier: frontend 8spec: 9 replicas: 310 # 选择器11 selector: 12 matchLabels:13 tier: frontend14 template:15 metadata:16 labels:17 tier: frontend18 spec:19 containers:20 - name: php-redis21 image: gcr.io/google_samples/gb-frontend:v3
③ 找出所有的 pods 用后续的筛选。④ 过滤过非活动状态中的 pod,排除Succeeded、 Failed 状态的和被删除 pod。⑤ 根据selector筛选出 ReplicaSet 相关的所有 pods。⑥ 进一步对 RepliaSet 进行同步逻辑处理。⑦ 本次同步处理结束后,计算 ReplicaSet 的状态并更新其状态。
Pod 副本管理
计算 ReplicaSet 当前 pod 副本数与期望副本数的差值。
1/ manageReplicas checks and updates replicas for the given ReplicaSet. 2// Does NOT modify
小结
参考资料
[1]《Kubernetes 编程》[2] https://kubernetes.io/zh-cn/docs/concepts/workloads/controllers/replicaset/
发表评论
暂时没有评论,来抢沙发吧~