locust 开源示例

2020-04-20  本文已影响0人  Canon_2020

示例1 locustfile.py 和 run_test.py

#!/usr/local/python3
# -*- coding: utf-8 -*-
# @Date    : 2018-04-15 09:00:00
# @Author  : Canon
# @Link    : https://www.python.org
# @Version : 3.6.1

from locust import HttpLocust
from locust import TaskSet
from locust import task


class BlogDemo(TaskSet):
    '''用户行为:打开我的博客首页demo'''
    @task(1)
    def open_blog(self):
        # 定义requests的请求头
        header = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}
        
        r = self.client.get("/s?ie=UTF-8&wd=python",  headers=header, verify=False)
        print(r.status_code)
        assert r.status_code == 200


class websitUser(HttpLocust):
    task_set = BlogDemo
    min_wait = 3000  # 单位毫秒
    max_wait = 6000  # 单位毫秒

#!/usr/local/python3
# -*- coding: utf-8 -*-
# @Date    : 2018-04-15 09:00:00
# @Author  : Canon
# @Link    : https://www.python.org
# @Version : 3.6.1


if __name__ == "__main__":
    import os
    os.system("locust -f E:\PythonCase\PyScript\LocustTest\Example\locustfile.py --host=https://www.baidu.com")

示例2 locustfile.py 和 run_locust.py

#!/usr/local/python3
# -*- coding: utf-8 -*-
# @Date    : 2018-04-15 09:00:00
# @Author  : Canon
# @Link    : https://www.python.org
# @Version : 3.6.1


import time
from locust import Locust, TaskSet, events, task
import sys
import os
import requests
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))


class test_demo(object):
    """接口测试demo"""

    def __init__(self, ):
        pass

    @staticmethod
    def baidu():
        url = "https://www.baidu.com/s?ie=UTF-8&wd=python"
        response = requests.get(url)
        response.raise_for_status()


class HttpClient(object):
    def __init__(self):
        pass

    def test_demo_test_httpbin_get(self):
        test_demo().baidu()

    def test_demo_test_httpbin_post(self):
        pass

    def test_demo_test_webservice(self):
        pass


class HttpLocust(Locust):
    def __init__(self, *args, **kwargs):
        super(HttpLocust, self).__init__(*args, **kwargs)
        self.client = HttpClient()


class ApiUser(HttpLocust):
    min_wait = 10
    max_wait = 100

    class task_set(TaskSet):
        # @task(1)
        # def test_demo_test_webservice(self):
        #     self.client.test_demo_test_webservice()
        # @task(1)
        # def test_demo_test_httpbin_post(self):
        #     self.client.test_demo_test_httpbin_post()
        @task(2)
        def test_demo_httpbin_get(self):
            self.client.test_demo_test_httpbin_get()

#!/usr/local/python3
# -*- coding: utf-8 -*-
# @Date    : 2018-04-15 09:00:00
# @Author  : Canon
# @Link    : https://www.python.org
# @Version : 3.6.1


import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), os.path.pardir)))
import multiprocessing

"""
Please Visit http://localhost:8089/
"""


def start_slave(locust_file, master_port):
    print('start slave pid:\t{0}'.format(os.getpid()))
    os.system('locust -f {0} --slave --master-port {1}'.format(locust_file, master_port))


if __name__ == '__main__':
    multiprocessing.freeze_support()

    # locustfile.py
    locust_file = "E:\PythonCase\PyScript\LocustTest\Example\locustfile.py"

    # locust 运行命令
    locust_command = '--no-web -c 10 -r 2 --run-time 5m'

    # 运行locust
    if 'master' in locust_command:
        # 分布式
        num = 2
        master_port = 5557
        record = []
        for i in range(num):
            process = multiprocessing.Process(target=start_slave, args=(locust_file, master_port))
            process.start()
            record.append(process)

        print('start master pid:\t{0}'.format(os.getpid()))
        cmd = 'locust -f {0} {1}'.format(locust_file, locust_command)
        print('cmd:\t{0}'.format(cmd))
        os.system(cmd)
    else:
        # 单例模式
        cmd = 'locust -f {0} {1}'.format(locust_file, locust_command)
        print('cmd:\t{0}'.format(cmd))
        os.system(cmd)

上一篇 下一篇

猜你喜欢

热点阅读