python 10flask接口开发案例

2018-07-19  本文已影响22人  6c0fe9142f09

目录

- bin      # 存储启动文件
  - start.py
- conf    # 存储配置文件
  - setting.py
- lib       # 存储代码
  - interface.py
  - tool.py
- logs    # 日志
  - logs

-bin-start.py

import sys
import os

base_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0,base_path)

from lib.interface import server
from conf.setting import SERVER_PORT

server.run(port=8999,debug=True)

-conf-setting.py

# 数据库配置
MYSQL_INFO = {
    'host': 'XXXXXXXXXXX',
    'user': 'root',
    'password': 'XXXXXXXXX',
    'port': 3307,
    'db': 'shuiping',
    'charset': 'utf8',
    'autocommit': True
}

# 服务器配置
SERVER_PORT = 8999
SERVER_HOST = "0.0.0.0"

# redis配置
REDIS_INFO = {
    "host":"XXXXXXXXXXXX",
    "port":"XXXXXXX",
    "password":"XXXXXX",
    "db":10
}

# mongo配置

-lib-interface.py

import flask,json,time
from tools.tool import my_db
from tools.tool import my_md5
from tools.tool import my_redis_hash
from tools.tool import my_redis_expire
from tools.tool import isLowNumber

def isUser(username,pw):
    res = my_db("SELECT password from user WHERE username = '{username}'".format(username=username))
    if res[0]["password"] == my_md5(pw):
        return True
    else:
        return False

def getSession(username):
    timeTrip = str(int(time.time()))
    return my_md5(username+timeTrip)

server = flask.Flask(__name__)

@server.route('/login',methods=["get","post"])
def api_login():
    res = {"code":5000,"message":"请求失败,未知异常"}
    method = flask.request.method
    if method == "GET":
        res = {"code":5001,"message":"请使用POST请求进行登录"}
    elif method == "POST":
        if flask.request.headers.get("Content-Type") == "application/json":
            jsonData = flask.request.json
            username = jsonData.get("username")
            pw = jsonData.get("password")
        else:
            username = flask.request.values.get("username")
            pw = flask.request.values.get("password")
        if username and pw:
            if isUser(username,pw):
                session = getSession(username)
                my_redis_hash(username,"session",session)
                my_redis_expire(username,86400)
                res = {"code":2000,"message":"登录成功","session":session}
            else:
                res = {"code":5002,"message":"用户名或密码错误"}
        else:
            res = {"code":5001,"message":"用户名或密码为空"}
    return json.dumps(res,ensure_ascii=False)

@server.route("/pay",methods=["get","post"])
def api_pay():
    res = {"code":5000,"message":"支付失败,未知异常"}
    method = flask.request.method
    if method == "GET":
        res = {"code":5001,"message":"请使用POST请求进行登录"}
    elif method == "POST":
        if flask.request.headers.get("Content-Type") == "application/json":
            jsonData = flask.request.json
            username = jsonData.get("username")
            session = jsonData.get("session")
            money = jsonData.get("money")
        else:
            username = flask.request.values.get("username")
            session = flask.request.values.get("session")
            money = flask.request.values.get("money")

        if username and session and money and (str.isdigit(str(money)) or isLowNumber(money)):
            user_session = my_redis_hash(username,"session")
            if user_session and session == user_session:
                sql = r"SELECT id,money FROM salary WHERE username = '{username}'".format(username=username)
                res = my_db(sql)
                if len(res) != 0:
                    user_money = float(res[0]["money"])
                    if user_money > float(money):
                        # 支付
                        sql = r"UPDATE salary SET money = {money} WHERE id = {id}".format(money=user_money-float(money),id=res[0]["id"])
                        my_db(sql)
                        res = {"code":2000,"message":"支付成功"}
                    else:
                        res = {"code":5004,"message":"支付失败,余额不足"}
                else:
                    res = {"code":5003,"message":"支付失败,无此用户的工资单"}
            else:
                res = {"code":5005,"message":"支付失败,请先登录"}
        else:
            res= {"code":5002,"message":"请输入正确的参数"}

    return json.dumps(res,ensure_ascii=False)

