Kubernetes —— ReplicaSet 控制器源码剖析

2022-11-06

ReplicaSet 控制器

ReplicaSet 确保在任何给定时间运行指定数量的 pod 副本,这一组 pod 副本的集合,称之为 ReplicaSet。Deployment 构建于 ReplicaSet 之上。

ReplicaSet 控制器的启动是在 controller manager 中完成

创建 ReplicaSet 控制器

源码位于 kubernetes 项目 pkg/controller/replicaset/replica_set.go

1func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, 2    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController { 3    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { 4        ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter()) 5    } 6 7    rsc := &ReplicaSetController{ 8        GroupVersionKind: gvk, 9        kubeClient:       kubeClient,10        podControl:       podControl,11        burstReplicas:    burstReplicas,12        expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),13        queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),14    }1516  // ①17    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{18        AddFunc:    rsc.addRS, // ②19        UpdateFunc: rsc.updateRS,20        DeleteFunc: rsc.deleteRS,21    })2223  // ...24  // 省去部分代码25  // ...2627    rsc.syncHandler = rsc.syncReplicaSet2829    return rsc30}3132func (rsc *ReplicaSetController) addRS(obj interface{}) { // ②33    rs := obj.(*apps.ReplicaSet)34    klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)35    rsc.enqueueRS(rs) // ③36}

① 向 Informer 注册 ReplicaSet 的添加、修改及删除对应的回调函数。Informer 在 Watch 的基础之上提供了高层次的能力,如本地内存缓存和通过唯一的key来查询缓存对象。如果每次获取一个 k8s 资源对象就向 kube-api-server 发起一次接口调用,会给其带来较大的压力,缓存可以减少 API server 的压力。Infomer 可以对对象的变更进行近乎实时的响应,不需要反复去轮询资源对象的状态。Infomer 还提供了出错时的处理能力,如 Watch 的长连接断开,会通过一个新的 Watch 请求来进行尝试等^[1]^。

开发者在实现自定义的 k8s Operator 时,通常会借助 operator sdk 这样的工具来生成 Informer 脚手架代码。

② 添加 ReplicaSet 对象的回调函数。③ ReplicaSet 控制器监听到发生了 replicaset 对象被添加时,将 replicaset 对象添加至工作队列待进一步处理。

启动 ReplicaSet 控制器

1// Run begins watching and syncing. 2func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) { 3    defer utilruntime.HandleCrash() 4    defer rsc.queue.ShutDown() 5 6    controllerName := strings.ToLower(rsc.Kind) 7    klog.Infof("Starting %v controller", controllerName) 8    defer klog.Infof("Shutting down %v controller", controllerName) 910    if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {11        return12    }1314    for i := 0; i < workers; i++ {15        go wait.UntilWithContext(ctx, rsc.worker, time.Second) // ① 16    }1718    <-ctx.Done()19}

① 运行指定数量的 gorutine 并行执行 worker 逻辑。gorutine 类似协程的概念,可理解成轻量级的“线程”。

ReplicaSet 控制器 worker 逻辑

1// worker runs a worker thread that just dequeues items, processes them, and marks them done. 2// It enforces that the syncHandler is never invoked concurrently with the same key. 3func (rsc *ReplicaSetController) worker(ctx context.Context) { 4    for rsc.processNextWorkItem(ctx) { // ① 5    } 6} 7 8func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool { 9    key, quit := rsc.queue.Get() // ② 10    if quit {11        return false12    }1314    defer rsc.queue.Done(key)   // ⑥ 1516    err := rsc.syncHandler(ctx, key.(string)) // ③17    if err == nil {18        rsc.queue.Forget(key) // ⑤ 19        return true20    }2122    utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))2324    rsc.queue.AddRateLimited(key) // ④ 2526    return true27}

ReplicaSet 控制器同步逻辑

