netty文件分段下载实战

2020-09-01  本文已影响0人  西5d

背景

最近有接触到netty,netty是一个比较流行的网络框架,提供和封装了比较丰富的网络处理API,可以比较容易地用它来实现所需要的功能,很多的中间件采用了netty来处理网络。要注意的是,目前持续支持的是netty4,netty5已经被废弃了,原因有一是引入了ForkJoinPool但提升不是很大,其他具体的可以看官网作者的讨论,有github地址。
最近也接触到大文件下载等相关的内容,所以正好就开始实战netty的内容。

吐槽

身边有一本《netty权威指南》,之前只看过一些概念性的内容,最近看到这个例子,质量实在是一言难尽,除了理论的东西,其他不建议买这个书看。首先是代码的排版,非常不好;其次书中本次要提到的例子是从netty官网的例子改的(后面会给官网地址),可能由于排版,删掉了很多代码,导致有些地方看得莫名其妙。还有删掉了import的部分,也很让人困惑,导致原始的例子搞了很久才启动成功。还有就是书里貌似也没有告知netty的具体版本,其他各种方面就不多说了,总结来说就是:
《netty权威指南(第二版)》不建议买

代码

简单说明下,代码分两部分,一个启动服务,一个用来支持文件的下载。是在官网的例子和书里的例子基础上修改的,后面本地用1.4G的视频做了验证,达到了想要的效果。重要的地方做了注释,可以仔细看下代码。注意本地使用的ubuntu+java+maven,netty的依赖版本4.1.50.Final

启动服务

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.junit.Test;

public class HttpFileServer {
    //文件的目录,根据需要自己设置
    private static final String DEFAULT_URL = "/home/me/download";

    public void run(final int port, final String url) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //!!这里需要注意下,如果有问题可能导致服务无法正常返回,注意参考官网例子。
                            socketChannel.pipeline().addLast("http-decoder", new HttpRequestDecoder());
                            socketChannel.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
                            socketChannel.pipeline().addLast("http-encoder", new HttpResponseEncoder());
                            socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                            socketChannel.pipeline().addLast("fileServerHandler", new HttpFileServerHandler(url));
                        }
                    });
            ChannelFuture future = bootstrap.bind(port).sync();
            System.out.println("file server start ...");
            System.out.println("port:" + port + ", url:" + url);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }

    @Test
    public void runFileServer() throws Exception {
        int port = 9993;
        new HttpFileServer().run(port, DEFAULT_URL);

    }
}

下载处理器

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;

import javax.activation.MimetypesFileTypeMap;
import java.io.File;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.util.regex.Pattern;

import static io.netty.handler.codec.http.HttpVersion.HTTP_1_0;

