Locust

locust实现压力测试_将locust作为第三方库

2020-08-24  本文已影响0人  bonnie_xing

在将locust作为第三方库使用前,先对其中涉及的对象进行介绍。
在了解关系下,便可更好的使用

一、locust架构和类关系

介绍locust中涉及的对象,以及他们之间的关系
参考链接 https://testerhome.com/topics/24300
locust官方参考文档 https://docs.locust.io/en/stable/

1.1. locust架构

核心架构.png

1.2. 主要类关系

先来一个关系图,看到locust主要类之间的关系


关系图.png

简单来说,Locust的代码分为以下模块:

1.3 核心类

核心类图.png

主要结构介绍完了,接下来看下具体的类和对应的方法

二、用户行为User task TaskSet

2.1. User

一个User代表一个压测用户。locust将为每个正在模拟的用户生成User类的一个实例。
【User可定义公共属性】

class MyUser(User):
    last_wait_time = 0

    def wait_time(self):
        self.last_wait_time += 1
        return self.last_wait_time

    ...
class WebUser(User):
    weight = 3
    ...

class MobileUser(User):
    weight = 1
    ...

如果文件中存在多个用户类,并且在命令行上未指定任何用户类,则Locust将产生相等数量的每个用户类
可以通过将它们作为命令行参数传递,来指定要从同一locustfile中使用哪些用户类

locust -f locust_file.py WebUser MobileUser
self.environment.runner.quit()

如果在独立蝗虫实例上运行,则将停止整个运行。如果在工作程序节点上运行,它将停止该特定节点

2.2 task

from locust import User, task, between

class MyUser(User):
    wait_time = between(5, 15)

    @task(3)
    def task1(self):
        pass

    @task(6)
    def task2(self):
        pass
from locust import User, constant, task, tag

class MyUser(User):
    wait_time = constant(1)

    @tag('tag1')
    @task
    def task1(self):
        pass

    @tag('tag1', 'tag2')
    @task
    def task2(self):
        pass

    @tag('tag3')
    @task
    def task3(self):
        pass

    @task
    def task4(self):
        pass

2.3 TaskSet

用于模拟现实用户分级操作的场景,单接口直接用User就可以。

TaskSet是蝗虫任务的集合,将像直接在User类上声明的任务一样执行,使用户在两次任务执行之间处于休眠状态.

class ForumSection(TaskSet):
    @task(10)
    def view_thread(self):
        pass

    @task(1)
    def create_thread(self):
        pass

    @task(1)
    def stop(self):
        self.interrupt()

class LoggedInUser(User):
    wait_time = between(5, 120)
    tasks = {ForumSection:2}

    @task
    def index_page(self):
        pass
class MyUser(User):
    @task(1)
    class MyTaskSet(TaskSet):
        ...
- Main user behaviour
  - Index page
  - Forum page
    - Read thread
      - Reply
    - New thread
    - View next page
  - Browse categories
    - Watch movie
    - Filter movies
  - About page

当正在运行的用户线程选择TaskSet类执行时,将创建该类的实例,然后执行将进入该TaskSet。
然后发生的事情是,将拾取并执行TaskSet的任务之一
然后线程将进入用户的wait_time函数指定的持续时间(除非wait_time直接在TaskSet类上声明了该函数,在这种情况下,它将使用该函数)
然后从TaskSet的任务中选择一个新任务,然后再次等待,依此类推。

2.4 User和TaskSet的关系

至此,已经了解了locust主要类之间的关系,以及主要类的功能。
接下来将以第三方库的方式,将locust引入到项目工程中

三、 以库的方式引入locust

3.1. 创建一个 Environment 实例

from locust.env import Environment

env = Environment(user_classes=[MyTestUser])

3.2. 创建 create_master_runner 或 create_worker_runner启动Runner

env.create_local_runner()
env.runner.start(5000, hatch_rate=20)
env.runner.greenlet.join()

3.3. start a Web UI

env.create_local_runner()
env.create_web_ui()
env.web_ui.greenlet.join()

3.4. 完整Demo

import gevent
from locust import HttpUser, task, between
from locust.env import Environment
from locust.stats import stats_printer
from locust.log import setup_logging

setup_logging("INFO", None)


class User(HttpUser):
    wait_time = between(1, 3)
    host = "https://docs.locust.io"

    @task
    def my_task(self):
        self.client.get("/")

    @task
    def task_404(self):
        self.client.get("/non-existing-path")

# setup Environment and Runner
env = Environment(user_classes=[User])
env.create_local_runner()

# start a WebUI instance
env.create_web_ui("127.0.0.1", 8089)

# start a greenlet that periodically outputs the current stats
gevent.spawn(stats_printer(env.stats))

# start the test
env.runner.start(1, hatch_rate=10)

# in 60 seconds stop the runner
gevent.spawn_later(60, lambda: env.runner.quit())

# wait for the greenlets
env.runner.greenlet.join()

# stop the web server for good measures
env.web_ui.stop()

