Oss多线程分片方式上传文件
public class OssClientFactory {
public static OSSClient createOssClient(String endpoint,
String accessKeyId,
String accessKeySecret,
String bucket) {
OSSClientBuilder ossClientBuilder=new OSSClientBuilder();
// 创建OSSClient实例。
OSSClient ossClient = (OSSClient) ossClientBuilder.build(endpoint, accessKeyId, accessKeySecret);
ossClient.createBucket(bucket);
return ossClient;
}
}
package com.jielu.aliyun.oss;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.*;
import com.jielu.leetcode.NamedThreadFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
/**
-
OSS Mul-Thread upload file
*/
public class OssMultipleThreadUploadExecutor {private final OSSClient ossClient;
public OssMultipleThreadUploadExecutor( OSSClient ossClient){
this.ossClient=ossClient;
}final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
10,
50,
1000 * 60,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory("thread-oss-upload-file"),
(r, executor) -> r.run()
);
public List<PartETag> genPartTag(List<UploadPartRequest> uploadPartResultList) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(uploadPartResultList.size());
List<PartETag> partETags=new ArrayList<>(uploadPartResultList.size());
for (int i = 1; i < uploadPartResultList.size(); i++) {
int finalI = i;
threadPoolExecutor.execute(() -> {
try {
UploadPartResult uploadPartResult= ossClient.uploadPart(uploadPartResultList.get(finalI));
partETags.add(uploadPartResult.getPartETag());
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
countDownLatch.countDown();;
}
countDownLatch.await();
threadPoolExecutor.shutdown();
Collections.sort(partETags, (o1, o2) -> o1.getPartNumber()-o2.getPartNumber());
return partETags;
}
}
package com.jielu.aliyun.oss;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.*;
import org.apache.commons.lang3.time.FastDateFormat;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Locale;
@Component
public class OssUtil {
/**
* get the result url this url can be used for download
* @param multipartFileList
* @return
* @throws IOException
* @throws InterruptedException
*/
public List<String> uploadFileToOss(List<MultipartFile> multipartFileList){
String endPoint = "", accessKeyId = "", accessKeySecret = "", bucket = "", bucketName = "";
String objectName = "/lycol/upload/oss/" + FastDateFormat.getInstance("yyyyMMdd", Locale.CHINESE).format(new Date());
OSSClient ossClient = OssClientFactory.createOssClient(endPoint, accessKeyId, accessKeySecret, bucket);
InitiateMultipartUploadRequest request = new InitiateMultipartUploadRequest(bucketName, objectName);
// 初始化分片。
InitiateMultipartUploadResult uploadResult = ossClient.initiateMultipartUpload(request);
String uploadId = uploadResult.getUploadId();
List<String> res = new ArrayList<>();
try {
for (MultipartFile multipartFile : multipartFileList) {
InputStream inputStream = multipartFile.getInputStream();
SliceInputStream sliceInputStream = new SliceInputStream(inputStream, 1024);
List<UploadPartRequest> uploadPartRequestList = sliceInputStream.genUploadPartRequestList(bucketName, objectName, uploadId);
OssMultipleThreadUploadExecutor ossMultipleThreadUploadExecutor =
new OssMultipleThreadUploadExecutor(ossClient);
List<PartETag> partETags = ossMultipleThreadUploadExecutor.genPartTag(uploadPartRequestList);
CompleteMultipartUploadRequest completeMultipartUploadRequest =
new CompleteMultipartUploadRequest(bucketName, objectName, uploadId, partETags);
CompleteMultipartUploadResult completeMultipartUploadResult =
ossClient.completeMultipartUpload(completeMultipartUploadRequest);
res.add(completeMultipartUploadResult.getLocation());
IOUtils.close(inputStream);
}
}
catch (Exception e){
throw new RuntimeException(e);
}
finally {
ossClient.shutdown();
}
return res;
}
}
public class SliceInputStream {
private final int partSize;
private InputStream inputStream;
public SliceInputStream(InputStream inputStream,int partSize) {
this.inputStream = inputStream;
this.partSize=partSize;
}
public List<UploadPartRequest> genUploadPartRequestList(String bucketName,String objectName,String uploadId
) throws IOException {
final long partSize = this.partSize;
int fileLength = inputStream.available();
int partCount = (int) (fileLength / partSize);
if (fileLength % partSize != 0) {
partCount++;
}
List<UploadPartRequest> uploadPartRequestList = new ArrayList<>();
for (int i = 0; i < partCount; i++) {
long startPos = i * partSize;
long curPartSize = (i + 1 == partCount) ? (fileLength - startPos) : partSize;
//skip position
inputStream.skip(startPos);
UploadPartRequest uploadPartRequest = new UploadPartRequest();
uploadPartRequest.setBucketName(bucketName);
uploadPartRequest.setKey(objectName);
uploadPartRequest.setUploadId(uploadId);
uploadPartRequest.setInputStream(inputStream);
uploadPartRequest.setPartSize(curPartSize);
uploadPartRequestList.add(uploadPartRequest);
}
return uploadPartRequestList;
}
public class NamedThreadFactory implements ThreadFactory {
protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1);
protected final AtomicInteger mThreadNum = new AtomicInteger(1);
protected final String mPrefix;
protected final boolean mDaemon;
protected final ThreadGroup mGroup;
public NamedThreadFactory() {
this("pool-" + POOL_SEQ.getAndIncrement(), false);
}
public NamedThreadFactory(String prefix) {
this(prefix, false);
}
public NamedThreadFactory(String prefix, boolean daemon) {
mPrefix = prefix + "-thread-";
mDaemon = daemon;
SecurityManager s = System.getSecurityManager();
mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup();
}
@Override
public Thread newThread(Runnable runnable) {
String name = mPrefix + mThreadNum.getAndIncrement();
Thread ret = new Thread(mGroup, runnable, name, 0);
ret.setDaemon(mDaemon);
return ret;
}
public ThreadGroup getThreadGroup() {
return mGroup;
}