接口测试

2017-09-14  本文已影响111人  qianyewhy

1、使用脚本测试接口与使用接口工具的对比

 Database
# https://docs.djangoproject.com/en/1.11/ref/settings/#databases
# 原来的数据库配置
DATABASES = {
    'default': {
        # 驱动
        'ENGINE': 'django.db.backends.mysql',
        # 主机地址
        'HOST': '127.0.0.1',
        # 端口
        'PORT': '3306',
        # 数据库名称
        'NAME': 'guest',
        # 用户名
        'USER': 'root',
        # 密码
        'PASSWORD': 'root',
        'OPTIONS': {
            'init_command': "SET sql_mode='STRICT_TRANS_TABLES'",
        },
    }
}
# coding=utf-8
import pymysql.cursors
import os
import configparser as cparser


# ======== Reading db_config.ini setting ===========
base_dir = str(os.path.dirname(os.path.dirname(__file__)))
base_dir = base_dir.replace('\\', '/')
file_path = base_dir + "/db_config.ini"

cf = cparser.ConfigParser()

cf.read(file_path,encoding='utf8')
host = cf.get("mysqlconf", "host")
port = cf.get("mysqlconf", "port")
db   = cf.get("mysqlconf", "db_name")
user = cf.get("mysqlconf", "user")
password = cf.get("mysqlconf", "password")


# ======== MySql base operating ===================
class DB:

    def __init__(self):
        try:
            # Connect to the database
            self.connection = pymysql.connect(host=host,
                                              port=int(port),
                                              user=user,
                                              password=password,
                                              db=db,
                                              charset='utf8mb4',
                                              cursorclass=pymysql.cursors.DictCursor)
        except pymysql.err.OperationalError as e:
            print("Mysql Error %d: %s" % (e.args[0], e.args[1]))

    # clear table data
    def clear(self, table_name):
        # real_sql = "truncate table " + table_name + ";"
        real_sql = "delete from " + table_name + ";"
        with self.connection.cursor() as cursor:
            cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
            cursor.execute(real_sql)
        self.connection.commit()

    # insert sql statement
    def insert(self, table_name, table_data):
        for key in table_data:
            table_data[key] = "'"+str(table_data[key])+"'"
        key   = ','.join(table_data.keys())
        value = ','.join(table_data.values())
        real_sql = "INSERT INTO " + table_name + " (" + key + ") VALUES (" + value + ")"
        #print(real_sql)

        with self.connection.cursor() as cursor:
            cursor.execute(real_sql)

        self.connection.commit()

    # close database
    def close(self):
        self.connection.close()

    # init data
    def init_data(self, datas):
        for table, data in datas.items():
            self.clear(table)
            for d in data:
                self.insert(table, d)
        self.close()

"""
if __name__ == '__main__':

    db = DB()
    table_name = "sign_event"
    data = {'id':1,'name':'红米','`limit`':2000,'status':1,'address':'北京会展中心','start_time':'2016-08-20 00:25:42'}
    table_name2 = "sign_guest"
    data2 = {'realname':'alen','phone':12312341234,'email':'alen@mail.com','sign':0,'event_id':1}

    db.clear(table_name)
    db.insert(table_name, data)
    db.close()
"""

首先要读取数据库配置,获取数据库信息,如主机地址,用户名,密码,端口等
其次通过pymysql库中的connect方法连接数据库
然后封装clear方法和insert方法
8、创建测试数据

import sys

sys.path.append('D:\\pydj\\guest\\pyrequest\\db_fixture')
try:
    from mysql_db import DB
except ImportError:
    from .mysql_db import DB

# create data
datas = {
    'sign_event': [
        {'id': 1, 'name': '红米Pro发布会', '`limit`': 2000, 'status': 1, 'address': '北京会展中心',
         'start_time': '2017-08-20 14:00:00'},
        {'id': 2, 'name': '可参加人数为0', '`limit`': 0, 'status': 1, 'address': '北京会展中心',
         'start_time': '2017-08-20 14:00:00'},
        {'id': 3, 'name': '当前状态为0关闭', '`limit`': 2000, 'status': 0, 'address': '北京会展中心',
         'start_time': '2017-08-20 14:00:00'},
        {'id': 4, 'name': '发布会已结束', '`limit`': 2000, 'status': 1, 'address': '北京会展中心',
         'start_time': '2001-08-20 14:00:00'},
        {'id': 5, 'name': '小米5发布会', '`limit`': 2000, 'status': 1, 'address': '北京国家会议中心',
         'start_time': '2017-08-20 14:00:00'},
    ],
    'sign_guest': [
        {'id': 1, 'realname': 'alen', 'phone': 13511001100, 'email': 'alen@mail.com', 'sign': 0, 'event_id': 1},
        {'id': 2, 'realname': 'has sign', 'phone': 13511001101, 'email': 'sign@mail.com', 'sign': 1, 'event_id': 1},
        {'id': 3, 'realname': 'tom', 'phone': 13511001102, 'email': 'tom@mail.com', 'sign': 0, 'event_id': 5},
    ],
}


