Kubernetes —— ReplicaSet 控制器源码剖析

网友投稿 953 2022-11-06

本站部分文章、图片属于网络上可搜索到的公开信息,均用于学习和交流用途,不能代表睿象云的观点、立场或意见。我们接受网民的监督,如发现任何违法内容或侵犯了您的权益,请第一时间联系小编邮箱jiasou666@gmail.com 处理。

Kubernetes —— ReplicaSet 控制器源码剖析

Kubernetes —— ReplicaSet 控制器源码剖析

ReplicaSet 控制器

ReplicaSet 确保在任何给定时间运行指定数量的 pod 副本,这一组 pod 副本的集合,称之为 ReplicaSet。Deployment 构建于 ReplicaSet 之上。

ReplicaSet 控制器的启动是在 controller manager 中完成

创建 ReplicaSet 控制器

源码位于 kubernetes 项目 pkg/controller/replicaset/replica_set.go

1func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, 2    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController { 3    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil { 4        ratelimiter.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter()) 5    } 6 7    rsc := &ReplicaSetController{ 8        GroupVersionKind: gvk, 9        kubeClient:       kubeClient,10        podControl:       podControl,11        burstReplicas:    burstReplicas,12        expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),13        queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),14    }1516  // ①17    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{18        AddFunc:    rsc.addRS, // ②19        UpdateFunc: rsc.updateRS,20        DeleteFunc: rsc.deleteRS,21    })2223  // ...24  // 省去部分代码25  // ...2627    rsc.syncHandler = rsc.syncReplicaSet2829    return rsc30}3132func (rsc *ReplicaSetController) addRS(obj interface{}) { // ②33    rs := obj.(*apps.ReplicaSet)34    klog.V(4).Infof("Adding %s %s/%s", rsc.Kind, rs.Namespace, rs.Name)35    rsc.enqueueRS(rs) // ③36}

① 向 Informer 注册 ReplicaSet 的添加、修改及删除对应的回调函数。Informer 在 Watch 的基础之上提供了高层次的能力,如本地内存缓存和通过唯一的key来查询缓存对象。如果每次获取一个 k8s 资源对象就向 kube-api-server 发起一次接口调用,会给其带来较大的压力,缓存可以减少 API server 的压力。Infomer 可以对对象的变更进行近乎实时的响应,不需要反复去轮询资源对象的状态。Infomer 还提供了出错时的处理能力,如 Watch 的长连接断开,会通过一个新的 Watch 请求来进行尝试等^[1]^。

开发者在实现自定义的 k8s Operator 时,通常会借助 operator sdk 这样的工具来生成 Informer 脚手架代码。

② 添加 ReplicaSet 对象的回调函数。③ ReplicaSet 控制器监听到发生了 replicaset 对象被添加时,将 replicaset 对象添加至工作队列待进一步处理。

启动 ReplicaSet 控制器

1// Run begins watching and syncing. 2func (rsc *ReplicaSetController) Run(ctx context.Context, workers int) { 3    defer utilruntime.HandleCrash() 4    defer rsc.queue.ShutDown() 5 6    controllerName := strings.ToLower(rsc.Kind) 7    klog.Infof("Starting %v controller", controllerName) 8    defer klog.Infof("Shutting down %v controller", controllerName) 910    if !cache.WaitForNamedCacheSync(rsc.Kind, ctx.Done(), rsc.podListerSynced, rsc.rsListerSynced) {11        return12    }1314    for i := 0; i < workers; i++ {15        go wait.UntilWithContext(ctx, rsc.worker, time.Second) // ① 16    }1718    <-ctx.Done()19}

① 运行指定数量的 gorutine 并行执行 worker 逻辑。gorutine 类似协程的概念,可理解成轻量级的“线程”。

ReplicaSet 控制器 worker 逻辑

1// worker runs a worker thread that just dequeues items, processes them, and marks them done. 2// It enforces that the syncHandler is never invoked concurrently with the same key. 3func (rsc *ReplicaSetController) worker(ctx context.Context) { 4    for rsc.processNextWorkItem(ctx) { // ① 5    } 6} 7 8func (rsc *ReplicaSetController) processNextWorkItem(ctx context.Context) bool { 9    key, quit := rsc.queue.Get() // ② 10    if quit {11        return false12    }1314    defer rsc.queue.Done(key)   // ⑥ 1516    err := rsc.syncHandler(ctx, key.(string)) // ③17    if err == nil {18        rsc.queue.Forget(key) // ⑤ 19        return true20    }2122    utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err))2324    rsc.queue.AddRateLimited(key) // ④ 2526    return true27}

