实战项目(二)-嘉宾发布会签到系统

2021-02-02  本文已影响0人  巴鶴

先前学习了虫师老师的接口自动化测试教程,总结一下项目实战思路

近期分享: 接口自动化测试从零开始学习集锦: https://www.jianshu.com/nb/49125734 --欢迎探讨!!

1、Django开发一个发布会嘉宾签到系统;
2、学习Html前端优化展示;
3、保证开发的功能点测试通过;
4、开发接口视图层代码;
5、安装Pymsql,搭建本地数据库, 设计测试数据初始化,并对数据的插入删除做了封装
6、安装Requests库调用接口;
7、unittest单元测试框架运行测试;
8、HTMLTestRunner生成接口测试报告;

步骤1: db_fixture.mysql_db.py

Created on 2017年8月29日
@author: Yvon_早安阳光

from pymysql import  connect,cursors
from pymysql.err import OperationalError
import os
import configparser as cparser
 
#============读取db_config.ini文件设置============
'''获取当前路径'''
base_dir = str(os.path.dirname(os.path.dirname(__file__)))
print(base_dir)
'''转换路径格式'''
base_dir = base_dir.replace('\\', '/')
'''获取数据库配置文件的路径'''
file_path = base_dir + '/db_config.ini'
print(file_path)
 
cf = cparser.ConfigParser()
cf.read(file_path)
host = cf.get("mysqlconf", "host")
print(host)
port = cf.get("mysqlconf", "port")
user = cf.get("mysqlconf", "user")
print(user)
password = cf.get("mysqlconf", "password")
db = cf.get("mysqlconf", "db_name")
 
#============封装MySQL基本操作============ 
class DB:
    def __init__(self):    
        try:
            #连接数据库
            self.connection = connect(host=host,
                                port=int(port),
                                user=user,
                                password=password,
                                db=db,
                                charset='utf8mb4',
                                cursorclass=cursors.DictCursor)
        except OperationalError as e:
            print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
             
    #清除表数据
    def clear(self,table_name):
        real_sql = 'delete from ' + table_name + ';'
        print(real_sql)
        with self.connection.cursor() as cursor:
            cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
            cursor.execute(real_sql)
        self.connection.commit()
         
    #插入数据
    def insert(self,table_name,table_data):
        for key in table_data:
            table_data[key] = "'" + str(table_data[key]) + "'"
        key = ','.join(table_data.keys())
        value = ','.join(table_data.values())
        real_sql = 'INSERT INTO ' + table_name + " (" + key + ") VALUES (" + value + ") "
        print(real_sql)
        with self.connection.cursor() as cursor:   
            cursor.execute(real_sql)
        self.connection.commit()
 
    #关闭数据库连接
    def close(self):
        self.connection.close()
         
    # 插入数据
    def init_data(self, datas):
        for table, data in datas.items():
            self.clear(table)
            for d in data:
                self.insert(table, d)
        self.close()
 
if __name__ == '__main__':
    db =DB()
    table_name = "sign_event"
    data = {'id':2,'name':'土豆','`limit`':3000,'status':1,'address':'杭州博物馆','start_time':'2018-05-15 14:25:42','create_time':'2016-07-20 00:25:42'}
    table_name2 = "sign_guest"
    data2 = {'realname':'alen','phone':12312341234,'email':'alen@mail.com','sign':0,'event_id':1}
  
    db.clear(table_name)
    db.insert(table_name, data)
    db.close()

步骤2: db_fixture.test_data.py

'''
Created on 2017年8月29日
@author: Yvon_早安阳光
'''
import sys
sys.path.append('../db_fixture')
try:
    from mysql_db import DB
except ImportError:
    from .mysql_db import DB
 