# Inster table datas
def init_data():
    """
    db = DB()
    for table, data in datas.items():
        db.clear(table)
        for d in data:
            db.insert(table, d)
    db.close()
    """
    DB().init_data(datas)


if __name__ == '__main__':
    init_data()

1)创建一个字典datas,键表示表名,值表示表里的数据
2)init_data()函数用于读取datas 字典中的数据,调用DB 类中的clear()方法清除数据库,然后,调用insert()方法插入表数据。
3)之后的测试用例就已经这些数据进行编写
9、编写测试用例


image.png
import unittest
import requests
import os, sys

parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, parentdir)
from pyrequest.db_fixture import test_data


class AddEventTest(unittest.TestCase):
    """添加发布会"""

    def setUp(self):
        self.base_url = "http://127.0.0.1:8000/api/add_event/"

    def tearDown(self):
        print(self.result)

    def test_add_event_all_null(self):
        """ 所有参数为空 """
        payload = {'eid': '', '': '', 'limit': '', 'address': "", 'start_time': ''}
        r = requests.post(self.base_url, data=payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 10021)
        self.assertEqual(self.result['message'], 'parameter error')

    def test_add_event_eid_exist(self):
        """ id已经存在 """
        payload = {'eid': 1, 'name': '一加4发布会', 'limit': 2000, 'address': "深圳宝体", 'start_time': '2017'}
        r = requests.post(self.base_url, data=payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 10022)
        self.assertEqual(self.result['message'], 'event id already exists')

    def test_add_event_name_exist(self):
        """ 名称已经存在 """
        payload = {'eid': 16, 'name': '红米Pro发布会', 'limit': 2000, 'address': "深圳宝体", 'start_time': '2017'}
        r = requests.post(self.base_url, data=payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 10023)
        self.assertEqual(self.result['message'], 'event name already exists')

    def test_add_event_data_type_error(self):
        """ 日期格式错误 """
        payload = {'eid': 14, 'name': '一加5手机发布会', 'limit': 2000, 'address': "深圳宝体", 'start_time': '2017'}
        r = requests.post(self.base_url, data=payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 10024)
        self.assertIn('start_time format error.', self.result['message'])

    def test_add_event_success(self):
        """ 添加成功 """
        payload = {'eid': 17, 'name': '一加7手机发布会', 'limit': 2000, 'address': "深圳宝体", 'start_time': '2017-05-10 12:00:00'}
        r = requests.post(self.base_url, data=payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 200)
        self.assertEqual(self.result['message'], 'add event success')


if __name__ == '__main__':
    test_data.init_data()  # 初始化接口测试数据
    unittest.main()

10、集合测试用例并执行

from pyrequest.HTMLTestRunner import HTMLTestRunner
import unittest
from pyrequest.db_fixture import test_data
import time
import sys

sys.path.append('./interface')
sys.path.append('./db_fixture')

# 指定测试用例为当前文件夹下的 interface 目录
test_dir = 'D:\pydj\guest\pyrequest\interface'
print(test_dir)
discover = unittest.defaultTestLoader.discover(test_dir, pattern='*_test.py')

if __name__ == "__main__":
    # 初始化接口测试数据
    test_data.init_data()

    now = time.strftime("%Y-%m-%d %H_%M_%S")
    filename = 'D:/pydj/guest/pyrequest/report/' + now + '_result.html'
    fp = open(filename, 'wb')
    runner = HTMLTestRunner(stream=fp,
                            title='Guest Manage System Interface Test Report',
                            description='Implementation Example with: ')
    runner.run(discover)
    fp.close()
上一篇 下一篇

猜你喜欢

热点阅读