当前位置:网站首页>深入解析Kubebuilder

深入解析Kubebuilder

2022-07-07 04:45:00 chenxy02

參考網址:​​​​​​​​​​​​​​​​​

​​​​​​Introduction - The Kubebuilder Book

深入解析 Kubebuilder:讓編寫 CRD 變得更簡單 - 知乎

​​​​​​​導讀

推薦朋友們多看上面官方提供的文檔。

前面我轉發一篇博客 基於Kubebuilder開發Operator(入門使用)_chenxy02的博客-CSDN博客 記錄了對kubebuilder的入門使用,本文旨在加深對kubebuilder深圳,進一步熟悉kubebuilder的項目代碼。

核心概念

GVKs & GVRs

GVK = GroupVersionKind,GVR = GroupVersionResource

  • 在編碼過程中,資源數據的存儲都是以結構體存儲(稱為Go type)
    • 由於多版本version的存在(alpha1, beta1, v1等),不同版本中存儲結構體的存在著差异,但是我們都會給其相同的Kind名字(比如Deployment)
    • 因此,我們編碼中只用kind名(如Deployment), 並不能准確獲取到其使用哪個版本結構體
    • 所以,采用GVK獲取到一個具體的存儲結構體,也就是GVK的三個信息(group/version/kind)確定一個Go type(結構體)

如何獲取呢? —— 通過Scheme,Scheme存儲了GVK和Go Type的映射關系

  • 在創建資源過程中,我們編寫yaml,提交請求:
    • 編寫yaml過程中,我們會寫apiversion和kind,其實就是GVK
    • 而客戶端(也就是我們)與apiserver通信是http形式,就是將請求發送到某一http path

發送到哪個http path呢?——這個http path其實就是GVR

  • /apis/batch/v1/nampspaces/default/job 這個就是錶示default 命名空間的job資源
  • 我們 kubectl get po 時 也是請求的路徑,也可以稱為GVR
  • 其實 GVR 是由 GVK 轉化而來 —— 通過REST映射的RESTMappers實現

Scheme

每一組Controllers都需要一個Scheme,提供了Kinds與對應的Go types的映射,也就是說給定Go type就知道他的GVK,給定GVK就知道他的Go type

Manager

Kubebuilder 的核心組件,具有 3 個職責:

  • 負責運行所有的Controllers;

  • 初始化共享cashes,包含listAndWath功能

  • 初始化clients用於與Api Server通信

Cache

Kubebuilder的核心組件,負責在Controller進程裏面根據Scheme同步Api Server中所有該Controller關心GVKs的GVRs,其核心是GVK->Informer的映射,Informer會負責監聽對應GVK的GVRs的創建/删除/更新操作,以觸發Controller的Reconcile邏輯。

Controller

Kubebuilder為我們生成的脚手架文件,我們只需要實現Reconcile方法即可。

Clients

在實現Controller的時候不可避免地需要對某些資源類型進行創建/删除/更新,就是通過該Clients實現的,其中查詢功能實際查詢是本地的Cache,寫操作直接訪問Api Server

Index

由於Controller經常要對Cache進行查詢,Kubebuilder提供Index utility給Cache加索引提昇查詢效率。

Finalizer

在一般情况下,如果資源被删除之後,我們雖然能够觸發删除事件,但是這個時候從Cache裏面無法讀取任何被删除對象的信息,這樣一來,導致很多垃圾清理工作因為信息不足無法進行。

K8s的Finalizer字段用於處理這種情况。在K8s中,只要對象ObjectMeta裏面的Finalizers不為空,對該對象的delete操作就會轉變為update操作,具體說就是update deletionTimestamp字段,其意義就是告訴K8s的GC“在deletionTimestamp 這個時刻之後,只要Finalizer為空,就立馬删除掉該對象“。

所以一般的使用姿勢就是在創建對象時把 Finalizers 設置好(任意 string),然後處理 DeletionTimestamp 不為空的 update 操作(實際是 delete),根據 Finalizers 的值執行完所有的 pre-delete hook(此時可以在 Cache 裏面讀取到被删除對象的任何信息)之後將 Finalizers 置為空即可。

OwnerReference

k8s GC在删除一個對象時,任何ownerReference是該對象的對象都會被清除,與此同時,kubebuilder支持所有對象的變更 都會觸發Owner對象controller的Reconcile方法。