1func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error { 2    startTime := time.Now() 3    defer func() { 4        klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) 5    }() 6 7    namespace, name, err := cache.SplitMetaNamespaceKey(key) 8    if err != nil { 9        return err10    }11    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)12    if apierrors.IsNotFound(err) { // ① ReplicaSet 已删除13        klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)14        rsc.expectations.DeleteExpectations(key)15        return nil16    }17    if err != nil {18        return err19    }2021    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)22    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) // ② ReplicaSet 对象上定义的标签选择器,用于选择相对应的 Pod23    if err != nil {24        utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))25        return nil26    }2728    // list all pods to include the pods that don't match the rs`s selector29    // anymore but has the stale controller ref.30    // TODO: Do the List and Filter in a single pass, or use an index.31    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) // ③ 找出所有的 Pod 用后续的过滤32    if err != nil {33        return err34    }35    // Ignore inactive pods.36    filteredPods := controller.FilterActivePods(allPods) // ④ 过滤非活动状态中的 Pod (排除 Succeeded/Failed/删除的)  3738    // NOTE: filteredPods are pointing to objects from cache - if you need to39    // modify them, you need to copy it first.40    filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods) // ⑤ 根据标签选择器筛选出来的所有 Pods41    if err != nil {42        return err43    }4445    var manageReplicasErr error46    if rsNeedsSync && rs.DeletionTimestamp == nil {47        manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs) // ⑥ 进一步调协 RepliaSet48    }49    rs = rs.DeepCopy()50    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)5152    // Always updates status as pods come up or die.53    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) // ⑦ 更新 ReplicaSet 状态54    if err != nil {55        // Multiple things could lead to this update failing. Requeuing the replica set ensures56        // Returning an error causes a requeue without forcing a hotloop57        return err58    }59    // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.60    if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&61        updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&62        updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {63        rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)64    }65    return manageReplicasErr66}6768func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {69    var claimed []*v1.Pod70    var errlist []error7172    match := func(obj metav1.Object) bool { 73        pod := obj.(*v1.Pod)74        // Check selector first so filters only run on potentially matching Pods.75    // ⑤ 判断 pod 是否满足标签选择器76        if !m.Selector.Matches(labels.Set(pod.Labels)) { 77            return false78        }79        //  ... 省略部分代码80        return true81    }8283  //  ... 省略部分代码84    for _, pod := range pods {85        ok, err := m.ClaimObject(ctx, pod, match, adopt, release)86        if err != nil {87            errlist = append(errlist, err)88            continue89        }90        if ok {91            claimed = append(claimed, pod)92        }93    }94    return claimed, utilerrors.NewAggregate(errlist)95}

① 查询不到 key 对应的 ReplicaSet 对象,表明 ReplicaSet 已被删除。这里并不能返回 err,返回 err 将会被重新加入工作队列,此时并不需要进一步处理。② ReplicaSet 对象上定义的标签选择器,用于筛选 ReplicaSet 关心的 pods。例^[2]^:

1apiVersion: apps/v1 2kind: ReplicaSet 3metadata: 4  name: frontend 5  labels: 6    app: guestbook 7    tier: frontend 8spec: 9  replicas: 310  # 选择器11  selector: 12    matchLabels:13      tier: frontend14  template:15    metadata:16      labels:17        tier: frontend18    spec:19      containers:20      - name: php-redis21        image: gcr.io/google_samples/gb-frontend:v3

③ 找出所有的 pods 用后续的筛选。④ 过滤过非活动状态中的 pod,排除Succeeded、 Failed 状态的和被删除 pod。⑤ 根据selector筛选出 ReplicaSet 相关的所有 pods。⑥ 进一步对 RepliaSet 进行同步逻辑处理。⑦ 本次同步处理结束后,计算 ReplicaSet 的状态并更新其状态。

Pod 副本管理

计算 ReplicaSet 当前 pod 副本数与期望副本数的差值。

1/ manageReplicas checks and updates replicas for the given ReplicaSet. 2// Does NOT modify . 3// It will requeue the replica set in case of an error while creating/deleting pods. 4func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { 5    diff := len(filteredPods) - int(*(rs.Spec.Replicas)) // ① 计算实际 Pod 副本本与期望副本数的差值 6    rsKey, err := controller.KeyFunc(rs) 7    if err != nil { 8        utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) 9        return nil10    }11    if diff < 0 { // ② 实际 Pods 副本数少于期望副本数,创建更多的 Pod12        diff *= -113        if diff > rsc.burstReplicas {14            diff = rsc.burstReplicas15        }16        rsc.expectations.ExpectCreations(rsKey, diff)1718        klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)1920        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {  // ③ 批量创建 Pods21            err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))22            if err != nil {23                if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {24                    // if the namespace is being terminated, we don't have to do25                    // anything because any creation will fail26                    return nil27                }28            }29            return err30        })3132        // Any skipped pods that we never attempted to start shouldn't be expected.33        // The skipped pods will be retried later. The next controller resync will34        // retry the slow start process.35        if skippedPods := diff - successfulCreations; skippedPods > 0 {36            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)37            for i := 0; i < skippedPods; i++ {38                // Decrement the expected number of creates because the informer won't observe this pod39                rsc.expectations.CreationObserved(rsKey)40            }41        }42        return err43    } else if diff > 0 { // ④ 实际 Pod 副本数大于期望副本数,删除多余的 Pods44        if diff > rsc.burstReplicas {45            diff = rsc.burstReplicas46        }47        klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)4849        relatedPods, err := rsc.getIndirectlyRelatedPods(rs)50        utilruntime.HandleError(err)5152        // Choose which Pods to delete, preferring those in earlier phases of startup.53        podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff) // ⑤ 选择哪些 Pods 用于删除5455        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))5657        errCh := make(chan error, diff)58        var wg sync.WaitGroup59        wg.Add(diff)60        for _, pod := range podsToDelete {61            go func(targetPod *v1.Pod) {62                defer wg.Done()63                if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil { // ⑥ 删除 Pod64                    // Decrement the expected number of deletes because the informer won't observe this deletion65                    podKey := controller.PodKey(targetPod)66                    rsc.expectations.DeletionObserved(rsKey, podKey)67                    if !apierrors.IsNotFound(err) {68                        klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)69                        errCh <- err70                    }71                }72            }(pod)73        }74        wg.Wait()7576        select {77        case err := <-errCh:78            // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.79            if err != nil {80                return err81            }82        default:83        }84    }8586    return nil87}