# 创建测试数据
datas = {
    #发布会表数据
    'sign_event':[
        #有效的发布会数据
        {'id':1,'name':'OPPO发布会','`limit`':2000,'status':1,'address':'杭州江干区','start_time':'2019-05-20 14:00:00','create_time':'2016-07-20 00:25:42'},
        {'id':2,'name':'联想发布会','`limit`':1500,'status':1,'address':'北京博远路','start_time':'2019-05-25 14:25:38','create_time':'2016-07-20 00:25:42'},
        #参加人数限制
        {'id':3,'name':'ViVo发布会','`limit`':0,'status':1,'address':'苏州通园路','start_time':'2019-05-20 14:00:00','create_time':'2016-07-20 00:25:42'},
        #发布会未开始,状态关闭
        {'id':5,'name':'三星发布会','`limit`':2000,'status':0,'address':'南京博物馆','start_time':'2019-07-25 14:00:00','create_time':'2016-07-20 00:25:42'},
        {'id':6,'name':'锤子发布会','`limit`':1000,'status':0,'address':'南通泗水路','start_time':'2019-05-10 14:00:00','create_time':'2016-07-20 00:25:42'},
        #发布会已结束
        {'id':7,'name':'小米发布会','`limit`':800,'status':1,'address':'北京水立方','start_time':'2016-05-20 14:00:00','create_time':'2016-07-20 00:25:42'},
        {'id':8,'name':'大米发布会','`limit`':1500,'status':1,'address':'淮安高教区','start_time':'2017-05-20 14:34:56','create_time':'2016-07-20 00:25:42'},
    ],
     #嘉宾表数据
    'sign_guest':[
        #未签到有效数据
        {'id':1,'realname':'张强','phone':18612341234,'email':'andy@mail.com','sign':0,'event_id':1,'create_time':'2016-07-20 00:25:42'},
        {'id':3,'realname':'刘强','phone':17701231234,'email':'marry@163.com','sign':0,'event_id':2,'create_time':'2016-07-20 00:25:42'},
        #已签到
        {'id':2,'realname':'李强','phone':15040013210,'email':'dugu@126.com','sign':1,'event_id':1,'create_time':'2016-07-20 00:25:42'},
        #发布会人数限制、发布会未激活,发布会已结束
        {'id':5,'realname':'郭强','phone':18108090705,'email':'xiaowu@mail.com','sign':0,'event_id':3,'create_time':'2016-07-20 00:25:42'},
        {'id':6,'realname':'金俊','phone':13064931234,'email':'jinj@163.com','sign':0,'event_id':5,'create_time':'2017-07-20 00:00:00'},
 
    ],
}
 
# 将测试数据插入表
def init_data():
    DB().init_data(datas)

步骤3: Views_interface.py

思路1: 创建新的一条发布会

访问地址:http://127.0.0.1:8000/api/add_event](http://127.0.0.1:8000/api/add_event)

1、首先,判断eid、name、limit、address、start_time 等字段均不能为空,否则JsonResponse()返回相应的状态码和提示。JsonResponse()是一个非常有用的方法,它可以直接将字典转化成Json 格式返回到客户端。
2、接下来,判断发布会id 是否存在,以及发布会名称(name)是否存在;如果存在将返回相应的状态码和提示信息。将数据字典作为整个返回字典中data 对应的值返回。
3、再接下来,判断发布会状态是否为空,如果为空,将状态设置为1(True)。
4、最后,将数据插入到Event表,在插入的过程中如果日期格式错误,将抛出ValidationError异常,接收该异常并返回相应的状态和提示,否则,插入成功,返回状态码200 和“add event success”的提示。

思路2: 查询发布会接口

访问地址: http://127.0.0.1:8000/api/get_event_list/?eid=4](http://127.0.0.1:8000/api/get_event_list/?eid=4
http://127.0.0.1:8000/api/get_event_list/?name=](http://127.0.0.1:8000/api/get_event_list/?name=发布会

