当前位置:网站首页>深入解析Kubebuilder
深入解析Kubebuilder
2022-07-07 04:45:00 【chenxy02】
參考網址:
Introduction - The Kubebuilder Book
深入解析 Kubebuilder:讓編寫 CRD 變得更簡單 - 知乎
導讀
推薦朋友們多看上面官方提供的文檔。
前面我轉發一篇博客 基於Kubebuilder開發Operator(入門使用)_chenxy02的博客-CSDN博客 記錄了對kubebuilder的入門使用,本文旨在加深對kubebuilder深圳,進一步熟悉kubebuilder的項目代碼。
核心概念
GVKs & GVRs
GVK = GroupVersionKind,GVR = GroupVersionResource
- 在編碼過程中,資源數據的存儲都是以結構體存儲(稱為Go type)
- 由於多版本version的存在(alpha1, beta1, v1等),不同版本中存儲結構體的存在著差异,但是我們都會給其相同的Kind名字(比如Deployment)
- 因此,我們編碼中只用kind名(如Deployment), 並不能准確獲取到其使用哪個版本結構體
- 所以,采用GVK獲取到一個具體的存儲結構體,也就是GVK的三個信息(group/version/kind)確定一個Go type(結構體)
如何獲取呢? —— 通過Scheme,Scheme存儲了GVK和Go Type的映射關系
- 在創建資源過程中,我們編寫yaml,提交請求:
- 編寫yaml過程中,我們會寫apiversion和kind,其實就是GVK
- 而客戶端(也就是我們)與apiserver通信是http形式,就是將請求發送到某一http path
發送到哪個http path呢?——這個http path其實就是GVR
- /apis/batch/v1/nampspaces/default/job 這個就是錶示default 命名空間的job資源
- 我們 kubectl get po 時 也是請求的路徑,也可以稱為GVR
- 其實 GVR 是由 GVK 轉化而來 —— 通過REST映射的RESTMappers實現
Scheme
每一組Controllers都需要一個Scheme,提供了Kinds與對應的Go types的映射,也就是說給定Go type就知道他的GVK,給定GVK就知道他的Go type
Manager
Kubebuilder 的核心組件,具有 3 個職責:
負責運行所有的Controllers;
初始化共享cashes,包含listAndWath功能
初始化clients用於與Api Server通信
Cache
Kubebuilder的核心組件,負責在Controller進程裏面根據Scheme同步Api Server中所有該Controller關心GVKs的GVRs,其核心是GVK->Informer的映射,Informer會負責監聽對應GVK的GVRs的創建/删除/更新操作,以觸發Controller的Reconcile邏輯。
Controller
Kubebuilder為我們生成的脚手架文件,我們只需要實現Reconcile方法即可。
Clients
在實現Controller的時候不可避免地需要對某些資源類型進行創建/删除/更新,就是通過該Clients實現的,其中查詢功能實際查詢是本地的Cache,寫操作直接訪問Api Server
Index
由於Controller經常要對Cache進行查詢,Kubebuilder提供Index utility給Cache加索引提昇查詢效率。
Finalizer
在一般情况下,如果資源被删除之後,我們雖然能够觸發删除事件,但是這個時候從Cache裏面無法讀取任何被删除對象的信息,這樣一來,導致很多垃圾清理工作因為信息不足無法進行。
K8s的Finalizer字段用於處理這種情况。在K8s中,只要對象ObjectMeta裏面的Finalizers不為空,對該對象的delete操作就會轉變為update操作,具體說就是update deletionTimestamp字段,其意義就是告訴K8s的GC“在deletionTimestamp 這個時刻之後,只要Finalizer為空,就立馬删除掉該對象“。
所以一般的使用姿勢就是在創建對象時把 Finalizers 設置好(任意 string),然後處理 DeletionTimestamp 不為空的 update 操作(實際是 delete),根據 Finalizers 的值執行完所有的 pre-delete hook(此時可以在 Cache 裏面讀取到被删除對象的任何信息)之後將 Finalizers 置為空即可。
OwnerReference
k8s GC在删除一個對象時,任何ownerReference是該對象的對象都會被清除,與此同時,kubebuilder支持所有對象的變更 都會觸發Owner對象controller的Reconcile方法。
所有概念集合在一起如下圖所示:
源碼閱讀
下面代碼來源於 基於Kubebuilder開發Operator(入門使用)_chenxy02的博客-CSDN博客
從main.go開始
Kubebuilder 創建的 main.go 是整個項目的入口,邏輯十分簡單:
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(webappv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func main() {
...
// 1、Manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "ecaf1259.my.domain",
})
...
// 2、init Reconciler(Controller)
if err = (&controllers.GuestbookReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Guestbook")
os.Exit(1)
}
...
// 3、start Manager
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
可以看到在 init方法裏面我們將webappv1注册到Scheme裏面去了,這樣一來Cache就知道watch誰了,main方法裏面的邏輯基本都是Manager的:
- 初始化了一個Manager;
- 將Manager 的 Client 傳給 Controller,並且調用 SetupWithManager 方法傳入 Manager 進行 Controller 的初始化;
- 啟動Manager
Manager初始化
Manager初始化代碼如下:
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Set default values for options fields
options = setOptionsDefaults(options)
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
clusterOptions.Scheme = options.Scheme
clusterOptions.MapperProvider = options.MapperProvider
clusterOptions.Logger = options.Logger
clusterOptions.SyncPeriod = options.SyncPeriod
clusterOptions.Namespace = options.Namespace
clusterOptions.NewCache = options.NewCache
clusterOptions.ClientBuilder = options.ClientBuilder
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
clusterOptions.DryRunClient = options.DryRunClient
clusterOptions.EventBroadcaster = options.EventBroadcaster
})
...
return &controllerManager{
cluster: cluster,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
logger: options.Logger,
elected: make(chan struct{}),
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
}, nil
}
可以看到 主要是創建Cache與Clients等等等實例:
創建Cache
Cache 初始化代碼如下:
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
Scheme: scheme,
}
}
可以看到Cache主要就是創建了InformersMap, Scheme 裏面的每個 GVK 都創建了對應的 Informer,通過 informersByGVK 這個 map 做 GVK 到 Informer 的映射,每個Informer會根據ListWatch函數對對應的GVK進行List和Watch。
創建 Clients
創建 Clients 很簡單:
// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// Create the Client for Write operations.
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return &client.DelegatingClient{
Reader: &client.DelegatingReader{
CacheReader: cache,
ClientReader: c,
},
Writer: c,
StatusClient: c,
}, nil
}
讀操作使用上面創建的 Cache,寫操作使用 K8s go-client 直連。
Controller初始化
下面看看Controller的啟動:
// SetupWithManager sets up the controller with the Manager.
func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.Guestbook{}).
Complete(r)
}
使用的中Builder模式,NewControllerManagerBy和For方法都給Builder傳參,最重要的是最後一個方法Complete,其邏輯是:
func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) {
...
// Set the Manager
if err := blder.doManager(); err != nil {
return nil, err
}
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
...
return blder.mgr, nil
}
主要是看看doController和doWatch方法:
doController方法
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
...
// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
}
// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
Log: options.Log.WithName("controller").WithName(name),
}, nil
}
該方法初始化了一個Controller,傳入了一些很重要的參數:
- Do: Reconcile 邏輯;
- Cache:找Informer注册Watch
- Queue:Watch資源的CUD事件緩存
doWatch方法
func (blder *Builder) doWatch() error {
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
// Watches the managed types
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
// Do the watch requests
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}
}
return nil
}
可以看到該方法對本Controller負責的CRD進行了watch,同時底下還會watch本CRD管理的其它資源,這個managedObjects可以通過Controller初始化Builder的Owns方法傳入,說到Watch我們關心兩個邏輯:
1、注册的handler
type EnqueueRequestForObject struct{}
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.MetaOld != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaOld.GetName(),
Namespace: evt.MetaOld.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
}
if evt.MetaNew != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaNew.GetName(),
Namespace: evt.MetaNew.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
}
}
// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
可以看到Kubebuilder為注册的Handler就是將發生變更的對象的NamespacedName入隊列,如果在Reconcile邏輯需要判斷創建/更新/删除,需要有自己的判斷邏輯。
2、注册的流程
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
...
log.Info("Starting EventSource", "controller", c.Name, "source", src)
return src.Start(evthdler, c.Queue, prct...)
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
...
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}
我們的Handler實際注册到Informer上面,這樣整個邏輯就串起來了,通過Cache我們創建了所有Scheme裏面GVKs的Informers,然後對應GVK的Controller注册了Watch Handler到對應的Informer,這樣一來對應的GVK裏面的資源有變更都會觸發Handler,將變更事件寫到Controller的事件隊列中,之後觸發我們的Reconcile方法。
边栏推荐
- A picture to understand! Why did the school teach you coding but still not
- Why does WordPress open so slowly?
- 一度辍学的数学差生,获得今年菲尔兹奖
- acwing 843. n-皇后问题
- Station B boss used my world to create convolutional neural network, Lecun forwarding! Burst the liver for 6 months, playing more than one million
- Code source de la fonction [analogique numérique] MATLAB allcycles () (non disponible avant 2021a)
- 关于01背包个人的一些理解
- ESG Global Leaders Summit | Intel Wang Rui: coping with global climate challenges with the power of science and technology
- Easycvr cannot be played using webrtc. How to solve it?
- A detailed explanation of head pose estimation [collect good articles]
猜你喜欢
这项15年前的「超前」技术设计,让CPU在AI推理中大放光彩
acwing 843. n-皇后问题
C#使用西门子S7 协议读写PLC DB块
[multi threading exercise] write a multi threading example of the producer consumer model.
How to open win11 remote desktop connection? Five methods of win11 Remote Desktop Connection
Practice Guide for interface automation testing (middle): what are the interface testing scenarios
Have you got the same "artifact" of cross architecture development praised by various industry leaders?
Introduction to the PureMVC series
DFS和BFS概念及实践+acwing 842 排列数字(dfs) +acwing 844. 走迷宫(bfs)
Video fusion cloud platform easycvr video Plaza left column list style optimization
随机推荐
Common methods of list and map
軟件測試之網站測試如何進行?測試小攻略走起!
Tiktok may launch an independent grass planting community platform: will it become the second little red book
一度辍学的数学差生,获得今年菲尔兹奖
Introduction to the PureMVC series
VM virtual machine operating system not found and NTLDR is missing
Why does WordPress open so slowly?
用CPU方案打破内存墙?学PayPal堆傲腾扩容量,漏查欺诈交易量可降至1/30
深入解析Kubebuilder
ESG Global Leaders Summit | Intel Wang Rui: coping with global climate challenges with the power of science and technology
You can't sell the used lithography machine to China! The United States unreasonably pressured the Dutch ASML, and domestic chips were suppressed again
[multi threading exercise] write a multi threading example of the producer consumer model.
Zero knowledge private application platform aleo (1) what is aleo
Mathematical analysis_ Notes_ Chapter 10: integral with parameters
[untitled]
窗口可不是什么便宜的东西
leetcode 53. Maximum Subarray 最大子数组和(中等)
组织实战攻防演练的5个阶段
EasyCVR平台接入RTMP协议,接口调用提示获取录像错误该如何解决?
Unit test asp Net MVC 4 Application - unit testing asp Net MVC 4 apps thoroughly