从图中可以看出本次实战的基本流程是客户端A请求服务端核心模块,核心模块生产一条消息到消息队列,然后服务端消息模块消费消息,消费完之后就将消息推送给客户端B,流程很简单,没有太多技巧,唯一的巧妙之处就在消息模块这边的处理上,本文的重点也主要讲解消息模块这一块,主要包括netty server、netty client、channel的存储等等。



<dependency >
 < groupId > io.netty</ groupId>
 < artifactId > netty - all</ artifactId>
 < version > 4.1 .6.Final</ version>
 < / dependency>


public class NettyServer {
    public void run( int port )
        new Thread()
            public void run()
                runServer( port );

    private void runServer( int port )
        Print.info( "===============Message服务端启动===============" );
        EventLoopGroup  bossGroup   = new NioEventLoopGroup();
        EventLoopGroup  workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group( bossGroup, workerGroup );                        b.channel( NioServerSocketChannel.class );
            b.childHandler( new ChannelInitializer<SocketChannel>()
                        protected void initChannel( SocketChannel ch ) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();                                        pipeline.addLast( "codec-http", new HttpServerCodec() );
                            pipeline.addLast( "aggregator", new HttpObjectAggregator( 65536 ) );
                            pipeline.addLast( "handler", new MyWebSocketServerHandler() );
                    } );                                    Channel ch = b.bind( port ).sync().channel();                       Print.info( "Message服务器启动成功:" + ch.toString() );
        } catch ( Exception e ) {
            Print.error( "Message服务运行异常:" + e.getMessage() );
        } finally {
            bossGroup.shutdownGracefully();                 workerGroup.shutdownGracefully();            Print.info( "Message服务已关闭" );


public class MyWebSocketServerHandler extends SimpleChannelInboundHandler<Object>{
    private static final String     WEBSOCKET_PATH = "";
    private WebSocketServerHandshaker   handshaker;         @Override protected void channelRead0( ChannelHandlerContext ctx, Object msg ) throws Exception
        if ( msg instanceof FullHttpRequest )
             * 以http请求形式接入,但是走的是websocket                        handleHttpRequest(ctx, (FullHttpRequest) msg);                }else if (msg instanceof  WebSocketFrame){
             * 处理websocket客户端的消息                        handleWebSocketFrame(ctx, (WebSocketFrame) msg);               }    }    @Override        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();
        private void handleHttpRequest( ChannelHandlerContext ctx, FullHttpRequest req ) throws Exception /* 要求Upgrade为websocket,过滤掉get/Post                if (!req.decoderResult().isSuccess() */
            || (!"websocket".equals( req.headers().get( "Upgrade" ) ) ) ) {
                /* 若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端                        sendHttpResponse(ctx, req, new DefaultFullHttpResponse(                    HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));                       return; */
            }                WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://localhost:9502/websocket", null, false );
            handshaker = wsFactory.newHandshaker( req );                if ( handshaker == null )
                WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse( ctx.channel() );
            } else {
                handshaker.handshake( ctx.channel(), req );

        private void handleWebSocketFrame( ChannelHandlerContext ctx, WebSocketFrame frame )                                                               /* Check for closing frame        if (frame instanceof CloseWebSocketFrame) { */
            handshaker.close( ctx.channel(), (CloseWebSocketFrame) frame.retain() );            return;

        if ( frame instanceof PingWebSocketFrame )
            ctx.channel().write( new PongWebSocketFrame( frame.content().retain() ) );            return;
        if ( !(frame instanceof TextWebSocketFrame) )
            Print.error( "数据帧类型不支持!" );            throw new UnsupportedOperationException( String.format( "%s frame types not supported", frame.getClass().getName() ) );
        /* Send the uppercase string back.        String request = ((TextWebSocketFrame) frame).text();        Print.info("Netty服务器接收到的信息: " + request);        if (request.equals(Const.HEARTBEAT)){ */
        ctx.channel().write( new TextWebSocketFrame( request ) );            return;

    JSONObject jsonData = JSONObject.parseObject( request );        String eventType = jsonData.getString( "event_type" );        String apiToken = jsonData.getString( "api_token" );        if ( Const.FRONT.equals( eventType ) )
        Print.info( "front event" );            ChannelSupervise.updateChannel( apiToken, ctx.channel() );
    }else if ( Const.BEHIND.equals( eventType ) )
        Print.info( "behind event" );            Channel chan = ChannelSupervise.findChannel( apiToken );            if ( null == chan )
            Print.error( "目标用户不存在" );
        }else {
            JSONObject jsonMsg = new JSONObject();                jsonMsg.put( "type", jsonData.get( "type" ) );                jsonMsg.put( "child_type", jsonData.get( "child_type" ) );                jsonMsg.put( "title", jsonData.get( "title" ) );                jsonMsg.put( "body", jsonData.get( "body" ) );                ChannelSupervise.sendToSimple( apiToken, new TextWebSocketFrame( jsonMsg.toString() ) );                Print.info( "向目标用户发送成功" );
        Print.error( "event type error" );
}    private static void sendHttpResponse( ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res )        /* 返回应答给客户端        if (res.status().code() != 200) { */
    ByteBuf buf = Unpooled.copiedBuffer( res.status().toString(), CharsetUtil.UTF_8 );            res.content().writeBytes( buf );            buf.release();