public class HttpFileServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private static final Pattern INSECURE_URI = Pattern.compile(".*[<>&\"].*");
    private static final Pattern ALLOW_FILE_NAME = Pattern.compile("[A-Za-z0-9][-_A-Za-z0-9\\.]*");

    private final String url;

    public HttpFileServerHandler(String url) {
        this.url = url;
    }

    //展示文件的目录
    private static void sendListing(ChannelHandlerContext ctx, File dir) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
        StringBuilder builder = new StringBuilder();
        builder.append("<html><body>\r\n");
        builder.append("list:\r\n");
        builder.append("<ul>");
        builder.append("<li> link:<a href=\"..\">..</a></li>\r\n");
        for (File f : dir.listFiles()) {
            if (!ALLOW_FILE_NAME.matcher(f.getName()).matches()) {
                continue;
            }
            builder.append("<li> link:<a href=\"").append(f.getAbsolutePath()).append("\">").append(f.getAbsolutePath()).append("</a></li>");
        }
        builder.append("</ul></body></html>\r\n");
        ByteBuf buffer = Unpooled.copiedBuffer(builder, StandardCharsets.UTF_8);
        response.content().writeBytes(buffer);
        buffer.release();
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        cause.printStackTrace();
        if (ctx.channel().isActive()) {
            sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (!request.decoderResult().isSuccess()) {
            sendError(ctx, HttpResponseStatus.BAD_REQUEST);
            return;
        }

        if (request.method() != HttpMethod.GET) {
            sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
            return;
        }

        final String uri = request.uri();
        final String path = sanitizeUri(uri);
        if (null == path) {
            sendError(ctx, HttpResponseStatus.FORBIDDEN);
            return;
        }

        File file = new File(path);
        if (!file.exists() || file.isHidden()) {
            sendError(ctx, HttpResponseStatus.NOT_FOUND);
            return;
        }

        if (file.isDirectory()) {
            if (uri.startsWith("/")) {
                sendListing(ctx, file);
            } else {
                sendRedirect(ctx, uri + "/");
            }
            return;
        }

        if (!file.isFile()) {
            sendError(ctx, HttpResponseStatus.FORBIDDEN);
            return;
        }

        RandomAccessFile randomAccessFile = null;
        try {
            randomAccessFile = new RandomAccessFile(file, "r");
        } catch (Exception e) {
            sendError(ctx, HttpResponseStatus.NOT_FOUND);
            return;
        }

        long length = randomAccessFile.length();
        HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        //设置content-length大小
        HttpUtil.setContentLength(httpResponse, length);
        setContentType(httpResponse, file);
        if (!HttpUtil.isKeepAlive(request)) {
            httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
        } else if (request.protocolVersion().equals(HTTP_1_0)) {
            httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        // Write the initial line and the header.
        ctx.write(httpResponse);

        // Write the content.
        //处理分段
        ChannelFuture sendFileFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(randomAccessFile, 8192)), ctx.newProgressivePromise());
        //显示进度,可以用稍大的文件实验
        sendFileFuture.addListener(new ChannelProgressiveFutureListener() {
            @Override
            public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
                if (total < 0) {
                    System.err.println("progress:" + progress);
                } else {
                    //!!可以用来测试
                    //TimeUnit.SECONDS.sleep(1);
                    System.out.println(String.format("progress: %s\t%s", progress, total));
                }
            }

            @Override
            public void operationComplete(ChannelProgressiveFuture future) throws Exception {
                System.out.println(future.channel() + "complete.");
            }
        });
        if (!HttpUtil.isKeepAlive(request)) {
            sendFileFuture.addListener(ChannelFutureListener.CLOSE);
        }
    }

    private void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
        response.content().writeCharSequence("失败", StandardCharsets.UTF_8);
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    //给地址转码
    private String sanitizeUri(String uri) {
        try {
            uri = URLDecoder.decode(uri, StandardCharsets.UTF_8.name());
        } catch (UnsupportedEncodingException e) {
            try {
                uri = URLDecoder.decode(uri, StandardCharsets.ISO_8859_1.name());
            } catch (UnsupportedEncodingException e1) {
                throw new Error();
            }
        }

        if (!uri.startsWith(url)) {
            uri = url + uri;
        }

        if (!uri.startsWith("/")) {
            return null;
        }
        uri = uri.replace('/', File.separatorChar);
        if (uri.contains(File.pathSeparator + ".")
                || uri.contains("." + File.separator)
                || uri.startsWith(".")
                || uri.endsWith(".")
                || INSECURE_URI.matcher(uri).matches()) {
            return null;
        }
        //这里用直接返回的方式
        return uri;
        //        return System.getProperty("user.dir") + File.separator + uri;
    }

    private void setContentType(HttpResponse response, File file) {
        MimetypesFileTypeMap mimetypesFileTypeMap = new MimetypesFileTypeMap();
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, mimetypesFileTypeMap.getContentType(file.getPath()));
    }

    private void sendRedirect(ChannelHandlerContext ctx, String newUri) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FOUND);
        response.headers().set(HttpHeaderNames.LOCATION, newUri);
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

}

下载大文件的输出结果:

file server start ...
port:9993, url:/home/me/download
progress: 3088384   1433994604
progress: 3088384   1433994604
progress: 3088384   1433994604
progress: 3088384   1433994604
progress: 3088384   1433994604
progress: 3129344   1433994604
progress: 3129344   1433994604
progress: 3129344   1433994604
progress: 3129344   1433994604
progress: 3129344   1433994604
progress: 3170304   1433994604
progress: 3170304   1433994604
progress: 3170304   1433994604
[id: 0x18c57827, L:0.0.0.0/0.0.0.0:9993 ! R:/127.0.0.1:42060]complete.

官网参考

github地址: netty
本次示例地址:示例

总结

以上就是本期的内容,最终我们达到了分段下载大文件的目的,确实还是实战才能知道是否好用。题外再说,之前也在找相关的例子,国内网上很多都是相同的抄来抄去,很少有自己的东西,大量的质量不好,期待能有个优质的内容分享平台。

上一篇下一篇

猜你喜欢

热点阅读