locust
简介
https://www.jianshu.com/p/30cbbc000d67
https://blog.csdn.net/BearStarX/article/details/91791898
https://debugtalk.com/post/head-first-locust-advanced-script/
https://cloud.tencent.com/developer/article/1594240 (Locust官方文档(API)解读(全)
https://www.axihe.com/tools/locust/increase-performance.html
Locust是一款易于使用的分布式用户负载测试工具。它用于对网站(或其他系统)进行负载测试,并确定系统可以处理多少并发用户。
Locust完全基于事件,因此可以在一台计算机上支持数千个并发用户。与许多其他基于事件的应用程序相比,它不使用回调。相反,它通过协程(gevent)机制使用轻量级过程。每个蝗虫蜂拥到你的网站实际上是在自己的进程内运行(或者是greenlet)。
Locust的并发机制摒弃了进程和线程,采用协程(gevent)的机制。采用多线程来模拟多用户时,线程数会随着并发数的增加而增加,而线程之间的切换是需要占用资源的,IO的阻塞和线程的sleep会不可避免的导致并发效率下降;正因如此,LoadRunner和Jmeter这类采用进程和线程的测试工具,都很难在单机上模拟出较高的并发压力。而协程和线程的区别在于,协程避免了系统级资源调度,由此大幅提高了性能。正常情况下,单台普通配置的测试机可以生产数千并发压力,这是LoadRunner和Jmeter都无法实现的。
注意:locust 在wins 上的性能比较差 ,所以测试时候 ,压力机最好是Linux的
1. 安装
在装好python的机器上 使用 pip install locust 安装
2. Locust 入门使用
Locust 主要使用的类有 2个 : 1)Taskset 2)HttpUser(http请求使用)。 locust 是基于requests库的,所以使用起来比较简单,由于基于requests 所以请求可以保留session ,自动保留登录态,token也可以脚本上保存。 但是requests的并发能力不行 ,官方建议大并发时使用 FastHttpLocust做为客户端
例子:
from locust import HttpLocust, TaskSet, task
class WebsiteTasks(TaskSet):
def on_start(self):
self.client.post("/login", {
"username": "test",
"password": "123456"
})
@task(2)
def index(self):
self.client.get("/")
@task(1)
def about(self):
self.client.get("/about/")
class WebsiteUser(HttpLocust):
task_set = WebsiteTasks
host = "http://debugtalk.com"
min_wait = 1000
max_wait = 5000
class HttpLocust(Locust)
client : 对应着虚拟用户作为客户端所具备的请求能力,也就是请求发起器,基于requests
task_set: 指向一个TaskSet类,TaskSet类定义了用户的任务信息,该属性为必填;
max_wait/min_wait: 每个用户执行两个任务间隔时间的上下限(毫秒),具体数值在上下限中随机取值,若不指定则默认间隔时间固定为1秒;
host:被测系统的host,当在终端中启动locust时没有指定--host参数时才会用到;
weight:同时运行多个Locust类时会用到,用于控制不同类型任务的执行权重。
class TaskSet
性能测试工具要模拟用户的业务操作,就需要通过脚本模拟用户的行为。在前面的比喻中说到,TaskSet类好比蝗虫的大脑,控制着蝗虫的具体行为。
TaskSet类实现了虚拟用户所执行任务的调度算法,包括规划任务执行顺序(schedule_task)、挑选下一个任务(execute_next_task)、执行任务(execute_task)、休眠等待(wait)、中断控制(interrupt)等等。在此基础上,就可以在TaskSet子类中采用非常简洁的方式来描述虚拟用户的业务测试场景,对虚拟用户的所有行为(任务)进行组织和描述,并可以对不同任务的权重进行配置。
在TaskSet子类中定义任务信息时,可以采取两种方式,@task装饰器和tasks属性。
from locust import TaskSet, task
class UserBehavior(TaskSet):
@task(1)
def test_job1(self):
self.client.get('/job1')
@task(2)
def test_job2(self):
self.client.get('/job2')
from locust import TaskSet
def test_job1(obj):
obj.client.get('/job1')
def test_job2(obj):
obj.client.get('/job2')
class UserBehavior(TaskSet):
tasks = {test_job1:1, test_job2:2}
# tasks = [(test_job1,1), (test_job1,2)] # 两种方式等价
on_start 和 on_stop 方法
User(和 TaskSet)可以声明 on _ start 方法、 on _ stop 方法。用户在开始运行时调用 on _ start 方法,在停止运行时调用 on _ stop 方法。对于 TaskSet,当模拟用户开始执行 TaskSet 时调用 on _ start 方法,当模拟用户停止执行 TaskSet 时调用 on _ stop。 类似setUp 和 teardown方法,在初始化的时候只执行一次。
关联
参数化 (可使用python 的list 和quere实现)
检查点
集合点
具体做法是,在WebsiteUser定义一个数据集,然后所有虚拟用户在WebsiteTasks中就可以共享该数据集了。如果不要求数据唯一性,数据集选择list数据结构,从头到尾循环遍历即可;如果要求数据唯一性,数据集选择queue数据结构,取数据时进行queue.get()操作即可,并且这也不会循环取数据;至于涉及到需要循环取数据的情况,那也简单,每次取完数据后再将数据插入到队尾即可,queue.put_nowait(data)。
如下为一个例子:
from locust import TaskSet, task, HttpLocust
import queue
#保证并发测试数据唯一性,循环取数据:模拟3用户并发登录账号,总共有90个账号,要求并发登录账号不相同,但数据可循环使用。
class UserBehavior(TaskSet):
@task
def test_register(self):
try:
data = self.locust.user_data_queue.get()
except queue.Empty:
print('account data run out, test ended.')
exit(0)
print('register with user: {}, pwd: {}'\
.format(data['username'], data['password']))
payload = {
'username': data['username'],
'password': data['password']
}
self.client.post('/register', data=payload)
self.locust.user_data_queue.put_nowait(data)
class WebsiteUser(HttpLocust):
host = 'https://debugtalk.com'
task_set = UserBehavior
user_data_queue = queue.Queue()
for index in range(100):
data = {
"username": "test%04d" % index,
"password": "pwd%04d" % index,
"email": "test%04d@debugtalk.test" % index,
"phone": "186%08d" % index,
}
user_data_queue.put_nowait(data)
min_wait = 1000
max_wait = 3000
Locust运行模式
1. 单进程
2. 单台机器 开启 多个slave (按cpu的核数,可开启等于N的数量(包括master))
3. 分布式多台机器
多进程分布式运行
1. 先启动一个 master
2. 在启动多个slave
开启master, 用--master
$ locust -H http://yingzi-dataflow-smart-gateway.stage.yingzi.com -f locusttest.py --web-host=127.0.0.1 --master --port=8089
启动slaver ,用 --slave
$ locust -H http://yingzi-dataflow-smart-gateway.stage.yingzi.com -f locusttest.py --slave
如果slave与master不在同一台机器上,还需要通过--master-host参数再指定master的IP地址。
$ locust -H http://yingzi-dataflow-smart-gateway.stage.yingzi.com
-f locusttest.py --slave --master-host=<locust_machine_ip>
demo
from locust import TaskSet,task,HttpUser,User,between
import os
from locust.contrib.fasthttp import FastHttpUser
import json
# import gevent
# from gevent import monkey
# gevent.monkey.patch_all()
class TestCase(TaskSet):
_token=''
def on_start(self):
url='/api/v1/auth/token'
data = {
"accessType": "RESTFUL",
"appKey": "yingzi-idp-share",
"clientIp": "172.19.61.254",
"deviceId": "",
"secretKey": "7he9JB74675993tz260yMZyik79iK2au"
}
headers = {
'Content-Type': 'application/json;charset=UTF-8',
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36'
}
res = self.client.post(url, data=json.dumps(data), headers=headers,catch_response=True)
self._token=res.json()['data']['access_token']
print(self._token)
@task(10)
def getDataBySql(self):
access_token= self._token
print(access_token)
url='/api/v1/data/common/getDataBySql?access_token=%s'%access_token
headers = {
'Content-Type': 'application/json;charset=UTF-8',
'Accept': 'application/json, text/plain, */*',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,zh;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36'
}
data={
"act": "ihp.stage",
"isPage": True,
"page": 0,
"requestId": "string",
"searchId": 1,
"size": 0,
"sql": "select * from ihp_share.ihp_quarantine_onsite_appendix_ear_tag"
}
res=self.client.post(url,data=json.dumps(data),headers=headers,catch_response=True)
print(res.status_code)
class TestLocust(FastHttpUser):
host='http://yingzi-dataflow-smart-gateway.stage.yingzi.com'
tasks = [TestCase]
min_wait=1000
max_wait=2000
# 有三个内置的等待时间函数:
# constant 固定时间
# between 在最小值和最大值之间的随机时间 #wait_time = between(5, 15)
# constant_pacing一个自适应时间,确保任务每x秒运行一次
if __name__ == "__main__":
os.system("locust -f locusttest.py --web-host=127.0.0.1")
#os.system('locust -f locust_study.py --csv=result --no-web -c2000 -r2000 -t120s --loglevel=INFO --logfile=test.log')
# $ locust -f examples/basic.py --csv=example --no-web -t10m
# --no-web 表示不使用web界面运行测试
# -c 设置虚拟用户数
# -r 设置每秒启动虚拟用户数
# -t 设置运行时间
# -P, --port:指定web端口,默认为8089.
# -n, --num - request:指定总执行测试;
# -r, --hatch - rate:指定并发加压速率,默认值位1。即用户数增加的速度
用例管理
@ tag 装饰器
通过使用@tag 装饰器标记任务,您可以使用 -- tags 和 -- exclude-tags 参数来挑选测试期间执行的任务。例子:
from locust import User, constant, task, tag
class MyUser(User):
wait_time = constant(1)
@tag('tag1')
@task
def task1(self):
pass
@tag('tag1', 'tag2')
@task
def task2(self):
pass
@tag('tag3')
@task
def task3(self):
pass
@task
def task4(self):
pass
如果您使用 -- tag1开始这个测试,那么在测试期间只会执行 task1和 task2。如果以 -- tag2 tag3开始,那么只执行 task2和 task3。
--exclude-tags 则相反,是做排除选择。因此,如果以 --exclude-tags tag3开始测试,则只执行 task1、 task2和 task4。排除总是优先于包含,因此如果一个任务包含一个您已经包含的标记和一个您已经排除的标记,那么它将不会被执行。
增加断言
对于测试来说,每个自动化测试用例都应该有个断言判断,这样才能知道测试用例的成功/失败。
在Python+Locust中,可以使用catch_response参数,with-statement和对response.failure()的调用将请求标记为失败。
with self.client.post(url=url, data=params, timeout=10, catch_response=True) as response:
if response.status_code == 200:
response.success()
else:
response.failure('Failed!')
注意:catch_response=True这个参数是必须要加的,否则在性能测试时,后台会一直报错,提示AttributeError: 'Response' object has no attribute 'success'。
刚刚的样例中,是举例断言status_code是否等于200,如果是,则返回成功,反之返回失败。
TaskSets可嵌套
TaskSet的一个非常重要的特性是它们可以嵌套,因为真实的网站通常以分层的方式构建,包含多个子部分。因此,嵌套任务集将允许我们定义一种行为,以更现实的方式来模拟用户。
TaskSet还可以嵌套:参考下面的代码
第一种:
class ForumPage(TaskSet):
@task(20)
def read_thread(self):
pass
@task(1)
def new_thread(self):
pass
@task(5)
def stop(self):
self.interrupt()
class UserBehaviour(TaskSet):
tasks = {ForumPage:10}
@task
def index(self):
pass
嵌套TaskSet的方式就像使用task属性指定任务时一样,但不是引用python函数,而是引用另一个TaskSet:
在上面的示例中,如果在执行UserBehaviour TaskSet时选择了要执行的ForumPage,则ForumPage TaskSet将开始执行。然后,ForumPage TaskSet将选择其自己的任务之一,执行它,等待,依此类推。
关于上述示例,需要注意一点,那就是在ForumPage的stop方法中调用 self.interrupt()。这样做实际上是停止执行ForumPage任务集,并在UserBehaviour实例中继续执行。如果我们在ForumPage的某处没有调用interrupt()方法,Locust将永远不会停止运行已经启动的ForumPage任务。但是通过使用中断功能,我们可以与任务权重一起定义模拟用户离开论坛的可能性。
也可以使用@task装饰器在类中内联声明嵌套的TaskSet,就像声明普通任务时一样:
class MyTaskSet(TaskSet):
@task
class SubTaskSet(TaskSet):
@task
def my_task(self):
pass
顺序执行
TaskSequence类
TaskSequence类是一个TaskSet,但是它的任务是按顺序执行的。
@seq_task(1)
def first_task(self):
pass
@seq_task(2)
def second_task(self):
pass
@seq_task(3)
@task(10)
def third_task(self):
pass
在上面的示例中,执行顺序定义为先执行first_task,然后执行second_task,最后执行Third_task 10次。可以看到,可以使用@task装饰器组合@seq_task,当然也可以将TaskSet嵌套在TaskSequences中,反之亦然。
请求结果统一判断
设置容错,错误信息属正常返回,此类情况可将返回指定错误信息的请求结果设置成success
设置超时,请求消耗时间大于预期的情况,可将次请求设置为fail,并在错误信息中标注失败原因
规范输出格式,请求失败是打印请求头,请求体,url等详细信息
def request_result(response, *args, **kwargs):
"""
响应结果判断
:param response:响应体
:param args: 自定义 -- 可用来设置容错信息 格式[{"status_code": 400, "message": "error_1"}, {"status_code": 400, "message": "error_2"}]
:param kwargs: 自定义 -- 可用来保存请求详细信息,请求失败时输出详细内容
:return:
"""
# 判断当前请求状态码是否为200
if response.status_code == 200:
try:
response_json = response.json()
except JSONDecodeError as e:
logging.error(e.__repr__())
# 返回的数据不是json格式
request_failure(response, "HTTP标准响应状态码不是200!!!", **kwargs)
else:
# 容错判断代码
TODO: 容错代码
if 执行通过:
# 结果校验通过执行一下代码:
events.request_success.fire(
request_type=response.locust_request_meta["method"],
name=response.locust_request_meta["name"],
response_time=response.locust_request_meta["response_time"],
response_length=response.locust_request_meta["content_size"],
)
response._is_reported = True
else:
# 结果校验失败调用request_failure
# 失败有可能是response_time超预期时间
# if response.locust_request_meta["response_time"] > 6000: # 响应时间超6秒,结果设为失败
request_failure(response, "error_message", **kwargs)
else:
request_failure(response, "HTTP标准响应状态码不是200!!!", **kwargs)
def request_failure(response, err_message="", **kwargs):
exception_message = ""
if err_message:
exception_message += "失败信息为:{},".format(err_message)
if "响应超时" not in err_message:
exception_message += "--- 接口返回结果: {}".format(response.text)
if kwargs:
for key in kwargs.keys():
exception_message += ", 请求{}为: {}".format(key, kwargs[key])
logging.error(exception_message)
events.request_failure.fire(
request_type=response.locust_request_meta["method"],
name=response.locust_request_meta["name"],
response_time=response.locust_request_meta["response_time"],
response_length=response.locust_request_meta["content_size"],
exception=exception_message
)
response._is_reported = True
其他补充
第一个压测实例 压测 MQTT:
https://blog.csdn.net/qq_39214101/article/details/107997414
开始第二个实例压kafka
import time
from locust import TaskSet, task, Locust, events
from kafka import KafkaProducer
import json
class UserBehavior(TaskSet):
def on_start(self):
self.producer = KafkaProducer(bootstrap_servers=['x.x.x.x:9092'])
def on_stop(self):
# 该方法当程序结束时每用户进行调用,关闭连接
self.producer.close()
@task(1)
def sendAddCmd(self):
start_time = time.time()
time_unique_id = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime())
print(time_unique_id)
print("===========================",start_time)
try:
timestamp = int(time.time())
message = {
'timestamp': timestamp,
'message': "121314"
}
msg = json.dumps(message)
msg = msg.encode('utf-8')
self.producer.send('preHandleTopic', msg)
except Exception as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="kafka", name="add", response_time=total_time,response_length=0, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="kafka", name="add", response_time=total_time,response_length=0)
class SocketUser(Locust):
task_set = UserBehavior
min_wait = 1000 # 单位毫秒
max_wait = 1000 # 单位毫秒
if __name__ == '__main__':
import os
os.system("locust -f SendKafka.py --host=x.x.x.x:9092")
开始第三个实例压tcp
from locust import HttpLocust, TaskSet, task
import socket # 导入 socket 模块
host = socket.gethostname() # 获取本地主机名
port = 8888 # 设置端口号
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
class UserBehaviora(TaskSet):
def on_start(self):
print("start")
@task(1)
def bky_index(self):
sock.send(b'121314')
re_mes = sock.recv(1024).decode()
print(re_mes)
class WebsiteUser(HttpLocust):
task_set = UserBehaviora
min_wait = 1500
max_wait = 5000
if __name__ == "__main__":
import os
os.system("locust -f locust6.py --host=x.x.x.x:xxxx")
第三方工具
支持其他采样器协议,报告等。
Locust 插件:https://github.com/SvenskaSpel/locust-plugins/
无需手动步骤即可自动执行分布式运行
Locust集群:https://github.com/SvenskaSpel/locust-swarm/(蝗虫群)
使用其他语言
Locust主服务器和Locust从服务器通过交换msgpack消息进行通信,这是许多语言所支持的。因此,您可以使用任何喜欢的语言编写Locust任务。为了方便起见,一些库充当了从属运行程序。他们运行你的Locust任务,并定期向master报告。
Golang
Boomer:https://github.com/myzhan/boomer/
Java
Locust4j:https://github.com/myzhan/locust4j
Swarm: https://github.com/anhldbk/swarm
配置管理
部署Locust很容易,但是有些工具仍然可以提供一定程度的便利。
tinx.locust是Ansible的一个安装角色,用于配置和控制Locust系统服务,或使用ansible-container构建Locust docker映像。还管理locustfile和相应的测试数据。