当前位置:网站首页>Keda 2.7.1 brief analysis of scaledjob code
Keda 2.7.1 brief analysis of scaledjob code
2022-06-26 02:05:00 【weixin_ forty million four hundred and fifty-five thousand one 】
scaledJob Official description of use :As an alternate to scaling event-driven code as deployments you can also run and scale your code as Kubernetes Jobs.
The essence is through various metric control job Number , The core code is as follows go file
- scale_jobs.go
- scaledjob_controller.go
scaledjob_controller and scaledobject_controller similar , adopt requestScaleLoop function :
// requestScaleLoop request ScaleLoop handler for the respective ScaledJob
func (r *ScaledJobReconciler) requestScaleLoop(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob) error {
logger.V(1).Info("Starting a new ScaleLoop")
return r.scaleHandler.HandleScalableObject(ctx, scaledJob)
}
call scale_handler.go Of HandleScalableObject function ,HandleScalableObject At the same time object and job.
switch obj := scalableObject.(type) {
case *kedav1alpha1.ScaledObject:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
case *kedav1alpha1.ScaledJob:
go h.startPushScalers(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
go h.startScaleLoop(ctx, withTriggers, obj.DeepCopy(), scalingMutex)
}
return nil
scale_jobs The core code is RequestJobScale and createJobs, Finally through batchv1 “k8s.io/api/batch/v1” establish / adjustment job Number .
func (e *scaleExecutor) RequestJobScale(ctx context.Context, scaledJob *kedav1alpha1.ScaledJob, isActive bool, scaleTo int64, maxScale int64) {
logger := e.logger.WithValues("scaledJob.Name", scaledJob.Name, "scaledJob.Namespace", scaledJob.Namespace)
runningJobCount := e.getRunningJobCount(ctx, scaledJob)
pendingJobCount := e.getPendingJobCount(ctx, scaledJob)
logger.Info("Scaling Jobs", "Number of running Jobs", runningJobCount)
logger.Info("Scaling Jobs", "Number of pending Jobs ", pendingJobCount)
effectiveMaxScale := NewScalingStrategy(logger, scaledJob).GetEffectiveMaxScale(maxScale, runningJobCount, pendingJobCount, scaledJob.MaxReplicaCount())
if effectiveMaxScale < 0 {
effectiveMaxScale = 0
}
if isActive {
logger.V(1).Info("At least one scaler is active")
now := metav1.Now()
scaledJob.Status.LastActiveTime = &now
err := e.updateLastActiveTime(ctx, logger, scaledJob)
if err != nil {
logger.Error(err, "Failed to update last active time")
}
e.createJobs(ctx, logger, scaledJob, scaleTo, effectiveMaxScale)
} else {
logger.V(1).Info("No change in activity")
}
condition := scaledJob.Status.Conditions.GetActiveCondition()
if condition.IsUnknown() || condition.IsTrue() != isActive {
if isActive {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionTrue, "ScalerActive", "Scaling is performed because triggers are active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are active")
return
}
} else {
if err := e.setActiveCondition(ctx, logger, scaledJob, metav1.ConditionFalse, "ScalerNotActive", "Scaling is not performed because triggers are not active"); err != nil {
logger.Error(err, "Error setting active condition when triggers are not active")
return
}
}
}
err := e.cleanUp(ctx, scaledJob)
if err != nil {
logger.Error(err, "Failed to cleanUp jobs")
}
}
func (e *scaleExecutor) createJobs(ctx context.Context, logger logr.Logger, scaledJob *kedav1alpha1.ScaledJob, scaleTo int64, maxScale int64) {
scaledJob.Spec.JobTargetRef.Template.GenerateName = scaledJob.GetName() + "-"
if scaledJob.Spec.JobTargetRef.Template.Labels == nil {
scaledJob.Spec.JobTargetRef.Template.Labels = map[string]string{
}
}
scaledJob.Spec.JobTargetRef.Template.Labels["scaledjob.keda.sh/name"] = scaledJob.GetName()
logger.Info("Creating jobs", "Effective number of max jobs", maxScale)
if scaleTo > maxScale {
scaleTo = maxScale
}
logger.Info("Creating jobs", "Number of jobs", scaleTo)
labels := map[string]string{
"app.kubernetes.io/name": scaledJob.GetName(),
"app.kubernetes.io/version": version.Version,
"app.kubernetes.io/part-of": scaledJob.GetName(),
"app.kubernetes.io/managed-by": "keda-operator",
"scaledjob.keda.sh/name": scaledJob.GetName(),
}
for key, value := range scaledJob.ObjectMeta.Labels {
labels[key] = value
}
for i := 0; i < int(scaleTo); i++ {
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: scaledJob.GetName() + "-",
Namespace: scaledJob.GetNamespace(),
Labels: labels,
},
Spec: *scaledJob.Spec.JobTargetRef.DeepCopy(),
}
// Job doesn't allow RestartPolicyAlways, it seems like this value is set by the client as a default one,
// we should set this property to allowed value in that case
if job.Spec.Template.Spec.RestartPolicy == "" {
logger.V(1).Info("Job RestartPolicy is not set, setting it to 'OnFailure', to avoid setting it to the client's default value 'Always'")
job.Spec.Template.Spec.RestartPolicy = corev1.RestartPolicyOnFailure
}
// Set ScaledJob instance as the owner and controller
err := controllerutil.SetControllerReference(scaledJob, job, e.reconcilerScheme)
if err != nil {
logger.Error(err, "Failed to set ScaledJob as the owner of the new Job")
}
err = e.client.Create(ctx, job)
if err != nil {
logger.Error(err, "Failed to create a new Job")
}
}
logger.Info("Created jobs", "Number of jobs", scaleTo)
e.recorder.Eventf(scaledJob, corev1.EventTypeNormal, eventreason.KEDAJobsCreated, "Created %d jobs", scaleTo)
}
Other useful URL:
https://livewyer.io/blog/2021/06/17/keda-showcase-autoscaling-based-on-prometheus-redis/
https://blog.csdn.net/github_19391267/article/details/109634935
边栏推荐
- Tengwenze, a hot-blooded boy, was invited to serve as the image ambassador of the global finals of the sixth season perfect children's model
- UN make (6) conditional execution of makefile
- One minute to understand the difference between synchronous, asynchronous, blocking and non blocking
- readv & writev
- Redis7.0 installation steps
- PTA class a simulated ninth bullet: 1114-1117
- 接口测试用例设计
- It's better to finish one than start thousands of times (reprinted from Douban)
- Abnova actn4 DNA probe solution
- How to set achievable medium - and long-term goals?
猜你喜欢

