Netty4.1 HTTP开发入门

2022-01-01  本文已影响0人  肥兔子爱豆畜子

(一)服务端

开发了一个简单的Http Server,使用的是Netty 4.1.46.Final版本。

服务器类
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import lombok.extern.slf4j.Slf4j;

/**
 * 一个使用netty实现的简单的http服务器
 * HttpServerCodec是能同时处理入站和出站的编解码器
 * */
@Slf4j
public class SimpleHttpServer {

    private static final int _1M = 1024 * 1024;

    public static void main(String[] args) {
        log.info("启动SimpleHttpServer服务器...");
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup(2);
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.option(ChannelOption.SO_BACKLOG, 1024).group(boss, worker).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) {

                            ch.pipeline().addLast("codec", new HttpServerCodec());
                            ch.pipeline().addLast("aggregate", new HttpObjectAggregator(_1M));
                            ch.pipeline().addLast("msg", new MyHttpMsgHandler());
                        }

                    });
            ChannelFuture future = bootstrap.bind(8080).sync();
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
            log.error("http服务器错误:" + e.getMessage(), e);
        } finally {
            log.info("关闭http服务器");
            try {
                boss.shutdownGracefully().sync();
                worker.shutdownGracefully().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
自定义Handler,用来处理Http Request并向客户端写Response
package com.wangan.netty_httpserver;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

/**
 * 用来处理http消息,包括请求与响应
 * 入站byte由HttpServerCodec转为请求消息,而出站响应同样由HttpServerCodec转为出站byte
 * */
@Slf4j
public class MyHttpMsgHandler extends SimpleChannelInboundHandler<HttpObject> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
        
        if (msg instanceof FullHttpRequest) {
            log.info("收到HTTP请求,聚合为FullHttpRequest");

            FullHttpRequest request = (FullHttpRequest) msg;
            log.info("HTTP Headers:{}", request.headers().toString());
            String requestId = request.headers().get("RequestID");
            log.info("Http RequestID:{}", requestId);
            String requestContent = request.content().toString(CharsetUtil.UTF_8);
            log.info("HTTP Content:{}", requestContent);

            ByteBuf responseContent = Unpooled.copiedBuffer("已收到,请求内容为" + requestContent, CharsetUtil.UTF_8);
            HttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,
                    responseContent);
            if (requestId != null)
                response.headers().add("RequestID", requestId);
            ctx.write(response);
            ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            future.addListener(ChannelFutureListener.CLOSE);//写完Response,关闭了Channel
            log.info("写回response");
        }

    }

}
关于HttpObjectAggregator

Http协议底层传输是基于TCP对数据进行字节传输的,传输的时候对端需要知道何时是一个完整的请求结束,一般会有如下几个方式:

当使用的是分块传输的话,如果不使用HttpObjectAggregator的话,我们需要在channelRead0处理多个HttpContent,以及最后的LastHttpContent,每个HttpContent都是变长的数据块,而通过在pipeline上的HttpServerCodec后面添加HttpObjectAggregator,可以将多个块聚合成一个完整的FullHttpRequest,方便以上层Http应用层协议的方式进行统一处理,而不用考虑底层数据传输的细节了。

参考

https://www.cnblogs.com/nxlhero/p/11670942.html
https://www.cnblogs.com/xuehaoyue/p/6639029.html

(二)客户端

在前一篇中Netty4.1 Http开发入门(一)服务端,实现了一个简单的Http Server,这次来实现一个Http客户端。
为了方便进行测试,这次把这个Http客户端整合到SpringBoot里边,调用流程是:PostMan -> SpringBoot Controller -> Http客户端 -> Http Server

简单Http连接

每次请求:客户端创建到服务端的连接,发请求,收相应,然后关闭连接。
笔者这里在SpringBoot中调用客户端使用的是同步模型,但我们知道Netty本身是异步事件驱动模型,所以需要一点线程同步技巧,笔者思路是做一个Response的容器类,即SynResponse,里边用Latch(1)做一个同步锁,Controller里边已同步方式发送请求给服务端之后,通过SynResponse在Latch上await获取响应的Content,等Netty回调Handler时对Latch countDown,这样Controller就会继续执行并把Response Content返回给前端了。代码如下:
SynResponse

public class SynResponse<T> {
    
    private T responseContent;
    private CountDownLatch latch = new CountDownLatch(1);
    
    
    public T getResponseContent() throws InterruptedException {
        latch.await();
        return responseContent;
    }
    
    public void setResponseContent(T responseContent) {
        this.responseContent = responseContent;
        latch.countDown();
    }
    
}

SimpleNettyHttpClient:

@Slf4j
@Component
public class SimpleNettyHttpClient {

    private static final int _1M = 1024 * 1024;

    private EventLoopGroup workerGroup;

    @PostConstruct
    public void init() {
        workerGroup = new NioEventLoopGroup();
    }

