JAVA-BigFileDownloader

2019-12-06  本文已影响0人  jiahzhon

一.bigFileDownloader.class

import utils.Tools;

import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 大文件下载器
 */
public class BigFileDownloader {

    protected final URL requestURL;
    protected final long fileSize;
    /**
     * 负责已下载数据的存储
     */
    protected final Storage storage;
    protected final AtomicBoolean taskCanceled = new AtomicBoolean(false);

    public BigFileDownloader(String strURL) throws Exception {
        requestURL = new URL(strURL);
        //获取待下载资源的大小(单位字节)
        fileSize = retieveFileSize(requestURL);
        System.out.println("file total size:"+Long.toString(fileSize));
        String fileName = strURL.substring(strURL.lastIndexOf('/')+1);
        //创建负责存储已下载数据的对象
        storage = new Storage(fileSize,fileName);
    }

    public void download(int taskCount,long reportInterval) throws Exception{

        long chunkSizePerThread = fileSize/taskCount;
        // 下载数据段的起始字节
        long lowerBound = 0;
        // 下载数据段的结束字段
        long upperBound = 0;
        DownloadTask dt;
        for(int i = taskCount -1 ; i>=0; i--){
            lowerBound = i * chunkSizePerThread;
            if (i == taskCount - 1){
                upperBound = fileSize;
            }else{
                upperBound = lowerBound + chunkSizePerThread-1;
            }
            //创建下载任务
            dt = new DownloadTask(lowerBound, upperBound ,requestURL,storage,taskCanceled);
            dispatchWork(dt,i);
        }
        //定时报告下载进度
        reportProgress(reportInterval);
        //清理程序占用的资源
        doCleanup();
    }

    protected void doCleanup(){
        Tools.silentClose(storage);
    }

    protected  void cancelDownload(){
        if(taskCanceled.compareAndSet(false,true)){
            doCleanup();
        }
    }

    protected void dispatchWork(final DownloadTask dt,int workerIndex) {
        //创建下载线程
        Thread workerThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    dt.run();
                }catch (Exception e){
                    //取消整个文件的下载
                    cancelDownload();
                }
            }
        });
        workerThread.setName("downloader-" + workerIndex);
        workerThread.start();
    }

    //根据指定的URL获取相应文件的大小
    private  static long retieveFileSize(URL requestURL) throws Exception{
        long size = -1;
        HttpURLConnection conn = null;
        try {
            conn = (HttpURLConnection) requestURL.openConnection();

            conn.setRequestMethod("HEAD");
            conn.setRequestProperty("Connection", "Keep-alive");
            conn.connect();
            int statusCode = conn.getResponseCode();
            if (HttpURLConnection.HTTP_OK != statusCode) {
                throw new Exception("Server exception,status code:" + statusCode);
            }

            String cl = conn.getHeaderField("Content-Length");
            size = Long.valueOf(cl);
        } finally {
            if (null != conn) {
                conn.disconnect();
            }
        }
        return size;
    }

    //报告下载进度
    private void reportProgress(long reportInterval) throws InterruptedException{
        float lastCompletion;
        int completion = 0;
        while (!taskCanceled.get()) {
            lastCompletion = completion;
            completion = (int) (storage.getTotalWrites() * 100 / fileSize);
            if (completion == 100) {
                break;
            } else if (completion - lastCompletion >= 1) {
                System.out.println("Completion:%"+completion+"%%");
                if (completion >= 90) {
                    reportInterval = 1000;
                }
            }
            Thread.sleep(reportInterval);
        }
        System.out.println("Completion:%"+completion+"%%");
    }

}

二.Storage.class

import utils.Tools;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicLong;

public class Storage implements Closeable,AutoCloseable {
    private final RandomAccessFile storeFile;
    private final FileChannel storeChannel;
    protected final AtomicLong totalWrites = new AtomicLong(0);

    public Storage(long fileSize,String fileShortName) throws IOException{
        String fullFileName = System.getProperty("java.io.tmpdir" + "/"+ fileShortName);
        String localFileName;
        localFileName = createStoreFile(fileSize,fullFileName);
        storeFile = new RandomAccessFile(localFileName,"rw");
        storeChannel = storeFile.getChannel();
    }

    /**
     * 将data中指定的数据写入文件
     *
     * @param offset
     *          写入数据在整个文件中的起始偏移位置
     * @param byteBuf
     *          byteBuf必须在该方法调用前执行 byteBuf.flip()
     * @return 写入文件的数据长度
     * @throws IOException
     */
    public int store(long offset, ByteBuffer byteBuf) throws IOException{
        int length;
        storeChannel.write(byteBuf,offset);
        length = byteBuf.limit();
        totalWrites.addAndGet(length);
        return length;
    }

    public long getTotalWrites(){
        return totalWrites.get();

    }

    private String createStoreFile(final long fileSize,String fullFileName) throws IOException{
        File file = new File(fullFileName);
        System.out.println("create local file:"+fullFileName);
        RandomAccessFile raf;
        //以读、写方式打开,支持文件的读取或写入。若文件不存在,则创建之。
        raf = new RandomAccessFile(file,"rw");
        try {
            raf.setLength(fileSize);
        }finally {
            Tools.silentClose(raf);
        }
        return fullFileName;
    }

    @Override
    public synchronized void close() throws IOException {
        if(storeChannel.isOpen()){
            Tools.silentClose(storeChannel,storeFile);
        }
    }
}

1.NIO的channel