通俗易懂C語言關鍵字static

Differences and functions of TOS cos DSCP

Sweet cool girl jinshuyi was invited to be the spokesperson for the global finals of the sixth season perfect children's model

One stop solution EMQ for hundreds of millions of communication of Internet of things

Show spirit chenzitong was invited to be the chief experience officer of the global finals of the sixth season perfect children's model

readv & writev

Exploring temporary information for dynamic network embedding

Graphics rendering pipeline

-- EGFR FISH probe solution

A lost note for konjaku beginner
随机推荐
wifi 的理论速度计算方法
Pre ++, post ++ and pre -- and post -- (+a, a++ and --a, a--)
shell curl 执行脚本,带传参数,自定义参数
Input 3 integers and output them from large to small
Convert Weishi camera pictures
Create OpenGL window
Differences and functions of export set env in makefile
shell学习记录(三)
前置++,后置++与前置--与后置--(++a,a++与--a,a--)
Meaning of each state in TCP network communication
Redis7.0 installation steps
Dataframe extracts data from a column and converts it into a list
Distributed systems (II) understanding of distributed transactions
Abnova actn4 DNA probe solution
Calibration...
The answer skills and examples of practical cases of the second construction company are full of essence
关于strlen与sizeof的区别
NDK20b FFmpeg4.2.2 编译和集成
Three factors affecting personal growth
树莓派 + AWS IoT Greengrass