ChannelFuture f = ctx.channel().writeAndFlush( res );                                                                           /* 如果是非Keep-Alive,关闭连接        if (!isKeepAlive(req) || res.status().code() != 200) { */
f.addListener( ChannelFutureListener.CLOSE );        } }    @Override public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) throws Exception
    cause.printStackTrace();        ctx.close();

private static String getWebSocketLocation( FullHttpRequest req )
    return("ws://" + req.headers().get( HOST ) + WEBSOCKET_PATH);

/**     * 接收客户端连接事件     */ @Override public void channelActive( ChannelHandlerContext ctx ) throws Exception
    Print.info( "客户端与服务端连接开启:" + ctx.channel() );        ChannelSupervise.addChannel( null, ctx.channel() );

/**     * 接收客户端关闭事件     */ @Override public void channelInactive( ChannelHandlerContext ctx ) throws Exception
    Print.info( "客户端与服务端连接关闭:" + ctx.channel() );        ChannelSupervise.removeChannel( ctx.channel() );


public class ChannelSupervise {
    private static ChannelGroup         GlobalGroup = new DefaultChannelGroup( GlobalEventExecutor.INSTANCE );
    private static ConcurrentMap<String, ChannelId> ChannelMap  = new ConcurrentHashMap();
    public static void addChannel( String apiToken, Channel channel )
        GlobalGroup.add( channel );
        if ( null != apiToken )
            ChannelMap.put( apiToken, channel.id() );

    public static void updateChannel( String apiToken, Channel channel )
        Channel chan = GlobalGroup.find( channel.id() );                if ( null == chan )
            addChannel( apiToken, channel );
        }else {
            ChannelMap.put( apiToken, channel.id() );

    public static void removeChannel( Channel channel )
        GlobalGroup.remove( channel );
        Collection<ChannelId> values = ChannelMap.values();                values.remove( channel.id() );

    public static Channel findChannel( String apiToken )
        ChannelId chanId = ChannelMap.get( apiToken );
        if ( null == chanId )
        return(GlobalGroup.find( ChannelMap.get( apiToken ) ) );

    public static void sendToAll( TextWebSocketFrame tws )
        GlobalGroup.writeAndFlush( tws );

    public static void sendToSimple( String apiToken, TextWebSocketFrame tws )
        GlobalGroup.find( ChannelMap.get( apiToken ) ).writeAndFlush( tws );


@Servicepublic class NettyClient {
    private Channel channel;
    public void run( String strUri )
        new Thread()
            public void run()
                runClient( strUri );
        }.start();            private void runClient( String strUri )
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap   b       = new Bootstrap();
                URI     uri     = new URI( strUri );
                String      protocol    = uri.getScheme();               if ( !"ws".equals( protocol ) )
                    throw new IllegalArgumentException( "Unsupported protocol: " + protocol );
                HttpHeaders customHeaders = new DefaultHttpHeaders();
                customHeaders.add( "MyHeader", "MyValue" );
                 * Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
                 * If you change it to V00, ping is not supported and remember to change
                 * HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
                final MyWebSocketClientHandler handler =
                    new MyWebSocketClientHandler(
                        WebSocketClientHandshakerFactory.newHandshaker( uri, WebSocketVersion.V13, null, false, customHeaders ) );            b.group( group );            b.channel( NioSocketChannel.class );            b.handler( new ChannelInitializer<SocketChannel>()
                                                                                                                                      @Overpublic void initChannel( SocketChannel ch ) throws Exception {
                                                                                                                                          ChannelPipeline pipeline = ch.pipeline();                    pipeline.addLast( "http-codec", new HttpClientCodec() );                    pipeline.addLast( "aggregator", new HttpObjectAggregator( 8192 ) );                    pipeline.addLast( "ws-handler", handler );
                                                                                                                                  } );            Print.info( "===============Message客户端启动===============" );            channel = b.connect( uri.getHost(), uri.getPort() ).sync().channel();            handler.handshakeFuture().sync();            channel.closeFuture().sync();
            } catch ( Exception e ) {
                Print.error( e.getMessage() );
            } finally {
    private Channel channel;
    public void run( String strUri )
        new Thread()
            public void run()
                runClient( strUri );
        }.start();            private void runClient( String strUri )
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap   b       = new Bootstrap();
                URI     uri     = new URI( strUri );
                String      protocol    = uri.getScheme();               if ( !"ws".equals( protocol ) )
                    throw new IllegalArgumentException( "Unsupported protocol: " + protocol );
                HttpHeaders customHeaders = new DefaultHttpHeaders();
                customHeaders.add( "MyHeader", "MyValue" );
                 * Connect with V13 (RFC 6455 aka HyBi-17). You can change it to V08 or V00.
                 * If you change it to V00, ping is not supported and remember to change
                 * HttpResponseDecoder to WebSocketHttpResponseDecoder in the pipeline.
                final MyWebSocketClientHandler handler =
                    new MyWebSocketClientHandler(
                        WebSocketClientHandshakerFactory.newHandshaker( uri, WebSocketVersion.V13, null, false, customHeaders ) );            b.group( group );            b.channel( NioSocketChannel.class );            b.handler( new ChannelInitializer<SocketChannel>()
                                                                                                                                      @Overpublic void initChannel( SocketChannel ch ) throws Exception {
                                                                                                                                          ChannelPipeline pipeline = ch.pipeline();                    pipeline.addLast( "http-codec", new HttpClientCodec() );                    pipeline.addLast( "aggregator", new HttpObjectAggregator( 8192 ) );                    pipeline.addLast( "ws-handler", handler );
                                                                                                                                  } );            Print.info( "===============Message客户端启动===============" );            channel = b.connect( uri.getHost(), uri.getPort() ).sync().channel();            handler.handshakeFuture().sync();            channel.closeFuture().sync();
            } catch ( Exception e ) {
                Print.error( e.getMessage() );
            } finally {


public class MyWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
    private final WebSocketClientHandshaker handshaker;
    private ChannelPromise          handshakeFuture;
    public MyWebSocketClientHandler( WebSocketClientHandshaker handshaker )
        this.handshaker = handshaker;
    }    public ChannelFuture handshakeFuture()

    public void handlerAdded( ChannelHandlerContext ctx ) throws Exception
        handshakeFuture = ctx.newPromise();

    public void channelActive( ChannelHandlerContext ctx ) throws Exception
        handshaker.handshake( ctx.channel() );

    public void channelInactive( ChannelHandlerContext ctx ) throws Exception
        Print.info( "webSocket client disconnected!" );

    public void channelRead( ChannelHandlerContext ctx, Object msg ) throws Exception
        Channel ch = ctx.channel();                if ( !handshaker.isHandshakeComplete() )
            handshaker.finishHandshake( ch, (FullHttpResponse) msg );                       Print.info( "websocket client connected!" );
            handshakeFuture.setSuccess();                      return;
        if ( msg instanceof FullHttpResponse )
            FullHttpResponse response = (FullHttpResponse) msg;                        throw new Exception( "Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString( CharsetUtil.UTF_8 ) + ')' );
        WebSocketFrame frame = (WebSocketFrame) msg;                if ( frame instanceof TextWebSocketFrame )
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;                        Print.info( "客户端收到消息: " + textFrame.text() );
        } else if ( frame instanceof PongWebSocketFrame )
            Print.info( "websocket client received pong" );
        } else if ( frame instanceof CloseWebSocketFrame )
            Print.info( "websocket client received closing" );

    protected void channelRead0( ChannelHandlerContext channelHandlerContext, Object o ) throws Exception

    public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) throws Exception
        cause.printStackTrace();               if ( !handshakeFuture.isDone() )
            handshakeFuture.setFailure( cause );


