我爱编程

netty 简单应用

2018-05-09  本文已影响0人  备货仓66

java 后端服务

启动类

package netty;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.buffer.ByteBuf;

import io.netty.buffer.Unpooled;

import io.netty.channel.*;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.http.*;

import io.netty.handler.codec.http.websocketx.*;

import io.netty.handler.stream.ChunkedWriteHandler;

import io.netty.util.CharsetUtil;

import java.util.Map;

import java.util.concurrent.ConcurrentHashMap;

/**

* Created by yehan on 2018/4/27.

*/

public class WebSocketServer {

private final EventLoopGroupworkerGroup =new NioEventLoopGroup();

    private final EventLoopGroupbossGroup =new NioEventLoopGroup();

    private WebSocketServerHandshakerhandshaker;

    private MapchannelMap =new ConcurrentHashMap();

    public void run() {

ServerBootstrap boot =new ServerBootstrap();

        boot.group(bossGroup, workerGroup)

.channel(NioServerSocketChannel.class)

.childHandler(new ChannelInitializer() {

@Override

                    protected void initChannel(Channel ch)throws Exception {

ChannelPipeline pipeline = ch.pipeline();

                        channelMap.put(ch.id(),ch);

                        pipeline.addLast("http-codec", new HttpServerCodec());

                        pipeline.addLast("aggregator", new HttpObjectAggregator(65536));

                        pipeline.addLast("http-chunked", new ChunkedWriteHandler());

                        pipeline.addLast("handler", new WebSocketServerHandler(WebSocketServer.this));

                    }

});

        try {

Channel ch = boot.bind(2048).sync().channel();

            System.out.println("websocket server start at port:2048");

            ch.closeFuture().sync();

        }catch (InterruptedException e) {

e.printStackTrace();

        }finally {

bossGroup.shutdownGracefully();

            workerGroup.shutdownGracefully();

        }

}

public void handlerWebSocketFrame(ChannelHandlerContext ctx,

                                      WebSocketFrame frame) {

/**

* 判断是否关闭链路的指令

*/

        if (frameinstanceof CloseWebSocketFrame) {

handshaker.close(ctx.channel(),

                    (CloseWebSocketFrame) frame.retain());

return;

        }

/**

* 判断是否ping消息

*/

        if (frameinstanceof PingWebSocketFrame) {

ctx.channel().write(

new PongWebSocketFrame(frame.content().retain()));

return;

        }

/**

* 本例程仅支持文本消息,不支持二进制消息

*/

        if (frameinstanceof BinaryWebSocketFrame) {

throw new UnsupportedOperationException(String.format(

"%s frame types not supported", frame.getClass().getName()));

        }

if(frameinstanceof TextWebSocketFrame){

// 返回应答消息

            String request = ((TextWebSocketFrame) frame).text();

            System.out.println("服务端收到:" + request);

//            ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到并返回:"+request));

            for (Channel ch :channelMap.values()) {

if (ctx.channel().equals(ch)) {

ch.writeAndFlush(new TextWebSocketFrame("我发送的:"+request));

continue;

                }

ch.writeAndFlush(new TextWebSocketFrame("收到:"+request));

            }

}

}

public void handleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req){

if (!req.getDecoderResult().isSuccess()

|| (!"websocket".equals(req.headers().get("Upgrade")))) {

sendHttpResponse(ctx, req, new DefaultFullHttpResponse(

HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));

return;

        }

WebSocketServerHandshakerFactory wsFactory =new WebSocketServerHandshakerFactory(

"ws://localhost:2048/ws", null, false);

        handshaker = wsFactory.newHandshaker(req);

        if (handshaker ==null) {

WebSocketServerHandshakerFactory

.sendUnsupportedWebSocketVersionResponse(ctx.channel());

        }else {

handshaker.handshake(ctx.channel(), req);

        }

}

private static void sendHttpResponse(ChannelHandlerContext ctx,

                                        FullHttpRequest req, DefaultFullHttpResponse res) {

// 返回应答给客户端

        if (res.getStatus().code() !=200) {

ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(),

                    CharsetUtil.UTF_8);

            res.content().writeBytes(buf);

            buf.release();

        }

// 如果是非Keep-Alive,关闭连接

        ChannelFuture f = ctx.channel().writeAndFlush(res);

        if (!isKeepAlive(req) || res.getStatus().code() !=200) {

f.addListener(ChannelFutureListener.CLOSE);

        }

}

private static boolean isKeepAlive(FullHttpRequest req) {

return false;

    }

public static void main(String[] args) {

new WebSocketServer().run();

    }

}


处理类

package netty;

import io.netty.channel.ChannelHandlerContext;

import io.netty.channel.SimpleChannelInboundHandler;

import io.netty.handler.codec.http.FullHttpRequest;

import io.netty.handler.codec.http.websocketx.WebSocketFrame;

/**

* Created by yehan on 2018/4/27.

*/

public class WebSocketServerHandlerextends SimpleChannelInboundHandler {

private  WebSocketServerwebSocketServer;

    public WebSocketServerHandler(WebSocketServer webSocketServer) {

super();

        this.webSocketServer = webSocketServer;

    }

@Override

    protected void channelRead0(ChannelHandlerContext ctx, Object msg)

throws Exception {

/**

* HTTP接入,WebSocket第一次连接使用HTTP连接,用于握手

*/

        if(msginstanceof FullHttpRequest){

webSocketServer.handleHttpRequest(ctx, (FullHttpRequest)msg);

            System.out.println("http请求");

        }

/**

* Websocket 接入

*/

        else if(msginstanceof WebSocketFrame){

webSocketServer.handlerWebSocketFrame(ctx, (WebSocketFrame)msg);

        }

}

@Override

    public void channelReadComplete(ChannelHandlerContext ctx)throws Exception {

ctx.flush();

    }

}

客户端

上一篇 下一篇

猜你喜欢

热点阅读