当前位置:网站首页>深入解析Kubebuilder
深入解析Kubebuilder
2022-07-06 22:13:00 【chenxy02】
参考网址:
Introduction - The Kubebuilder Book
深入解析 Kubebuilder:让编写 CRD 变得更简单 - 知乎
导读
推荐朋友们多看上面官方提供的文档。
前面我转发一篇博客 基于Kubebuilder开发Operator(入门使用)_chenxy02的博客-CSDN博客 记录了对kubebuilder的入门使用,本文旨在加深对kubebuilder深圳,进一步熟悉kubebuilder的项目代码。
核心概念
GVKs & GVRs
GVK = GroupVersionKind,GVR = GroupVersionResource
- 在编码过程中,资源数据的存储都是以结构体存储(称为Go type)
- 由于多版本version的存在(alpha1, beta1, v1等),不同版本中存储结构体的存在着差异,但是我们都会给其相同的Kind名字(比如Deployment)
- 因此,我们编码中只用kind名(如Deployment), 并不能准确获取到其使用哪个版本结构体
- 所以,采用GVK获取到一个具体的存储结构体,也就是GVK的三个信息(group/version/kind)确定一个Go type(结构体)
如何获取呢? —— 通过Scheme,Scheme存储了GVK和Go Type的映射关系
- 在创建资源过程中,我们编写yaml,提交请求:
- 编写yaml过程中,我们会写apiversion和kind,其实就是GVK
- 而客户端(也就是我们)与apiserver通信是http形式,就是将请求发送到某一http path
发送到哪个http path呢?——这个http path其实就是GVR
- /apis/batch/v1/nampspaces/default/job 这个就是表示default 命名空间的job资源
- 我们 kubectl get po 时 也是请求的路径,也可以称为GVR
- 其实 GVR 是由 GVK 转化而来 —— 通过REST映射的RESTMappers实现
Scheme
每一组Controllers都需要一个Scheme,提供了Kinds与对应的Go types的映射,也就是说给定Go type就知道他的GVK,给定GVK就知道他的Go type
Manager
Kubebuilder 的核心组件,具有 3 个职责:
负责运行所有的Controllers;
初始化共享cashes,包含listAndWath功能
初始化clients用于与Api Server通信
Cache
Kubebuilder的核心组件,负责在Controller进程里面根据Scheme同步Api Server中所有该Controller关心GVKs的GVRs,其核心是GVK->Informer的映射,Informer会负责监听对应GVK的GVRs的创建/删除/更新操作,以触发Controller的Reconcile逻辑。
Controller
Kubebuilder为我们生成的脚手架文件,我们只需要实现Reconcile方法即可。
Clients
在实现Controller的时候不可避免地需要对某些资源类型进行创建/删除/更新,就是通过该Clients实现的,其中查询功能实际查询是本地的Cache,写操作直接访问Api Server
Index
由于Controller经常要对Cache进行查询,Kubebuilder提供Index utility给Cache加索引提升查询效率。
Finalizer
在一般情况下,如果资源被删除之后,我们虽然能够触发删除事件,但是这个时候从Cache里面无法读取任何被删除对象的信息,这样一来,导致很多垃圾清理工作因为信息不足无法进行。
K8s的Finalizer字段用于处理这种情况。在K8s中,只要对象ObjectMeta里面的Finalizers不为空,对该对象的delete操作就会转变为update操作,具体说就是update deletionTimestamp字段,其意义就是告诉K8s的GC“在deletionTimestamp 这个时刻之后,只要Finalizer为空,就立马删除掉该对象“。
所以一般的使用姿势就是在创建对象时把 Finalizers 设置好(任意 string),然后处理 DeletionTimestamp 不为空的 update 操作(实际是 delete),根据 Finalizers 的值执行完所有的 pre-delete hook(此时可以在 Cache 里面读取到被删除对象的任何信息)之后将 Finalizers 置为空即可。
OwnerReference
k8s GC在删除一个对象时,任何ownerReference是该对象的对象都会被清除,与此同时,kubebuilder支持所有对象的变更 都会触发Owner对象controller的Reconcile方法。
所有概念集合在一起如下图所示:

