系统的性能压测

2020-07-22  本文已影响0人  翻身小白菜

性能测试

为了保证系统在线上稳定运行,需要对系统做性能测试,看是否可抗住线上流量。
性能测试是一个总称,具体可以细分为 性能测试、负载测试、压力测试和稳定性测试。

可以看出 性能测试、负载测试、压力测试是连续的三个阶段。

性能测试曲线

一般系统性能呈现如下趋势:



a->b阶段为性能测试,一般数据会呈线性增长。到达b后开始进入负载测试阶段,再增加并发数,性能不在显著增加,一直到c。c点系统到达了饱和,再施加压力,系统性能会迅速下降,达到d点,即系统崩溃不可用。
与上图对应,系统是响应时间变化如下:



a->b响应时间不变。b->c 响应时间增加。其实,b点是正常运行的点,c点本身就是崩溃点,维持一段时间,就很可能奔溃。到d点系统崩溃,不可用。
总之,性能测试就是不断增加并发,看TPS值。

性能测试小工具

写一个小程序,可以测试某一url的性能。并发数量和总发送的包数可以自己控制。返回响应的平均时间和95%响应时间。
创建文件performance_test.py,编写程序如下:

#/usr/bin python
# -*- encoding: utf-8 -*-
import argparse
import multiprocessing
import urllib2
import time
import logging

import conf

def performance_test(url, concurrency, max_request):
    """main"""
    manager = multiprocessing.Manager()
    request_num = manager.Value("d",0)
    resp_time=manager.list()
    process_pool = []
    
    # start performance test
    # request_process(url, request_num, max_request, resp_time)
    for num in xrange(0, concurrency):
        p = multiprocessing.Process(target=request_process, args=(url, request_num, max_request, resp_time,))
        process_pool.append(p)
    
    for p in process_pool:
        p.start()
    
    for p in process_pool:
        p.join()
        # p.close()
    print len(resp_time)
    resp_time_info = resp_time
    resp_time_info.sort()
    average = sum(resp_time_info) / len(resp_time_info)
    resp_95 = resp_time_info[int(0.95 * len(resp_time_info))]
    
    return average, resp_95


def request_process(url, request_num, max_request, resp_time):
    """get url until request_num large or equal max_request"""
    error_num = 0 
    while request_num.value <= max_request:
        request_num.value += 1
        start_time = time.time()
        header = {}
        try:
            req = urllib2.Request(url)
            resp = urllib2.urlopen(req, timeout=conf.TIMEOUT)
            
            if resp.code != 200:
                logging.warn("get url %s error, resp_code: %s" % (url, resp.code))
                error_num += 1
                request_num.value -= 1
            else:
                logging.debug("get url %s success" % url) 
                error_num = 0
                resp_time.append(time.time() - start_time)

        except Exception as err:
            logging.warn("get url %s error: %s" % (url, err))
            error_num += 1
            request_num.value -= 1
        
        if error_num > 3:
            logging.error("too much error...")
            break


def get_argparse():
    """get parse input args"""
    parser = argparse.ArgumentParser(description='Performance test tool. '
                                     'use like: python performance_test.py -c 10 -n 100 -u http://www.baidu.com')
    parser.add_argument('-c', '--concurrency', dest='concurrency', type=int, default=conf.CONCORRENCY, 
                        help='Number of multiple request to make at a time')
    parser.add_argument('-n', '--requests', dest='requests', type=int, default=conf.MAX_REQUESTS, 
                        help='Number of requests to perform')
    parser.add_argument('-u', '--url', dest='urls', type=str, required=True, 
                        help='url to request')
    
    args = parser.parse_args()
    return args


if __name__ == "__main__":
    args = get_argparse()
    concurrency = args.concurrency
    max_request = args.requests
    urls = args.urls
    print performance_test(urls, concurrency, max_request)

通过启动的进程数量控制并发。通过进程间共享变量实现总发送数据量的控制(由于没有严格的锁,因此,时间发送的数据会和设置的有一点出入)。
运行:

python performance_test.py -c 10 -n 100 -u http://www.baidu.com

得到结果:

(0.26014810571303737, 0.3801710605621338)

上一篇 下一篇

猜你喜欢

热点阅读