当前位置:网站首页>深入解析Kubebuilder
深入解析Kubebuilder
2022-07-07 04:45:00 【chenxy02】
參考網址:
Introduction - The Kubebuilder Book
深入解析 Kubebuilder:讓編寫 CRD 變得更簡單 - 知乎
導讀
推薦朋友們多看上面官方提供的文檔。
前面我轉發一篇博客 基於Kubebuilder開發Operator(入門使用)_chenxy02的博客-CSDN博客 記錄了對kubebuilder的入門使用,本文旨在加深對kubebuilder深圳,進一步熟悉kubebuilder的項目代碼。
核心概念
GVKs & GVRs
GVK = GroupVersionKind,GVR = GroupVersionResource
- 在編碼過程中,資源數據的存儲都是以結構體存儲(稱為Go type)
- 由於多版本version的存在(alpha1, beta1, v1等),不同版本中存儲結構體的存在著差异,但是我們都會給其相同的Kind名字(比如Deployment)
- 因此,我們編碼中只用kind名(如Deployment), 並不能准確獲取到其使用哪個版本結構體
- 所以,采用GVK獲取到一個具體的存儲結構體,也就是GVK的三個信息(group/version/kind)確定一個Go type(結構體)
如何獲取呢? —— 通過Scheme,Scheme存儲了GVK和Go Type的映射關系
- 在創建資源過程中,我們編寫yaml,提交請求:
- 編寫yaml過程中,我們會寫apiversion和kind,其實就是GVK
- 而客戶端(也就是我們)與apiserver通信是http形式,就是將請求發送到某一http path
發送到哪個http path呢?——這個http path其實就是GVR
- /apis/batch/v1/nampspaces/default/job 這個就是錶示default 命名空間的job資源
- 我們 kubectl get po 時 也是請求的路徑,也可以稱為GVR
- 其實 GVR 是由 GVK 轉化而來 —— 通過REST映射的RESTMappers實現
Scheme
每一組Controllers都需要一個Scheme,提供了Kinds與對應的Go types的映射,也就是說給定Go type就知道他的GVK,給定GVK就知道他的Go type
Manager
Kubebuilder 的核心組件,具有 3 個職責:
負責運行所有的Controllers;
初始化共享cashes,包含listAndWath功能
初始化clients用於與Api Server通信
Cache
Kubebuilder的核心組件,負責在Controller進程裏面根據Scheme同步Api Server中所有該Controller關心GVKs的GVRs,其核心是GVK->Informer的映射,Informer會負責監聽對應GVK的GVRs的創建/删除/更新操作,以觸發Controller的Reconcile邏輯。
Controller
Kubebuilder為我們生成的脚手架文件,我們只需要實現Reconcile方法即可。
Clients
在實現Controller的時候不可避免地需要對某些資源類型進行創建/删除/更新,就是通過該Clients實現的,其中查詢功能實際查詢是本地的Cache,寫操作直接訪問Api Server
Index
由於Controller經常要對Cache進行查詢,Kubebuilder提供Index utility給Cache加索引提昇查詢效率。
Finalizer
在一般情况下,如果資源被删除之後,我們雖然能够觸發删除事件,但是這個時候從Cache裏面無法讀取任何被删除對象的信息,這樣一來,導致很多垃圾清理工作因為信息不足無法進行。
K8s的Finalizer字段用於處理這種情况。在K8s中,只要對象ObjectMeta裏面的Finalizers不為空,對該對象的delete操作就會轉變為update操作,具體說就是update deletionTimestamp字段,其意義就是告訴K8s的GC“在deletionTimestamp 這個時刻之後,只要Finalizer為空,就立馬删除掉該對象“。
所以一般的使用姿勢就是在創建對象時把 Finalizers 設置好(任意 string),然後處理 DeletionTimestamp 不為空的 update 操作(實際是 delete),根據 Finalizers 的值執行完所有的 pre-delete hook(此時可以在 Cache 裏面讀取到被删除對象的任何信息)之後將 Finalizers 置為空即可。
OwnerReference
k8s GC在删除一個對象時,任何ownerReference是該對象的對象都會被清除,與此同時,kubebuilder支持所有對象的變更 都會觸發Owner對象controller的Reconcile方法。
所有概念集合在一起如下圖所示:
源碼閱讀
下面代碼來源於 基於Kubebuilder開發Operator(入門使用)_chenxy02的博客-CSDN博客
從main.go開始
Kubebuilder 創建的 main.go 是整個項目的入口,邏輯十分簡單:
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(webappv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func main() {
...
// 1、Manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "ecaf1259.my.domain",
})
...
// 2、init Reconciler(Controller)
if err = (&controllers.GuestbookReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Guestbook")
os.Exit(1)
}
...
// 3、start Manager
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
可以看到在 init方法裏面我們將webappv1注册到Scheme裏面去了,這樣一來Cache就知道watch誰了,main方法裏面的邏輯基本都是Manager的:
- 初始化了一個Manager;
- 將Manager 的 Client 傳給 Controller,並且調用 SetupWithManager 方法傳入 Manager 進行 Controller 的初始化;
- 啟動Manager
Manager初始化
Manager初始化代碼如下:
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Set default values for options fields
options = setOptionsDefaults(options)
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
clusterOptions.Scheme = options.Scheme
clusterOptions.MapperProvider = options.MapperProvider
clusterOptions.Logger = options.Logger
clusterOptions.SyncPeriod = options.SyncPeriod
clusterOptions.Namespace = options.Namespace
clusterOptions.NewCache = options.NewCache
clusterOptions.ClientBuilder = options.ClientBuilder
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
clusterOptions.DryRunClient = options.DryRunClient
clusterOptions.EventBroadcaster = options.EventBroadcaster
})
...
return &controllerManager{
cluster: cluster,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
logger: options.Logger,
elected: make(chan struct{}),
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
}, nil
}
可以看到 主要是創建Cache與Clients等等等實例:
創建Cache
Cache 初始化代碼如下:
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
Scheme: scheme,
}
}
可以看到Cache主要就是創建了InformersMap, Scheme 裏面的每個 GVK 都創建了對應的 Informer,通過 informersByGVK 這個 map 做 GVK 到 Informer 的映射,每個Informer會根據ListWatch函數對對應的GVK進行List和Watch。
創建 Clients
創建 Clients 很簡單:
// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// Create the Client for Write operations.
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return &client.DelegatingClient{
Reader: &client.DelegatingReader{
CacheReader: cache,
ClientReader: c,
},
Writer: c,
StatusClient: c,
}, nil
}
讀操作使用上面創建的 Cache,寫操作使用 K8s go-client 直連。
Controller初始化
下面看看Controller的啟動:
// SetupWithManager sets up the controller with the Manager.
func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.Guestbook{}).
Complete(r)
}
使用的中Builder模式,NewControllerManagerBy和For方法都給Builder傳參,最重要的是最後一個方法Complete,其邏輯是:
func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) {
...
// Set the Manager
if err := blder.doManager(); err != nil {
return nil, err
}
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
...
return blder.mgr, nil
}
主要是看看doController和doWatch方法:
doController方法
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
...
// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
}
// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
Log: options.Log.WithName("controller").WithName(name),
}, nil
}
該方法初始化了一個Controller,傳入了一些很重要的參數:
- Do: Reconcile 邏輯;
- Cache:找Informer注册Watch
- Queue:Watch資源的CUD事件緩存
doWatch方法
func (blder *Builder) doWatch() error {
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
// Watches the managed types
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
// Do the watch requests
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}
}
return nil
}
可以看到該方法對本Controller負責的CRD進行了watch,同時底下還會watch本CRD管理的其它資源,這個managedObjects可以通過Controller初始化Builder的Owns方法傳入,說到Watch我們關心兩個邏輯:
1、注册的handler
type EnqueueRequestForObject struct{}
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.MetaOld != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaOld.GetName(),
Namespace: evt.MetaOld.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
}
if evt.MetaNew != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaNew.GetName(),
Namespace: evt.MetaNew.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
}
}
// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
可以看到Kubebuilder為注册的Handler就是將發生變更的對象的NamespacedName入隊列,如果在Reconcile邏輯需要判斷創建/更新/删除,需要有自己的判斷邏輯。
2、注册的流程
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
...
log.Info("Starting EventSource", "controller", c.Name, "source", src)
return src.Start(evthdler, c.Queue, prct...)
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
...
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}
我們的Handler實際注册到Informer上面,這樣整個邏輯就串起來了,通過Cache我們創建了所有Scheme裏面GVKs的Informers,然後對應GVK的Controller注册了Watch Handler到對應的Informer,這樣一來對應的GVK裏面的資源有變更都會觸發Handler,將變更事件寫到Controller的事件隊列中,之後觸發我們的Reconcile方法。
边栏推荐
- C # use Siemens S7 protocol to read and write PLC DB block
- leetcode 53. Maximum Subarray 最大子数组和(中等)
- 软件测试之网站测试如何进行?测试小攻略走起!
- Organize five stages of actual attack and defense drill
- Break the memory wall with CPU scheme? Learn from PayPal to expand the capacity of aoteng, and the volume of missed fraud transactions can be reduced to 1/30
- Vscode 如何使用内置浏览器?
- 掌握软件安全测试方法秘笈,安全测试报告信手捏来
- Data security -- 12 -- Analysis of privacy protection
- Video fusion cloud platform easycvr video Plaza left column list style optimization
- Introduction to namespace Basics
猜你喜欢
[multi threading exercise] write a multi threading example of the producer consumer model.
Chapter 9 Yunji datacanvas company won the highest honor of the "fifth digital finance innovation competition"!
On the 110th anniversary of Turing's birth, has the prediction of intelligent machine come true?
[on automation experience] the growth path of automated testing
Easycvr cannot be played using webrtc. How to solve it?
[team learning] [phase 34] Baidu PaddlePaddle AI talent Creation Camp
深入解析Kubebuilder
Introduction to the PureMVC series
Win11图片打不开怎么办?Win11无法打开图片的修复方法
Common methods of list and map
随机推荐
掌握软件安全测试方法秘笈,安全测试报告信手捏来
微信能开小号了,拼多多“砍一刀”被判侵权,字节VR设备出货量全球第二,今日更多大新闻在此
Win11图片打不开怎么办?Win11无法打开图片的修复方法
Meaning of 'n:m' and '1:n' in database design
Up to 5million per person per year! Choose people instead of projects, focus on basic scientific research, and scientists dominate the "new cornerstone" funded by Tencent to start the application
一度辍学的数学差生,获得今年菲尔兹奖
Organize five stages of actual attack and defense drill
acwing 843. N-queen problem
广告归因:买量如何做价值衡量?
计数排序基础思路
Both primary and secondary equipment numbers are 0
AI表现越差,获得奖金越高?纽约大学博士拿出百万重金,悬赏让大模型表现差劲的任务
Data security -- 12 -- Analysis of privacy protection
Jetson nano configures pytorch deep learning environment / / to be improved
【线段树实战】最近的请求次数 + 区域和检索 - 数组可修改+我的日程安排表Ⅰ/Ⅲ
kivy教程之设置窗体大小和背景(教程含源码)
How to conduct website testing of software testing? Test strategy let's go!
sscanf,sscanf_ S and its related usage "suggested collection"
The root file system of buildreoot prompts "depmod:applt not found"
各路行业大佬称赞的跨架构开发“神器”,你get同款了吗?