所有概念集合在一起如下圖所示:

源碼閱讀

下面代碼來源於 基於Kubebuilder開發Operator(入門使用)_chenxy02的博客-CSDN博客

從main.go開始

Kubebuilder 創建的 main.go 是整個項目的入口,邏輯十分簡單:

var (
	scheme   = runtime.NewScheme()
	setupLog = ctrl.Log.WithName("setup")
)

func init() {
	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
	utilruntime.Must(webappv1.AddToScheme(scheme))
	//+kubebuilder:scaffold:scheme
}

func main() {
    ...	

	// 1、Manager
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme:                 scheme,
		MetricsBindAddress:     metricsAddr,
		Port:                   9443,
		HealthProbeBindAddress: probeAddr,
		LeaderElection:         enableLeaderElection,
		LeaderElectionID:       "ecaf1259.my.domain",
	})
    ...

	// 2、init Reconciler(Controller)
	if err = (&controllers.GuestbookReconciler{
		Client: mgr.GetClient(),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "Guestbook")
		os.Exit(1)
	}
    ...	

	
	// 3、start Manager
	setupLog.Info("starting manager")
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
}

可以看到在 init方法裏面我們將webappv1注册到Scheme裏面去了,這樣一來Cache就知道watch誰了,main方法裏面的邏輯基本都是Manager的:

  1. 初始化了一個Manager;
  2. 將Manager 的 Client 傳給 Controller,並且調用 SetupWithManager 方法傳入 Manager 進行 Controller 的初始化;
  3. 啟動Manager

Manager初始化

Manager初始化代碼如下:

// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
	// Set default values for options fields
	options = setOptionsDefaults(options)

	cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
		clusterOptions.Scheme = options.Scheme
		clusterOptions.MapperProvider = options.MapperProvider
		clusterOptions.Logger = options.Logger
		clusterOptions.SyncPeriod = options.SyncPeriod
		clusterOptions.Namespace = options.Namespace
		clusterOptions.NewCache = options.NewCache
		clusterOptions.ClientBuilder = options.ClientBuilder
		clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
		clusterOptions.DryRunClient = options.DryRunClient
		clusterOptions.EventBroadcaster = options.EventBroadcaster
	})
	
    ...    

	return &controllerManager{
		cluster:                 cluster,
		recorderProvider:        recorderProvider,
		resourceLock:            resourceLock,
		metricsListener:         metricsListener,
		metricsExtraHandlers:    metricsExtraHandlers,
		logger:                  options.Logger,
		elected:                 make(chan struct{}),
		port:                    options.Port,
		host:                    options.Host,
		certDir:                 options.CertDir,
		leaseDuration:           *options.LeaseDuration,
		renewDeadline:           *options.RenewDeadline,
		retryPeriod:             *options.RetryPeriod,
		healthProbeListener:     healthProbeListener,
		readinessEndpointName:   options.ReadinessEndpointName,
		livenessEndpointName:    options.LivenessEndpointName,
		gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
		internalProceduresStop:  make(chan struct{}),
		leaderElectionStopped:   make(chan struct{}),
	}, nil
}

可以看到 主要是創建Cache與Clients等等等實例:

創建Cache

Cache 初始化代碼如下:

// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
	opts, err := defaultOpts(config, opts)
	if err != nil {
		return nil, err
	}
	im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
	return &informerCache{InformersMap: im}, nil
}

// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,
	scheme *runtime.Scheme,
	mapper meta.RESTMapper,
	resync time.Duration,
	namespace string) *InformersMap {

	return &InformersMap{
		structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace),
		unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
		metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace),

		Scheme: scheme,
	}
}

可以看到Cache主要就是創建了InformersMap, Scheme 裏面的每個 GVK 都創建了對應的 Informer,通過 informersByGVK 這個 map 做 GVK 到 Informer 的映射,每個Informer會根據ListWatch函數對對應的GVK進行List和Watch。

創建 Clients

創建 Clients 很簡單:

// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
    // Create the Client for Write operations.
    c, err := client.New(config, options)
    if err != nil {
        return nil, err
    }
    return &client.DelegatingClient{
        Reader: &client.DelegatingReader{
            CacheReader:  cache,
            ClientReader: c,
        },
        Writer:       c,
        StatusClient: c,
    }, nil
}

讀操作使用上面創建的 Cache,寫操作使用 K8s go-client 直連

