实战项目(二)-嘉宾发布会签到系统
先前学习了虫师老师的接口自动化测试教程,总结一下项目实战思路
近期分享: 接口自动化测试从零开始学习集锦: https://www.jianshu.com/nb/49125734 --欢迎探讨!!
1、Django开发一个发布会嘉宾签到系统;
2、学习Html前端优化展示;
3、保证开发的功能点测试通过;
4、开发接口视图层代码;
5、安装Pymsql,搭建本地数据库, 设计测试数据初始化,并对数据的插入删除做了封装
6、安装Requests库调用接口;
7、unittest单元测试框架运行测试;
8、HTMLTestRunner生成接口测试报告;
步骤1: db_fixture.mysql_db.py
Created on 2017年8月29日
@author: Yvon_早安阳光
from pymysql import connect,cursors
from pymysql.err import OperationalError
import os
import configparser as cparser
#============读取db_config.ini文件设置============
'''获取当前路径'''
base_dir = str(os.path.dirname(os.path.dirname(__file__)))
print(base_dir)
'''转换路径格式'''
base_dir = base_dir.replace('\\', '/')
'''获取数据库配置文件的路径'''
file_path = base_dir + '/db_config.ini'
print(file_path)
cf = cparser.ConfigParser()
cf.read(file_path)
host = cf.get("mysqlconf", "host")
print(host)
port = cf.get("mysqlconf", "port")
user = cf.get("mysqlconf", "user")
print(user)
password = cf.get("mysqlconf", "password")
db = cf.get("mysqlconf", "db_name")
#============封装MySQL基本操作============
class DB:
def __init__(self):
try:
#连接数据库
self.connection = connect(host=host,
port=int(port),
user=user,
password=password,
db=db,
charset='utf8mb4',
cursorclass=cursors.DictCursor)
except OperationalError as e:
print("Mysql Error %d: %s" % (e.args[0], e.args[1]))
#清除表数据
def clear(self,table_name):
real_sql = 'delete from ' + table_name + ';'
print(real_sql)
with self.connection.cursor() as cursor:
cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
cursor.execute(real_sql)
self.connection.commit()
#插入数据
def insert(self,table_name,table_data):
for key in table_data:
table_data[key] = "'" + str(table_data[key]) + "'"
key = ','.join(table_data.keys())
value = ','.join(table_data.values())
real_sql = 'INSERT INTO ' + table_name + " (" + key + ") VALUES (" + value + ") "
print(real_sql)
with self.connection.cursor() as cursor:
cursor.execute(real_sql)
self.connection.commit()
#关闭数据库连接
def close(self):
self.connection.close()
# 插入数据
def init_data(self, datas):
for table, data in datas.items():
self.clear(table)
for d in data:
self.insert(table, d)
self.close()
if __name__ == '__main__':
db =DB()
table_name = "sign_event"
data = {'id':2,'name':'土豆','`limit`':3000,'status':1,'address':'杭州博物馆','start_time':'2018-05-15 14:25:42','create_time':'2016-07-20 00:25:42'}
table_name2 = "sign_guest"
data2 = {'realname':'alen','phone':12312341234,'email':'alen@mail.com','sign':0,'event_id':1}
db.clear(table_name)
db.insert(table_name, data)
db.close()
步骤2: db_fixture.test_data.py
'''
Created on 2017年8月29日
@author: Yvon_早安阳光
'''
import sys
sys.path.append('../db_fixture')
try:
from mysql_db import DB
except ImportError:
from .mysql_db import DB
# 创建测试数据
datas = {
#发布会表数据
'sign_event':[
#有效的发布会数据
{'id':1,'name':'OPPO发布会','`limit`':2000,'status':1,'address':'杭州江干区','start_time':'2019-05-20 14:00:00','create_time':'2016-07-20 00:25:42'},
{'id':2,'name':'联想发布会','`limit`':1500,'status':1,'address':'北京博远路','start_time':'2019-05-25 14:25:38','create_time':'2016-07-20 00:25:42'},
#参加人数限制
{'id':3,'name':'ViVo发布会','`limit`':0,'status':1,'address':'苏州通园路','start_time':'2019-05-20 14:00:00','create_time':'2016-07-20 00:25:42'},
#发布会未开始,状态关闭
{'id':5,'name':'三星发布会','`limit`':2000,'status':0,'address':'南京博物馆','start_time':'2019-07-25 14:00:00','create_time':'2016-07-20 00:25:42'},
{'id':6,'name':'锤子发布会','`limit`':1000,'status':0,'address':'南通泗水路','start_time':'2019-05-10 14:00:00','create_time':'2016-07-20 00:25:42'},
#发布会已结束
{'id':7,'name':'小米发布会','`limit`':800,'status':1,'address':'北京水立方','start_time':'2016-05-20 14:00:00','create_time':'2016-07-20 00:25:42'},
{'id':8,'name':'大米发布会','`limit`':1500,'status':1,'address':'淮安高教区','start_time':'2017-05-20 14:34:56','create_time':'2016-07-20 00:25:42'},
],
#嘉宾表数据
'sign_guest':[
#未签到有效数据
{'id':1,'realname':'张强','phone':18612341234,'email':'andy@mail.com','sign':0,'event_id':1,'create_time':'2016-07-20 00:25:42'},
{'id':3,'realname':'刘强','phone':17701231234,'email':'marry@163.com','sign':0,'event_id':2,'create_time':'2016-07-20 00:25:42'},
#已签到
{'id':2,'realname':'李强','phone':15040013210,'email':'dugu@126.com','sign':1,'event_id':1,'create_time':'2016-07-20 00:25:42'},
#发布会人数限制、发布会未激活,发布会已结束
{'id':5,'realname':'郭强','phone':18108090705,'email':'xiaowu@mail.com','sign':0,'event_id':3,'create_time':'2016-07-20 00:25:42'},
{'id':6,'realname':'金俊','phone':13064931234,'email':'jinj@163.com','sign':0,'event_id':5,'create_time':'2017-07-20 00:00:00'},
],
}
# 将测试数据插入表
def init_data():
DB().init_data(datas)
步骤3: Views_interface.py
思路1: 创建新的一条发布会
访问地址:http://127.0.0.1:8000/api/add_event](http://127.0.0.1:8000/api/add_event)
1、首先,判断eid、name、limit、address、start_time 等字段均不能为空,否则JsonResponse()返回相应的状态码和提示。JsonResponse()是一个非常有用的方法,它可以直接将字典转化成Json 格式返回到客户端。
2、接下来,判断发布会id 是否存在,以及发布会名称(name)是否存在;如果存在将返回相应的状态码和提示信息。将数据字典作为整个返回字典中data 对应的值返回。
3、再接下来,判断发布会状态是否为空,如果为空,将状态设置为1(True)。
4、最后,将数据插入到Event表,在插入的过程中如果日期格式错误,将抛出ValidationError异常,接收该异常并返回相应的状态和提示,否则,插入成功,返回状态码200 和“add event success”的提示。
思路2: 查询发布会接口
访问地址: http://127.0.0.1:8000/api/get_event_list/?eid=4](http://127.0.0.1:8000/api/get_event_list/?eid=4
http://127.0.0.1:8000/api/get_event_list/?name=](http://127.0.0.1:8000/api/get_event_list/?name=发布会
1、通过GET 请求接收发布会id 和name 参数。两个参数都是可选的。首先,判断当两个参数同时为空,接口返回状态码10021,参数错误。
2、如果发布会id 不为空,优先通过id 查询,因为id 的唯一性,所以,查询结果只会有一条,将查询结果以key:value 对的方式存放到定义的event 字典中,并将数据字典作为整个返回字典中data 对应的值返回。
3、name 查询为模糊查询,查询数据可能会有多条,返回的数据稍显复杂;首先将查询的每一条数据放到一个字典event 中,再把每一个字典再放到数组datas 中,最后再将整个数组做为返回字典中data 对应的值返回。
思路3: 添加嘉宾接口
访问地址: http://127.0.0.1:8000/api/add_guest/](http://127.0.0.1:8000/api/add_guest/
通过POST 请求接收嘉宾参数:关联的发布会id、姓名、手机号和邮箱等参数。
1、首先,判断eid、realname、phone 等参数均不能为空。
2、接下来,判断嘉宾关联的发布会id 是否存在,以及关联的发布会状态是否为True(即1),如果不存在或不为True,将返回相应的状态码和提示信息。
3、再接下来的步骤是判断当前时间是否大于发布会时间,如果大于则说明发布已开始,或者早已经结束。那么该发布会就应该不能允许再添加嘉宾。
4、最后,插入嘉宾数据,如果发布会的手机号重复则抛IntegrityError 异常,接收该异常并返回相应的状态码和提示信息。如果添加成功,则返回状态码200 和“add guest success”的提示。
代码参考
<meta charset="utf-8">
from django.http.response import HttpResponse, JsonResponse
'''查询数据库'''
from sign.models import Event,Guest
from django.core.exceptions import ValidationError, ObjectDoesNotExist
from django.db.utils import IntegrityError
import time
from django.template.context_processors import request
#添加发布会接口
def add_event(request):
eid = request.POST.get('eid','') #发布会id
name = request.POST.get('name','') #发布会标题
limit = request.POST.get('limit','') #限制人数
status = request.POST.get('status','') #状态
address = request.POST.get('address','') #地址
start_time = request.POST.get('start_time','') #发布会时间
'''必填项为空'''
if eid =='' or name == '' or limit == '' or address == '' or start_time == '':
return JsonResponse({'status':207,'message':'parameter error'})
'''event id重复'''
results = Event.objects.filter(id = eid)
if results:
return JsonResponse({'status':201,'message':'event id already exists'})
'''event name 重复'''
result = Event.objects.filter(name = name)
if result:
return JsonResponse({'status':201,'message':'event name already exists'})
'''创建新的一条发布会'''
if status == '':
status = 1
try:
Event.objects.create(id=eid,name=name,limit=limit,address=address,status=int(status),start_time=start_time)
except ValidationError as e:
error = 'start_time format error. It must be in YYYY-MM-DD HH:MM:SS format.'
return JsonResponse({'status':202,'message':'start_tiem format error'})
return JsonResponse({'status':200,'message':'add event success'})
#查询发布会接口
def get_event_list(request):
eid = request.GET.get('eid','')
name = request.GET.get('name','')
if eid == '' and name == '':
return JsonResponse({'status':207,'message':'parameter error'})
if eid !='':
event = {}
try:
result = Event.objects.get(id = eid)
except ObjectDoesNotExist:
return JsonResponse({'status':201,'message':'query result is empty'})
else:
event['name'] = result.name
event['limit'] = result.limit
event['status'] = result.status
event['address'] = result.address
event['start_time'] = result.start_time
return JsonResponse({'status':200,'message':'success','data':event})
if name !='':
datas = []
results = Event.objects.filter(name__contains = name) #发布会名称模糊查询
if results:
for r in results:
event = {}
event['name'] = r.name
event['limit'] = r.limit
event['status'] = r.status
event['address'] = r.address
event['start_time'] = r.start_time
datas.append(event)
return JsonResponse({'status':200,'message':'success','data':datas})
else:
return JsonResponse({'status':201,'message':'query result is empty'})
#添加嘉宾接口
def add_guest(request):
eid = request.POST.get('eid','') #关联发布会id
realname = request.POST.get('realname','') #姓名
phone = request.POST.get('phone','') #手机号
email = request.POST.get('email','') #邮箱
'''必填项为空'''
if eid =='' or realname == '' or phone == '' :
return JsonResponse({'status':207,'message':'parameter error'})
'''eid不存在'''
result = Event.objects.filter(id=eid)
if not result:
return JsonResponse({'status':202,'message':'event id null'})
'''status状态为False,不可再添加嘉宾'''
result = Event.objects.get(id=eid).status
if not result:
return JsonResponse({'status':203,'message':'event status is not available'})
'''限制发布会人数'''
event_limit = Event.objects.get(id=eid).limit #发布会限制人数
guest_limit = Guest.objects.filter(event_id=eid) #发布会已添加的嘉宾数
if len(guest_limit) >= event_limit:
return JsonResponse({'status':204,'message':'event number is full'})
'''发布会已结束'''
event_time = Event.objects.get(id=eid).start_time #获取发布会开始时间
print(event_time)
# etime = str(event_time).split(".")[0] #截取小数点左边的时间数据,精确到秒,并转成字符串
etime = str(event_time).split('+')[0] # 根据实际的打印结果截取右侧的数据,打印出来的结果是"+00:00",故截取+号右侧的数据
print(etime)
timeArray = time.strptime(str(etime), "%Y-%m-%d %H:%M:%S") #转换成时间戳的格式
e_time = int(time.mktime(timeArray)) #获取最终的发布会的开始时间
print(e_time)
now_time = str(time.time()) #获取当前时间
print(now_time)
ntime = now_time.split(".")[0] #截取小数点左边的时间数据
n_time = int(ntime)
print(n_time)
#当前时间大于发布会时间
if n_time >= e_time:
return JsonResponse({'status':205,'message':'event has started'})
try:
Guest.objects.create(realname=realname,phone=int(phone),email=email,sign=0,event_id=int(eid))
except IntegrityError:
return JsonResponse({'status':206,'message':'the event guest phone number repeat'}) #嘉宾重复添加手机号
return JsonResponse({'status':200,'message':'add guest success'})
步骤4: interface.add_event_test.py
<meta charset="utf-8">
'''
Created on 2017年8月29日
@author: Yvon_早安阳光
'''
#coding:utf-8
import unittest
import requests
import os, sys
parentdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, parentdir)
from db_fixture import test_data
class AddEventTest(unittest.TestCase):
''' 添加发布会 '''
def setUp(self):
self.base_url = "[http://127.0.0.1:8000/api/add_event/](http://127.0.0.1:8000/api/add_event/)"
def tearDown(self):
print(self.result)
def test_add_event_all_null(self):
'''所有参数为空'''
payload = {'eid':'','name':'','status':'','limit':'','address':"",'start_time':''}
r = requests.post(self.base_url,data = payload)
self.result = r.json()
self.assertEqual(self.result['status'], 207)
self.assertEqual(self.result['message'], 'parameter error')
def test_add_event_eid_exist(self):
'''eid 已存在'''
payload = {'eid':1,'name':'小金理财发布会','status':'1','limit':'2000',
'address':"江苏建湖",'start_time':'2020-05-20 14:00:00'}
r = requests.post(self.base_url,data = payload)
self.result = r.json()
self.assertEqual(self.result['status'], 201)
self.assertEqual(self.result['message'], 'event id already exists')
def test_add_event_name_exist(self):
'''名称已存在'''
payload = {'eid':9,'name':'大米发布会','status':'1','limit':'1500',
'address':"杭州江干庆春路",'start_time':'2019-05-20 15:00:00'}
r = requests.post(self.base_url,data = payload)
self.result = r.json()
self.assertEqual(self.result['status'], 201)
self.assertEqual(self.result['message'], 'event name already exists')
def test_add_event_data_type_error(self):
'''日期格式错误'''
payload = {'eid':9,'name':'悟空理财发布会','status':'1','limit':'1000',
'address':"扬州中华路",'start_time':'2019'}
r = requests.post(self.base_url,data = payload)
self.result = r.json()
self.assertEqual(self.result['status'], 202)
self.assertEqual(self.result['message'],'start_tiem format error')
def test_add_event_success(self):
payload = {'eid':10,'name':'金立发布会','status':'1','limit':'1500',
'address':"盐城中华路",'start_time':'2020-05-10 13:00:00'}
r = requests.post(self.base_url,data = payload)
self.result = r.json()
self.assertEqual(self.result['status'], 200)
self.assertEqual(self.result['message'],'add event success')
if __name__ == '__main__':
test_data.init_data() #初始化接口测试数据
unittest.ma
步骤5: Send_mail_report.py
'''
Created on 2017年9月4日
@author: Yvon_早安阳光
'''
import time, sys, unittest, smtplib, os
sys.path.append('./interface')
sys.path.append('./db_fixture')
from HTMLTestRunner import HTMLTestRunner
from db_fixture import test_data
from email.mime.multipart import MIMEMultipart
from email.header import Header
# from email.mime.application import MIMEApplication
from email.mime.text import MIMEText
#======================定义发送邮件======================
def send_mail(file_new):
'''file_new是指最新的测试报告'''
f = open(file_new,'rb')
mail_body = f.read()
f.close()
msg = MIMEMultipart()
'''邮件标题'''
msg['Subject'] = Header('发布会嘉宾签到接口测试报告' ,'utf-8')
'''邮件正文'''
zhengwen = "各位,详情测试结果请见附件文档"
msg.attach(MIMEText(zhengwen, 'plain'))
'''附件测试报告 '''
att= MIMEText(mail_body,'html','utf-8')
att.add_header('Content-Disposition', 'attachment', filename='Guest Manage System Interface_TestReport.html')
msg.attach(att)
#发送邮件
smtp = smtplib.SMTP()
smtp.connect('smtp.exmail.qq.com')
smtp.login('fajin@bertadata.com','36#20qa20A')
smtp.sendmail('fajin@bertadata.com','huiselanse@yeah.net',msg.as_string())
print('email has send out !')
smtp.quit()
#======================查找测试报告的目录,找到最新生成的测试报告======================
'''按时间获取最新的测试报告'''
def new_report(testreport):
lists = os.listdir(testreport)
lists.sort(key = lambda fn: os.path.getatime(testreport + '\\' + fn))
file_new = os.path.join(testreport,lists[-1])
print(file_new)
return file_new
#指定测试用例为当前文件夹下的目录
test_dir = './interface'
discover = unittest.defaultTestLoader.discover(test_dir, pattern='*_test.py')
if __name__ == '__main__':
test_data.init_data() #初始化测试数据
#报告存放的路径
result_report_dir= 'result_report/'
'''实时当前的时间'''
now = time.strftime("%Y-%m-%d %H_%M_%S")
file_name = result_report_dir + '\\' + now +'result.html'
fp = open(file_name,'wb')
#定义测试报告
runner =HTMLTestRunner(stream = fp,title ='嘉宾签到接口测试报告',description='接口用例执行情况:')
runner.run(discover)
fp.close() #关闭报告文件
new_report = new_report(result_report_dir)
send_mail(new_report)
测试报告1.png
测试报告2.png