-lib-tool.py

import pymysql,hashlib,redis,time
from conf.setting import MYSQL_INFO
from conf.setting import REDIS_INFO

def my_db(sql,type=None):
    coon = pymysql.connect(**MYSQL_INFO)
    if type=="list":
        cur = coon.cursor()
    else:
        cur = coon.cursor(cursor=pymysql.cursors.DictCursor)
    sql=sql.strip()
    cur.execute(sql)
    sql_start = sql[:6].lower()#取sql的开头,转成小写
    if sql_start.startswith('select') or sql_start.startswith('show'):
        data = cur.fetchall()
    else:
        data = 'ok'
    cur.close()
    coon.close()
    return data

def my_md5(s):
    m = hashlib.md5(s.encode())
    return m.hexdigest()

def my_redis_str(k,v=None,time=None):
    r = redis.Redis(**REDIS_INFO)
    if v:
        if time and time > 0:
            return r.set(k,v,int(time))
        return r.set(k,v)
    else:
        value = r.get(k)
        if value:
            return value.decode()
        else:
            return None

def my_redis_str_del(k):
    r = redis.Redis(**REDIS_INFO)
    return r.delete(k)

def my_redis_hash(k1, k2=None, v=None):
    r = redis.Redis(**REDIS_INFO)
    if k2:
        if v:
            return r.hset(k1,k2,v)
        else:
            value = r.hget(k1,k2)
            if value:
                return value.decode()
            else:
                return None
    else:
        dic = r.hgetall(k1)
        for k,v in dic.items():
            dic[k.decode()] = dic.pop(k).decode()
        return dic

def my_redis_hash_del(k1,k2=None):
    r = redis.Redis(**REDIS_INFO)
    if k2:
        r.hdel(k1,k2)
    else:
        r.hdel(k1)

def my_redis_expire(k1,time=None):
    r = redis.Redis(**REDIS_INFO)
    if time:
        if time > 0:
            return r.expire(k1,time)
        else:
            return -1
    else:
        return r.persist(k1)



if __name__ == '__main__':

    my_redis_str("name", "大树先生")
    print(my_redis_str("name"))
    print(my_redis_str("sex"))
    print(my_redis_str_del("name"))
    print(my_redis_str("name"))

    my_redis_hash(k1="gengzongyuan",k2="name",v="gengzongyuan")
    print(my_redis_hash("gengzongyuan"))
    print(my_redis_hash("gengzongyuan","name"))
    print(my_redis_hash("niuhanyang"))
    print(my_redis_hash("niuhanyang","name"))

    print(my_md5("123456"))

    sql = r"SELECT * FROM user;"
    print(my_db(sql))
    print(my_db(sql,"list"))

    sql = r"INSERT INTO user(username,password) VALUE('大树先生','123456')"
    print(my_db(sql))

    sql = r"SELECT * FROM user;"
    print(my_db(sql))

    sql = r"SELECT * FROM user WHERE username = '大树'"
    print(my_db(sql))

    timeTrip = time.time()
    print(timeTrip)

    my_redis_hash("zyl","name","zhaoyanling")
    print(my_redis_hash("zyl"))
    print(my_redis_expire("zyl",20))
    print(my_redis_expire("zyl"))


    print(my_redis_hash("dashu","sex"))

    sql = r"SELECT money FROM salary WHERE username = '{username}'".format(username="大树")
    res = my_db(sql)
    print(res)

def isLowNumber(number):
    # 1、判断小数点个数
    # 2、正小数:小数点左边和右边都是整数的时候才合法,按照小数点进行
    # 3、负小数:小数点左边必须以负号开头且只有一个负号,并且去掉负号之后是个数字
    number = str(number)
    if number.count('.') == 1:
        numberList = number.split('.')
        left = numberList[0]
        right = numberList[1]
        # print(numberList)
        if left.isdigit() and right.isdigit():
            return True
        if left.startswith('-') and left[1:].isdigit() and right.isdigit():
            return True
    return False
上一篇 下一篇

猜你喜欢

热点阅读