CFlaskProj-CNetSpy的API模拟(服务端)

2020-07-10  本文已影响0人  码农朱同学

项目初衷:

因为公司后台服务“水比较深”,接口环境有 release gray beta dev dark apptest。。。各种环境。更麻烦的是有时,某个页面只有某个环境有数据,而某个功能只在另一个环境有数据。常常环境还会出现挂了等情况。之前我看公司的处理方法都是采取在本地模拟数据的方法。在实现接口的同时,还要写一套模拟的数据,代码看起来非常难看,而且还曾发生过带着模拟数据上线的奇葩错误。而我前一段时间解决身份证识别时顺便重新学了Python(之前学的差不多忘光了),就尝试用Python语言借助flask框架自己搭建一套api服务。

项目分析

1,项目结构
项目结构

数据库采用的轻量级sqlite数据库,并使用SQLAlchemy库辅助数据操作。

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app_watch01.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.secret_key = "zhxh"

api = Api(app)
api.add_resource(ApiRecordList, '/api/records')
api.add_resource(ApiRecord, '/<path>')
api.add_resource(BugRecordList, '/bug/records')
api.add_resource(BugRecord, '/bug/item/<timestamp>')
api.add_resource(CodeRecordList, '/code/records')
api.add_resource(CodeRecord, '/code/item/<code_id>')

db.init_app(app)

考虑到手机屏幕太小,为了展示和修改数据方便,也写了两个h5页面来展示所有的api接口以及,api修改,效果如下:


API列表

点进API修改页面:


API修改页面

(关于h5和js的知识已经还给老师了,遇坑无数,加班了2个周末才搞起来这两个页面。)


@app.route('/mock/records.html')
def api_path_list():
    return render_template('api_list.html')


@app.route('/mock/item/<path>')
def api_show(path):
    return render_template('api_item.html', path=path.replace("__", "/"))

1,客户端关键代码

上一篇文章在CNetSpy中我已经介绍了ApiMockHelper和ApiMockInterceptor

        HttpUrl oldHttpUrl = request.url();
        HttpUrl newHttpUrl = oldHttpUrl
                .newBuilder()
                .scheme("http")
                .host(ApiMockHelper.host)
                .port(5000)
                .encodedPath("/" + oldHttpUrl.encodedPath().replace("/", "__").substring(2) + pathParams.toString())
                .build();
        Request.Builder builder = request.newBuilder();

将之前请求的url,和参数转换一下让url请求自己api镜像。

2,前端关键代码
<script>
  $.ajax({
    url: "/api/records",
    dataType: 'json',
    success: function (res) {

      res.map(item => {
        let path = getShowPath(item.path)

        let liNode = document.createElement("li");
        liNode.className = "my_record_list_li"

        let labelNode = document.createElement("label")
        labelNode.className = "my_record_list_label"
        let spanNode = document.createElement("span")
        spanNode.innerHTML = getShowTime(item.timestamp)
        labelNode.append(spanNode)
        let delNode = document.createElement("a");
        delNode.className = "delete_record"
        delNode.href = "/mock/item/" + item.path
        delNode.innerHTML = "删除"
        labelNode.append(delNode)


        let aNode = document.createElement("a");
        aNode.className = "my_record_list_a"
        aNode.href = "/mock/item/" + item.path
        aNode.innerHTML = path

        liNode.append(aNode)
        liNode.append(labelNode)

        $("#record_list_container").append(liNode)
      })

      document.getElementById('record_num_value').innerHTML = "共" + res.length + "条"
    }
  })

</script>
3,服务端关键代码
class ApiRecord(Resource):

    @staticmethod
    def get():
        return {"You should have a path": 404}, 403

    @staticmethod
    def get(path):
        connection = sqlite3.connect("app_watch01.db")
        cursor = connection.cursor()
        query = "select * from api_record where path=?"
        row = cursor.execute(query, (path,)).fetchone()
        if row:
            if row[1] == "0":
                return str2json(row[3]), 200
            elif row[1] == "-1":
                return str2json(row[4]), 200
            else:
                return str2json(row[2]), 200
        else:
            item = None
        connection.close()
        return item

    def post(self, path):
        connection = sqlite3.connect("app_watch01.db")
        cursor = connection.cursor()
        query = "select * from api_record where path=?"
        row = cursor.execute(query, (path,)).fetchone()
        if row:
            if row[1] == "0":
                return str2json(row[3]), 200
            elif row[1] == "-1":
                return str2json(row[4]), 200
            else:
                return str2json(row[2]), 200
        else:
            item = None
        connection.close()
        return item

    @staticmethod
    def delete(path):
        item = ApiRecordModel.find_by_path(path)
        if item:
            item.delete_from_db()

        return {"message": "item deleted"}


class ApiRecordList(Resource):
    parser = reqparse.RequestParser()
    parser.add_argument('path')
    parser.add_argument('show_type')
    parser.add_argument('resp_data')
    parser.add_argument('resp_empty')
    parser.add_argument('resp_error')

    @staticmethod
    def get():
        return [x.json() for x in ApiRecordModel.query.order_by(ApiRecordModel.timestamp.desc()).all()]

    def post(self):
        args = self.parser.parse_args()
        path = args['path'].strip().lstrip('/').replace('/', '__')

        show_type = "" if args['show_type'] is None else args['show_type']
        resp_data = "" if args['resp_data'] is None else args['resp_data']
        resp_empty = "" if args['resp_empty'] is None else args['resp_empty']
        resp_error = "" if args['resp_error'] is None else args['resp_error']

        show_type = re.sub(r'\s+', '', show_type)
        resp_data = re.sub(r'\s+', '', resp_data)
        resp_empty = re.sub(r'\s+', '', resp_empty)
        resp_error = re.sub(r'\s+', '', resp_error)

        item = ApiRecordModel.find_by_path(path)
        if item is None:
            item = ApiRecordModel(path, show_type, resp_data, resp_empty, resp_error)
            item.save_to_db()
            return item.json(), 201
        else:
            if show_type in {"1", "0", "-1"}:
                item.show_type = show_type
            else:
                item.show_type = '1'

            if resp_data != '':
                item.resp_data = resp_data
            if resp_empty != '':
                item.resp_empty = resp_empty
            if resp_error != '':
                item.resp_error = resp_error
            item.timestamp = get_millisecond()
            ApiRecordModel.db_session_commit()
            return item.json()

    @staticmethod
    def get_paths():
        return [x.path for x in ApiRecordModel.query.order_by(ApiRecordModel.timestamp.desc()).all()]
上一篇下一篇

猜你喜欢

热点阅读