Controller初始化

下面看看Controller的啟動:

// SetupWithManager sets up the controller with the Manager.
func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&webappv1.Guestbook{}).
		Complete(r)
}

使用的中Builder模式,NewControllerManagerBy和For方法都給Builder傳參,最重要的是最後一個方法Complete,其邏輯是:

func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) {
...
    // Set the Manager
    if err := blder.doManager(); err != nil {
        return nil, err
    }
    // Set the ControllerManagedBy
    if err := blder.doController(r); err != nil {
        return nil, err
    }
    // Set the Watch
    if err := blder.doWatch(); err != nil {
        return nil, err
    }
...
    return blder.mgr, nil
}

主要是看看doController和doWatch方法:

doController方法

func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
    ...

	// Inject dependencies into Reconciler
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}

	// Create controller with dependencies set
	return &controller.Controller{
		Do: options.Reconciler,
		MakeQueue: func() workqueue.RateLimitingInterface {
			return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
		},
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		CacheSyncTimeout:        options.CacheSyncTimeout,
		SetFields:               mgr.SetFields,
		Name:                    name,
		Log:                     options.Log.WithName("controller").WithName(name),
	}, nil
}

該方法初始化了一個Controller,傳入了一些很重要的參數:

  • Do: Reconcile 邏輯;
  • Cache:找Informer注册Watch
  • Queue:Watch資源的CUD事件緩存

doWatch方法

func (blder *Builder) doWatch() error {
	// Reconcile type
	typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
	if err != nil {
		return err
	}
	src := &source.Kind{Type: typeForSrc}
	hdler := &handler.EnqueueRequestForObject{}
	allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
	if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
		return err
	}

	// Watches the managed types
	for _, own := range blder.ownsInput {
		typeForSrc, err := blder.project(own.object, own.objectProjection)
		if err != nil {
			return err
		}
		src := &source.Kind{Type: typeForSrc}
		hdler := &handler.EnqueueRequestForOwner{
			OwnerType:    blder.forInput.object,
			IsController: true,
		}
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, own.predicates...)
		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
			return err
		}
	}

	// Do the watch requests
	for _, w := range blder.watchesInput {
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, w.predicates...)

		// If the source of this watch is of type *source.Kind, project it.
		if srckind, ok := w.src.(*source.Kind); ok {
			typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
			if err != nil {
				return err
			}
			srckind.Type = typeForSrc
		}

		if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
			return err
		}
	}
	return nil
}

可以看到該方法對本Controller負責的CRD進行了watch,同時底下還會watch本CRD管理的其它資源,這個managedObjects可以通過Controller初始化Builder的Owns方法傳入,說到Watch我們關心兩個邏輯:

1、注册的handler

type EnqueueRequestForObject struct{}
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
        ...
    q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
        Name:      evt.Meta.GetName(),
        Namespace: evt.Meta.GetNamespace(),
    }})
}
// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
    if evt.MetaOld != nil {
        q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
            Name:      evt.MetaOld.GetName(),
            Namespace: evt.MetaOld.GetNamespace(),
        }})
    } else {
        enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
    }
    if evt.MetaNew != nil {
        q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
            Name:      evt.MetaNew.GetName(),
            Namespace: evt.MetaNew.GetNamespace(),
        }})
    } else {
        enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
    }
}
// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
        ...
    q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
        Name:      evt.Meta.GetName(),
        Namespace: evt.Meta.GetNamespace(),
    }})
}

可以看到Kubebuilder為注册的Handler就是將發生變更的對象的NamespacedName入隊列,如果在Reconcile邏輯需要判斷創建/更新/删除,需要有自己的判斷邏輯。

2、注册的流程

// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
    ...
    log.Info("Starting EventSource", "controller", c.Name, "source", src)
    return src.Start(evthdler, c.Queue, prct...)
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
    ...
    is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
    return nil
}

我們的Handler實際注册到Informer上面,這樣整個邏輯就串起來了,通過Cache我們創建了所有Scheme裏面GVKs的Informers,然後對應GVK的Controller注册了Watch Handler到對應的Informer,這樣一來對應的GVK裏面的資源有變更都會觸發Handler,將變更事件寫到Controller的事件隊列中,之後觸發我們的Reconcile方法。​​​​​​​

原网站

版权声明
本文为[chenxy02]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207062213253142.html