    public String sendRequest(String url, String requestBody) throws Exception {
        Bootstrap boot = new Bootstrap();
        boot.group(workerGroup);
        boot.channel(NioSocketChannel.class);
        boot.remoteAddress("127.0.0.1", 8080);

        SynResponse<String> synResponse = new SynResponse<>();

        boot.handler(new ChannelInitializer<NioSocketChannel>() {

            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ch.pipeline().addLast(new StringEncoder());
                ch.pipeline().addLast(new HttpClientCodec());
                ch.pipeline().addLast(new HttpObjectAggregator(_1M));
                ch.pipeline().addLast(new TestHandler(synResponse));
            }
        });

        ChannelFuture f = boot.connect().sync();
        log.info("已连接");

        URI uri = new URI(url);

        DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,
                uri.toASCIIString(), Unpooled.wrappedBuffer(requestBody.getBytes("UTF-8")));
        request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
        log.info("组装Request");

        Channel channel = f.channel();
        channel.writeAndFlush(request);
        log.info("写Request");

        channel.closeFuture().sync();
        log.info("关闭Channel");

        //workerGroup.shutdownGracefully();

        log.info("同步阻塞获取Response Content");
        return synResponse.getResponseContent();
    }

    public static class TestHandler extends ChannelInboundHandlerAdapter {

        private SynResponse<String> synResponse;

        public TestHandler(SynResponse<String> synResponse) {
            this.synResponse = synResponse;
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

            FullHttpResponse response = (FullHttpResponse) msg;
            String content = response.content().toString(CharsetUtil.UTF_8);
            log.info("收到服务端response:{}", content);

            synResponse.setResponseContent(content);
            log.info("Response content放入result");

        }
    }
}

Controller:

@Autowired
private SimpleNettyHttpClient simpleNettyClient;

@RequestMapping(value = "/postStringSimple", method = RequestMethod.POST)
    public String postStringSimple(@RequestBody String data) throws Exception {
        
        String res = simpleNettyClient.sendRequest("http://127.0.0.1:8080/test", data);
        logger.info("Controller返回:{}", res);
        return res;
    }
Http连接池

上面的简单Http连接是调用线程每次创建一个连接,在连接的生命周期内,都是为这一个调用线程所用。而如果要使用连接池模式,那么连接在生命周期内是可能被多个线程重复使用的,多个线程通过1个连接发出去的请求,在收到服务端响应、Netty回调Handler时必须知道这个Response对应的是哪个Request,笔者这里采用的是对每个连接Channel在池里创建的时候绑定一个Map,用来存放<RequestID, SynResponse>这样的一个KV对,即发送请求的时候生成唯一的Request存入Map,Handler收到Response时把响应内容封装为SynContent并与RequestID建立关联放入Map,为了防止内存溢出,需要在本次请求使用完之后remove掉对应的key值。连接池的实现笔者使用的是Netty提供的实现,即ChannelPoolMapChannelPoolHandler两个接口。代码如下:

PoolingNettyClient 使用ChannelPoolMap接口来做连接池的初始化。对外提供sendRequest方法:

@Slf4j
@Component
public class PoolingNettyClient {
    
    //key是remote地址,每个remote地址对应一个FixedChannelPool
    private ChannelPoolMap<String, FixedChannelPool> poolMap;
    
    @PostConstruct
    public void init() {
        log.info("pooling netty client 初始化...");
        Bootstrap boot = new Bootstrap();
        boot.channel(NioSocketChannel.class);
        boot.group(new NioEventLoopGroup());
        boot.remoteAddress("127.0.0.1", 8080);
        
        poolMap = new AbstractChannelPoolMap<String, FixedChannelPool>() {
            
            ChannelPoolHandler handler = new PoolingChannelHandler();

            @Override
            protected FixedChannelPool newPool(String key) {
                return new FixedChannelPool(boot, handler, 50);
            }};
    }
    
    //发送请求
    public String sendRequest(String url, String requestBody) throws InterruptedException, ExecutionException {
        log.info("发送请求,url:{},requestbody:{}", url, requestBody);
        
        RequestWraper requestWraper = new RequestWraper(url, requestBody);
        String requestId = requestWraper.getRequest().headers().get("RequestID");
        //AttributeKey<Map> resMapKey = AttributeKey.valueOf("ResponseMap");
        
        FixedChannelPool pool= poolMap.get("127.0.0.1");
        Future<Channel> channelAcquireDone = pool.acquire().sync(); //从池中获取连接
        Channel channel = channelAcquireDone.get();
        
        log.info("已从连接池获取连接...");
        
        //准备response容器,并放入ResponseMap
        Map<String, SynResponse<String>> responseMap = getResponseMap(channel);
        log.info(JSON.toJSONString(responseMap));
        SynResponse<String> synResponse = new SynResponse<>();
        if(responseMap != null)
            responseMap.put(requestId, synResponse);
        //log.info(JSON.toJSONString(getResponseMap(channel)));
        
        
        //写请求
        log.info("写请求...");
        ChannelFuture writeRequestDone = channel.writeAndFlush(requestWraper.getRequest()).sync();
        if(writeRequestDone.isSuccess()) {
            log.info("requestData已发送,ChannelID:{}, requestId:{}", channel.id(), requestId);
        }else {
            log.error("requestData发送失败,ChannelID:{}, requestId:{}", channel.id(), requestId);
        }
        
        pool.release(channel); //把连接还回池
        log.info("把连接还回池...");
        
        //从Response容器阻塞获取content,返回前端
        return synResponse.getResponseContent();
    }
    