ReplicaSet 控制器同步逻辑

1func (rsc *ReplicaSetController) syncReplicaSet(ctx context.Context, key string) error { 2    startTime := time.Now() 3    defer func() { 4        klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) 5    }() 6 7    namespace, name, err := cache.SplitMetaNamespaceKey(key) 8    if err != nil { 9        return err10    }11    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name)12    if apierrors.IsNotFound(err) { // ① ReplicaSet 已删除13        klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key)14        rsc.expectations.DeleteExpectations(key)15        return nil16    }17    if err != nil {18        return err19    }2021    rsNeedsSync := rsc.expectations.SatisfiedExpectations(key)22    selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) // ② ReplicaSet 对象上定义的标签选择器,用于选择相对应的 Pod23    if err != nil {24        utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector for rs %v/%v: %v", namespace, name, err))25        return nil26    }2728    // list all pods to include the pods that don't match the rs`s selector29    // anymore but has the stale controller ref.30    // TODO: Do the List and Filter in a single pass, or use an index.31    allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) // ③ 找出所有的 Pod 用后续的过滤32    if err != nil {33        return err34    }35    // Ignore inactive pods.36    filteredPods := controller.FilterActivePods(allPods) // ④ 过滤非活动状态中的 Pod (排除 Succeeded/Failed/删除的)  3738    // NOTE: filteredPods are pointing to objects from cache - if you need to39    // modify them, you need to copy it first.40    filteredPods, err = rsc.claimPods(ctx, rs, selector, filteredPods) // ⑤ 根据标签选择器筛选出来的所有 Pods41    if err != nil {42        return err43    }4445    var manageReplicasErr error46    if rsNeedsSync && rs.DeletionTimestamp == nil {47        manageReplicasErr = rsc.manageReplicas(ctx, filteredPods, rs) // ⑥ 进一步调协 RepliaSet48    }49    rs = rs.DeepCopy()50    newStatus := calculateStatus(rs, filteredPods, manageReplicasErr)5152    // Always updates status as pods come up or die.53    updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) // ⑦ 更新 ReplicaSet 状态54    if err != nil {55        // Multiple things could lead to this update failing. Requeuing the replica set ensures56        // Returning an error causes a requeue without forcing a hotloop57        return err58    }59    // Resync the ReplicaSet after MinReadySeconds as a last line of defense to guard against clock-skew.60    if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 &&61        updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) &&62        updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) {63        rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second)64    }65    return manageReplicasErr66}6768func (m *PodControllerRefManager) ClaimPods(ctx context.Context, pods []*v1.Pod, filters ...func(*v1.Pod) bool) ([]*v1.Pod, error) {69    var claimed []*v1.Pod70    var errlist []error7172    match := func(obj metav1.Object) bool { 73        pod := obj.(*v1.Pod)74        // Check selector first so filters only run on potentially matching Pods.75    // ⑤ 判断 pod 是否满足标签选择器76        if !m.Selector.Matches(labels.Set(pod.Labels)) { 77            return false78        }79        //  ... 省略部分代码80        return true81    }8283  //  ... 省略部分代码84    for _, pod := range pods {85        ok, err := m.ClaimObject(ctx, pod, match, adopt, release)86        if err != nil {87            errlist = append(errlist, err)88            continue89        }90        if ok {91            claimed = append(claimed, pod)92        }93    }94    return claimed, utilerrors.NewAggregate(errlist)95}

① 查询不到 key 对应的 ReplicaSet 对象,表明 ReplicaSet 已被删除。这里并不能返回 err,返回 err 将会被重新加入工作队列,此时并不需要进一步处理。② ReplicaSet 对象上定义的标签选择器,用于筛选 ReplicaSet 关心的 pods。例^[2]^:

1apiVersion: apps/v1 2kind: ReplicaSet 3metadata: 4  name: frontend 5  labels: 6    app: guestbook 7    tier: frontend 8spec: 9  replicas: 310  # 选择器11  selector: 12    matchLabels:13      tier: frontend14  template:15    metadata:16      labels:17        tier: frontend18    spec:19      containers:20      - name: php-redis21        image: gcr.io/google_samples/gb-frontend:v3

③ 找出所有的 pods 用后续的筛选。④ 过滤过非活动状态中的 pod,排除Succeeded、 Failed 状态的和被删除 pod。⑤ 根据selector筛选出 ReplicaSet 相关的所有 pods。⑥ 进一步对 RepliaSet 进行同步逻辑处理。⑦ 本次同步处理结束后,计算 ReplicaSet 的状态并更新其状态。

Pod 副本管理

计算 ReplicaSet 当前 pod 副本数与期望副本数的差值。

1/ manageReplicas checks and updates replicas for the given ReplicaSet. 2// Does NOT modify . 3// It will requeue the replica set in case of an error while creating/deleting pods. 4func (rsc *ReplicaSetController) manageReplicas(ctx context.Context, filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { 5    diff := len(filteredPods) - int(*(rs.Spec.Replicas)) // ① 计算实际 Pod 副本本与期望副本数的差值 6    rsKey, err := controller.KeyFunc(rs) 7    if err != nil { 8        utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) 9        return nil10    }11    if diff < 0 { // ② 实际 Pods 副本数少于期望副本数,创建更多的 Pod12        diff *= -113        if diff > rsc.burstReplicas {14            diff = rsc.burstReplicas15        }16        rsc.expectations.ExpectCreations(rsKey, diff)1718        klog.V(2).InfoS("Too few replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "creating", diff)1920        successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {  // ③ 批量创建 Pods21            err := rsc.podControl.CreatePods(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))22            if err != nil {23                if apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {24                    // if the namespace is being terminated, we don't have to do25                    // anything because any creation will fail26                    return nil27                }28            }29            return err30        })3132        // Any skipped pods that we never attempted to start shouldn't be expected.33        // The skipped pods will be retried later. The next controller resync will34        // retry the slow start process.35        if skippedPods := diff - successfulCreations; skippedPods > 0 {36            klog.V(2).Infof("Slow-start failure. Skipping creation of %d pods, decrementing expectations for %v %v/%v", skippedPods, rsc.Kind, rs.Namespace, rs.Name)37            for i := 0; i < skippedPods; i++ {38                // Decrement the expected number of creates because the informer won't observe this pod39                rsc.expectations.CreationObserved(rsKey)40            }41        }42        return err43    } else if diff > 0 { // ④ 实际 Pod 副本数大于期望副本数,删除多余的 Pods44        if diff > rsc.burstReplicas {45            diff = rsc.burstReplicas46        }47        klog.V(2).InfoS("Too many replicas", "replicaSet", klog.KObj(rs), "need", *(rs.Spec.Replicas), "deleting", diff)4849        relatedPods, err := rsc.getIndirectlyRelatedPods(rs)50        utilruntime.HandleError(err)5152        // Choose which Pods to delete, preferring those in earlier phases of startup.53        podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff) // ⑤ 选择哪些 Pods 用于删除5455        rsc.expectations.ExpectDeletions(rsKey, getPodKeys(podsToDelete))5657        errCh := make(chan error, diff)58        var wg sync.WaitGroup59        wg.Add(diff)60        for _, pod := range podsToDelete {61            go func(targetPod *v1.Pod) {62                defer wg.Done()63                if err := rsc.podControl.DeletePod(ctx, rs.Namespace, targetPod.Name, rs); err != nil { // ⑥ 删除 Pod64                    // Decrement the expected number of deletes because the informer won't observe this deletion65                    podKey := controller.PodKey(targetPod)66                    rsc.expectations.DeletionObserved(rsKey, podKey)67                    if !apierrors.IsNotFound(err) {68                        klog.V(2).Infof("Failed to delete %v, decremented expectations for %v %s/%s", podKey, rsc.Kind, rs.Namespace, rs.Name)69                        errCh <- err70                    }71                }72            }(pod)73        }74        wg.Wait()7576        select {77        case err := <-errCh:78            // all errors have been reported before and they're likely to be the same, so we'll only return the first one we hit.79            if err != nil {80                return err81            }82        default:83        }84    }8586    return nil87}

小结

参考资料

[1]《Kubernetes 编程》[2] https://kubernetes.io/zh-cn/docs/concepts/workloads/controllers/replicaset/

上一篇:软件测试培训之并发性能测试前的准备工作
下一篇:软件测试培训之Bug管理的一般流程
相关文章

 发表评论

暂时没有评论,来抢沙发吧~