JavaJava学习之路Java学习笔记

基于WebSocket的在线聊天室(一)

2016-04-28  本文已影响14649人  anyesu
效果预览

前言


去年在tomcat7自带的例子中发现了两个有趣的demo,贪食蛇游戏和画板。很有意思的是打开的几个窗口内容都是一样的,而且还会同步更新,如果换做以往做web开发的套路来实现这个效果还是比较费劲的。于是心血来潮就去查了一些关于websocket的资料并做了这么一个文字聊天室。前段时间应别人的需要又把它翻了出来加上了视频和语音功能,瞬间高大上了很多。做完之后当然得趁热打铁总结下,顺便作为第一次写文章的素材。(∩_∩)

简介


WebSocket 是 HTML5 一种新的协议。它实现了浏览器与服务器全双工通信,能更好的节省服务器资源和带宽并达到实时通讯,它建立在 TCP 之上,同 HTTP 一样通过 TCP 来传输数据。

说起实时通讯就不得不提一些“服务器推”技术。

客户端以一定的时间间隔发送Ajax请求,优点实现起来比较简单、省事,不过缺点也很明显,请求有很大一部分是无用的,而且需要频繁建立和释放TCP连接,很消耗带宽和服务器资源。

与普通轮询不同的地方在于,服务端接收到请求后会保持住不立即返回响应,等到有消息更新才返回响应并关闭连接,客户端处理完响应再重新发起请求。较之普通轮询没有无用的请求,但服务器保持连接也是有消耗的,如果服务端数据变化频繁的话和普通轮询并无两样。

在页面中嵌入一个隐藏的iframe,将其src设为一个长连接的请求,这样服务端就能不断向客户端发送数据。优缺点与长轮询相仿。

这些技术都明显存在两个相同的缺点:1. 服务器需要很大的开销 2. 都做不到真正意义上的“主动推送”,服务端只能“被动”地响应,于是就轮到正主出场了。在websocket中,只需要做一个握手动作就可以在客户端和服务器之间建立连接,之后通过数据帧的形式在这个连接上进行通讯,并且,由于连接是双向的,在连接建立之后服务端随时可以主动向客户端发送消息(前提是连接没有断开)。

实现


以前一些websocket的例子都是基于某个特定的容器(如Tomcat,Jetty),在Oracle发布了JSR356规范之后,websocket的JavaAPI得到了统一,所以只要Web容器支持JSR356,那么我们写websocket时,代码都是一样的了.Tomcat从7.0.47开始支持JSR356.另外有一点要说明的是JDK的要求是7及以上。
我本地的环境为jdk1.7,nginx1.7.8(反向代理),tomcat7.0.52(需要在buildpath中还要添加tomcat7的library),chrome。

废话不多说,先上代码

消息结构Message类
public class Message {
    
        private int type;//消息类型

        private String msg;//消息主题

        private String host;// 发送者

        private String[] dests;// 接受者

        private RoomInfo roomInfo;//聊天室信息

        public class MsgConstant {

            public final static int Open = 1;// 新连接

            public final static int Close = 2;// 连接断开

            public final static int MsgToAll = 3;// 发送给所有人

            public final static int MsgToPoints = 4;// 发送给指定用户

            public final static int RequireLogin = 5;// 需要登录

            public final static int setName = 6;// 设置用户名
        }

        public static class RoomInfo {
    
            private String name;// 聊天室名称

            private String creater;//创建人

            private String createTime;// 创建时间

            public RoomInfo(String creater, String createTime) {
                this.creater = creater;
                this.createTime = createTime;
            }
    
            public RoomInfo(String name) {
                this.name = name;
            }
            // 省略set get
        }

        public Message() {
            setType(MsgConstant.MsgToAll);
        }
    
        public Message(String host, int type) {
            setHost(host);
            setType(type);
        }
    
        public Message(String host, int type, String msg) {
            this(host, type);
            setMsg(msg);
        }

