在线消息推送和离线消息推送(2.1)

2020-02-21  本文已影响0人  独步江雪

先稍微拆分下服务端的逻辑,原先处理发布者和接收者都写在一个连接里,有点杂乱了

推送服务器

#         coding : utf-8
#         author : ['Wang Suyin', ]
#           data : 2020/2/20 16:48
#       software : PyCharm
# python_version : '3.5.3 64bit'
#           file : 推送服务器.py
"""
说明文档:

"""


import json
from flask import Flask
from flask_sockets import Sockets


app = Flask(__name__)
sockets = Sockets(app)

ws_pool = []  # 推送目标池




def auth_admin_token(token):
    return True
def auth_listener_token(token):
    return True

@sockets.route('/admin')
def admin_socket(ws):
    print('admin接入')
    r_data = ws.receive()
    r_data = json.loads(r_data)
    token = r_data['data']['token']
    if not r_data['type']=='init' or not  auth_admin_token(token):
        ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
        ws.close()
        return
    ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))

    while not ws.closed:
        r_data = ws.receive()
        if not r_data:
            break
        ws.send(json.dumps({'type':'message_r','code':200, 'msg':'发布成功'}))

        data = json.loads(r_data)
        if data['type'] == 'message':
            print('将消息推送给{}'.format(ws_pool))
            print(ws_pool)
            #推送给在线用户
            for e in ws_pool:
                try:
                    e.send(json.dumps({'type':'message','data':{'message':data['data']['message']}}))
                except:
                    ws_pool.remove(e)
    try:
        ws.close()
    except:
        pass

@sockets.route('/listener')
def listener_socket(ws):
    print('listener接入')
    r_data = ws.receive()
    r_data = json.loads(r_data)
    token = r_data['data']['token']
    if not r_data['type']=='init' or not  auth_admin_token(token):
        ws.send(json.dumps({'type':'init_r','code': 400, 'msg': '鉴权失败'}))
        ws.close()
        return
    ws.send(json.dumps({'type':'init_r','code': 200, 'msg': '鉴权成功'}))

    ws_pool.append(ws)
    while not ws.closed:
        r_data = ws.receive()
        if not r_data:
            break
        #这里阻塞住就可以了,因为消息监听器只接收消息
    try:
        ws.close()
    except:
        pass
    finally:
        ws_pool.remove(ws)


if __name__ == '__main__':
    from gevent import pywsgi
    from geventwebsocket.handler import WebSocketHandler
    from gevent import monkey
    monkey.patch_all()

    server = pywsgi.WSGIServer(('0.0.0.0', 5003), app, handler_class = WebSocketHandler)
    print('web server start ... ')
    server.serve_forever()

消息发布器

import json

import websocket


# 这里就不写界面了,要推送的消息一并写在on_message里

def on_open(ws):
    token = 'admin'
    data = json.dumps({'type': 'init', 'data': {'token': token}})
    ws.send(data)


def on_message(ws, r_data):
    print('接收到消息:{}'.format(r_data))
    data = json.loads(r_data)
    if data['type'] == 'init_r':
        if data['code'] != 200:
            ws.close()
            print('鉴权失败')
            return
        ws.send(json.dumps({'type': 'message', 'data': {'message': '测试消息,这是一条公告'}}))


    elif data['type'] == 'message_r':
        if data['code'] == 200:
            print('发布成功')
        else:
            print('发布失败')
        ws.close()


def on_error(ws):
    print('连接异常')


def on_close(ws):
    print('连接关闭')


if __name__ == '__main__':

    websocket.enableTrace(True)
    ws = websocket.WebSocketApp('ws://127.0.0.1:5003/admin')
    ws.on_open = on_open
    ws.on_message = on_message
    ws.on_error = on_error
    ws.on_close = on_close
    ws.run_forever()

消息监听器

import json
import sys
import datetime
import threading

from PyQt5.QtWidgets import *
from PyQt5.QtGui import *
from PyQt5.QtCore import *

import websocket

websocket.enableTrace(True)

class ActionSet:
    @staticmethod
    def on_open( ws):
        token = '123456qwe'
        data = json.dumps({'type': 'init', 'data': {'token': token}})
        ws.send(data)

    @staticmethod
    def on_message(ws, r_data):
        print('接收到消息:{}'.format(r_data))
        data = json.loads(r_data)
        if data['type'] == 'init_r':
            if data['code'] != 200:
                ws.close()
                print('鉴权失败')
                return
        print('鉴权成功')
        if data['type'] == 'message':
            print(data['data']['message'])
            MainWindow._instance.new_message_got.emit(data['data']['message'])
    @staticmethod
    def on_error(ws):
        print('连接异常')

    @staticmethod
    def on_close(ws):
        print('连接关闭')


class MainWindow(QWidget):
    _instance=None
    new_message_got = pyqtSignal(str)
    def __init__(self, parent = None):
        super().__init__(parent)
        MainWindow._instance = self
        self.setWindowTitle('消息监听器')
        self.resize(800, 600)

        self.__btn = QPushButton('连接')
        self.__lw =QListWidget()

        main_layout = QVBoxLayout()
        main_layout.setSpacing(0)
        main_layout.setContentsMargins(0,0,0,0)
        self.setLayout(main_layout)
        main_layout.addWidget(self.__btn)
        main_layout.addWidget(self.__lw)

        self.__btn.clicked.connect(self.__on_btn_clicked)
        self.new_message_got.connect(self.__show_new_message)

    def __on_btn_clicked(self):
        url='ws://127.0.0.1:5003/listener'
        ws = websocket.WebSocketApp(url )

        ws.on_open = ActionSet.on_open
        ws.on_message = ActionSet.on_message
        ws.on_error = ActionSet.on_error
        ws.on_close = ActionSet.on_close

        wst = threading.Thread(target = ws.run_forever)
        wst.setDaemon(True)
        wst.start()

    def __show_new_message(self,text):
        time_ = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        text= '{}   {}'.format(time_, text)
        self.__lw.insertItem(0,text)


if __name__ == '__main__':

    app = QApplication(sys.argv)
    w = MainWindow()
    w.show()
    app.exec_()
上一篇下一篇

猜你喜欢

热点阅读