CFlaskProj-CNetSpy的API模拟(服务端)
2020-07-10 本文已影响0人
码农朱同学
项目初衷:
因为公司后台服务“水比较深”,接口环境有 release gray beta dev dark apptest。。。各种环境。更麻烦的是有时,某个页面只有某个环境有数据,而某个功能只在另一个环境有数据。常常环境还会出现挂了等情况。之前我看公司的处理方法都是采取在本地模拟数据的方法。在实现接口的同时,还要写一套模拟的数据,代码看起来非常难看,而且还曾发生过带着模拟数据上线的奇葩错误。而我前一段时间解决身份证识别时顺便重新学了Python(之前学的差不多忘光了),就尝试用Python语言借助flask框架自己搭建一套api服务。
项目分析
1,项目结构
项目结构数据库采用的轻量级sqlite数据库,并使用SQLAlchemy库辅助数据操作。
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app_watch01.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.secret_key = "zhxh"
api = Api(app)
api.add_resource(ApiRecordList, '/api/records')
api.add_resource(ApiRecord, '/<path>')
api.add_resource(BugRecordList, '/bug/records')
api.add_resource(BugRecord, '/bug/item/<timestamp>')
api.add_resource(CodeRecordList, '/code/records')
api.add_resource(CodeRecord, '/code/item/<code_id>')
db.init_app(app)
考虑到手机屏幕太小,为了展示和修改数据方便,也写了两个h5页面来展示所有的api接口以及,api修改,效果如下:
API列表
点进API修改页面:
API修改页面
(关于h5和js的知识已经还给老师了,遇坑无数,加班了2个周末才搞起来这两个页面。)
@app.route('/mock/records.html')
def api_path_list():
return render_template('api_list.html')
@app.route('/mock/item/<path>')
def api_show(path):
return render_template('api_item.html', path=path.replace("__", "/"))
1,客户端关键代码
上一篇文章在CNetSpy中我已经介绍了ApiMockHelper和ApiMockInterceptor
HttpUrl oldHttpUrl = request.url();
HttpUrl newHttpUrl = oldHttpUrl
.newBuilder()
.scheme("http")
.host(ApiMockHelper.host)
.port(5000)
.encodedPath("/" + oldHttpUrl.encodedPath().replace("/", "__").substring(2) + pathParams.toString())
.build();
Request.Builder builder = request.newBuilder();
将之前请求的url,和参数转换一下让url请求自己api镜像。
2,前端关键代码
<script>
$.ajax({
url: "/api/records",
dataType: 'json',
success: function (res) {
res.map(item => {
let path = getShowPath(item.path)
let liNode = document.createElement("li");
liNode.className = "my_record_list_li"
let labelNode = document.createElement("label")
labelNode.className = "my_record_list_label"
let spanNode = document.createElement("span")
spanNode.innerHTML = getShowTime(item.timestamp)
labelNode.append(spanNode)
let delNode = document.createElement("a");
delNode.className = "delete_record"
delNode.href = "/mock/item/" + item.path
delNode.innerHTML = "删除"
labelNode.append(delNode)
let aNode = document.createElement("a");
aNode.className = "my_record_list_a"
aNode.href = "/mock/item/" + item.path
aNode.innerHTML = path
liNode.append(aNode)
liNode.append(labelNode)
$("#record_list_container").append(liNode)
})
document.getElementById('record_num_value').innerHTML = "共" + res.length + "条"
}
})
</script>
3,服务端关键代码
class ApiRecord(Resource):
@staticmethod
def get():
return {"You should have a path": 404}, 403
@staticmethod
def get(path):
connection = sqlite3.connect("app_watch01.db")
cursor = connection.cursor()
query = "select * from api_record where path=?"
row = cursor.execute(query, (path,)).fetchone()
if row:
if row[1] == "0":
return str2json(row[3]), 200
elif row[1] == "-1":
return str2json(row[4]), 200
else:
return str2json(row[2]), 200
else:
item = None
connection.close()
return item
def post(self, path):
connection = sqlite3.connect("app_watch01.db")
cursor = connection.cursor()
query = "select * from api_record where path=?"
row = cursor.execute(query, (path,)).fetchone()
if row:
if row[1] == "0":
return str2json(row[3]), 200
elif row[1] == "-1":
return str2json(row[4]), 200
else:
return str2json(row[2]), 200
else:
item = None
connection.close()
return item
@staticmethod
def delete(path):
item = ApiRecordModel.find_by_path(path)
if item:
item.delete_from_db()
return {"message": "item deleted"}
class ApiRecordList(Resource):
parser = reqparse.RequestParser()
parser.add_argument('path')
parser.add_argument('show_type')
parser.add_argument('resp_data')
parser.add_argument('resp_empty')
parser.add_argument('resp_error')
@staticmethod
def get():
return [x.json() for x in ApiRecordModel.query.order_by(ApiRecordModel.timestamp.desc()).all()]
def post(self):
args = self.parser.parse_args()
path = args['path'].strip().lstrip('/').replace('/', '__')
show_type = "" if args['show_type'] is None else args['show_type']
resp_data = "" if args['resp_data'] is None else args['resp_data']
resp_empty = "" if args['resp_empty'] is None else args['resp_empty']
resp_error = "" if args['resp_error'] is None else args['resp_error']
show_type = re.sub(r'\s+', '', show_type)
resp_data = re.sub(r'\s+', '', resp_data)
resp_empty = re.sub(r'\s+', '', resp_empty)
resp_error = re.sub(r'\s+', '', resp_error)
item = ApiRecordModel.find_by_path(path)
if item is None:
item = ApiRecordModel(path, show_type, resp_data, resp_empty, resp_error)
item.save_to_db()
return item.json(), 201
else:
if show_type in {"1", "0", "-1"}:
item.show_type = show_type
else:
item.show_type = '1'
if resp_data != '':
item.resp_data = resp_data
if resp_empty != '':
item.resp_empty = resp_empty
if resp_error != '':
item.resp_error = resp_error
item.timestamp = get_millisecond()
ApiRecordModel.db_session_commit()
return item.json()
@staticmethod
def get_paths():
return [x.path for x in ApiRecordModel.query.order_by(ApiRecordModel.timestamp.desc()).all()]