源码阅读
下面代码来源于 基于Kubebuilder开发Operator(入门使用)_chenxy02的博客-CSDN博客
从main.go开始
Kubebuilder 创建的 main.go 是整个项目的入口,逻辑十分简单:
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(webappv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func main() {
...
// 1、Manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "ecaf1259.my.domain",
})
...
// 2、init Reconciler(Controller)
if err = (&controllers.GuestbookReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Guestbook")
os.Exit(1)
}
...
// 3、start Manager
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}可以看到在 init方法里面我们将webappv1注册到Scheme里面去了,这样一来Cache就知道watch谁了,main方法里面的逻辑基本都是Manager的:
- 初始化了一个Manager;
- 将Manager 的 Client 传给 Controller,并且调用 SetupWithManager 方法传入 Manager 进行 Controller 的初始化;
- 启动Manager
Manager初始化
Manager初始化代码如下:
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Set default values for options fields
options = setOptionsDefaults(options)
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
clusterOptions.Scheme = options.Scheme
clusterOptions.MapperProvider = options.MapperProvider
clusterOptions.Logger = options.Logger
clusterOptions.SyncPeriod = options.SyncPeriod
clusterOptions.Namespace = options.Namespace
clusterOptions.NewCache = options.NewCache
clusterOptions.ClientBuilder = options.ClientBuilder
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
clusterOptions.DryRunClient = options.DryRunClient
clusterOptions.EventBroadcaster = options.EventBroadcaster
})
...
return &controllerManager{
cluster: cluster,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
logger: options.Logger,
elected: make(chan struct{}),
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
}, nil
}可以看到 主要是创建Cache与Clients等等等实例:
创建Cache
Cache 初始化代码如下:
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
Scheme: scheme,
}
}可以看到Cache主要就是创建了InformersMap, Scheme 里面的每个 GVK 都创建了对应的 Informer,通过 informersByGVK 这个 map 做 GVK 到 Informer 的映射,每个Informer会根据ListWatch函数对对应的GVK进行List和Watch。
创建 Clients
创建 Clients 很简单:
// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// Create the Client for Write operations.
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return &client.DelegatingClient{
Reader: &client.DelegatingReader{
CacheReader: cache,
ClientReader: c,
},
Writer: c,
StatusClient: c,
}, nil
}读操作使用上面创建的 Cache,写操作使用 K8s go-client 直连。
Controller初始化
下面看看Controller的启动:
// SetupWithManager sets up the controller with the Manager.
func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.Guestbook{}).
Complete(r)
}使用的中Builder模式,NewControllerManagerBy和For方法都给Builder传参,最重要的是最后一个方法Complete,其逻辑是:
func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) {
...
// Set the Manager
if err := blder.doManager(); err != nil {
return nil, err
}
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
...
return blder.mgr, nil
}主要是看看doController和doWatch方法:
doController方法
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
...
// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
}
// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
Log: options.Log.WithName("controller").WithName(name),
}, nil
}该方法初始化了一个Controller,传入了一些很重要的参数:
- Do: Reconcile 逻辑;
- Cache:找Informer注册Watch
- Queue:Watch资源的CUD事件缓存
doWatch方法
func (blder *Builder) doWatch() error {
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
// Watches the managed types
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
// Do the watch requests
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}
}
return nil
}可以看到该方法对本Controller负责的CRD进行了watch,同时底下还会watch本CRD管理的其它资源,这个managedObjects可以通过Controller初始化Builder的Owns方法传入,说到Watch我们关心两个逻辑:
1、注册的handler
type EnqueueRequestForObject struct{}
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.MetaOld != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaOld.GetName(),
Namespace: evt.MetaOld.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
}
if evt.MetaNew != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaNew.GetName(),
Namespace: evt.MetaNew.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
}
}
// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}可以看到Kubebuilder为注册的Handler就是将发生变更的对象的NamespacedName入队列,如果在Reconcile逻辑需要判断创建/更新/删除,需要有自己的判断逻辑。
2、注册的流程
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
...
log.Info("Starting EventSource", "controller", c.Name, "source", src)
return src.Start(evthdler, c.Queue, prct...)
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
...
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}我们的Handler实际注册到Informer上面,这样整个逻辑就串起来了,通过Cache我们创建了所有Scheme里面GVKs的Informers,然后对应GVK的Controller注册了Watch Handler到对应的Informer,这样一来对应的GVK里面的资源有变更都会触发Handler,将变更事件写到Controller的事件队列中,之后触发我们的Reconcile方法。
边栏推荐
- Fiance donated 500million dollars to female PI, so that she didn't need to apply for projects, recruited 150 scientists, and did scientific research at ease!
- Wechat can play the trumpet. Pinduoduo was found guilty of infringement. The shipment of byte VR equipment ranks second in the world. Today, more big news is here
- [coded font series] opendyslexic font
- Advertising attribution: how to measure the value of buying volume?
- Detect when a tab bar item is pressed
- [written to the person who first published the paper] common problems in writing comprehensive scientific and Technological Papers
- Implementation of JSTL custom function library
- Deeply cultivate the developer ecosystem, accelerate the innovation and development of AI industry, and Intel brings many partners together
- Win11玩绝地求生(PUBG)崩溃怎么办?Win11玩绝地求生崩溃解决方法
- 软件测试之网站测试如何进行?测试小攻略走起!
猜你喜欢

