登陆方式-JWT

2022-03-05  本文已影响0人  skoll

传统token和jwt的区别

1 .传统token:服务端会对登陆成功产生一个随机Token返回,同时也会在本地包留对应的token,这里要存数据库.再次访问的时候,会携带之前的token来这里进行校验,服务端则通过与本地包留的token进行对比,寻找到符合条件的token数据,则校验成功

2 .JWT:服务端会对登陆成功的用户产生一个随机token返回,但是不会在服务端本地保留.当用户再次访问的时候,服务端会基于jwt对token进行校验和解码.

1 .一旦用户登陆,后面每个请求都会包含JWT,从而允许用户访问改令牌所允许的路由,服务,资源,
2 .通过使用公钥和私钥对JWT进行签名认证,由于签名是使用head和payload计算的,还可以检查内容是否遭到篡改,并切可以根据用户权限来做权限操作

JWT格式

image.png

最简单的令牌组成可以是这样
1 .Header:JWT的标头

{
  "alg": "HS256",
  "typ": "JWT"
}
1 .typ:令牌的类型
2 .alg:使用的签名算法
3 .指定类型和签名算法后,这个json被Base64Url编码成JWT的第一部分

2 .Payload:包含一些数据

payload={
        'exp':exp,
        'data':{
            'name':name,
        },
        'iat':time.time()
    }
1 .iss:定义签发人
2 .exp:定义过期日期
3 .sub:一个主题,jwt所面向的用户
4 .aud:受众:接收jwt的一方
5 .nbf:令牌生效的时间
6 .iat:签发时间
7 .jti:编号
8 .data里面可以存一些自定义消息
9 .这些消息被Base64Url编码形成JWT第二部分

3 .Signature:一个签证消息

1 .把header,payload(这俩都是base64之后)和secret
2 .用HMAC SHA256算法进行签名

4 .拼接:把上面的三个点分隔的base64-url字符串组成在一起,这个字符串将会在HTML中传递

生成令牌

def create_token(name):
    # 基于jwt创建token的函数
    global SECRET_KEY
    headers={
        'alg':"HS256",
        'typ':'JWT'
    },
    exp=datetime.datetime.utcnow() + datetime.timedelta(days=100)
    payload={
        'exp':exp,
        'data':{
            'name':name,
        },
        'iat':time.time()
    }
    token=jwt.encode(
        payload=payload,
        key=SECRET_KEY,
        algorithm='HS256',
        headers=headers
    )

    return token

JWT和Session的不同

1 .JWT具有加密签名,session cookie没有
2 .JWT是无状态的,被存在在客户端,不在服务端内存中.
3 .身份验证可以在本地进行,不是必须通过服务器数据库进行验证.有些资源消耗不必要
4 .不需要服务器存,因此具有超级强的可扩展性

验证

1 .每次浏览器都会将 token 通过请求头 authorization 带给服务器,请求头的值为 Bearer token,这是 JWT 规定的,服务器取出 token 使用 decode 方法进行解码

2 .axios.interceptors.request.use(config => {
    // 在 localStorage 获取 token
    let token = localStorage.getItem("token");

    // 如果存在则设置请求头
    if (token) {
        config.headers.Authorization = `Bearer ${token}`;
    }

    return config;
})


3 .服务器每次收到信息都会对它的前两部分进行加密,然后比对加密后的结果是否跟客户端传送过来的第三部分相同,如果相同则验证通过,否则失败。

4 .jwt仅仅是将payload的内容做一次base64编码,所以那些攻击者老哥要改你的内容还是很简单的,但是老哥们不知道你的私钥啊,改了之后没法生成正确的签名,用加密的方式再次对请求进来的header.payload进行校验,发现跟signature对不上,这时候你就可以很清楚知道,有人在搞事情,直接返回个500假装服务器挂了

全部代码

from logging import exception
from multiprocessing.pool import MapResult
from unittest import result
from flask import Flask,request,jsonify,abort,Blueprint,redirect,url_for
from flask.json import JSONEncoder
from flask_cors import *
import pymongo
from bson.objectid import ObjectId
import json
from bson.json_util import dumps
import time
import jwt
from jwt import exceptions
import datetime



app=Flask(__name__)
CORS(app,supports_credentials=True)

class JSONEncoder(json.JSONEncoder):
    def default(self,o):
        if isinstance(o,ObjectId):
            return str(o)
        return json.JSONEncoder.default(self,o)

# jwt相关
SECRET_KEY="w6jiayixieqitadeyan"

def create_token(name):
    # 基于jwt创建token的函数
    global SECRET_KEY
    headers={
        'alg':"HS256",
        'typ':'JWT'
    },
    exp=datetime.datetime.utcnow() + datetime.timedelta(days=100)
    payload={
        'exp':exp,
        'data':{
            'name':name,
        },
        'iat':time.time()
    }
    token=jwt.encode(
        payload=payload,
        key=SECRET_KEY,
        algorithm='HS256',
        headers=headers
    )

    return token

def validate_token(token):
    # 解析token
    global SECRET_KEY
    payload=None
    msg=None
    try:
        payload=jwt.decode(token,SECRET_KEY,verify=False, algorithms=['HS256'])
    except exceptions.ExpiredSignatureError:
        msg="token失效"
    except jwt.DecodeError:
        msg="token认证失败"
    except jwt.InvalidTokenError:
        msg="非法的token"
    return (payload,msg)

@app.route('/')
def hello():
    return app.send_static_file('./index.html')


# 路径重定向
auth=Blueprint('auth',__name__)

