当前位置:网站首页>Locust performance test 4 (custom load Policy)
Locust performance test 4 (custom load Policy)
2022-07-07 09:08:00 【Song_ Lun】
Preface
Sometimes we need a fully customized load test , This cannot be achieved by simply setting or changing the number of users and the brush out rate . for example , You may want to generate a load spike or rise or fall at a custom time . By using LoadTestShape
class , You can completely control the user count and generation rate at any time .
Time based peak strategy
There is such a need , A login interface has 10 Users log in at the same time , And continue 180 second , So we need to set it like this ? I'm going to use LoadTestShape
class
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/26 10:11 In the morning
# @Name : peilun
# @File : Testshape_test.py
from locust import LoadTestShape, task, HttpUser, constant
import json
""" Custom load policy Generate every second 5 Users , The duration of the 180s """
class MyUser(HttpUser):
# Request interval
wait_time = constant(1)
host = 'https://www.baidu.com'
url = '/api/user/login'
headers = {
'Content-Type': 'application/json'}
@task
def on_start(self):
global Authorization
# Login is performed only once obtain token
print(" User initialization -- Log in and get token")
self.data = {
"username":"123456","password":"123456"}
respon = self.client.post(self.url, headers = self.headers, data=json.dumps(self.data), name=' The user login ', verify=False, timeout=10)
resp_dict = respon.json()
# print(f' The response data is :{resp_dict}')
if respon.status_code == 200:
# print(resp_dict['msg'])
print(f' The response data is :{
resp_dict}')
Authorization = respon.headers['Authorization']
return Authorization
else:
respon.failure(resp_dict['msg'])
# Generate every second 10 Users , The duration of the 180s
class MyCustomShape(LoadTestShape):
# Set the duration of pressure measurement , Unit second
time_limit = 180
# Start every second / Number of users stopped
spawn_rate = 10
def tick(self):
""" Returns a tuple , Contains two values : user_count -- Total users spawn_rate -- Start every second / Number of users stopped return None when , Stop the load test """
# Get the execution time of pressure test
run_time = self.get_run_time()
# The operation duration is within the maximum duration of pressure measurement , Then continue
if run_time < self.time_limit:
user_count = round(run_time, -1)
return user_count, self.spawn_rate
return None
if __name__ == '__main__':
import os
os.system("locust -f Testshape_test.py")
Step based load policy
Every time 30 Second increase 5 Users , continued 10 minute
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/26 10:11 In the morning
# @Name : peilun
# @File : Testshape_test2.py
from locust import LoadTestShape, task, HttpUser, constant
import json
import math
""" Custom load policy Every time 30 Second increase 5 Users , continued 10 minute """
class MyUser(HttpUser):
# Request interval
wait_time = constant(1)
host = 'https://www.baidu.com'
url = '/api/user/login'
headers = {
'Content-Type': 'application/json'}
@task
def on_start(self):
global Authorization
# Login is performed only once obtain token
print(" User initialization -- Log in and get token")
self.data = {
"username":"123456","password":"123456"}
respon = self.client.post(self.url, headers = self.headers, data=json.dumps(self.data), name=' The user login ', verify=False, timeout=10)
resp_dict = respon.json()
# print(f' The response data is :{resp_dict}')
if respon.status_code == 200:
# print(resp_dict['msg'])
print(f' The response data is :{
resp_dict}')
Authorization = respon.headers['Authorization']
return Authorization
else:
respon.failure(resp_dict['msg'])
# Generate every second 10 Users , The duration of the 180s
class MyCustomShape(LoadTestShape):
""" step_time -- Time between steps step_load -- Users increase the number at each step spawn_rate -- The user stops every second in every step / Number of starts time_limit -- The time limit , In seconds """
step_time = 30
step_load = 5
spawn_rate = 10
time_limit = 600
def tick(self):
# Get the time to perform the pressure test
run_time = self.get_run_time()
# The running time is beyond the time limit , Do not perform
if run_time > self.time_limit:
return None
current_step = math.floor(run_time / self.step_time) + 1
return current_step * self.step_load, self.spawn_rate
if __name__ == '__main__':
import os
os.system("locust -f Testshape_test2.py")
Load policy based on time period
front 10s and 10-20s The number of users is 10;20-50s The number of users is 50;50-80s The number of users is 100;80s The number of subsequent users is 30
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2021/11/26 10:11 In the morning
# @Name : peilun
# @File : Testshape_test3.py
from locust import LoadTestShape, task, HttpUser, constant
import json
""" Custom load policy front 10s and 10-20s The number of users is 10;20-50s The number of users is 50;50-80s The number of users is 100;80s The number of subsequent users is 30 """
class MyUser(HttpUser):
# Request interval
wait_time = constant(1)
host = 'https://www.baidu.com'
url = '/api/user/login'
headers = {
'Content-Type': 'application/json'}
@task
def on_start(self):
global Authorization
# Login is performed only once obtain token
print(" User initialization -- Log in and get token")
self.data = {
"username":"123456","password":"123456"}
respon = self.client.post(self.url, headers = self.headers, data=json.dumps(self.data), name=' The user login ', verify=False, timeout=10)
resp_dict = respon.json()
# print(f' The response data is :{resp_dict}')
if respon.status_code == 200:
# print(resp_dict['msg'])
print(f' The response data is :{
resp_dict}')
Authorization = respon.headers['Authorization']
return Authorization
else:
respon.failure(resp_dict['msg'])
class MyCustomShape(LoadTestShape):
""" duration -- How many seconds to enter the next stage users -- The number of users spawn_rate -- Start every second / Number of users stopped """
stages = [
{
"duration": 10, "users": 10, "spawn_rate": 10},
{
"duration": 20, "users": 50, "spawn_rate": 10},
{
"duration": 50, "users": 100, "spawn_rate": 10},
{
"duration": 80, "users": 30, "spawn_rate": 10}
]
def tick(self):
run_time = self.get_run_time()
for stage in self.stages:
if run_time < stage["duration"]:
tick_data = (stage["users"], stage["spawn_rate"])
return tick_data
if __name__ == '__main__':
import os
os.system("locust -f Testshape_test3.py")
边栏推荐
- 徽商期货公司评级是多少?开户安全吗?我想开户,可以吗?
- 串口實驗——簡單數據收發
- 实现自定义内存分配器
- RuntimeError: Calculated padded input size per channel: (1 x 1). Kernel size: (5 x 5). Kernel size c
- Newly found yii2 excel processing plug-in
- 硬件大熊原创合集(2022/05更新)
- OpenGL 3D graphics rendering
- Interview question: general layout and wiring principles of high-speed PCB
- How long does the PMP usually need to prepare for the exam in advance?
- NVIC中断优先级管理
猜你喜欢
What are the conditions for applying for NPDP?
面板显示技术:LCD与OLED
Led analog and digital dimming
Reflections on the way of enterprise IT architecture transformation (Alibaba's China Taiwan strategic thought and architecture practice)
Why is access to the external network prohibited for internal services of the company?
Screen automatically generates database documents
面试题:高速PCB一般布局、布线原则
2020 year end summary
C language for calculating the product of two matrices
Simple use of Xray
随机推荐
Alibaba P8 teaches you how to realize multithreading in automated testing? Hurry up and stop
Explain Huawei's application market in detail, and gradually reduce 32-bit package applications and strategies in 2022
Vagrant failed to mount directory mount: unknown filesystem type 'vboxsf'
LeetCode 715. Range 模块
STM32的时钟系统
How to use Arthas to view class variable values
2022-07-06 unity core 9 - 3D animation
2022-07-06 Unity核心9——3D动画
Three updates to build applications for different types of devices | 2022 i/o key review
How to realize sliding operation component in fast application
【Istio Network CRD VirtualService、Envoyfilter】
Summary of PMP learning materials
Leetcode question brushing record (array) combination sum, combination sum II
Esp32-ulp coprocessor low power mode RTC GPIO interrupt wake up
Port occupation troubleshooting
go mod module declares its path as: gtihub. com/xxx-xx but was required as:xx-xx
LeetCode 736. LISP syntax parsing
What is the value of getting a PMP certificate?
Recommended by Alibaba P8, the test coverage tool - Jacobo is very practical
Platformization, a fulcrum of strong chain complementing chain