1、通过GET 请求接收发布会id 和name 参数。两个参数都是可选的。首先,判断当两个参数同时为空,接口返回状态码10021,参数错误。
2、如果发布会id 不为空,优先通过id 查询,因为id 的唯一性,所以,查询结果只会有一条,将查询结果以key:value 对的方式存放到定义的event 字典中,并将数据字典作为整个返回字典中data 对应的值返回。
3、name 查询为模糊查询,查询数据可能会有多条,返回的数据稍显复杂;首先将查询的每一条数据放到一个字典event 中,再把每一个字典再放到数组datas 中,最后再将整个数组做为返回字典中data 对应的值返回。

思路3: 添加嘉宾接口

访问地址: http://127.0.0.1:8000/api/add_guest/](http://127.0.0.1:8000/api/add_guest/

通过POST 请求接收嘉宾参数:关联的发布会id、姓名、手机号和邮箱等参数。
1、首先,判断eid、realname、phone 等参数均不能为空。
2、接下来,判断嘉宾关联的发布会id 是否存在,以及关联的发布会状态是否为True(即1),如果不存在或不为True,将返回相应的状态码和提示信息。
3、再接下来的步骤是判断当前时间是否大于发布会时间,如果大于则说明发布已开始,或者早已经结束。那么该发布会就应该不能允许再添加嘉宾。
4、最后,插入嘉宾数据,如果发布会的手机号重复则抛IntegrityError 异常,接收该异常并返回相应的状态码和提示信息。如果添加成功,则返回状态码200 和“add guest success”的提示。

代码参考

<meta charset="utf-8">
from django.http.response import HttpResponse, JsonResponse

'''查询数据库'''
from sign.models import Event,Guest
from django.core.exceptions import ValidationError, ObjectDoesNotExist
from django.db.utils import IntegrityError
import time
from django.template.context_processors import request

#添加发布会接口
def add_event(request):
    eid = request.POST.get('eid','')     #发布会id
    name = request.POST.get('name','')   #发布会标题
    limit = request.POST.get('limit','')  #限制人数
    status = request.POST.get('status','')  #状态
    address = request.POST.get('address','')  #地址
    start_time = request.POST.get('start_time','')  #发布会时间

    '''必填项为空'''
    if eid =='' or name == '' or limit == '' or address == '' or start_time == '':
        return JsonResponse({'status':207,'message':'parameter error'})
    '''event id重复'''
    results = Event.objects.filter(id = eid)
    if results:
        return JsonResponse({'status':201,'message':'event id already exists'})
    '''event name 重复'''
    result = Event.objects.filter(name = name)
    if result:
        return JsonResponse({'status':201,'message':'event name already exists'})

    '''创建新的一条发布会''' 
    if status == '': 
        status = 1  
    try:
        Event.objects.create(id=eid,name=name,limit=limit,address=address,status=int(status),start_time=start_time)
    except ValidationError as e:
        error = 'start_time format error. It must be in YYYY-MM-DD HH:MM:SS format.' 
        return JsonResponse({'status':202,'message':'start_tiem format error'})
    return JsonResponse({'status':200,'message':'add event success'})

#查询发布会接口
def get_event_list(request):
    eid = request.GET.get('eid','')
    name = request.GET.get('name','')
    if eid == '' and name == '':
        return JsonResponse({'status':207,'message':'parameter error'})
    if eid !='':
        event = {}
        try:
            result = Event.objects.get(id = eid)
        except ObjectDoesNotExist:
            return JsonResponse({'status':201,'message':'query result is empty'})
        else:
            event['name'] = result.name
            event['limit'] = result.limit
            event['status'] = result.status
            event['address'] = result.address
            event['start_time'] = result.start_time
            return JsonResponse({'status':200,'message':'success','data':event})

    if name !='':
        datas = []
        results = Event.objects.filter(name__contains = name)  #发布会名称模糊查询
        if results:
            for r in results:
                event = {}
                event['name'] = r.name
                event['limit'] = r.limit
                event['status'] = r.status
                event['address'] = r.address
                event['start_time'] = r.start_time
                datas.append(event)
            return JsonResponse({'status':200,'message':'success','data':datas})
        else:
            return JsonResponse({'status':201,'message':'query result is empty'})

