Django

在线消息推送和离线消息推送(3)

2020-02-21  本文已影响0人  独步江雪

参考资料
https://blog.csdn.net/z50l2o08e2u4aftor9a/article/details/80276522

首先介绍一下具体的功能设计。
整个系统由服务端、客户端组成。客户端包含前端(浏览器端、pyqt桌面端)和后台(用于添加要推送的消息)。

因为浏览器端不容易注意到实时提醒,所以我将实时提醒功能放到pyqt桌面端中实现(如托盘区闪烁等醒目的方式)。
浏览器端中只对消息进行拉取操作(刷新网页时),而不采取实时推的方式。
pyqt桌面端主要接收新消息提示即可,通知用户到网页端查看最新消息。(登录时主动拉取一次消息判断有无新消息,或者服务器主动推送所有新消息)

实时推送消息和主动拉取消息两个功能实际上是完全分离的,可以独立行使各自的职能。

不过下面我就不写浏览器端的代码了, 直接把相关的逻辑也放到pyqt里实现。

然后再来讲一下拉取服务端消息的逻辑。
每个用户拥有自己的消息库。当消息库为空时,一次性拉取服务端所有消息入用户库;当消息库内有消息时,每次拉取只拉取用户库内最新的消息发布时间之后的消息(将未入用户库的消息入库后再从用户库拉取)。

代码共5部分
1.鉴权服务器:用于服务和服务之间对用户提供的token做权限检查
2.消息数据服务器:用于读取和处理用户消息在数据库中的状态。
3.消息在线推送服务器:将管理员发布的消息在线实时推送给所有在线用户(若使用发布器的在线推送功能)
4.管理员消息发布器:分为在线推送和离线入库两个功能,可独立或组合使用。在线推送消息不会入消息库。
5.用户消息监听器:登陆后可接收管理员推送的在线消息,也可主动拉取用户对应消息库中的所有已读、未读消息,并标记消息的已读、未读状态。

下面是代码部分(为了简化代码逻辑,后面代码中的token即用户名。)

1.鉴权服务器

from flask import Flask, request, jsonify

app = Flask(__name__)


class Lib:
    @staticmethod
    def auth_admin_token(token):
        return True

    @staticmethod
    def auth_user_token(token):
        return True


@app.route('/admin', methods = ['POST'])
def admin_auth_token():
    data = request.json
    if Lib.auth_admin_token(data['token']):
        return jsonify({'code': 200, 'msg': '鉴权成功'})
    return jsonify({'code': 400, 'msg': '鉴权失败'})


@app.route('/user', methods = ['POST'])
def user_auth_token():
    data = request.json
    if Lib.auth_user_token(data['token']):
        return jsonify({'code': 200, 'msg': '鉴权成功'})
    return jsonify({'code':400, 'msg': '鉴权失败'})


if __name__ == '__main__':
    app.run(host = '0.0.0.0', port = 5009)

2.消息数据服务器

#         coding : utf-8
#         author : ['Wang Suyin', ]
#           data : 2020/2/21 14:14
#       software : PyCharm
# python_version : '3.5.3 64bit'
#           file : 2.消息数据服务器.py
"""
说明文档:

"""
import datetime
import requests
import json

from flask import Flask,request,jsonify
app=Flask(__name__)

# 虚拟数据
db = {
    'message_table': [
        {'id': 1, 'message': '消息1', 'publish_time': (2020, 1, 20, 3, 4, 5)},
        {'id': 2, 'message': '消息2', 'publish_time': (2020, 1, 20, 3, 4, 6)},
        {'id': 3, 'message': '消息3', 'publish_time': (2020, 1, 20, 3, 4, 7)},
    ],
    'user_to_messages_table': [
        {'username': 'tester', 'message_id': 1, 'read': True},
        {'username': 'tester', 'message_id': 2, 'read': False},
    ],
}

class Utils:
    @staticmethod
    def auth_admin_token(token):
        data = {'token': token}
        r = requests.post(url = 'http://127.0.0.1:5009/admin', json = data)
        r = json.loads(r.text)
        if r['code'] == 200:
            return True
        return False

    @staticmethod
    def add_message(message):
        t = datetime.datetime.now()
        publish_time = (t.year, t.month, t.day, t.hour, t.minute, t.second)
        db['message_table'].append({'id':len(db['message_table'])+1,'message': message, 'publish_time': publish_time})

    @staticmethod
    def get_user_messages(username):
        print(db['message_table'])
        # 拉取新消息入用户库
        # 原先考虑的是拉取最新时间后的消息,目前看来求消息ID表的差集更简单些,也更保险些(防止遗漏)。当然,要根据具体场景优化,代码也要更换为数据库代码,这里仅为示意。
        for message_id in list(
                set([e['id'] for e in db['message_table']])
                - set([e['message_id'] for e in [e_ for e_ in db['user_to_messages_table'] if e_['username']==username]])
        ):
            db['user_to_messages_table'].append({'username': username, 'message_id': message_id, 'read': False})

        # 在通知量少的情况下,可一次性返回所有内容。
        # 在通知量多的情况下,可分页返回,也可只返回未读消息,已读消息由客户端本地保存。
        # 简单起见,以下一次性返回所有内容
        res = []
        for e in db['user_to_messages_table']:
            if e['username'] == username:
                d = [e_ for e_ in db['message_table'] if e_['id'] == e['message_id']][0]
                d.update({'read': e['read']})
                res.append(d)
        return res
    @staticmethod
    def mark_as_read(username, message_id,read_status):
        #将消息标记为已读、未读状态,比较简单,就不具体谢写了
        pass