3.5. 判断当前状态,退出locust压测

3.5.1 增加监听的代码

@events.quitting.add_listener
def results(environment, **kw):
    logging.error("------------bonnie--------------")
    if environment.stats.total.fail_ratio > 0.01:
        logging.error("Test failed due to failure ratio > 1%")
        environment.process_exit_code = 1
    elif environment.stats.total.avg_response_time > 10:
        logging.error("Test failed due to average response time ratio > 200 ms")
        environment.process_exit_code = 1
    elif environment.stats.total.get_response_time_percentile(0.95) > 300:
        logging.error("Test failed due to 95th percentile response time > 800 ms")
        environment.process_exit_code = 1
    else:
        environment.process_exit_code = 0

3.5.2. 修改监听,注册到init上

只用在init上被注册,在实际执行时才能被调用

from locust import events
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner

def checker(environment):
    while not environment.runner.state in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
        time.sleep(1)
        # if environment.runner.stats.total.fail_ratio > 0.2:
        # print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
        if environment.stats.total.avg_response_time > 40:

            print(f"fail ratio was {environment.stats.total.avg_response_time}, quitting")
            environment.runner.quit()
            return


@events.init.add_listener
def on_locust_init(environment, **_kwargs):
    # only run this on master & standalone
    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(checker, environment)

# 需要在创建完env之后进行调用,才能起作用
on_locust_init(env)

四、一个完整的Demo

涉及到其他文件,需要加载才能正常运行

from locust import events
from locust.env import Environment
from locust.stats import stats_printer
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, WorkerRunner
from locust.log import setup_logging, logging

import gevent
import time
from config import ConfigStopCondition, ConfigLoadInfo

from flask import request, Response
from locust import stats as locust_stats, runners as locust_runners
from locust import events
from prometheus_client import Metric, REGISTRY, exposition

is_quitting = False


def checker(environment):
    global is_quitting
    while not environment.runner.state in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
        time.sleep(1)
        if environment.runner.stats.total.fail_ratio > ConfigStopCondition.fail_ratio:
            logging.error(f"Test failed due to failure ratio > {ConfigStopCondition.fail_ratio}, quitting")
            print(f"Test failed due to failure ratio > {ConfigStopCondition.fail_ratio}, quitting")
            is_quitting = True
        elif environment.stats.total.avg_response_time > ConfigStopCondition.avg_response_time:
            logging.error(f"Test failed due to average response time ratio > {ConfigStopCondition.avg_response_time}, quitting")
            print(f"Test failed due to average response time ratio > {ConfigStopCondition.avg_response_time},quitting")
            is_quitting = True
        elif environment.stats.total.get_response_time_percentile(0.95) > ConfigStopCondition.response_time_95:
            logging.error(f"Test failed due to 95th percentile response time > {ConfigStopCondition.response_time_95}, ms quitting")
            print(f"Test failed due to 95th percentile response time > {ConfigStopCondition.response_time_95}, ms quitting")
            is_quitting = True

        if is_quitting:
            logging.error("Fail Ratio \t | Avg time \t | 95 time")
            logging.error(f" {environment.runner.stats.total.fail_ratio}  \t | "
                          f"{environment.stats.total.avg_response_time}  \t |  "
                          f"{environment.stats.total.get_response_time_percentile(0.95)} ")
            environment.runner.quit()
            return

@events.init.add_listener
def on_locust_init(environment, runner, **_kwargs):
    if not isinstance(environment.runner, WorkerRunner):
        gevent.spawn(checker, environment)

def run_load_test(my_user):
    global is_quitting
    # 通过for循环,实现分不同用户数量的压测
    for u, r, rtime in zip(ConfigLoadInfo.user_list, ConfigLoadInfo.rate_list, ConfigLoadInfo.runtime_list):
        if not is_quitting:
            print( f"Current user is {u}")
            logging.error(f"Current user is {u}")
            # setup Environment and Runner
            env = Environment(user_classes=[my_user], step_load=True, stop_timeout=rtime*60*2)

            env.create_local_runner()

            # start a WebUI instance
            env.create_web_ui("172.18.86.167", 8089)

            # start a greenlet that periodically outputs the current stats
            gevent.spawn(stats_printer(env.stats))

            on_locust_init(env, env.runner)

            # start the test
            env.runner.start(u, hatch_rate=r)

            # in 60 seconds stop the runner
            gevent.spawn_later(rtime*60, lambda: env.runner.quit())

            # wait for the greenlets
            env.runner.greenlet.join()

            # stop the web server for good measures
            env.web_ui.stop()

if __name__ == "__main__":
    run_load_test(MyUser)

此时,已经实现将locust作为第三方库在python工程中运行了
此时可以打开locust页面,查看运行状态

通过上述代码。实现了,以第三方库的形式,分阶段压测被测对象。
并在不满足判定条件时,结束压测。

下一小结,介绍通过Prometheus和Garapha对数据进行长久保存。
并且生成可视化图表

上一篇下一篇

猜你喜欢

热点阅读