C#使用西门子S7 协议读写PLC DB块

MySQL forgot how to change the password

Intel David tuhy: the reason for the success of Intel aoten Technology

Camera calibration (I): robot hand eye calibration

EasyCVR集群版本添加RTSP设备提示服务器ID错误,该如何解决?

Video fusion cloud platform easycvr video Plaza left column list style optimization

Break the memory wall with CPU scheme? Learn from PayPal to expand the capacity of aoteng, and the volume of missed fraud transactions can be reduced to 1/30

mpf2_线性规划_CAPM_sharpe_Arbitrage Pricin_Inversion Gauss Jordan_Statsmodel_Pulp_pLU_Cholesky_QR_Jacobi

Ssm+jsp realizes the warehouse management system, and the interface is called an elegant interface

程序员上班摸鱼,这么玩才高端!
随机推荐
[team learning] [phase 34] Baidu PaddlePaddle AI talent Creation Camp
Win11图片打不开怎么办?Win11无法打开图片的修复方法
视频融合云平台EasyCVR视频广场左侧栏列表样式优化
The worse the AI performance, the higher the bonus? Doctor of New York University offered a reward for the task of making the big model perform poorly
NFT meta universe chain diversified ecosystem development case
NanopiNEO使用开发过程记录
Meaning of 'n:m' and '1:n' in database design
主设备号和次设备号均为0
Data security -- 12 -- Analysis of privacy protection
Intel and Xinbu technology jointly build a machine vision development kit to jointly promote the transformation of industrial intelligence
[on automation experience] the growth path of automated testing
每人每年最高500万经费!选人不选项目,专注基础科研,科学家主导腾讯出资的「新基石」启动申报
抖音或将推出独立种草社区平台:会不会成为第二个小红书
[written to the person who first published the paper] common problems in writing comprehensive scientific and Technological Papers
leetcode 53. Maximum subarray maximum subarray sum (medium)
軟件測試之網站測試如何進行?測試小攻略走起!
Why does WordPress open so slowly?
Win11 control panel shortcut key win11 multiple methods to open the control panel
[untitled]
Common methods of list and map