区别 stream channel
支持异步 不支持 支持
是否可双向传输数据 不能,只能单向 可以,既可以从通道读取数据,也可以向通道写入数据
是否结合Buffer使用 必须结合buffer使用
性能 较低 较高

2.channel简易文件复制

public static void copyFileUseNIO(String src,String dst) throws IOException{
//声明源文件和目标文件
        FileInputStream fi=new FileInputStream(new File(src));
        FileOutputStream fo=new FileOutputStream(new File(dst));
        //获得传输通道channel
        FileChannel inChannel=fi.getChannel();
        FileChannel outChannel=fo.getChannel();
        //获得容器buffer
        ByteBuffer buffer=ByteBuffer.allocate(1024);
        while(true){
            //判断是否读完文件
            int eof =inChannel.read(buffer);
            if(eof==-1){
                break;  
            }
            //重设一下buffer的position=0,limit=position
            buffer.flip();
            //开始写
            outChannel.write(buffer);
            //写完要重置buffer,重设position=0,limit=capacity
            buffer.clear();
        }
        inChannel.close();
        outChannel.close();
        fi.close();
        fo.close();
}  

三.DownloadTask.class

import utils.Tools;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.atomic.AtomicBoolean;

public class DownloadTask implements Runnable{
    private final long lowerBound;
    private final long upperBound;
    private final DownloadBuffer xbuf;
    private final URL requestURL;
    private final AtomicBoolean cancelFlag;

    public DownloadTask(long lowerBound,long upperBound,URL requestURL,
                        Storage storage, AtomicBoolean cancelFlag){
        this.lowerBound = lowerBound;
        this.upperBound = upperBound;
        this.requestURL = requestURL;
        this.xbuf = new DownloadBuffer(lowerBound,upperBound,storage);
        this.cancelFlag = cancelFlag;
    }

    //对指定的URL发起HTTP分段下载请求
    private static InputStream issueRequest(URL requestURL, long lowerBound,
                                            long upperBound) throws IOException{
        Thread me = Thread.currentThread();
        System.out.println(me + "->[" + lowerBound + "," + upperBound + "]");
        final HttpURLConnection conn;
        InputStream in = null;
        conn = (HttpURLConnection) requestURL.openConnection();
        String strConnTimeout = System.getProperty("x.dt.conn.timeout");
        int connTimeout = null == strConnTimeout ? 60000 : Integer
                .valueOf(strConnTimeout);
        conn.setConnectTimeout(connTimeout);

        String strReadTimeout = System.getProperty("x.dt.read.timeout");
        int readTimeout = null == strReadTimeout ? 60000 : Integer
                .valueOf(strReadTimeout);
        conn.setReadTimeout(readTimeout);

        conn.setRequestMethod("GET");
        conn.setRequestProperty("Connection", "Keep-alive");
        // Range: bytes=0-1024
        conn.setRequestProperty("Range", "bytes=" + lowerBound + "-" + upperBound);
        conn.setDoInput(true);
        conn.connect();

        int statusCode = conn.getResponseCode();
        if (HttpURLConnection.HTTP_PARTIAL != statusCode) {
            conn.disconnect();
            throw new IOException("Server exception,status code:" + statusCode);
        }
        System.out.println(me + "-Content-Range:" + conn.getHeaderField("Content-Range")
                + ",connection:" + conn.getHeaderField("connection"));
        in = new BufferedInputStream(conn.getInputStream()) {
            @Override
            public void close() throws IOException {
                try {
                    super.close();
                } finally {
                    conn.disconnect();
                }
            }
        };

        return in;
    }

    @Override
    public void run() {
        if(cancelFlag.get()){
            return;
        }

        ReadableByteChannel channel = null;
        try {
            channel = Channels.newChannel(issueRequest(requestURL,lowerBound,upperBound));
            ByteBuffer buf = ByteBuffer.allocate(1024);
            while(!cancelFlag.get()&&channel.read(buf)>0){
                //将从网络读取的数据写入缓冲区
                xbuf.write(buf);
                buf.clear();
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }finally {
            Tools.silentClose(channel,xbuf);
        }
    }
}

四.DownloadBuffer.class

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;

public class DownloadBuffer implements Closeable {
    /**
     * 当前Buffer中缓冲的数据相对于整个存储文件的位置偏移
     */
    private long globalOffset;
    private long upperBound;
    private int offset = 0;
    public final ByteBuffer byteBuf;
    private final Storage storage;

    public DownloadBuffer(long globalOffset,long upperBound,final Storage storage){
        this.globalOffset = globalOffset;
        this.upperBound = upperBound;
        this.byteBuf = ByteBuffer.allocate(1024*1024);
        this.storage = storage;
    }

    public void write(ByteBuffer buf) throws IOException{
        int length = buf.position();
        final int capacity = byteBuf.capacity();


        //当前缓冲区已满,或者剩余容量不够容纳新数据
        if((offset + length) > capacity || length ==capacity){
            // 将缓冲区中的数据写入文件
            flush();
        }
    }

    public void flush() throws IOException{
        int length;
        byteBuf.flip();
        length = storage.store(globalOffset,byteBuf);
        byteBuf.clear();
        globalOffset += length;
        offset = 0;
    }

    @Override
    public void close() throws IOException {
        if(globalOffset< upperBound){
            flush();
        }
    }
}

五.silentClose

public static void silentClose(Closeable... closeable) {
    if (null == closeable) {
        return;
    }
    for (Closeable c : closeable) {
        if (null == c) {
            continue;
        }
        try {
            c.close();
        } catch (Exception ignored) {
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读