locust实现压力测试_将locust作为第三方库
在将locust作为第三方库使用前,先对其中涉及的对象进行介绍。
在了解关系下,便可更好的使用
一、locust架构和类关系
介绍locust中涉及的对象,以及他们之间的关系
参考链接 https://testerhome.com/topics/24300
locust官方参考文档 https://docs.locust.io/en/stable/
1.1. locust架构
核心架构.png- locust架构上使用master-slave模型,支持单机和分布式
- master和slave(即worker)使用 ZeroMQ 协议通讯
- 提供web页面管理master,从而控制slave,同时展示压测过程和汇总结果
- 可选no-web模式(headless 一般用于调试)
- 基于Python本身已经支持跨平台
1.2. 主要类关系
先来一个关系图,看到locust主要类之间的关系
关系图.png
简单来说,Locust的代码分为以下模块:
- User-压测用例:提供了HttpUser压测http协议,用户可以定义事务,断言等,也可以实现特定协议的User
- Runner-执行器:Locust的核心类,定义了整个框架的执行逻辑,实现了Master、Slave(worker)等执行器
- EventHook-事件钩子:通过预先定义的事件使得我们可以在这些事件发生时(比如slave上报)做一些额外的操作
- WebU:提供web界面的操作台和压测过程展示
- Socket-通信器:提供了分布式模式下master和slave的交互方式
- RequestStats-采集、分析器:定义了结果分析和数据上报格式
1.3 核心类
核心类图.png- 用户定义的User类作为Runner的user_classes传入
- TaskSet和User持有client,可以在类中直接发起客户端请求,client可以自己实现,Locust只实现了HttpUser
- master的client_listener监听施压端client消息
- slave的worker方法监听master消息
- slave的stats_reporter方法上报压测数据,默认3s上报一次
- slave的start启动协程,使用master分配的并发数开始压测
- slave默认1s上报一次心跳,如果master超过3s未收到某个slave的心跳则会将其标记为missing状态
主要结构介绍完了,接下来看下具体的类和对应的方法
二、用户行为User task TaskSet
2.1. User
一个User代表一个压测用户。locust将为每个正在模拟的用户生成User类的一个实例。
【User可定义公共属性】
-
2.1.1. wait_time属性:单位秒,两次执行task时间的间隔。between、constant、constant_pacing
eg:自定义wait_time下面的User类将休眠一秒钟,然后休眠两个,然后休眠三个,依此类推
class MyUser(User):
last_wait_time = 0
def wait_time(self):
self.last_wait_time += 1
return self.last_wait_time
...
-
2.1.2. weight属性:通过设置weight参数,设置用户比例
eg:网络用户的可能性是移动用户的三倍
class WebUser(User):
weight = 3
...
class MobileUser(User):
weight = 1
...
如果文件中存在多个用户类,并且在命令行上未指定任何用户类,则Locust将产生相等数量的每个用户类
可以通过将它们作为命令行参数传递,来指定要从同一locustfile中使用哪些用户类
locust -f locust_file.py WebUser MobileUser
-
2.1.3. host属性:要加载的主机的URL前缀(即“ http://google.com ”)
如果在用户类中声明了主机属性,则--host 在命令行或Web请求中未指定no的情况下将使用该属性。
可以在命令行、Web UI中修改该属性。
优先级: Web UI > 命令行(--host) > 代码 -
2.1.4. task属性: @task
详细内容见接下来的:2.2task -
2.1.5. 环境属性: environment
用户正在其中运行的引用
与环境或runner其所包含的环境进行交互
self.environment.runner.quit()
如果在独立蝗虫实例上运行,则将停止整个运行。如果在工作程序节点上运行,它将停止该特定节点
2.2 task
-
2.2.1. 宣告任务:
为用户类(或TaskSet)声明任务以使用task装饰器的典型方式。
@task采用可选的weight参数,该参数可用于指定任务的执行率
eg:task2被选择为task1的机会是两倍
from locust import User, task, between
class MyUser(User):
wait_time = between(5, 15)
@task(3)
def task1(self):
pass
@task(6)
def task2(self):
pass
-
2.2.2. 任务属性:
不常用 -
2.2.3. 标记任务:
通过使用标记<locust.tag>装饰器标记任务
from locust import User, constant, task, tag
class MyUser(User):
wait_time = constant(1)
@tag('tag1')
@task
def task1(self):
pass
@tag('tag1', 'tag2')
@task
def task2(self):
pass
@tag('tag3')
@task
def task3(self):
pass
@task
def task4(self):
pass
2.3 TaskSet
用于模拟现实用户分级操作的场景,单接口直接用User就可以。
TaskSet是蝗虫任务的集合,将像直接在User类上声明的任务一样执行,使用户在两次任务执行之间处于休眠状态.
- 带有TaskSet的Demo:
class ForumSection(TaskSet):
@task(10)
def view_thread(self):
pass
@task(1)
def create_thread(self):
pass
@task(1)
def stop(self):
self.interrupt()
class LoggedInUser(User):
wait_time = between(5, 120)
tasks = {ForumSection:2}
@task
def index_page(self):
pass
-
使用@task装饰器:
直接在User / TaskSet类下内联TaskSet
class MyUser(User):
@task(1)
class MyTaskSet(TaskSet):
...
-
多层嵌套:
TaskSet类的任务可以是其他TaskSet类,从而可以将它们嵌套任何数量的级别。
例如,我们可以使用以下结构定义TaskSet:
- Main user behaviour
- Index page
- Forum page
- Read thread
- Reply
- New thread
- View next page
- Browse categories
- Watch movie
- Filter movies
- About page
当正在运行的用户线程选择TaskSet类执行时,将创建该类的实例,然后执行将进入该TaskSet。
然后发生的事情是,将拾取并执行TaskSet的任务之一
然后线程将进入用户的wait_time函数指定的持续时间(除非wait_time直接在TaskSet类上声明了该函数,在这种情况下,它将使用该函数)
然后从TaskSet的任务中选择一个新任务,然后再次等待,依此类推。
2.4 User和TaskSet的关系
- 在执行时传递的参数是对TaskSet实例的引用,而不是User实例
- 可以从TaskSet实例中通过访问User实例
- TaskSets还包含一个便捷 client 属性,该属性引用User实例上的client属性
- TaskSet实例的属性user指向其User实例,parent指向其父TaskSet实例
- 标记TaskSet会将标记自动应用于所有TaskSet的任务
- 如果您在嵌套的TaskSet中标记任务,那么即使未标记TaskSet,蝗虫也将执行该任务
至此,已经了解了locust主要类之间的关系,以及主要类的功能。
接下来将以第三方库的方式,将locust引入到项目工程中
三、 以库的方式引入locust
3.1. 创建一个 Environment 实例
from locust.env import Environment
env = Environment(user_classes=[MyTestUser])
3.2. 创建 create_master_runner 或 create_worker_runner启动Runner
env.create_local_runner()
env.runner.start(5000, hatch_rate=20)
env.runner.greenlet.join()
3.3. start a Web UI
env.create_local_runner()
env.create_web_ui()
env.web_ui.greenlet.join()
3.4. 完整Demo
import gevent
from locust import HttpUser, task, between
from locust.env import Environment
from locust.stats import stats_printer
from locust.log import setup_logging
setup_logging("INFO", None)
class User(HttpUser):
wait_time = between(1, 3)
host = "https://docs.locust.io"
@task
def my_task(self):
self.client.get("/")
@task
def task_404(self):
self.client.get("/non-existing-path")
# setup Environment and Runner
env = Environment(user_classes=[User])
env.create_local_runner()
# start a WebUI instance
env.create_web_ui("127.0.0.1", 8089)
# start a greenlet that periodically outputs the current stats
gevent.spawn(stats_printer(env.stats))
# start the test
env.runner.start(1, hatch_rate=10)
# in 60 seconds stop the runner
gevent.spawn_later(60, lambda: env.runner.quit())
# wait for the greenlets
env.runner.greenlet.join()
# stop the web server for good measures
env.web_ui.stop()
3.5. 判断当前状态,退出locust压测
3.5.1 增加监听的代码
@events.quitting.add_listener
def results(environment, **kw):
logging.error("------------bonnie--------------")
if environment.stats.total.fail_ratio > 0.01:
logging.error("Test failed due to failure ratio > 1%")
environment.process_exit_code = 1
elif environment.stats.total.avg_response_time > 10:
logging.error("Test failed due to average response time ratio > 200 ms")
environment.process_exit_code = 1
elif environment.stats.total.get_response_time_percentile(0.95) > 300:
logging.error("Test failed due to 95th percentile response time > 800 ms")
environment.process_exit_code = 1
else:
environment.process_exit_code = 0
3.5.2. 修改监听,注册到init上
只用在init上被注册,在实际执行时才能被调用
from locust import events
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner
def checker(environment):
while not environment.runner.state in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
time.sleep(1)
# if environment.runner.stats.total.fail_ratio > 0.2:
# print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
if environment.stats.total.avg_response_time > 40:
print(f"fail ratio was {environment.stats.total.avg_response_time}, quitting")
environment.runner.quit()
return
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
# only run this on master & standalone
if not isinstance(environment.runner, WorkerRunner):
gevent.spawn(checker, environment)
# 需要在创建完env之后进行调用,才能起作用
on_locust_init(env)
四、一个完整的Demo
涉及到其他文件,需要加载才能正常运行
from locust import events
from locust.env import Environment
from locust.stats import stats_printer
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner
from locust.log import setup_logging, logging
import gevent
import time
from config import ConfigStopCondition, ConfigLoadInfo
from flask import request, Response
from locust import stats as locust_stats, runners as locust_runners
from locust import events
from prometheus_client import Metric, REGISTRY, exposition
is_quitting = False
def checker(environment):
global is_quitting
while not environment.runner.state in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
time.sleep(1)
if environment.runner.stats.total.fail_ratio > ConfigStopCondition.fail_ratio:
logging.error(f"Test failed due to failure ratio > {ConfigStopCondition.fail_ratio}, quitting")
print(f"Test failed due to failure ratio > {ConfigStopCondition.fail_ratio}, quitting")
is_quitting = True
elif environment.stats.total.avg_response_time > ConfigStopCondition.avg_response_time:
logging.error(f"Test failed due to average response time ratio > {ConfigStopCondition.avg_response_time}, quitting")
print(f"Test failed due to average response time ratio > {ConfigStopCondition.avg_response_time},quitting")
is_quitting = True
elif environment.stats.total.get_response_time_percentile(0.95) > ConfigStopCondition.response_time_95:
logging.error(f"Test failed due to 95th percentile response time > {ConfigStopCondition.response_time_95}, ms quitting")
print(f"Test failed due to 95th percentile response time > {ConfigStopCondition.response_time_95}, ms quitting")
is_quitting = True
if is_quitting:
logging.error("Fail Ratio \t | Avg time \t | 95 time")
logging.error(f" {environment.runner.stats.total.fail_ratio} \t | "
f"{environment.stats.total.avg_response_time} \t | "
f"{environment.stats.total.get_response_time_percentile(0.95)} ")
environment.runner.quit()
return
@events.init.add_listener
def on_locust_init(environment, runner, **_kwargs):
if not isinstance(environment.runner, WorkerRunner):
gevent.spawn(checker, environment)
def run_load_test(my_user):
global is_quitting
# 通过for循环,实现分不同用户数量的压测
for u, r, rtime in zip(ConfigLoadInfo.user_list, ConfigLoadInfo.rate_list, ConfigLoadInfo.runtime_list):
if not is_quitting:
print( f"Current user is {u}")
logging.error(f"Current user is {u}")
# setup Environment and Runner
env = Environment(user_classes=[my_user], step_load=True, stop_timeout=rtime*60*2)
env.create_local_runner()
# start a WebUI instance
env.create_web_ui("172.18.86.167", 8089)
# start a greenlet that periodically outputs the current stats
gevent.spawn(stats_printer(env.stats))
on_locust_init(env, env.runner)
# start the test
env.runner.start(u, hatch_rate=r)
# in 60 seconds stop the runner
gevent.spawn_later(rtime*60, lambda: env.runner.quit())
# wait for the greenlets
env.runner.greenlet.join()
# stop the web server for good measures
env.web_ui.stop()
if __name__ == "__main__":
run_load_test(MyUser)
此时,已经实现将locust作为第三方库在python工程中运行了
此时可以打开locust页面,查看运行状态
通过上述代码。实现了,以第三方库的形式,分阶段压测被测对象。
并在不满足判定条件时,结束压测。
下一小结,介绍通过Prometheus和Garapha对数据进行长久保存。
并且生成可视化图表