当前位置:网站首页>深入解析Kubebuilder
深入解析Kubebuilder
2022-07-06 22:13:00 【chenxy02】
参考网址:
Introduction - The Kubebuilder Book
深入解析 Kubebuilder:让编写 CRD 变得更简单 - 知乎
导读
推荐朋友们多看上面官方提供的文档。
前面我转发一篇博客 基于Kubebuilder开发Operator(入门使用)_chenxy02的博客-CSDN博客 记录了对kubebuilder的入门使用,本文旨在加深对kubebuilder深圳,进一步熟悉kubebuilder的项目代码。
核心概念
GVKs & GVRs
GVK = GroupVersionKind,GVR = GroupVersionResource
- 在编码过程中,资源数据的存储都是以结构体存储(称为Go type)
- 由于多版本version的存在(alpha1, beta1, v1等),不同版本中存储结构体的存在着差异,但是我们都会给其相同的Kind名字(比如Deployment)
- 因此,我们编码中只用kind名(如Deployment), 并不能准确获取到其使用哪个版本结构体
- 所以,采用GVK获取到一个具体的存储结构体,也就是GVK的三个信息(group/version/kind)确定一个Go type(结构体)
如何获取呢? —— 通过Scheme,Scheme存储了GVK和Go Type的映射关系
- 在创建资源过程中,我们编写yaml,提交请求:
- 编写yaml过程中,我们会写apiversion和kind,其实就是GVK
- 而客户端(也就是我们)与apiserver通信是http形式,就是将请求发送到某一http path
发送到哪个http path呢?——这个http path其实就是GVR
- /apis/batch/v1/nampspaces/default/job 这个就是表示default 命名空间的job资源
- 我们 kubectl get po 时 也是请求的路径,也可以称为GVR
- 其实 GVR 是由 GVK 转化而来 —— 通过REST映射的RESTMappers实现
Scheme
每一组Controllers都需要一个Scheme,提供了Kinds与对应的Go types的映射,也就是说给定Go type就知道他的GVK,给定GVK就知道他的Go type
Manager
Kubebuilder 的核心组件,具有 3 个职责:
负责运行所有的Controllers;
初始化共享cashes,包含listAndWath功能
初始化clients用于与Api Server通信
Cache
Kubebuilder的核心组件,负责在Controller进程里面根据Scheme同步Api Server中所有该Controller关心GVKs的GVRs,其核心是GVK->Informer的映射,Informer会负责监听对应GVK的GVRs的创建/删除/更新操作,以触发Controller的Reconcile逻辑。
Controller
Kubebuilder为我们生成的脚手架文件,我们只需要实现Reconcile方法即可。
Clients
在实现Controller的时候不可避免地需要对某些资源类型进行创建/删除/更新,就是通过该Clients实现的,其中查询功能实际查询是本地的Cache,写操作直接访问Api Server
Index
由于Controller经常要对Cache进行查询,Kubebuilder提供Index utility给Cache加索引提升查询效率。
Finalizer
在一般情况下,如果资源被删除之后,我们虽然能够触发删除事件,但是这个时候从Cache里面无法读取任何被删除对象的信息,这样一来,导致很多垃圾清理工作因为信息不足无法进行。
K8s的Finalizer字段用于处理这种情况。在K8s中,只要对象ObjectMeta里面的Finalizers不为空,对该对象的delete操作就会转变为update操作,具体说就是update deletionTimestamp字段,其意义就是告诉K8s的GC“在deletionTimestamp 这个时刻之后,只要Finalizer为空,就立马删除掉该对象“。
所以一般的使用姿势就是在创建对象时把 Finalizers 设置好(任意 string),然后处理 DeletionTimestamp 不为空的 update 操作(实际是 delete),根据 Finalizers 的值执行完所有的 pre-delete hook(此时可以在 Cache 里面读取到被删除对象的任何信息)之后将 Finalizers 置为空即可。
OwnerReference
k8s GC在删除一个对象时,任何ownerReference是该对象的对象都会被清除,与此同时,kubebuilder支持所有对象的变更 都会触发Owner对象controller的Reconcile方法。
所有概念集合在一起如下图所示:
源码阅读
下面代码来源于 基于Kubebuilder开发Operator(入门使用)_chenxy02的博客-CSDN博客
从main.go开始
Kubebuilder 创建的 main.go 是整个项目的入口,逻辑十分简单:
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(webappv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func main() {
...
// 1、Manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "ecaf1259.my.domain",
})
...
// 2、init Reconciler(Controller)
if err = (&controllers.GuestbookReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Guestbook")
os.Exit(1)
}
...
// 3、start Manager
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
可以看到在 init方法里面我们将webappv1注册到Scheme里面去了,这样一来Cache就知道watch谁了,main方法里面的逻辑基本都是Manager的:
- 初始化了一个Manager;
- 将Manager 的 Client 传给 Controller,并且调用 SetupWithManager 方法传入 Manager 进行 Controller 的初始化;
- 启动Manager
Manager初始化
Manager初始化代码如下:
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Set default values for options fields
options = setOptionsDefaults(options)
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
clusterOptions.Scheme = options.Scheme
clusterOptions.MapperProvider = options.MapperProvider
clusterOptions.Logger = options.Logger
clusterOptions.SyncPeriod = options.SyncPeriod
clusterOptions.Namespace = options.Namespace
clusterOptions.NewCache = options.NewCache
clusterOptions.ClientBuilder = options.ClientBuilder
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
clusterOptions.DryRunClient = options.DryRunClient
clusterOptions.EventBroadcaster = options.EventBroadcaster
})
...
return &controllerManager{
cluster: cluster,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
logger: options.Logger,
elected: make(chan struct{}),
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
}, nil
}
可以看到 主要是创建Cache与Clients等等等实例:
创建Cache
Cache 初始化代码如下:
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
Scheme: scheme,
}
}
可以看到Cache主要就是创建了InformersMap, Scheme 里面的每个 GVK 都创建了对应的 Informer,通过 informersByGVK 这个 map 做 GVK 到 Informer 的映射,每个Informer会根据ListWatch函数对对应的GVK进行List和Watch。
创建 Clients
创建 Clients 很简单:
// defaultNewClient creates the default caching client
func defaultNewClient(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// Create the Client for Write operations.
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return &client.DelegatingClient{
Reader: &client.DelegatingReader{
CacheReader: cache,
ClientReader: c,
},
Writer: c,
StatusClient: c,
}, nil
}
读操作使用上面创建的 Cache,写操作使用 K8s go-client 直连。
Controller初始化
下面看看Controller的启动:
// SetupWithManager sets up the controller with the Manager.
func (r *GuestbookReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&webappv1.Guestbook{}).
Complete(r)
}
使用的中Builder模式,NewControllerManagerBy和For方法都给Builder传参,最重要的是最后一个方法Complete,其逻辑是:
func (blder *Builder) Build(r reconcile.Reconciler) (manager.Manager, error) {
...
// Set the Manager
if err := blder.doManager(); err != nil {
return nil, err
}
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
...
return blder.mgr, nil
}
主要是看看doController和doWatch方法:
doController方法
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
...
// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
}
// Create controller with dependencies set
return &controller.Controller{
Do: options.Reconciler,
MakeQueue: func() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
Log: options.Log.WithName("controller").WithName(name),
}, nil
}
该方法初始化了一个Controller,传入了一些很重要的参数:
- Do: Reconcile 逻辑;
- Cache:找Informer注册Watch
- Queue:Watch资源的CUD事件缓存
doWatch方法
func (blder *Builder) doWatch() error {
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
// Watches the managed types
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
// Do the watch requests
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}
}
return nil
}
可以看到该方法对本Controller负责的CRD进行了watch,同时底下还会watch本CRD管理的其它资源,这个managedObjects可以通过Controller初始化Builder的Owns方法传入,说到Watch我们关心两个逻辑:
1、注册的handler
type EnqueueRequestForObject struct{}
// Create implements EventHandler
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
// Update implements EventHandler
func (e *EnqueueRequestForObject) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
if evt.MetaOld != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaOld.GetName(),
Namespace: evt.MetaOld.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no old metadata", "event", evt)
}
if evt.MetaNew != nil {
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.MetaNew.GetName(),
Namespace: evt.MetaNew.GetNamespace(),
}})
} else {
enqueueLog.Error(nil, "UpdateEvent received with no new metadata", "event", evt)
}
}
// Delete implements EventHandler
func (e *EnqueueRequestForObject) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
...
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Meta.GetName(),
Namespace: evt.Meta.GetNamespace(),
}})
}
可以看到Kubebuilder为注册的Handler就是将发生变更的对象的NamespacedName入队列,如果在Reconcile逻辑需要判断创建/更新/删除,需要有自己的判断逻辑。
2、注册的流程
// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
...
log.Info("Starting EventSource", "controller", c.Name, "source", src)
return src.Start(evthdler, c.Queue, prct...)
}
// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (is *Informer) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
...
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}
我们的Handler实际注册到Informer上面,这样整个逻辑就串起来了,通过Cache我们创建了所有Scheme里面GVKs的Informers,然后对应GVK的Controller注册了Watch Handler到对应的Informer,这样一来对应的GVK里面的资源有变更都会触发Handler,将变更事件写到Controller的事件队列中,之后触发我们的Reconcile方法。
边栏推荐
- Acl2022 | decomposed meta learning small sample named entity recognition
- ACL2022 | 分解的元学习小样本命名实体识别
- 微信能开小号了,拼多多“砍一刀”被判侵权,字节VR设备出货量全球第二,今日更多大新闻在此
- EasyCVR平台接入RTMP协议,接口调用提示获取录像错误该如何解决?
- [on automation experience] the growth path of automated testing
- 抖音或将推出独立种草社区平台:会不会成为第二个小红书
- Intel and Xinbu technology jointly build a machine vision development kit to jointly promote the transformation of industrial intelligence
- 关于01背包个人的一些理解
- 一图看懂!为什么学校教了你Coding但还是不会的原因...
- 这项15年前的「超前」技术设计,让CPU在AI推理中大放光彩
猜你喜欢
See Gardenia minor
DFS和BFS概念及实践+acwing 842 排列数字(dfs) +acwing 844. 走迷宫(bfs)
EasyCVR集群重启导致其他服务器设备通道状态离线情况的优化
Easycvr cannot be played using webrtc. How to solve it?
Five years of automated testing, and finally into the ByteDance, the annual salary of 30W is not out of reach
kivy教程之设置窗体大小和背景(教程含源码)
buildroot的根文件系统提示“depmod:applt not found”
Intel David tuhy: the reason for the success of Intel aoten Technology
各路行业大佬称赞的跨架构开发“神器”,你get同款了吗?
[team learning] [34 issues] scratch (Level 2)
随机推荐
namespace基础介绍
AI 落地新题型 RPA + AI =?
The worse the AI performance, the higher the bonus? Doctor of New York University offered a reward for the task of making the big model perform poorly
Break the memory wall with CPU scheme? Learn from PayPal to expand the capacity of aoteng, and the volume of missed fraud transactions can be reduced to 1/30
Structure actual training camp | after class homework | module 6
Case reward: Intel brings many partners to promote the innovation and development of multi domain AI industry
树与图的深度优先遍历模版原理
The request request is encapsulated in uni app, which is easy to understand
What if win11 pictures cannot be opened? Repair method of win11 unable to open pictures
主设备号和次设备号均为0
What about the collapse of win11 playing pubg? Solution to win11 Jedi survival crash
论文上岸攻略 | 如何快速入门学术论文写作
NanopiNEO使用开发过程记录
Surpassing postman, the new generation of domestic debugging tool apifox is elegant enough to use
See Gardenia minor
[system management] clear the icon cache of deleted programs in the taskbar
广告归因:买量如何做价值衡量?
食堂用户菜品关系系统(C语言课设)
Acl2022 | decomposed meta learning small sample named entity recognition
Camera calibration (I): robot hand eye calibration