@app.route('/add_message', methods=['POST'])
def add_message():
    data = request.json
    token = data['token']

    r = requests.post(url='http://127.0.0.1:5009/admin',json={  'token':token  } )
    r =  json.loads(r.text)
    if not r['code'] == 200:
        return jsonify({'code':400,'msg':'鉴权失败'})
    Utils.add_message(data['message'])
    print('新的消息入库成功')
    return jsonify({'code': 200, 'msg': '消息发布成功'})

@app.route('/get_user_messages', methods=['POST'])
def get_user_messages():
    data = request.json
    token = data['token']

    if not Utils.auth_admin_token(token):
        return jsonify({'code':400,'msg':'鉴权失败'})

    username = token
    data = Utils.get_user_messages(username)
    return jsonify({'code': 200, 'msg': '拉取用户消息成功','data':data})


@app.route('/mark_as_read', methods=['POST'])
def mark_as_read():
    data = request.json
    token = data['token']

    if not Utils.auth_admin_token(token):
        return jsonify({'code':400,'msg':'鉴权失败'})
    username = token
    message_id =  data['message_id']
    read_status = data['read_status']
    data = Utils.mark_as_read(username, message_id,read_status)
    return jsonify({'code': 200, 'msg': '拉取用户消息成功','data':data})


if __name__ == '__main__':
    app.run(host = '0.0.0.0', port = 5008)

3.消息在线推送服务器

#         coding : utf-8
#         author : ['Wang Suyin', ]
#           data : 2020/2/20 16:48
#       software : PyCharm
# python_version : '3.5.3 64bit'
#           file : 3.消息在线推送服务器.py
"""
说明文档:

"""
import requests

import json
from flask import Flask
from flask_sockets import Sockets


app = Flask(__name__)
sockets = Sockets(app)

ws_pool = []  # 推送目标池



def auth_admin_token(token):
    data = {  'token':token  }
    r = requests.post(url='http://127.0.0.1:5009/admin',json=data )
    r =  json.loads(r.text)
    if  r['code'] == 200:
        return True
    return False

def auth_user_token(token):
    data = {  'token':token  }
    r = requests.post(url='http://127.0.0.1:5009/user',json=data )
    r =  json.loads(r.text)
    if  r['code'] == 200:
        return True
    return False

#新消息的插入可以通过ws也可以通过http
@sockets.route('/admin')
def admin_socket(ws):
    print('admin接入')
    r_data = ws.receive()
    r_data = json.loads(r_data)
    token = r_data['data']['token']
    if not r_data['type']=='init' or not  auth_admin_token(token):
        ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
        ws.close()
        return
    ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))

    while not ws.closed:
        r_data = ws.receive()
        if not r_data:
            break
        ws.send(json.dumps({'type':'message_r','code':200, 'msg':'发布成功'}))

        data = json.loads(r_data)
        if data['type'] == 'message':
            print('将消息推送给{}'.format(ws_pool))
            print(ws_pool)
            #推送给在线用户
            for e in ws_pool:
                try:
                    e.send(json.dumps({'type':'message','data':{'message':data['data']['message']}}))
                except:
                    ws_pool.remove(e)
    try:
        ws.close()
    except:
        pass

@sockets.route('/listener')
def listener_socket(ws):
    print('listener接入')
    r_data = ws.receive()
    r_data = json.loads(r_data)
    token = r_data['data']['token']
    if not r_data['type']=='init' or not  auth_user_token(token):
        ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
        ws.close()
        return
    ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))

    ws_pool.append(ws)
    while not ws.closed:
        r_data = ws.receive()
        if not r_data:
            break
        #这里阻塞住就可以了,因为消息监听器只接收消息
    try:
        ws.close()
    except:
        pass
    finally:
        ws_pool.remove(ws)


if __name__ == '__main__':
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler
    from gevent import monkey
    monkey.patch_all()

    server = pywsgi.WSGIServer(('0.0.0.0', 5003), app, handler_class = WebSocketHandler)
    print('web server start ... ')
    server.serve_forever()

4.管理员消息发布器

import json
import requests

import websocket

websocket.enableTrace(True)


# 这里就不写界面了,要推送的消息一并写在on_message里


