Kubernetes资源类型发现

网友投稿 1010 2022-10-31

本站部分文章、图片属于网络上可搜索到的公开信息,均用于学习和交流用途,不能代表睿象云的观点、立场或意见。我们接受网民的监督,如发现任何违法内容或侵犯了您的权益,请第一时间联系小编邮箱jiasou666@gmail.com 处理。

Kubernetes资源类型发现

在基于Kubernetes平台开发过程中,经常需要访问不同Kuberntes版本的集群资源,这时就需要去获取所访问集群支持的资源版本信息。下面我们一起来看一下,如何来实现Kubernetes资源类型发现?文章分为下面几部分为大家介绍:1,kubectl命令查看Kubernetes集群资源2,client-go库获取Kubernetes集群资源3,controller-runtime动态获取Kubernetes集群资源kubectl命令kubectl命令行工具提供api-resources,api-versions命令来查看kubernetes集群当前支持的资源类型。kubectl api-resources查看k8s所有资源和版本列表。sukai@sukai:~$ kubectl api-resourcesNAME                              SHORTNAMES   APIVERSION                             NAMESPACED   KINDbindings                                       v1                                     true         Bindingcomponentstatuses                 cs           v1                                     false        ComponentStatusconfigmaps                        cm           v1                                     true         ConfigMapendpoints                         ep           v1                                     true         Endpointsevents                            ev           v1                                     true         Eventlimitranges                       limits       v1                                     true         LimitRangenamespaces                        ns           v1                                     false        Namespacenodes                             no           v1                                     false        Nodepersistentvolumeclaims            pvc          v1                                     true         PersistentVolumeClaimpersistentvolumes                 pv           v1                                     false        PersistentVolumepods                              po           v1                                     true         Pod...kubectl api-versions查看k8s的API组和版本列表sukai@sukai:~$ kubectl api-versionsadmissionregistration.k8s.io/v1apiextensions.k8s.io/v1apiregistration.k8s.io/v1apps/v1authentication.k8s.io/v1authorization.k8s.io/v1autoscaling/v1autoscaling/v2autoscaling/v2beta1autoscaling/v2beta2batch/v1batch/v1beta1certificates.k8s.io/v1cluster.clusterpedia.io/v1alpha2clusterpedia.io/v1beta1coordination.k8s.io/v1crd.projectcalico.org/v1dex.coreos.com/v1discovery.k8s.io/v1discovery.k8s.io/v1beta1events.k8s.io/v1events.k8s.io/v1beta1experiment.aiscope/v1alpha2flowcontrol.apiserver.k8s.io/v1beta1flowcontrol.apiserver.k8s.io/v1beta2iam.aiscope/v1alpha2networking.k8s.io/v1node.k8s.io/v1node.k8s.io/v1beta1policy/v1policy/v1beta1rbac.authorization.k8s.io/v1scheduling.k8s.io/v1storage.k8s.io/v1storage.k8s.io/v1beta1tenant.aiscope/v1alpha2traefik.containo.us/v1alpha1v1kubectl命令api-resources的代码1,创建一个discoveryclient2,discoveryclient.ServerPreferredResources()获取资源和首选版本的列表3,schema.ParseGroupVersion将"group/version"字符串转换为GroupVersion结构体,如果命令行参数指定了过滤条件,对APIResource资源进行过滤,过滤条件包括:资源组,全局资源还是命名空间资源,资源支持的动作如get,list,watch,create,update等。4,打印出资源列表// RunAPIResources does the workfunc (o *APIResourceOptions) RunAPIResources(cmd *cobra.Command, f cmdutil.Factory) error {   w := printers.GetNewTabWriter(o.Out)   defer w.Flush()   discoveryclient, err := f.ToDiscoveryClient()   if err != nil {      return err   }   if !o.Cached {      // Always request fresh data from the server      discoveryclient.Invalidate()   }   errs := []error{}   lists, err := discoveryclient.ServerPreferredResources()   if err != nil {      errs = append(errs, err)   }   resources := []groupResource{}   groupChanged := cmd.Flags().Changed("api-group")   nsChanged := cmd.Flags().Changed("namespaced")   for _, list := range lists {      if len(list.APIResources) == 0 {         continue      }      gv, err := schema.ParseGroupVersion(list.GroupVersion)      if err != nil {         continue      }      for _, resource := range list.APIResources {         if len(resource.Verbs) == 0 {            continue         }         // filter apiGroup         if groupChanged && o.APIGroup != gv.Group {            continue         }         // filter namespaced         if nsChanged && o.Namespaced != resource.Namespaced {            continue         }         // filter to resources that support the specified verbs         if len(o.Verbs) > 0 && !sets.NewString(resource.Verbs...).HasAll(o.Verbs...) {            continue         }         resources = append(resources, groupResource{            APIGroup:        gv.Group,            APIGroupVersion: gv.String(),            APIResource:     resource,         })      }   }   if o.NoHeaders == false && o.Output != "name" {      if err = printContextHeaders(w, o.Output); err != nil {         return err      }   }   sort.Stable(sortableResource{resources, o.SortBy})   for _, r := range resources {      switch o.Output {      case "name":         name := r.APIResource.Name         if len(r.APIGroup) > 0 {            name += "." + r.APIGroup         }         if _, err := fmt.Fprintf(w, "%s\n", name); err != nil {            errs = append(errs, err)         }      case "wide":         if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%v\t%s\t%v\n",            r.APIResource.Name,            strings.Join(r.APIResource.ShortNames, ","),            r.APIGroupVersion,            r.APIResource.Namespaced,            r.APIResource.Kind,            r.APIResource.Verbs); err != nil {            errs = append(errs, err)         }      case "":         if _, err := fmt.Fprintf(w, "%s\t%s\t%s\t%v\t%s\n",            r.APIResource.Name,            strings.Join(r.APIResource.ShortNames, ","),            r.APIGroupVersion,            r.APIResource.Namespaced,            r.APIResource.Kind); err != nil {            errs = append(errs, err)         }      }   }   if len(errs) > 0 {      return errors.NewAggregate(errs)   }   return nil}ToDiscoveryClient1,ToRESTConfig构造rest.Config2,配置突发流量和缓存目录,这里注释可以看到资源组请求是很频繁,需要配置速率限制,防止突发流量导致APIServer无法提供其他服务,另外这里还设置了两个本地缓存,一个用于缓存APIResource,一个用于HTTP缓存,缓存期限为10分钟。3,调用NewCachedDiscoveryClientForConfig创建CachedDiscoveryClient,CachedDiscoveryClient对DiscoveryClient做了一层封装cacheRoundTripper,cacheRoundTripper通过http.RoundTripper做本地缓存,如果本地缓存存在,那么直接返回给kubectl,不再请求k8s APIServer。// ToDiscoveryClient implements RESTClientGetter.// Expects the AddFlags method to have been called.// Returns a CachedDiscoveryInterface using a computed RESTConfig.func (f *ConfigFlags) ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error) {   config, err := f.ToRESTConfig()   if err != nil {      return nil, err   }   // The more groups you have, the more discovery requests you need to make.   // given 25 groups (our groups + a few custom resources) with one-ish version each, discovery needs to make 50 requests   // double it just so we don't end up here again for a while.  This config is only used for discovery.   config.Burst = f.discoveryBurst   cacheDir := defaultCacheDir   // retrieve a user-provided value for the "cache-dir"   // override httpCacheDir and discoveryCacheDir if user-value is given.   if f.CacheDir != nil {      cacheDir = *f.CacheDir   }   httpCacheDir := filepath.Join(cacheDir, "http")   discoveryCacheDir := computeDiscoverCacheDir(filepath.Join(cacheDir, "discovery"), config.Host)   return diskcached.NewCachedDiscoveryClientForConfig(config, discoveryCacheDir, httpCacheDir, time.Duration(10*time.Minute))}// NewCachedDiscoveryClientForConfig creates a new DiscoveryClient for the given config, and wraps// the created client in a CachedDiscoveryClient. The provided configuration is updated with a// custom transport that understands cache responses.// We receive two distinct cache directories for now, in order to preserve old behavior// which makes use of the --cache-dir flag value for storing cache data from the CacheRoundTripper,// and makes use of the hardcoded destination (~/.kube/cache/discovery/...) for storing// CachedDiscoveryClient cache data. If httpCacheDir is empty, the restconfig's transport will not// be updated with a roundtripper that understands cache responses.// If discoveryCacheDir is empty, cached server resource data will be looked up in the current directory.func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) { if len(httpCacheDir) > 0 {  // update the given restconfig with a custom roundtripper that  // understands how to handle cache responses.  config = restclient.CopyConfig(config)  config.Wrap(func(rt http.RoundTripper) http.RoundTripper {   return newCacheRoundTripper(httpCacheDir, rt)  }) } discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil {  return nil, err } return newCachedDiscoveryClient(discoveryClient, discoveryCacheDir, ttl), nil}ServerPreferredResources1,ServerGroups获取资源组列表,通过RestClient请求/api和/apis,得到k8s支持的GroupVersion列表2,fetchGroupVersionResources根据GroupVersion列表,通过RestClient并发请求/api/v1和/apis/groupVersion获取GroupVersion对应的Resources资源列表。3,将获取数据处理返回APIResourceList// ServerPreferredResources uses the provided discovery interface to look up preferred resourcesfunc ServerPreferredResources(d DiscoveryInterface) ([]*metav1.APIResourceList, error) {   serverGroupList, err := d.ServerGroups()   if err != nil {      return nil, err   }   groupVersionResources, failedGroups := fetchGroupVersionResources(d, serverGroupList)   result := []*metav1.APIResourceList{}   grVersions := map[schema.GroupResource]string{}                         // selected version of a GroupResource   grAPIResources := map[schema.GroupResource]*metav1.APIResource{}        // selected APIResource for a GroupResource   gvAPIResourceLists := map[schema.GroupVersion]*metav1.APIResourceList{} // blueprint for a APIResourceList for later grouping   for _, apiGroup := range serverGroupList.Groups {      for _, version := range apiGroup.Versions {         groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}         apiResourceList, ok := groupVersionResources[groupVersion]         if !ok {            continue         }         // create empty list which is filled later in another loop         emptyAPIResourceList := metav1.APIResourceList{            GroupVersion: version.GroupVersion,         }         gvAPIResourceLists[groupVersion] = &emptyAPIResourceList         result = append(result, &emptyAPIResourceList)         for i := range apiResourceList.APIResources {            apiResource := &apiResourceList.APIResources[i]            if strings.Contains(apiResource.Name, "/") {               continue            }            gv := schema.GroupResource{Group: apiGroup.Name, Resource: apiResource.Name}            if _, ok := grAPIResources[gv]; ok && version.Version != apiGroup.PreferredVersion.Version {               // only override with preferred version               continue            }            grVersions[gv] = version.Version            grAPIResources[gv] = apiResource         }      }   }   // group selected APIResources according to GroupVersion into APIResourceLists   for groupResource, apiResource := range grAPIResources {      version := grVersions[groupResource]      groupVersion := schema.GroupVersion{Group: groupResource.Group, Version: version}      apiResourceList := gvAPIResourceLists[groupVersion]      apiResourceList.APIResources = append(apiResourceList.APIResources, *apiResource)   }   if len(failedGroups) == 0 {      return result, nil   }   return result, &ErrGroupDiscoveryFailed{Groups: failedGroups}}// ServerGroups returns the supported groups, with information like supported versions and the// preferred version.func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) { // Get the groupVersions exposed at /api v := &metav1.APIVersions{} err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v) apiGroup := metav1.APIGroup{} if err == nil && len(v.Versions) != 0 {  apiGroup = apiVersionsToAPIGroup(v) } if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {  return nil, err } // Get the groupVersions exposed at /apis apiGroupList = &metav1.APIGroupList{} err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList) if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {  return nil, err } // to be compatible with a v1.0 server, if it's a 403 or 404, ignore and return whatever we got from /api if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {  apiGroupList = &metav1.APIGroupList{} } // prepend the group retrieved from /api to the list if not empty if len(v.Versions) != 0 {  apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...) } return apiGroupList, nil}// fetchServerResourcesForGroupVersions uses the discovery client to fetch the resources for the specified groups in parallel.func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) { groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList) failedGroups := make(map[schema.GroupVersion]error) wg := &sync.WaitGroup{} resultLock := &sync.Mutex{} for _, apiGroup := range apiGroups.Groups {  for _, version := range apiGroup.Versions {   groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}   wg.Add(1)   go func() {    defer wg.Done()    defer utilruntime.HandleCrash()    apiResourceList, err := d.ServerResourcesForGroupVersion(groupVersion.String())    // lock to record results    resultLock.Lock()    defer resultLock.Unlock()    if err != nil {     // TODO: maybe restrict this to NotFound errors     failedGroups[groupVersion] = err    }    if apiResourceList != nil {     // even in case of error, some fallback might have been returned     groupVersionResources[groupVersion] = apiResourceList    }   }()  } } wg.Wait() return groupVersionResources, failedGroups}func (d *DiscoveryClient) ServerGroups() (apiGroupList *metav1.APIGroupList, err error) { // Get the groupVersions exposed at /api v := &metav1.APIVersions{} err = d.restClient.Get().AbsPath(d.LegacyPrefix).Do(context.TODO()).Into(v) apiGroup := metav1.APIGroup{} if err == nil && len(v.Versions) != 0 {  apiGroup = apiVersionsToAPIGroup(v) } if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {  return nil, err } // Get the groupVersions exposed at /apis apiGroupList = &metav1.APIGroupList{} err = d.restClient.Get().AbsPath("/apis").Do(context.TODO()).Into(apiGroupList) if err != nil && !errors.IsNotFound(err) && !errors.IsForbidden(err) {  return nil, err } // to be compatible with a v1.0 server, if it's a 403 or 404, ignore and return whatever we got from /api if err != nil && (errors.IsNotFound(err) || errors.IsForbidden(err)) {  apiGroupList = &metav1.APIGroupList{} } // prepend the group retrieved from /api to the list if not empty if len(v.Versions) != 0 {  apiGroupList.Groups = append([]metav1.APIGroup{apiGroup}, apiGroupList.Groups...) } return apiGroupList, nil}func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) { groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList) failedGroups := make(map[schema.GroupVersion]error) wg := &sync.WaitGroup{} resultLock := &sync.Mutex{} for _, apiGroup := range apiGroups.Groups {  for _, version := range apiGroup.Versions {   groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}   wg.Add(1)   go func() {    defer wg.Done()    defer utilruntime.HandleCrash()    apiResourceList, err := d.ServerResourcesForGroupVersion(groupVersion.String())    // lock to record results    resultLock.Lock()    defer resultLock.Unlock()    if err != nil {     // TODO: maybe restrict this to NotFound errors     failedGroups[groupVersion] = err    }    if apiResourceList != nil {     // even in case of error, some fallback might have been returned     groupVersionResources[groupVersion] = apiResourceList    }   }()  } } wg.Wait() return groupVersionResources, failedGroups}client-gokubectl CachedDiscoveryClient对client-go的DiscoveryClient进行了封装,增加了Cache缓存层,减少DiscoveryClient请求资源列表时对APIServer的请求压力。client-go获取资源列表的过程包括:1,discovery.NewDiscoveryClientForConfig创建discoveryClient。2,ServerGroups返回GroupVersion列表,包括支持的资源组,支持的版本以及首选的版本,比如v1(core组),apps/v1等。3,ServerResourcesForGroupVersion返回APIResourceList,包括各个资源组版本下的资源信息,比如pods, deployments,CRD自定义资源,Aggregated API聚合API资源等。client-go还提供了GetAPIGroupResources方法,将上面步骤进行了整合,直接返回资源列表APIGroupResources,提供开发使用。那么获取了资源列表以后,可以做什么呢?在开发过程中经常需要进行资源操作,在调用API时需要GVR和GVK进行转换,或者只知道资源,不知道k8s实际资源的版本,那么就需要根据资源查询版本,然后再调用API。k8s定义了RESTMapper接口,根据资源列表APIGroupResources构建RESTMapper,提供资源映射查询,比如KindsFor根据资源GVR映射GVK,ResourcesFor根据GVR查询GVR优先级。k8s API支持多种RESTMapper,比如:DefaultRESTMapper资源无法匹配到版本时,返回runtime.Scheme定义的默认版本。MultiRESTMapper,一组RESTMapper,资源在一个RESTMapper无法匹配时,进行下一个RESTMapper匹配。PriorityRESTMapper基于某个RESTMapper封装,先从RESTMapper匹配出资源,然后根据优先级选择资源。client-go通过NewDiscoveryRESTMapper根据资源列表APIGroupResources,构造了一个PriorityRESTMapper。这个PriorityRESTMapper又是基于MultiRESTMapper进行封装的。这样就可以通过PriorityRESTMapper进行资源发现。controller-runtime下面我们再来看一下controller-runtime框架又是如何基于client-go来实现自己的资源发现的。NewDynamicRESTMapper运行时动态发现资源类型:1,NewDiscoveryClientForConfig创建DiscoveryClient类型的客户端2,构造dynamicRESTMapper,dynamicRESTMapper实现了RESTMapper接口。可以看到dynamicRESTMapper,包含了一个速率限制器,一个newMapper的函数。newMapper调用GetAPIGroupResources获取资源列表,NewDiscoveryRESTMapper根据资源列表,构造了一个dynamicRESTMapper。3,通过newMapper初始化dynamicRESTMapper.staticMapper,也就是请求了k8s APIServer获取资源并生成了dynamicRESTMapper保存到staticMapper。func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (meta.RESTMapper, error) {   client, err := discovery.NewDiscoveryClientForConfig(cfg)   if err != nil {      return nil, err   }   drm := &dynamicRESTMapper{      limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),      newMapper: func() (meta.RESTMapper, error) {         groupResources, err := restmapper.GetAPIGroupResources(client)         if err != nil {            return nil, err         }         return restmapper.NewDiscoveryRESTMapper(groupResources), nil      },   }   for _, opt := range opts {      if err = opt(drm); err != nil {         return nil, err      }   }   if !drm.lazy {      if err := drm.setStaticMapper(); err != nil {         return nil, err      }   }   return drm, nil}func (drm *dynamicRESTMapper) setStaticMapper() error { newMapper, err := drm.newMapper() if err != nil {  return err } drm.staticMapper = newMapper return nil}下面看一下dynamicRESTMapper如何进行资源的版本列表查询和组资源下资源列表查询的。流程比较简单,通过checkAndReload进行资源列表查询,如果返回NoResourceMatchError错误,那么就调用setStaticMapper重新请求k8s APIServer获取资源生成dynamicRESTMapper的staticMapper,再次进行查询,如果还查询不到那么返回错误,并且在这个过程中有了速率限制器的并发限制drm.limiter.Allow。也就是说,当资源查询不到时触发一次资源查询更新restmapper,再查一次。func (drm *dynamicRESTMapper) KindsFor(resource schema.GroupVersionResource) ([]schema.GroupVersionKind, error) {   if err := drm.init(); err != nil {      return nil, err   }   var gvks []schema.GroupVersionKind   err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {      var err error      gvks, err = drm.staticMapper.KindsFor(resource)      return err   })   return gvks, err}func (drm *dynamicRESTMapper) ResourcesFor(input schema.GroupVersionResource) ([]schema.GroupVersionResource, error) {   if err := drm.init(); err != nil {      return nil, err   }   var gvrs []schema.GroupVersionResource   err := drm.checkAndReload(&meta.NoResourceMatchError{}, func() error {      var err error      gvrs, err = drm.staticMapper.ResourcesFor(input)      return err   })   return gvrs, err}// checkAndReload attempts to call the given callback, which is assumed to be dependent// on the data in the restmapper.//// If the callback returns an error that matches the given error, it will attempt to reload// the RESTMapper's data and re-call the callback once that's occurred.// If the callback returns any other error, the function will return immediately regardless.//// It will take care of ensuring that reloads are rate-limited and that extraneous calls// aren't made. If a reload would exceed the limiters rate, it returns the error return by// the callback.// It's thread-safe, and worries about thread-safety for the callback (so the callback does// not need to attempt to lock the restmapper).func (drm *dynamicRESTMapper) checkAndReload(needsReloadErr error, checkNeedsReload func() error) error { // first, check the common path -- data is fresh enough // (use an IIFE for the lock's defer) err := func() error {  drm.mu.RLock()  defer drm.mu.RUnlock()  return checkNeedsReload() }() // NB(directxman12): `Is` and `As` have a confusing relationship -- // `Is` is like `== or does this implement .Is`, whereas `As` says // `can I type-assert into` needsReload := errors.As(err, &needsReloadErr) if !needsReload {  return err } // if the data wasn't fresh, we'll need to try and update it, so grab the lock... drm.mu.Lock() defer drm.mu.Unlock() // ... and double-check that we didn't reload in the meantime err = checkNeedsReload() needsReload = errors.As(err, &needsReloadErr) if !needsReload {  return err } // we're still stale, so grab a rate-limit token if we can... if !drm.limiter.Allow() {  // return error from static mapper here, we have refreshed often enough (exceeding rate of provided limiter)  // so that client's can handle this the same way as a "normal" NoResourceMatchError / NoKindMatchError  return err } // ...reload... if err := drm.setStaticMapper(); err != nil {  return err } // ...and return the results of the closure regardless return checkNeedsReload()}总结Kubernetes的DiscoveryClient提供了Kubernetes支持资源列表查询,k8s API提供多种RESTMapper提供资源GVR/GVK映射查询,资源版本列表查询。client-go基于DiscoveryClient提供了GetAPIGroupResources方法直接返回APIGroupResources,NewDiscoveryRESTMapper通过APIGroupResources构造PriorityRESTMapper。而controller-runtime使用client-go的GetAPIGroupResources获取资源后,构造了dynamicRESTMapper,dynamicRESTMapper增加了速率限制和查询出错重试,进行了优化。

上一篇:HTML5的优化技巧
下一篇:浅论HTML5未来发展的10大优势
相关文章

 发表评论

暂时没有评论,来抢沙发吧~