# 登陆相关
@auth.route('/login',methods=['POST','GET'])
def login():
    if request.method=="POST":
        data=json.loads(str(request.data,'utf-8'))
        print('登陆的信息',data)
        username=data['username']
        password=data['password']
        remberValue=data['remember']    
        find=json.loads(dumps(myUserCol.find({'username':data['username']})))

        if len(json.loads(dumps(find))):
            currPsd=find[0]['password']
            if password==currPsd:
                token=create_token(username)
                return {
                        'status':'ok',
                        'msg':'登陆成功',
                        'token':token
                }
            else:
                return {
                    'status':'no',
                    'msg':"密码不正确"
                }
        else:
            return {
                'status':'no',
                'msg':'用户没有注册,需要先注册用户才能登陆'
            }     
   

@auth.route('/logout',methods=['POST','GET'])
# 必须登陆之后才能访问这个路由
def logout():
    return {
        'status':'ok',
        'msg':"成功退出"
    }

@app.route('/test')
def test():
    return {
        "data":"测试手机"
}

@app.route('/create',methods=['POST'])
def create():
    if request.method=='POST':
        data=json.loads(str(request.data,'utf-8'))
        # 先找一下吧
        with open('./login.json','r') as f:
            load_dict=json.load(f)
            print(load_dict)
            if data['username'] in load_dict['success']:
                print('可以被注册')
            else:
                print('用户没有权限注册')
                return {
                    'status':"no",
                    'msg':'用户没有权限注册'
                }

        find=myUserCol.find({'username':data['username']})
        if len(json.loads(dumps(find))):
            return {
                'status':'no',
                'msg':'用户名以被注册,必须换一个'
            }
        
        result=myUserCol.insert_one(data)
        if result.inserted_id:
            return {
                'status':'ok',
                'msg':"创建用户成功"
            }
        else:
            return {
                'status':'no',
                'msg':"用户创建失败"
            }

# 新增一条数据
@app.route('/insert',methods=['POST'])
# @login_required
def postDir():
    if request.method=="POST": 
        # data=ast.literal_eval(request.data.decode("UTF-8"))
        data=json.loads(str(request.data,'utf-8'))
        result=mycol.insert_one(data)
        print('插入结果',result.inserted_id)

        writer={
            'name':'测试名字',
            'time':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
            'comment':"第一条数据",
            'id':result.inserted_id
        }
        dataList={}
        dataList['name']=data['name']
        dataList['list']=[writer]
        mapsResult=myMap.insert_one(dataList)

        if result.inserted_id and mapsResult.inserted_id:
            return {
                "status":"ok",
                "msg":"数据插入成功"
            }
        else:
            return {
                "status":"no",
                "msg":'数据插入失败'
            }
# 更新一条数据:
@app.route('/update_one',methods=["POST"])
def update():
    if request.method=='POST':
        data=json.loads(str(request.data,'utf-8'))
        # 这里要把拿到的数据转换一下下
        token=data['token']
        print(validate_token(token))
        if validate_token(token):
            data.pop('token')
            result=mycol.insert_one(data)
            # 更新的时候也是新数据全部插入
            writer={
                'name':data['writer'],
                'time':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
                'comment':data['comment'],
                'id':result.inserted_id
            }
            mapResult=myMap.update_one({"name":data['name']},{"$addToSet":{'list':writer}
            })
        else:
            return {
                'status':'no',
                'msg':"请先登陆,在上传数据"
            }

        if result  and mapResult:
            return {
                "status":'ok',
                'msg':'数据更新成功',
                'name':data['name']
            }
        else:
            return {
                'status':"no",
                'msg':"数据更新失败"
            }


# 查询集合内所有数据:只需要知道具体数据,后续根据这个数据来查找
@app.route('/find',methods=["GET"])
def find():
    if request.method=="GET":
        # data=json.loads(str(request.data,'utf-8'))
        result=myMap.find({})
        if result:
            list=[]
            for x in result:
                list.append(x)
            return {
                'status':'ok',
                "data":json.loads(dumps(list))
            }
        else:
            return {
                "statud":"no",
                "data":"当前没有数据"
            }

# find()
# 查询单条数据
@app.route('/find_one',methods=["POST"])
def find_one():
    if request.method=="POST":
        data=json.loads(str(request.data,'utf-8'))
        result=mycol.find_one({"_id":ObjectId(data['id'])})
        # 以key为主键查,又不需要转义.ObjectId类型的数据
        print('result',type(result))
        if result:
            # list=[]
            # for x in result:
            #     list.append(x)
            # print(x,'list')
            return {
                'status':'ok',
                "data":json.loads(dumps(result))
            }
        else:
            return {
                "statud":"no",
                "data":"当前没有数据"
            }

@app.route('/delete_one',methods=["POST"])
def delete_one():
    if request.method=="POST":
        data=json.loads(str(request.data,'utf-8'))
        
        # result=mycol.delete_one({"_id":ObjectId(data['id'])})
        # 以key为主键查,又不需要转义.ObjectId类型的数据
        print('删除',data['id'],data['name'])
        result=myMap.update({'name':data['name']},{
            '$pull':{'list':{'_id':ObjectId(data['id'])}}
        })
        if result:
            return {
                'status':'ok',
                "data":{"value":"删除完毕"}
            }
        else:
            return {
                "statud":"no",
                "data":"当前没有数据"
            }


# 武将战斗相关
if __name__ == '__main__':
    # 测试某个函数
    token=create_token('123')
    print(token,validate_token(token))
    app.config['JSON_AS_ASCII'] = False
    app.register_blueprint(auth,url_prefix='/auth')
    app.run(host='10.252.60.55',port=8080,debug=True)

# cursor转为dict
# objetcId不能转换

上一篇 下一篇

猜你喜欢

热点阅读