        public Message(String host, int type, String[] dests) {
            this(host, type);
            setDests(dests);
        }

        @Override
        public String toString() {
            // 序列化成json串
            return JSONObject.toJSONString(this);
        }
    }

public class wsConfigurator extends ServerEndpointConfig.Configurator {
    @Override
    public void modifyHandshake(ServerEndpointConfig config, HandshakeRequest request, HandshakeResponse response) {
        //通过配置来获取httpsession
        HttpSession httpSession = (HttpSession) request.getHttpSession();
        config.getUserProperties().put(HttpSession.class.getName(), httpSession);
    }
}

@ServerEndpoint(value = "/websocket/chat/{uid}", configurator = wsConfigurator.class)
public class textController {

    private Session session;

    private LoginUser loginUser;

    private static RoomInfo roomInfo;

    //连接集合
    private static final Set<textController> connections = new CopyOnWriteArraySet<textController>();

    /**
     * websock连接建立后触发
     * 
     * @param session
     * @param config
     */
    @OnOpen
    public void OnOpen(Session session, EndpointConfig config, @PathParam(value = "uid") String uid) {
        //设置websock连接的session
        setSession(session);
        // 获取HttpSession
        HttpSession httpSession = (HttpSession) config.getUserProperties().get(HttpSession.class.getName());
        // 从HttpSession中取得当前登录的用户作为当前连接的用户
        setLoginUser((LoginUser) httpSession.getAttribute("LoginUser"));
        if (getLoginUser() == null) {
            requireLogin();// 未登录需要进行登录
            return;
        }
        // 设置聊天室信息
        if (getConnections().size() == 0) {// 如果当前聊天室为空,建立新的信息
            setRoomInfo(new RoomInfo(getUserName(), (new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")).format(new Date())));
        }
        //加入连接集合
        getConnections().add(this);
        //广播通知所有连接有新用户加入
        broadcastToAll(new Message(getUserName(), MsgConstant.Open, getUsers()));
    }

    /**
     * websock连接断开后触发
     */
    @OnClose
    public void OnClose() {
        //从连接集合中移除
        getConnections().remove(this);
        //广播通知所有连接有用户退出
        broadcastToAll(new Message(getUserName(), MsgConstant.Close, getUsers()));
    }

    /**
     * 接受到客户端发送的字符串时触发
     * 
     * @param message
     */
    @OnMessage(maxMessageSize = 1000)
    public void OnMessage(String message) {
        //消息内容反序列化
        Message msg = JSONObject.parseObject(message, Message.class);
        msg.setHost(getUserName());
        //对html代码进行转义
        msg.setMsg(txt2htm(msg.getMsg()));
        if (msg.getDests() == null)
            broadcastToAll(msg);
        else
            broadcastToSpecia(msg);
    }

    @OnError
    public void onError(Throwable t) throws Throwable {
        System.err.println("Chat Error: " + t.toString());
    }

    /**
     * 广播给所有用户
     * 
     * @param msg
     */
    private static void broadcastToAll(Message msg) {
        for (textController client : getConnections())
            client.call(msg);
    }

    /**
     * 发送给指定的用户
     * 
     * @param msg
     */
    private static void broadcastToSpecia(Message msg) {
        for (textController client : getConnections())
            // 感觉用map进行映射会更好点
            if (Contains(msg.getDests(), client.getUserName()))
                client.call(msg);
    }

    private void call(Message msg) {
        try {
            synchronized (this) {
                if (getUserName().equals(msg.getHost()) && msg.getType() == MsgConstant.Open)
                    msg.setRoomInfo(getRoomInfo());
                this.getSession().getBasicRemote().sendText(msg.toString());
            }
        } catch (IOException e) {
            try {
                //断开连接
                this.getSession().close();
            } catch (IOException e1) {
            }
            OnClose();
        }
    }

    private void requireLogin() {
        Message msg = new Message();
        msg.setType(MsgConstant.RequireLogin);
        call(msg);
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public Session getSession() {
        return this.session;
    }

    public LoginUser getLoginUser() {
        return loginUser;
    }

    public void setLoginUser(LoginUser loginUser) {
        this.loginUser = loginUser;
    }

    /**
     * 设置聊天室信息
     */
    public static void setRoomInfo(RoomInfo info) {
        roomInfo = info;
    }

    public static RoomInfo getRoomInfo() {
        return roomInfo;
    }

    private String getUserName() {
        if (getLoginUser() == null)
            return "";
        return getLoginUser().getUserName();
    }

    public static Set<textController> getConnections() {
        return connections;
    }

    private String[] getUsers() {
        int i = 0;
        String[] destArrary = new String[getConnections().size()];
        for (textController client : getConnections())
            destArrary[i++] = client.getUserName();
        return destArrary;
    }

    /**
     * html代码转义
     * 
     * @param txt
     * @return
     */
    public static String txt2htm(String txt) {
        if (StringUtils.isBlank(txt)) {
            return txt;
        }
        return txt.replaceAll("&", "&").replaceAll("<", "<").replaceAll(">", ">").replaceAll("\"", """).replaceAll(" ", " ").replaceAll("\n", "<br/>").replaceAll("\'", "'");
    }

    /**
     * 字符串数组是否包含指定字符串
     * 
     * @param strs
     * @param str
     * @return
     */
    public static boolean Contains(String[] strs, String str) {
        if (StringUtils.isBlank(str) || strs.length == 0)
            return false;
        for (String s : strs)
            if (s.equals(str))
                return true;
        return false;
    }
}

服务端代码就这么三个类,还是比较简单的(>_<|||还是比别人的例子复杂好多)。

@ServerEndpoint

用来标记一个websocket服务器终端

@ClientEndpoint

用来标记一个websocket客户器

@OnOpen

websock连接建立后执行,主要进行一些初始化操作

@OnClose

websock连接断开后执行,没什么好说的

@OnMessage

接收到客户端发送端的消息后执行,值得注意的是,OnMessage注解的方法可以有多个重载,方法参数可以为String,ByteBuffer等类型,相应的,session有这么几个方法可以向客户端发送消息:sendText,sendBinary,sendObject等。另外,上面代码里向客户端发消息用的是session.getBasicRemote().sendText方法,这是阻塞的方式,还有一种异步方式session.getAsyncRemote().sendText,虽说是异步,不过高频率发送并没有出现错乱的情况,还有待研究。

@OnError

报错的时候会执行,不过试过各种异常下这个方法都没有执行,很奇怪

服务端功能比较简单,主要实现了几个注解的方法,对客户端传来的消息进行广播,并无其他额外操作。再来看下前端的代码:

(function(window) {
    Blob.prototype.appendAtFirst = function(blob) {
        return new Blob([blob, this]);
    };
    var WS_Open = 1,
        WS_Close = 2,
        WS_MsgToAll = 3,
        WS_MsgToPoints = 4,
        WS_RequireLogin = 5,
        WS_setName = 6,
        types = ["文本", "视频", "语音"],
        getWebSocket = function(host) {
            var socket;
            if ('WebSocket' in window) {
                socket = new WebSocket(host);
            } else if ('MozWebSocket' in window) {
                socket = new MozWebSocket(host);
            }
            return socket;
        },
        WSClient = function(option) {
            var isReady = false,
                init = function(client, option) {
                    client.socket = null;
                    client.online = false;
                    client.isUserClose = false;
                    client.option = option || {};
                };

            this.connect = function(host) {
                var client = this,
                    socket = getWebSocket(host);

                if (socket == null) {
                    console.log('错误: 当前浏览器不支持WebSocket,请更换其他浏览器', true);
                    alert('错误: 当前浏览器不支持WebSocket,请更换其他浏览器');
                    return;
                }

                socket.onopen = function() {
                    var onopen = client.option.onopen,
                        type = types[client.option.type];
                    console.log('WebSocket已连接.');
                    console.log("%c类型:" + type, "color:rgb(228, 186, 20)");
                    onopen && onopen();
                };

                socket.onclose = function() {
                    var onclose = client.option.onclose,
                        type = types[client.option.type];
                    client.online = false;
                    console.error('WebSocket已断开.');
                    console.error("%c类型:" + type, "color:rgb(228, 186, 20)");
                    onclose && onclose();
                    if (!client.isUserClose) {
                        client.initialize();
                    }
                };

                socket.onmessage = function(message) {
                    var option = client.option;
                    if (typeof(message.data) == "string") {
                        var msg = JSON.parse(message.data);
                        switch (msg.type) {
                        case WS_Open:
                            option.wsonopen && option.wsonopen(msg);
                            break;
                        case WS_Close:
                            option.wsonclose && option.wsonclose(msg);
                            break;
                        case WS_MsgToAll:
                        case WS_MsgToPoints:
                            option.wsonmessage && option.wsonmessage(msg);
                            break;
                        case WS_RequireLogin:
                            option.wsrequirelogin && option.wsrequirelogin();
                            break;
                        case WS_setName:
                            option.userName = msg.host;
                            option.wssetname && option.wssetname(msg);
                            break;
                        }
                    } else if (message.data instanceof Blob) {
                        option.wsonblob && option.wsonblob(message);
                    }

                };

                isReady = true;
                this.socket = socket;
                return this;
            };

            this.initialize = function(param) {
                return this.connect(this.option.host + (param ? "?" + param : ""));
            };

            this.sendString = function(message) {// 向服务端发送给字符串
                return isReady && this.socket.send(message);
            };

            this.sendBlob = function(blob) {// 向服务端发送二进制数据
                return isReady && this.socket.send(blob.appendAtFirst(this.option.userName));
            };

            this.close = function() {
                this.isReady = false;
                this.online = false;
                this.isUserClose = true;
                this.socket.close();
                return true;
            };

            this.isMe = function(name) {
                return this.option.userName == name;
            }

            init(this, option);
        };

    window.WSClient = WSClient;

})(window);

这里的代码我做了下粗劣的封装,就不给出具体实现了,调用的时候实现具体的逻辑即可。如下形式:

var textClient = new WSClient({
    host: "ws://" + window.location.host + "/websocket/chat/123",// 注意这里不是http协议
    type: MODE_TEXT,
    onopen: function() {
        console.log('WebSocket已连接.');
    },
    onclose: function() {
        console.log('Info: WebSocket已断开.');
    },
    wsonopen: function(msg) {
        console.log("***加入聊天室");
    },
    wsonclose: function(msg) {
        console.log("***退出了聊天室");
    },
    wsonmessage: function(msg) {
        console.log(“收到消息:” + msg.msg);
    },
    wsrequirelogin: function(msg) {
        document.location.href = "http://" + window.location.host + "/login.htm?to_url=" + document.location.href;
    },
    wssetname: function(msg) {
    }
});

和服务端要实现的几个方法类似,就不多说了。其中,socket.onmessage中message.data有两种类型:string和Blob,Blob表示二进制数据,比如图片和声音,文件就可以通过Blob对象来传输。另外,服务端发送消息的send方法是有好几种的,而这里WebSocket对象的send方法只有一个,参数可以是Blob或string。

最后附上websocket的nginx配置:

location /websocket/chat { 
    proxy_pass http://localhost:8080/websocket/chat; 
    include websocket.conf;
}

websocket.conf:

#避免nginx超时
proxy_read_timeout 86400;
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# WebSocket support
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";

参考文章:


唉,第一次写文章,加上基础不扎实,磨磨蹭蹭写了一晚上才结束战斗,真是不容易。水平有限,有讲错的地方欢迎指出,之后会继续关于视频和音频通讯方式的总结。

上一篇下一篇

猜你喜欢

热点阅读