class OnlinePublisher:
    _message = ''

    def __init__(self, url = 'ws://127.0.0.1:5003/admin'):

        self.__ws = ws = websocket.WebSocketApp(url)
        ws.on_open = self.on_open
        ws.on_message = self.on_message
        ws.on_error = self.on_error
        ws.on_close = self.on_close

    def publish(self, message):
        OnlinePublisher._message = message
        self.__ws.run_forever()

    @staticmethod
    def on_open(ws):
        token = 'admin'
        data = json.dumps({'type': 'init', 'data': {'token': token}})
        ws.send(data)

    @staticmethod
    def on_message(ws, r_data):
        print('接收到消息:{}'.format(r_data))
        data = json.loads(r_data)
        if data['type'] == 'init_r':
            if data['code'] != 200:
                ws.close()
                raise Exception('鉴权失败')
            ws.send(json.dumps({'type': 'message', 'data': {'message': OnlinePublisher._message}}))


        elif data['type'] == 'message_r':
            if data['code'] == 200:
                print('发布成功')
            else:
                raise Exception('发布失败')
            ws.close()

    @staticmethod
    def on_error(ws):
        print('连接异常')

    @staticmethod
    def on_close(ws):
        print('连接关闭')


def online_publish(message):
    OnlinePublisher().publish(message)


def offline_publish(message):
    data = {'token': 'admin', 'message': message}
    r = requests.post(url = 'http://127.0.0.1:5008/add_message', json = data)
    print(r.text)
    r = json.loads(r.text)
    if r['code'] == 400:
        raise Exception('鉴权失败')
    print('消息入库成功')


if __name__ == '__main__':
    message = '测试消息,这是一条公告'
    online_publish(message)
    offline_publish(message)

5.用户消息监听器

import json
import sys
import datetime
import threading
import requests

from PyQt5.QtWidgets import *
from PyQt5.QtGui import *
from PyQt5.QtCore import *

import websocket

websocket.enableTrace(True)


class ActionSet:
    @staticmethod
    def on_open(ws):
        token = 'user'
        data = json.dumps({'type': 'init', 'data': {'token': token}})
        ws.send(data)

    @staticmethod
    def on_message(ws, r_data):
        print('接收到消息:{}'.format(r_data))
        data = json.loads(r_data)
        if data['type'] == 'init_r':
            if data['code'] != 200:
                ws.close()
                print('鉴权失败')
                return
        print('鉴权成功')
        if data['type'] == 'message':
            print(data['data']['message'])
            MainWindow._instance.new_message_got.emit(data['data']['message'])

    @staticmethod
    def on_error(ws):
        print('连接异常')

    @staticmethod
    def on_close(ws):
        print('连接关闭')


class MainWindow(QWidget):
    _instance = None
    new_message_got = pyqtSignal(str)

    def __init__(self, parent = None):
        super().__init__(parent)
        MainWindow._instance = self
        self.setWindowTitle('消息监听器')
        self.resize(800, 600)

        self.__btn_online_connect = QPushButton('连接')
        self.__lw_online_message = QListWidget()

        self.__le_username = QLineEdit()
        self.__le_username.setPlaceholderText('要拉取消息的用户名')
        self.__btn_pull_all_messages = QPushButton('拉取所有消息/刷新')
        self.__lw_all_messages = QListWidget()


        main_layout = QVBoxLayout()
        main_layout.setSpacing(0)
        main_layout.setContentsMargins(0, 0, 0, 0)
        self.setLayout(main_layout)
        main_layout.addWidget(self.__btn_online_connect)
        main_layout.addWidget(self.__lw_online_message)
        main_layout.addWidget(self.__le_username)
        main_layout.addWidget(self.__btn_pull_all_messages)
        main_layout.addWidget(self.__lw_all_messages)
        self.__btn_online_connect.clicked.connect(self.__on_btn_online_connect_clicked)
        self.new_message_got.connect(self.__show_new_message)

        self.__btn_pull_all_messages.clicked.connect(self.__on_btn_pull_all_messages_clicked)


    def __on_btn_online_connect_clicked(self):
        url = 'ws://127.0.0.1:5003/listener'
        ws = websocket.WebSocketApp(url)

        ws.on_open = ActionSet.on_open
        ws.on_message = ActionSet.on_message
        ws.on_error = ActionSet.on_error
        ws.on_close = ActionSet.on_close

        wst = threading.Thread(target = ws.run_forever)
        wst.setDaemon(True)
        wst.start()

    def __on_btn_pull_all_messages_clicked(self):
        data = {'token': self.__le_username.text()}
        r = requests.post(url = 'http://127.0.0.1:5008/get_user_messages', json = data)
        r = json.loads(r.text)
        if r['code'] == 400:
            raise Exception('鉴权失败')
        print('消息拉取成功')
        message_datas = sorted(r['data'][:],key = lambda x:-x['id'])
        self.__lw_all_messages.clear()
        for  d in message_datas:
            self.__lw_all_messages.addItem('【{}】{}'.format({True:'已读',False:'未读'}[d['read']],d['message'] ) )

    def __show_new_message(self, text):
        time_ = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        text = '{}   {}'.format(time_, text)
        self.__lw_online_message.insertItem(0, text)


if __name__ == '__main__':

    app = QApplication(sys.argv)
    w = MainWindow()
    w.show()
    app.exec_()

上一篇下一篇

猜你喜欢

热点阅读