当前位置:网站首页>In depth analysis of kubernetes controller runtime
In depth analysis of kubernetes controller runtime
2022-07-03 06:08:00 【xuhss_ com】
High quality resource sharing
Learning route guidance ( Click unlock ) | Knowledge orientation | Crowd positioning |
---|---|---|
🧡 Python Actual wechat ordering applet 🧡 | Progressive class | This course is python flask+ Perfect combination of wechat applet , From the deployment of Tencent to the launch of the project , Create a full stack ordering system . |
Python Quantitative trading practice | beginner | Take you hand in hand to create an easy to expand 、 More secure 、 More efficient quantitative trading system |
Overview
controller-runtime yes Kubernetes The community provides a way to quickly build a set of Realized controller Functional tools , You don't have to do it yourself Controller The function of ; stay Kubebuilder
And Operator SDK
Is also used controller-runtime
. This article will controller-runtime
The working principle of the system and its usage in different scenarios are briefly summarized and introduced .
controller-runtime structure
controller-runtime
The main components need to be created by users Manager
and Reconciler
as well as Controller Runtime
I started it myself Cache
and Controller
.
- Manager: Created by the user during initialization , Used to start
Controller Runtime
Components - Reconciler: It is a component that users need to provide to handle their own business logic ( That is through
code-generator
Generated api-like The implementation of the controller The business processing part of ). - Cache: A cache , To establish
Informer
ToApiServer
To listen for resources and push the monitored objects to queue in . - Controller: On the one hand, to Informer register
eventHandler
, On the other hand, get data from the queue .controller The data will be retrieved from the queue and the user-definedReconciler
function .
chart :controller-runtime structure
chart :controller-runtime flowchart
It can be seen from the picture that ,Controller Will send to Informer Register some columns eventHandler; then Cache start-up Informer(informer Belong to cache In bag ), And ApiServer Set up to monitor ; When Informer When a resource change is detected , Add object to queue,Controller Take the element out and execute on the client Reconciler.
Controller introduce
We from controller-rumtime Project example Take a look at , How the whole architecture is implemented .
You can see example The following actually implements a reconciler
The structure of the body , Realized Reconciler
Abstract and Client
Structure
type reconciler struct {
client.Client
scheme *runtime.Scheme
}
So let's see In the abstract Reconciler What is it? , You can see that it is abstract Reconcile
Method , This is the logical process of concrete processing
type Reconciler interface {
Reconcile(context.Context, Request) (Result, error)
}
Now let's see who has achieved this Reconciler abstract
type Controller interface {
reconcile.Reconciler // Specific steps for coordination , adopt ns/name\
// adopt predicates To evaluate source data , And add queue in ( Put in the queue is reconcile.Requests)
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
// start-up controller, Similar to custom Run()
Start(ctx context.Context) error
GetLogger() logr.Logger
}
controller structure
stay controller-runtime\pkg\internal\controller\controller.go This is achieved in Controller
type Controller struct {
Name string // controller The logo of
MaxConcurrentReconciles int // Run concurrently Reconciler The number of , Default 1
// Realized reconcile.Reconciler The regulator of , Default DefaultReconcileFunc
Do reconcile.Reconciler
// makeQueue A corresponding queue will be built , Is to return to a speed limit queue
MakeQueue func() workqueue.RateLimitingInterface
// MakeQueue Created , In and out of the queue is the operation
Queue workqueue.RateLimitingInterface
// For injecting other content
// Have been abandoned
SetFields func(i interface{}) error
mu sync.Mutex
// Identify the status of the start
Started bool
// Context passed at startup , Used to stop the controller
ctx context.Context
// Time waiting for cache synchronization Default 2 minute
CacheSyncTimeout time.Duration
// Maintained eventHandler predicates, Start when the controller is started
startWatches []watchDescription
// Log builder , Output to log
LogConstructor func(request *reconcile.Request) logr.Logger
// RecoverPanic Is it right reconcile Caused by the panic recovery
RecoverPanic bool
}
It's over controller Of structure, So let's see controller How it is used
injection
Controller.Watch Realized the injection action , You can see watch()
By parameter The corresponding event function is passed into the internal
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
c.mu.Lock()
defer c.mu.Unlock()
// Use SetFields To complete the injection operation
if err := c.SetFields(src); err != nil {
return err
}
if err := c.SetFields(evthdler); err != nil {
return err
}
for _, pr := range prct {
if err := c.SetFields(pr); err != nil {
return err
}
}
// If Controller Not started yet , Cache these actions locally
if !c.Started {
c.startWatches = append(c.startWatches, watchDescription{src: src, handler: evthdler, predicates: prct})
return nil
}
c.LogConstructor(nil).Info("Starting EventSource", "source", src)
return src.Start(c.ctx, evthdler, c.Queue, prct...)
}
The start operation is actually informer Inject event functions
type Source interface {
// start yes Controller call , Used to Informer register EventHandler, take reconcile.Requests( A queued action ) Queue .
Start(context.Context, handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}
func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, queue workqueue.RateLimitingInterface,
prct ...predicate.Predicate) error {
// Informer should have been specified by the user.
if is.Informer == nil {
return fmt.Errorf("must specify Informer.Informer")
}
is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
return nil
}
We know that eventHandler, Actually it should be a onAdd
,onUpdate
This type of function ,queue It is workqueue, that Predicates
What is it? ?
The definition can be seen through tracing Predicate abstract , It can be seen that Predicate yes Watch What kind of event is there , When for each type of event , The corresponding function is true, stay eventHandler in , These are used as , Filtering of events .
// Predicate filters events before enqueuing the keys.
type Predicate interface {
// Create returns true if the Create event should be processed
Create(event.CreateEvent) bool
// Delete returns true if the Delete event should be processed
Delete(event.DeleteEvent) bool
// Update returns true if the Update event should be processed
Update(event.UpdateEvent) bool
// Generic returns true if the Generic event should be processed
Generic(event.GenericEvent) bool
}
In the corresponding action , You can see here as a filter operation
func (e EventHandler) OnAdd(obj interface{}) {
c := event.CreateEvent{}
// Pull Object out of the object
if o, ok := obj.(client.Object); ok {
c.Object = o
} else {
log.Error(nil, "OnAdd missing Object",
"object", obj, "type", fmt.Sprintf("%T", obj))
return
}
for _, p := range e.Predicates {
if !p.Create(c) {
return
}
}
// Invoke create handler
e.EventHandler.Create(c, e.Queue)
}
You can see it above , The corresponding is EventHandler.Create
To add , So what exactly are these actions doing ?
In the code pkg/handler , You can see these operations , Be similar to create, There will be ns/name Put in the queue .
func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
}
unqueue
I can see , The action of joining the team is actually to ns/name
Join the queue , So what did you do when you left the queue ?
adopt controller.Start()
You can see controller What actions have been taken after startup
func (c *Controller) Start(ctx context.Context) error {
c.mu.Lock()
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}
c.initMetrics()
// Set the internal context.
c.ctx = ctx
c.Queue = c.MakeQueue() // initialization queue
go func() { // Exit time , Give Way queue close
<-ctx.Done()
c.Queue.ShutDown()
}()
wg := &sync.WaitGroup{}
err := func() error {
defer c.mu.Unlock()
defer utilruntime.HandleCrash()
// start-up informer front , We're going to have evnetHandle predictates source register
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch.src))
// We have seen it above ,start Is the real registration action
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.LogConstructor(nil).Info("Starting Controller")
// startWatches We can see above , yes evnetHandle predictates source Is cached in ,
// Here is to take it out and start it
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := func() error {
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
return err
}
return nil
}(); err != nil {
return err
}
}
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil
// Launch workers to process resources
c.LogConstructor(nil).Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
wg.Add(c.MaxConcurrentReconciles)
// start-up controller Consumer thread
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go func() {
defer wg.Done()
for c.processNextWorkItem(ctx) {
}
}()
}
c.Started = true
return nil
}()
if err != nil {
return err
}
<-ctx.Done() // Blocking , Until the context closes
c.LogConstructor(nil).Info("Shutdown signal received, waiting for all workers to finish")
wg.Wait() // Wait for all threads to shut down
c.LogConstructor(nil).Info("All workers finished")
return nil
}
Through the above analysis , You can see , For each consumption worker Threads , It's actually calling theta processNextWorkItem Now let's see what he has done ?
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := c.Queue.Get() // Get data from the queue
if shutdown {
return false
}
defer c.Queue.Done(obj)
// The following should be prometheus Some things about indicators
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
// Get the object through reconcileHandler Handle
c.reconcileHandler(ctx, obj)
return true
}
So let's see reconcileHandler What did you do
func (c *Controller) reconcileHandler(ctx context.Context, obj interface{}) {
// Update metrics after processing each item
reconcileStartTS := time.Now()
defer func() {
c.updateMetrics(time.Since(reconcileStartTS))
}()
// Check whether the extracted data is reconcile.Request, Before enqueue You will know that the value of this type is inserted
req, ok := obj.(reconcile.Request)
if !ok {
// If you are wrong, forget
c.Queue.Forget(obj)
c.LogConstructor(nil).Error(nil, "Queue item was not a Request", "type", fmt.Sprintf("%T", obj), "value", obj)
return
}
log := c.LogConstructor(&req)
log = log.WithValues("reconcileID", uuid.NewUUID())
ctx = logf.IntoContext(ctx, log)
// Here we call our own implementation controller Realized Reconcile The action of
result, err := c.Reconcile(ctx, req)
switch {
case err != nil:
c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
log.Error(err, "Reconciler error")
case result.RequeueAfter > 0:
c.Queue.Forget(obj)
c.Queue.AddAfter(req, result.RequeueAfter)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
case result.Requeue:
c.Queue.AddRateLimited(req)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
default:
c.Queue.Forget(obj)
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelSuccess).Inc()
}
}
Through to example Medium Reconcile Find its use , You can see , Calling him is what we said above reconcileHandler
, We'll know when we get here ,controller The running flow of is Controller.Start()
> Controller.processNextWorkItem
> Controller.reconcileHandler
> Controller.Reconcile
Finally, it reaches our customized business logic processing Reconcile
Manager
Learn from it controller-runtime
I learned that , There is one Manager
The components of , What does this component do ? Let's analyze .
Manager
Is used to create and start controller
Of ( Allow multiple controller
And One manager
relation ),Manager Will activate all assigned to him controller, And other bootable objects .
stay example notice , Will initialize a ctrl.NewManager
func main() {
ctrl.SetLogger(zap.New())
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
// in a real controller, we'd create a new scheme for this
err = api.AddToScheme(mgr.GetScheme())
if err != nil {
setupLog.Error(err, "unable to add scheme")
os.Exit(1)
}
err = ctrl.NewControllerManagedBy(mgr).
For(&api.ChaosPod{}).
Owns(&corev1.Pod{}).
Complete(&reconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
})
if err != nil {
setupLog.Error(err, "unable to create controller")
os.Exit(1)
}
err = ctrl.NewWebhookManagedBy(mgr).
For(&api.ChaosPod{}).
Complete()
if err != nil {
setupLog.Error(err, "unable to create webhook")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
This manager
Namely controller-runtime\pkg\manager\manager.go Under the Manager
, Manager By initializing Caches and Clients And so on , And provide them to Runnables.
type Manager interface {
// Provided with APIServer The way of interaction , Such as incluster,indexer,cache etc.
cluster.Cluster
// Runnable Is arbitrarily permissible cm Components in , Such as webhook,controller,Caches, stay new When called, ,
// You can see that the incoming is a controller, What can be started here is with Start() Methodical , By calling Start()
// To start components
Add(Runnable) error
// Implement the election method . When elected close , Then the election is leader
Elected() <-chan struct{}
// This is a method for a number of health checks and indicators , It doesn't have much to do with what we care about
AddMetricsExtraHandler(path string, handler http.Handler) error
AddHealthzCheck(name string, check healthz.Checker) error
AddReadyzCheck(name string, check healthz.Checker) error
// Start All registered controllers will be started , until ctx Cancel . If there is any controller Report errors , Then quit immediately
// If used LeaderElection, You must exit the binary file immediately after this return ,
Start(ctx context.Context) error
// GetWebhookServer returns a webhook.Server
GetWebhookServer() *webhook.Server
// GetLogger returns this manager's logger.
GetLogger() logr.Logger
// GetControllerOptions returns controller global configuration options.
GetControllerOptions() v1alpha1.ControllerConfigurationSpec
}
controller-manager
controllerManager This is achieved manager The abstraction of
type controllerManager struct {
sync.Mutex
started bool
stopProcedureEngaged *int64
errChan chan error
runnables *runnables
cluster cluster.Cluster
// recorderProvider Used to record eventhandler source predictate
recorderProvider *intrec.Provider
// resourceLock forms the basis for leader election
resourceLock resourcelock.Interface
// Whether to close the election lease when exiting
leaderElectionReleaseOnCancel bool
// Some indicators , There is no need to pay attention to
metricsListener net.Listener
metricsExtraHandlers map[string]http.Handler
healthProbeListener net.Listener
readinessEndpointName string
livenessEndpointName string
readyzHandler *healthz.Handler
healthzHandler *healthz.Handler
// of controller Global parameter
controllerOptions v1alpha1.ControllerConfigurationSpec
logger logr.Logger
// Used to turn off LeaderElection.Run(...) The signal of
leaderElectionStopped chan struct{}
// Cancel the election , After losing the election , Must be delayed until gracefulShutdown after os.exit()
leaderElectionCancel context.CancelFunc
// leader Cancel the election
elected chan struct{}
port int
host string
certDir string
webhookServer *webhook.Server
webhookServerOnce sync.Once
// Not leader Node enforcement leader The waiting time of
leaseDuration time.Duration
// renewDeadline is the duration that the acting controlplane will retry
// refreshing leadership before giving up.
renewDeadline time.Duration
// LeaderElector Time to re operate
retryPeriod time.Duration
// gracefulShutdownTimeout Is in manager Let... Before stopping runnables The duration of the stop .
gracefulShutdownTimeout time.Duration
// onStoppedLeading is callled when the leader election lease is lost.
// It can be overridden for tests.
onStoppedLeading func()
shutdownCtx context.Context
internalCtx context.Context
internalCancel context.CancelFunc
internalProceduresStop chan struct{}
}
workflow
To understand the ControllerManager after , We go through example Let's see ControllerManager Of workflow
func main() {
ctrl.SetLogger(zap.New())
// New One manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
// in a real controller, we'd create a new scheme for this
err = api.AddToScheme(mgr.GetScheme())
if err != nil {
setupLog.Error(err, "unable to add scheme")
os.Exit(1)
}
err = ctrl.NewControllerManagedBy(mgr).
For(&api.ChaosPod{}).
Owns(&corev1.Pod{}).
Complete(&reconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
})
if err != nil {
setupLog.Error(err, "unable to create controller")
os.Exit(1)
}
err = ctrl.NewWebhookManagedBy(mgr).
For(&api.ChaosPod{}).
Complete()
if err != nil {
setupLog.Error(err, "unable to create webhook")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
- adopt
manager.New()
Initialize a manager, Some columns will be initialized manager Parameters of - adopt
ctrl.NewControllerManagedBy
register controller To manager inctrl.NewControllerManagedBy
yes builder An alias for , Build a builder Type of controllerbuilder
Mediumctrl
Namely controller
- start-up manager
builder
Let's take a look builder What was done at build time
// Builder builds a Controller.
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
name string
}
We see example It's called in For()
action , So this For()
What is it? ?
By annotating , We can see For() Provides Mediation object type ,ControllerManagedBy adopt reconciling object To correspond to create/delete/update
event . call For()
Equivalent to calling Watches(&source.Kind{Type: apiType}, &handler.EnqueueRequestForObject{})
.
func (blder *Builder) For(object client.Object, opts ...ForOption) *Builder {
if blder.forInput.object != nil {
blder.forInput.err = fmt.Errorf("For(...) should only be called once, could not assign multiple objects for reconciliation")
return blder
}
input := ForInput{object: object}
for _, opt := range opts {
opt.ApplyToFor(&input) // Finally, each of the objects we want to listen to opts Sign up
}
blder.forInput = input
return blder
}
The next step is to call Owns() ,Owns()
Looks like For()
The function is similar . Just say belong to different , It's through Owns Method set
func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
input := OwnsInput{object: object}
for _, opt := range opts {
opt.ApplyToOwns(&input)
}
blder.ownsInput = append(blder.ownsInput, input)
return blder
}
Finally arrived. Complete(),Complete
Is to finish this controller The construction of
// Complete builds the Application Controller.
func (blder *Builder) Complete(r reconcile.Reconciler) error {
_, err := blder.Build(r)
return err
}
// Build Create controller and return
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
if r == nil {
return nil, fmt.Errorf("must provide a non-nil Reconciler")
}
if blder.mgr == nil {
return nil, fmt.Errorf("must provide a non-nil Manager")
}
if blder.forInput.err != nil {
return nil, blder.forInput.err
}
// Checking the reconcile type exist or not
if blder.forInput.object == nil {
return nil, fmt.Errorf("must provide an object for reconciliation")
}
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
return blder.ctrl, nil
}
You can see , Will complete doController and doWatch
doController Will initialize this controller And back to
func (blder *Builder) doController(r reconcile.Reconciler) error {
globalOpts := blder.mgr.GetControllerOptions()
ctrlOptions := blder.ctrlOptions
if ctrlOptions.Reconciler == nil {
ctrlOptions.Reconciler = r
}
// By retrieving GVK Get the default name
gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
if err != nil {
return err
}
// Set up concurrency , If the maximum concurrency is 0 Find one
// Tracking seems to be when there is no setting , For example, according to app group Medium ReplicaSet Set up
// Is in the For() The number of types passed to determine the number of concurrency
if ctrlOptions.MaxConcurrentReconciles == 0 {
groupKind := gvk.GroupKind().String()
if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
ctrlOptions.MaxConcurrentReconciles = concurrency
}
}
// Setup cache sync timeout.
if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
}
// to controller One name, If the delivery is not initialized , Then use Kind Make a name
controllerName := blder.getControllerName(gvk)
// Setup the logger.
if ctrlOptions.LogConstructor == nil {
log := blder.mgr.GetLogger().WithValues(
"controller", controllerName,
"controllerGroup", gvk.Group,
"controllerKind", gvk.Kind,
)
lowerCamelCaseKind := strings.ToLower(gvk.Kind[:1]) + gvk.Kind[1:]
ctrlOptions.LogConstructor = func(req *reconcile.Request) logr.Logger {
log := log
if req != nil {
log = log.WithValues(
lowerCamelCaseKind, klog.KRef(req.Namespace, req.Name),
"namespace", req.Namespace, "name", req.Name,
)
}
return log
}
}
// Here is how to build a new controller , That's what I said manager.New()
blder.ctrl, err = newController(controllerName, blder.mgr, ctrlOptions)
return err
}
start Manager
Next is manager Start of , Which is the corresponding start()
And doWatch()
We can see from the following code , about doWatch()
Is to put compete()
The event functions of some previous resources are injected into controller in
func (blder *Builder) doWatch() error {
// Mediation type , This is also for For Of obj Come on , What structure do we need , Such as unstructured data or metadata-only
// metadata-only It is configured as a GVK schema.GroupVersionKind
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}&source.Kind{}
// Some preparatory work , Encapsulate an object as &source.Kind{}
//
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{} // That is to say, it contains obj An event queue for
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
// Here comes what I said before controller watch 了
// Inject a series of preparatory actions into cache Such as source eventHandler predicate
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
// Repeat again ownsInput action
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
// In the face of ownsInput Repeat the operation
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}
}
return nil
}
Because the first two builder
Your actions will mgr The pointer is passed to builder in , And operated complete()
, That is to say, the operation build()
, This represents the right to controller
Finished initializing , And event injection (watch
) The operation of , therefore Start(), Will be controller start-up
func (cm *controllerManager) Start(ctx context.Context) (err error) {
cm.Lock()
if cm.started {
cm.Unlock()
return errors.New("manager already started")
}
var ready bool
defer func() {
if !ready {
cm.Unlock()
}
}()
// Initialize the internal context.
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
// This channel On behalf of controller The stop of
stopComplete := make(chan struct{})
defer close(stopComplete)
// This must be deferred after closing stopComplete, otherwise we deadlock.
defer func() {
stopErr := cm.engageStopProcedure(stopComplete)
if stopErr != nil {
if err != nil {
err = kerrors.NewAggregate([]error{err, stopErr})
} else {
err = stopErr
}
}
}()
// Add the cluster runnable.
if err := cm.add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}
// Indicators
if cm.metricsListener != nil {
cm.serveMetrics()
}
if cm.healthProbeListener != nil {
cm.serveHealthProbes()
}
if err := cm.runnables.Webhooks.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}
// wait for informer Synchronization complete
if err := cm.runnables.Caches.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}
// Non electoral mode ,runnable Will be in cache Start after synchronization
if err := cm.runnables.Others.Start(cm.internalCtx); err != nil {
if !errors.Is(err, wait.ErrWaitTimeout) {
return err
}
}
// Start the leader election and all required runnables.
{
ctx, cancel := context.WithCancel(context.Background())
cm.leaderElectionCancel = cancel
go func() {
if cm.resourceLock != nil {
if err := cm.startLeaderElection(ctx); err != nil {
cm.errChan <- err
}
} else {
// Treat not having leader election enabled the same as being elected.
if err := cm.startLeaderElectionRunnables(); err != nil {
cm.errChan <- err
}
close(cm.elected)
}
}()
}
ready = true
cm.Unlock()
select {
case <-ctx.Done():
// We are done
return nil
case err := <-cm.errChan:
// Error starting or running a runnable
return err
}
}
You can see that it starts up 4 Types of runnable, It's actually about this runnable To start the , for example controller,cache etc. .
Take a look back. , We were using code-generator
Generate , And customize controller when , We also start by informer.Start()
, Otherwise, an error will be reported .
Finally, it can be represented by a diagram ,client-go And controller-manager The relationship between
Reference
边栏推荐
- [teacher Zhao Yuqiang] use the catalog database of Oracle
- Deep learning, thinking from one dimensional input to multi-dimensional feature input
- Merge and migrate data from small data volume, sub database and sub table Mysql to tidb
- Complete set of C language file operation functions (super detailed)
- Why should there be a firewall? This time xiaowai has something to say!!!
- Beandefinitionregistrypostprocessor
- [teacher Zhao Yuqiang] Cassandra foundation of NoSQL database
- Leetcode solution - 01 Two Sum
- 智牛股--03
- Kubernetes notes (VII) kuberetes scheduling
猜你喜欢
Synthetic keyword and NBAC mechanism
[teacher Zhao Yuqiang] MySQL flashback
[teacher Zhao Yuqiang] use the catalog database of Oracle
项目总结--04
Oauth2.0 - Introduction and use and explanation of authorization code mode
Project summary --01 (addition, deletion, modification and query of interfaces; use of multithreading)
Kubernetes notes (VII) kuberetes scheduling
pytorch DataLoader实现miniBatch(未完成)
深入解析kubernetes controller-runtime
[teacher Zhao Yuqiang] Cassandra foundation of NoSQL database
随机推荐
Use telnet to check whether the port corresponding to the IP is open
为什么网站打开速度慢?
Virtual memory technology sharing
[video of Teacher Zhao Yuqiang's speech on wot] redis high performance cache and persistence
Solve the problem of automatic disconnection of SecureCRT timeout connection
智牛股项目--05
Yum is too slow to bear? That's because you didn't do it
理解 期望(均值/估计值)和方差
Maximum likelihood estimation, divergence, cross entropy
Exportation et importation de tables de bibliothèque avec binaires MySQL
PMP笔记记录
pytorch 多分类中的损失函数
Txt document download save as solution
[teacher Zhao Yuqiang] index in mongodb (Part 1)
pytorch DataLoader实现miniBatch(未完成)
Project summary --01 (addition, deletion, modification and query of interfaces; use of multithreading)
从小数据量 MySQL 迁移数据到 TiDB
Cesium 点击获三维坐标(经纬度高程)
Skywalking8.7 source code analysis (II): Custom agent, service loading, witness component version identification, transform workflow
SVN分支管理