class MessageApplication {
    private NettyServer server;          @Autowired
    private NettyClient client;            public static void main( String[] args )
        SpringApplication.run( MessageApplication.class, args );

    public void initMessage()
        server.run( 9502 );
        try {
            Thread.sleep( 1000 );
        } catch ( InterruptedException e ) {
        }        client.run( "ws://localhost:" + 9502 );


<!DOCTYPE html >
 < html >
 < head >
 < meta charset = "UTF-8" >
          < title > WebSocket Chat</ title>
          < / head >
          < body >
          < script type = "text/javascript" >
                  var socket;
 if ( !window.WebSocket )
     window.WebSocket = window.MozWebSocket;
 if ( window.WebSocket )
     socket         = new WebSocket( "ws://localhost:9502" );
     socket.onmessage   = function( event )
         var ta = document.getElementById( 'responseText' );
         ta.value = ta.value + '\n' + event.data
     socket.onopen = function( event )
         var ta = document.getElementById( 'responseText' );
         ta.value = "连接开启!";
     socket.onclose = function( event )
         var ta = document.getElementById( 'responseText' );
         ta.value = ta.value + "连接被关闭";
 } else {
     alert( "你的浏览器不支持 WebSocket!" );

 function send( message )
     if ( !window.WebSocket )

     if ( socket.readyState == WebSocket.OPEN )
         socket.send( message );
     } else {
         alert( "连接没有开启." );

 < / script >
 < form onsubmit = "return false;" >
           < h3 > WebSocket : < / h3 >
           < textarea id = "responseText" style = "width: 500px; height: 300px;" > < / textarea >
                              < br >
                              < input type                                                                                                                                                      = "text" name = "message"  style = "width: 300px" value = "1" >
                                                                                                                              < input type                                                      = "button" value = "发送消息" onclick = "send(this.form.message.value)" >
                                                                                                                                                                                 < input type   = "button" onclick = "javascript:document.getElementById('responseText').value=''" value = "清空聊天记录" >
                                                                                                                                                                                                                                      < / form >
                                                                                                                                                                                                                                      < br >
                                                                                                                                                                                                                                      < / body >
                                                                                                                                                                                                                                      < / html>



2、打开测试页面,在底下的输入框输入:{"event_type":"front", "api_token":"11111"},表示客户端B连接上netty服务器