#添加嘉宾接口
def add_guest(request):
    eid = request.POST.get('eid','')    #关联发布会id 
    realname = request.POST.get('realname','') #姓名
    phone = request.POST.get('phone','') #手机号
    email = request.POST.get('email','') #邮箱

    '''必填项为空'''
    if eid =='' or realname == '' or phone == '' :
        return JsonResponse({'status':207,'message':'parameter error'})
    '''eid不存在'''
    result = Event.objects.filter(id=eid)   
    if not result:
        return JsonResponse({'status':202,'message':'event id null'})
    '''status状态为False,不可再添加嘉宾'''
    result = Event.objects.get(id=eid).status
    if not result:
        return JsonResponse({'status':203,'message':'event status is not available'})

    '''限制发布会人数'''
    event_limit = Event.objects.get(id=eid).limit    #发布会限制人数
    guest_limit = Guest.objects.filter(event_id=eid) #发布会已添加的嘉宾数
    if len(guest_limit) >= event_limit:
        return JsonResponse({'status':204,'message':'event number is full'})
    '''发布会已结束'''
    event_time = Event.objects.get(id=eid).start_time #获取发布会开始时间
    print(event_time)
#     etime = str(event_time).split(".")[0]  #截取小数点左边的时间数据,精确到秒,并转成字符串
    etime = str(event_time).split('+')[0]  # 根据实际的打印结果截取右侧的数据,打印出来的结果是"+00:00",故截取+号右侧的数据
    print(etime)
    timeArray = time.strptime(str(etime), "%Y-%m-%d %H:%M:%S") #转换成时间戳的格式
    e_time = int(time.mktime(timeArray)) #获取最终的发布会的开始时间
    print(e_time)
    now_time = str(time.time())  #获取当前时间
    print(now_time)
    ntime = now_time.split(".")[0] #截取小数点左边的时间数据
    n_time = int(ntime)
    print(n_time)

    #当前时间大于发布会时间
    if n_time >= e_time:
        return JsonResponse({'status':205,'message':'event has started'})
    try:
        Guest.objects.create(realname=realname,phone=int(phone),email=email,sign=0,event_id=int(eid))
    except IntegrityError:
        return JsonResponse({'status':206,'message':'the event guest phone number repeat'}) #嘉宾重复添加手机号
    return JsonResponse({'status':200,'message':'add guest success'})

步骤4: interface.add_event_test.py

<meta charset="utf-8">
'''
Created on 2017年8月29日
@author: Yvon_早安阳光
'''
#coding:utf-8

import unittest
import requests
import os, sys

parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, parentdir)
from db_fixture import test_data