    private Map<String, SynResponse<String>> getResponseMap(Channel ch) {
        AttributeKey<Map<String, SynResponse<String>>> resMapKey = AttributeKey.valueOf("ResponseMap");
        return ch.attr(resMapKey).get();
    }
    
}

PoolingChannelHandler 连接池里每个channel的handler,处理channel相应的各个事件create、acquire、release等:

@Slf4j
public class PoolingChannelHandler implements ChannelPoolHandler{
    
    private static final int _1M = 1024 * 1024;

    @Override
    public void channelReleased(Channel ch) throws Exception {
        log.info("释放连接...");
        
    }

    @Override
    public void channelAcquired(Channel ch) throws Exception {
        log.info("获取到连接...");
        
    }

    @Override
    public void channelCreated(Channel ch) throws Exception {
        log.info("已创建连接...");
        //channel上绑定个Map,用来放响应result
        AttributeKey<Map> key = AttributeKey.valueOf("ResponseMap");
        ch.attr(key).set(new ConcurrentHashMap<String, SynResponse<String>>());
        
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new StringEncoder())
                .addLast(new HttpClientCodec())
                .addLast(new HttpObjectAggregator(_1M))
                .addLast(new ResponseHandler());
    }

}

RequestWraper 请求包装类,包装了一个DefaultFullHttpRequest,设置了RequestID Header ,并且设置Content-Length Header,TCP层面属于指定长度的传输:

public class RequestWraper {

    private DefaultFullHttpRequest request;

    public RequestWraper(String url, String requestBody) {
        String requestId = genRequestId();
        request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url,
                Unpooled.wrappedBuffer(requestBody.getBytes(CharsetUtil.UTF_8)));
        request.headers().add("RequestID", requestId);
        request.headers().add(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
    }

    private String genRequestId() {
        return UUID.randomUUID().toString().replace("-", "");
    }

    public DefaultFullHttpRequest getRequest() {
        return request;
    }

}

ResponseHandler 业务逻辑Handler,这里用来处理服务端返回的Response:

@Slf4j
public class ResponseHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("读Response");
        FullHttpResponse response = (FullHttpResponse) msg;
        String requestId = response.headers().get("RequestID");
        ByteBuf content = response.content();
        String responseContent = content.toString(CharsetUtil.UTF_8);
        log.info("收到服务端response:{}", content.toString(CharsetUtil.UTF_8));

        //将Response content放入到ReponseMap里的SynResponse容器中
        Channel ch = ctx.channel();
        AttributeKey<Map> key = AttributeKey.valueOf("ResponseMap");
        Map responseMap = ch.attr(key).get();
        SynResponse<String> synResponse = (SynResponse) responseMap.remove(requestId);
        log.info("从ResponseMap获得SynResponse, 准备写Content");

        synResponse.setResponseContent(responseContent);
        log.info("已向SynResponse写入content");
    }

}

这里提一句,上面的Netty连接池属于“连接复用”且是“独享模式”,连接取用之后放回池里给其他线程使用,这是复用,同一个时刻一个连接只给一个线程所独占使用,这是独享,JDBC,Apache HttpClient所用的连接池都是这种模式。
连接“共享模式”的例子就是Redis的Lettuce连接池,同一个时刻一个连接是被多个线程一起使用的。Redis Server处理命令用的是一个单线程,而TCP协议本身保证顺序性,这样一来每个连接上的发出的请求的顺序一定跟返回的响应的顺序就一致了,所以这种情况下多个线程共用一个连接是线程安全的。
我们上面的连接池如果要改成这种模式的实现,也是需要RequestId来保证响应与请求的对应关系,因为服务端是多线程乱序处理的,无法保证同一个连接上响应返回的顺序与请求顺序一致。
btw,我们上面是为了转同步而不得不用的RequestId,本来独享模式是不需要的,注意体会一下。

总结
参考:

基于 Netty 如何实现高性能的 HTTP Client 的连接池 - 云+社区 - 腾讯云 (tencent.com)

Netty客户端连接池ChannelPool应用 【支持https请求】 - harara-小念 - 博客园 (cnblogs.com)

Netty Client实战——高并发连接池方案 | EGNOD'S BLOG (itboyer.github.io)

(5条消息) netty实战-netty client连接池设计Sam_Deep_Thinking-CSDN博客netty客户端连接池

Netty自带连接池的使用 - Ruthless - 博客园 (cnblogs.com)

Netty 中的粘包和拆包 - rickiyang - 博客园 (cnblogs.com)

Netty系列之Netty编解码框架分析-InfoQ

上一篇 下一篇

猜你喜欢

热点阅读