Oss多线程分片方式上传文件

2022-08-01  本文已影响0人  艺术类架构师

public class OssClientFactory {

public static OSSClient createOssClient(String endpoint,
String accessKeyId,
String accessKeySecret,
String bucket) {

  OSSClientBuilder ossClientBuilder=new OSSClientBuilder();
  // 创建OSSClient实例。
  OSSClient ossClient = (OSSClient) ossClientBuilder.build(endpoint, accessKeyId, accessKeySecret);
  ossClient.createBucket(bucket);
  return  ossClient;

}
}

package com.jielu.aliyun.oss;

import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.*;
import com.jielu.leetcode.NamedThreadFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;

/**

 public List<PartETag> genPartTag(List<UploadPartRequest> uploadPartResultList) throws InterruptedException {

     CountDownLatch countDownLatch = new CountDownLatch(uploadPartResultList.size());
     List<PartETag> partETags=new ArrayList<>(uploadPartResultList.size());
     for (int i = 1; i < uploadPartResultList.size(); i++) {
         int finalI = i;
         threadPoolExecutor.execute(() -> {
             try {
                 UploadPartResult  uploadPartResult= ossClient.uploadPart(uploadPartResultList.get(finalI));
                 partETags.add(uploadPartResult.getPartETag());
             } catch (Throwable e) {
                 throw new RuntimeException(e);
             }
         });
         countDownLatch.countDown();;
     }
     countDownLatch.await();
     threadPoolExecutor.shutdown();
     Collections.sort(partETags, (o1, o2) -> o1.getPartNumber()-o2.getPartNumber());
     return partETags;

 }

}

package com.jielu.aliyun.oss;

import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.*;
import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;

@Component
public class OssUtil {

/**
 * get the result url this url can be used for download
 * @param multipartFileList
 * @return
 * @throws IOException
 * @throws InterruptedException
 */
public List<String> uploadFileToOss(List<MultipartFile> multipartFileList){
    String endPoint = "", accessKeyId = "", accessKeySecret = "", bucket = "", bucketName = "";
    String objectName = "/lycol/upload/oss/" + FastDateFormat.getInstance("yyyyMMdd", Locale.CHINESE).format(new Date());

    OSSClient ossClient = OssClientFactory.createOssClient(endPoint, accessKeyId, accessKeySecret, bucket);
    InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, objectName);
    // 初始化分片。
    InitiateMultipartUploadResult uploadResult = ossClient.initiateMultipartUpload(request);
    String uploadId = uploadResult.getUploadId();
    List<String> res = new ArrayList<>();
    try {

    for (MultipartFile multipartFile : multipartFileList) {

        InputStream inputStream = multipartFile.getInputStream();
        SliceInputStream sliceInputStream = new SliceInputStream(inputStream, 1024);
        List<UploadPartRequest> uploadPartRequestList = sliceInputStream.genUploadPartRequestList(bucketName, objectName, uploadId);
        OssMultipleThreadUploadExecutor ossMultipleThreadUploadExecutor =
                new OssMultipleThreadUploadExecutor(ossClient);

        List<PartETag> partETags = ossMultipleThreadUploadExecutor.genPartTag(uploadPartRequestList);
        CompleteMultipartUploadRequest completeMultipartUploadRequest =
                new CompleteMultipartUploadRequest(bucketName, objectName, uploadId, partETags);

        CompleteMultipartUploadResult completeMultipartUploadResult =
                ossClient.completeMultipartUpload(completeMultipartUploadRequest);

        res.add(completeMultipartUploadResult.getLocation());
        IOUtils.close(inputStream);
    }
    }

    catch (Exception e){
        throw  new RuntimeException(e);
    }
    finally {
        ossClient.shutdown();
    }

    return res;

}

}

public class SliceInputStream {

private final int partSize;
private InputStream inputStream;


public SliceInputStream(InputStream inputStream,int partSize) {
    this.inputStream = inputStream;
    this.partSize=partSize;
}

public List<UploadPartRequest> genUploadPartRequestList(String bucketName,String objectName,String uploadId
                                                        ) throws IOException {

    final long partSize = this.partSize;
    int fileLength = inputStream.available();
    int partCount = (int) (fileLength / partSize);
    if (fileLength % partSize != 0) {
        partCount++;
    }
    List<UploadPartRequest> uploadPartRequestList = new ArrayList<>();
    for (int i = 0; i < partCount; i++) {
        long startPos = i * partSize;
        long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
        //skip position
        inputStream.skip(startPos);
        UploadPartRequest uploadPartRequest = new UploadPartRequest();
        uploadPartRequest.setBucketName(bucketName);
        uploadPartRequest.setKey(objectName);
        uploadPartRequest.setUploadId(uploadId);
        uploadPartRequest.setInputStream(inputStream);
        uploadPartRequest.setPartSize(curPartSize);
        uploadPartRequestList.add(uploadPartRequest);
    }
    return uploadPartRequestList;

}

public class NamedThreadFactory implements ThreadFactory {

protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);

protected final AtomicInteger mThreadNum = new AtomicInteger(1);

protected final String mPrefix;

protected final boolean mDaemon;

protected final ThreadGroup mGroup;

public NamedThreadFactory() {
    this("pool-" + POOL_SEQ.getAndIncrement(), false);
}

public NamedThreadFactory(String prefix) {
    this(prefix, false);
}

public NamedThreadFactory(String prefix, boolean daemon) {
    mPrefix = prefix + "-thread-";
    mDaemon = daemon;
    SecurityManager s = System.getSecurityManager();
    mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}

@Override
public Thread newThread(Runnable runnable) {
    String name = mPrefix + mThreadNum.getAndIncrement();
    Thread ret = new Thread(mGroup, runnable, name, 0);
    ret.setDaemon(mDaemon);
    return ret;
}

public ThreadGroup getThreadGroup() {
    return mGroup;
}

详情请看Git地址:https://github.com/LycolLoveLucy/mixedMutiple

上一篇下一篇

猜你喜欢

热点阅读