class AddEventTest(unittest.TestCase):
    ''' 添加发布会 '''
    def setUp(self):
        self.base_url = "[http://127.0.0.1:8000/api/add_event/](http://127.0.0.1:8000/api/add_event/)"
    def tearDown(self):
        print(self.result)
    def test_add_event_all_null(self):
        '''所有参数为空'''
        payload = {'eid':'','name':'','status':'','limit':'','address':"",'start_time':''}
        r = requests.post(self.base_url,data = payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 207)
        self.assertEqual(self.result['message'], 'parameter error')

    def test_add_event_eid_exist(self):
        '''eid 已存在'''
        payload = {'eid':1,'name':'小金理财发布会','status':'1','limit':'2000',
                   'address':"江苏建湖",'start_time':'2020-05-20 14:00:00'}
        r = requests.post(self.base_url,data = payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 201)
        self.assertEqual(self.result['message'], 'event id already exists')

    def test_add_event_name_exist(self):
        '''名称已存在'''
        payload = {'eid':9,'name':'大米发布会','status':'1','limit':'1500',
                   'address':"杭州江干庆春路",'start_time':'2019-05-20 15:00:00'}
        r = requests.post(self.base_url,data = payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 201)
        self.assertEqual(self.result['message'], 'event name already exists')

    def test_add_event_data_type_error(self):
        '''日期格式错误'''
        payload = {'eid':9,'name':'悟空理财发布会','status':'1','limit':'1000',
                   'address':"扬州中华路",'start_time':'2019'}

        r = requests.post(self.base_url,data = payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 202)
        self.assertEqual(self.result['message'],'start_tiem format error')

    def test_add_event_success(self):
        payload = {'eid':10,'name':'金立发布会','status':'1','limit':'1500',
                   'address':"盐城中华路",'start_time':'2020-05-10 13:00:00'}
        r = requests.post(self.base_url,data = payload)
        self.result = r.json()
        self.assertEqual(self.result['status'], 200)
        self.assertEqual(self.result['message'],'add event success')

if __name__ == '__main__':
    test_data.init_data() #初始化接口测试数据
    unittest.ma

步骤5: Send_mail_report.py

'''
Created on 2017年9月4日
@author: Yvon_早安阳光
'''
import time, sys, unittest, smtplib, os
sys.path.append('./interface')
sys.path.append('./db_fixture')
from HTMLTestRunner import HTMLTestRunner
from db_fixture import test_data
from email.mime.multipart import MIMEMultipart
from email.header import Header
# from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
 
#======================定义发送邮件======================
 
def send_mail(file_new):
    '''file_new是指最新的测试报告'''
    f = open(file_new,'rb')
    mail_body = f.read()
    f.close()
     
    msg = MIMEMultipart()
    '''邮件标题'''
    msg['Subject'] = Header('发布会嘉宾签到接口测试报告' ,'utf-8')
     
    '''邮件正文'''
    zhengwen = "各位,详情测试结果请见附件文档"
    msg.attach(MIMEText(zhengwen, 'plain')) 
     
    '''附件测试报告 '''
    att= MIMEText(mail_body,'html','utf-8')
    att.add_header('Content-Disposition', 'attachment', filename='Guest Manage System Interface_TestReport.html')
    msg.attach(att) 
        #发送邮件
    smtp = smtplib.SMTP()
    smtp.connect('smtp.exmail.qq.com')
    smtp.login('fajin@bertadata.com','36#20qa20A')
    smtp.sendmail('fajin@bertadata.com','huiselanse@yeah.net',msg.as_string())
    print('email has send out !')
    smtp.quit()
     
#======================查找测试报告的目录,找到最新生成的测试报告======================
'''按时间获取最新的测试报告'''
def new_report(testreport):
    lists = os.listdir(testreport)
    lists.sort(key = lambda fn: os.path.getatime(testreport + '\\' + fn))
    file_new = os.path.join(testreport,lists[-1])
    print(file_new)
    return file_new
 
#指定测试用例为当前文件夹下的目录
test_dir = './interface'
discover = unittest.defaultTestLoader.discover(test_dir, pattern='*_test.py')
 
if __name__ == '__main__':
    test_data.init_data() #初始化测试数据     
    #报告存放的路径
    result_report_dir= 'result_report/'
    '''实时当前的时间'''
    now = time.strftime("%Y-%m-%d %H_%M_%S")
    file_name = result_report_dir + '\\' + now +'result.html'
    fp = open(file_name,'wb')    
    #定义测试报告
    runner =HTMLTestRunner(stream = fp,title ='嘉宾签到接口测试报告',description='接口用例执行情况:')                          
    runner.run(discover)
    fp.close() #关闭报告文件
    new_report = new_report(result_report_dir)
    send_mail(new_report)
测试报告1.png
测试报告2.png
上一篇下一篇

猜你喜欢

热点阅读