登陆方式-JWT
2022-03-05 本文已影响0人
skoll
传统token和jwt的区别
1 .传统token:服务端会对登陆成功产生一个随机Token返回,同时也会在本地包留对应的token,这里要存数据库.再次访问的时候,会携带之前的token来这里进行校验,服务端则通过与本地包留的token进行对比,寻找到符合条件的token数据,则校验成功
2 .JWT:服务端会对登陆成功的用户产生一个随机token返回,但是不会在服务端本地保留.当用户再次访问的时候,服务端会基于jwt对token进行校验和解码.
1 .一旦用户登陆,后面每个请求都会包含JWT,从而允许用户访问改令牌所允许的路由,服务,资源,
2 .通过使用公钥和私钥对JWT进行签名认证,由于签名是使用head和payload计算的,还可以检查内容是否遭到篡改,并切可以根据用户权限来做权限操作
JWT格式
image.png最简单的令牌组成可以是这样
1 .Header:JWT的标头
{
"alg": "HS256",
"typ": "JWT"
}
1 .typ:令牌的类型
2 .alg:使用的签名算法
3 .指定类型和签名算法后,这个json被Base64Url编码成JWT的第一部分
2 .Payload:包含一些数据
payload={
'exp':exp,
'data':{
'name':name,
},
'iat':time.time()
}
1 .iss:定义签发人
2 .exp:定义过期日期
3 .sub:一个主题,jwt所面向的用户
4 .aud:受众:接收jwt的一方
5 .nbf:令牌生效的时间
6 .iat:签发时间
7 .jti:编号
8 .data里面可以存一些自定义消息
9 .这些消息被Base64Url编码形成JWT第二部分
3 .Signature:一个签证消息
1 .把header,payload(这俩都是base64之后)和secret
2 .用HMAC SHA256算法进行签名
4 .拼接:把上面的三个点分隔的base64-url字符串组成在一起,这个字符串将会在HTML中传递
生成令牌
def create_token(name):
# 基于jwt创建token的函数
global SECRET_KEY
headers={
'alg':"HS256",
'typ':'JWT'
},
exp=datetime.datetime.utcnow() + datetime.timedelta(days=100)
payload={
'exp':exp,
'data':{
'name':name,
},
'iat':time.time()
}
token=jwt.encode(
payload=payload,
key=SECRET_KEY,
algorithm='HS256',
headers=headers
)
return token
JWT和Session的不同
1 .JWT具有加密签名,session cookie没有
2 .JWT是无状态的,被存在在客户端,不在服务端内存中.
3 .身份验证可以在本地进行,不是必须通过服务器数据库进行验证.有些资源消耗不必要
4 .不需要服务器存,因此具有超级强的可扩展性
验证
1 .每次浏览器都会将 token 通过请求头 authorization 带给服务器,请求头的值为 Bearer token,这是 JWT 规定的,服务器取出 token 使用 decode 方法进行解码
2 .axios.interceptors.request.use(config => {
// 在 localStorage 获取 token
let token = localStorage.getItem("token");
// 如果存在则设置请求头
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
})
3 .服务器每次收到信息都会对它的前两部分进行加密,然后比对加密后的结果是否跟客户端传送过来的第三部分相同,如果相同则验证通过,否则失败。
4 .jwt仅仅是将payload的内容做一次base64编码,所以那些攻击者老哥要改你的内容还是很简单的,但是老哥们不知道你的私钥啊,改了之后没法生成正确的签名,用加密的方式再次对请求进来的header.payload进行校验,发现跟signature对不上,这时候你就可以很清楚知道,有人在搞事情,直接返回个500假装服务器挂了
全部代码
from logging import exception
from multiprocessing.pool import MapResult
from unittest import result
from flask import Flask,request,jsonify,abort,Blueprint,redirect,url_for
from flask.json import JSONEncoder
from flask_cors import *
import pymongo
from bson.objectid import ObjectId
import json
from bson.json_util import dumps
import time
import jwt
from jwt import exceptions
import datetime
app=Flask(__name__)
CORS(app,supports_credentials=True)
class JSONEncoder(json.JSONEncoder):
def default(self,o):
if isinstance(o,ObjectId):
return str(o)
return json.JSONEncoder.default(self,o)
# jwt相关
SECRET_KEY="w6jiayixieqitadeyan"
def create_token(name):
# 基于jwt创建token的函数
global SECRET_KEY
headers={
'alg':"HS256",
'typ':'JWT'
},
exp=datetime.datetime.utcnow() + datetime.timedelta(days=100)
payload={
'exp':exp,
'data':{
'name':name,
},
'iat':time.time()
}
token=jwt.encode(
payload=payload,
key=SECRET_KEY,
algorithm='HS256',
headers=headers
)
return token
def validate_token(token):
# 解析token
global SECRET_KEY
payload=None
msg=None
try:
payload=jwt.decode(token,SECRET_KEY,verify=False, algorithms=['HS256'])
except exceptions.ExpiredSignatureError:
msg="token失效"
except jwt.DecodeError:
msg="token认证失败"
except jwt.InvalidTokenError:
msg="非法的token"
return (payload,msg)
@app.route('/')
def hello():
return app.send_static_file('./index.html')
# 路径重定向
auth=Blueprint('auth',__name__)
# 登陆相关
@auth.route('/login',methods=['POST','GET'])
def login():
if request.method=="POST":
data=json.loads(str(request.data,'utf-8'))
print('登陆的信息',data)
username=data['username']
password=data['password']
remberValue=data['remember']
find=json.loads(dumps(myUserCol.find({'username':data['username']})))
if len(json.loads(dumps(find))):
currPsd=find[0]['password']
if password==currPsd:
token=create_token(username)
return {
'status':'ok',
'msg':'登陆成功',
'token':token
}
else:
return {
'status':'no',
'msg':"密码不正确"
}
else:
return {
'status':'no',
'msg':'用户没有注册,需要先注册用户才能登陆'
}
@auth.route('/logout',methods=['POST','GET'])
# 必须登陆之后才能访问这个路由
def logout():
return {
'status':'ok',
'msg':"成功退出"
}
@app.route('/test')
def test():
return {
"data":"测试手机"
}
@app.route('/create',methods=['POST'])
def create():
if request.method=='POST':
data=json.loads(str(request.data,'utf-8'))
# 先找一下吧
with open('./login.json','r') as f:
load_dict=json.load(f)
print(load_dict)
if data['username'] in load_dict['success']:
print('可以被注册')
else:
print('用户没有权限注册')
return {
'status':"no",
'msg':'用户没有权限注册'
}
find=myUserCol.find({'username':data['username']})
if len(json.loads(dumps(find))):
return {
'status':'no',
'msg':'用户名以被注册,必须换一个'
}
result=myUserCol.insert_one(data)
if result.inserted_id:
return {
'status':'ok',
'msg':"创建用户成功"
}
else:
return {
'status':'no',
'msg':"用户创建失败"
}
# 新增一条数据
@app.route('/insert',methods=['POST'])
# @login_required
def postDir():
if request.method=="POST":
# data=ast.literal_eval(request.data.decode("UTF-8"))
data=json.loads(str(request.data,'utf-8'))
result=mycol.insert_one(data)
print('插入结果',result.inserted_id)
writer={
'name':'测试名字',
'time':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
'comment':"第一条数据",
'id':result.inserted_id
}
dataList={}
dataList['name']=data['name']
dataList['list']=[writer]
mapsResult=myMap.insert_one(dataList)
if result.inserted_id and mapsResult.inserted_id:
return {
"status":"ok",
"msg":"数据插入成功"
}
else:
return {
"status":"no",
"msg":'数据插入失败'
}
# 更新一条数据:
@app.route('/update_one',methods=["POST"])
def update():
if request.method=='POST':
data=json.loads(str(request.data,'utf-8'))
# 这里要把拿到的数据转换一下下
token=data['token']
print(validate_token(token))
if validate_token(token):
data.pop('token')
result=mycol.insert_one(data)
# 更新的时候也是新数据全部插入
writer={
'name':data['writer'],
'time':time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
'comment':data['comment'],
'id':result.inserted_id
}
mapResult=myMap.update_one({"name":data['name']},{"$addToSet":{'list':writer}
})
else:
return {
'status':'no',
'msg':"请先登陆,在上传数据"
}
if result and mapResult:
return {
"status":'ok',
'msg':'数据更新成功',
'name':data['name']
}
else:
return {
'status':"no",
'msg':"数据更新失败"
}
# 查询集合内所有数据:只需要知道具体数据,后续根据这个数据来查找
@app.route('/find',methods=["GET"])
def find():
if request.method=="GET":
# data=json.loads(str(request.data,'utf-8'))
result=myMap.find({})
if result:
list=[]
for x in result:
list.append(x)
return {
'status':'ok',
"data":json.loads(dumps(list))
}
else:
return {
"statud":"no",
"data":"当前没有数据"
}
# find()
# 查询单条数据
@app.route('/find_one',methods=["POST"])
def find_one():
if request.method=="POST":
data=json.loads(str(request.data,'utf-8'))
result=mycol.find_one({"_id":ObjectId(data['id'])})
# 以key为主键查,又不需要转义.ObjectId类型的数据
print('result',type(result))
if result:
# list=[]
# for x in result:
# list.append(x)
# print(x,'list')
return {
'status':'ok',
"data":json.loads(dumps(result))
}
else:
return {
"statud":"no",
"data":"当前没有数据"
}
@app.route('/delete_one',methods=["POST"])
def delete_one():
if request.method=="POST":
data=json.loads(str(request.data,'utf-8'))
# result=mycol.delete_one({"_id":ObjectId(data['id'])})
# 以key为主键查,又不需要转义.ObjectId类型的数据
print('删除',data['id'],data['name'])
result=myMap.update({'name':data['name']},{
'$pull':{'list':{'_id':ObjectId(data['id'])}}
})
if result:
return {
'status':'ok',
"data":{"value":"删除完毕"}
}
else:
return {
"statud":"no",
"data":"当前没有数据"
}
# 武将战斗相关
if __name__ == '__main__':
# 测试某个函数
token=create_token('123')
print(token,validate_token(token))
app.config['JSON_AS_ASCII'] = False
app.register_blueprint(auth,url_prefix='/auth')
app.run(host='10.252.60.55',port=8080,debug=True)
# cursor转为dict
# objetcId不能转换