实习总结:文件迁移

2017-11-28  本文已影响0人  都让你们叫老了

根据项目需求,将文件从FDFS,SASS迁移到azure。此文章记录了java对三个服务器文件上传下载简单的实现。

@Controller
@RequestMapping("/file")
public class FileFloadMigrationController extends BaseController {

    private static Logger log = LoggerFactory.getLogger(FileServiceImpl.class);
    @Autowired
    @Qualifier("fileUploadServiceImpl")
    private FileService fileService;

    @Autowired
    private FileUploadedToDao fileUploadedToDao;

    @Autowired(required = false)
    protected MongoTemplate mongoTemplate;

    @Autowired
    private FileMapDao fileMapDao;

    @Autowired
    private FileMapService fileMapService;

    @Autowired
    private FileUploadedService fileUploadedService;

    /**
     * 文件迁移
     * 
     * @Title: fileMigration
     * @param state
     *            状态:fail:失败 upload:上传
     * @param module
     *            模块名 module=all:所有模块
     * @param lastModifyTime
     *            最后修改时间:存在该项时证明要导入新增数据
     * @return
     * @throws Exception
     */
    @RequestMapping(value = "/filefload/{state}/{module}", method = RequestMethod.GET)
    public @ResponseBody Rest fileMigration(@PathVariable String state, @PathVariable String module, String lastModifyTime) throws Exception {
        new HttpClient().getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(0, false));
        List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
        List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
        Map<Integer, Object> result = new HashMap<Integer, Object>();
        // 上传失败后
        if (state.equals("fail")) {
            // 从新上传所有模块中失败数据
            if (module.equals("all")) {
                findFileUploadedTos = getFileUploadsFail();
            } else {
                // 按模块上传
                findFileUploadedTos = getFileUploadsFail(module);
            }

            result = uploadFileFail(findFileUploadedTos);

        } else if (state.equals("upload")) {
            if (module.equals("all")) {
                // 上传所有模块中数据
                fileUploadeds = getFileUploadsNomal();
            } else {
                // 按模块上传数据
                fileUploadeds = getFileUploadsNomal(module);
            }
            // 最后修改时间不为空,需要上传新文件
            // 比如2017-11-09日转移一次后,11日需要在转移。此时lastModifyTime 需要写为2017-11-09
            if (lastModifyTime != null && !lastModifyTime.equals("")) {
                fileUploadeds = getFileUploadTosByLastModify(lastModifyTime);
            }

            result = uploadFileNomal(fileUploadeds);
        }

        return Rest.item(StateCode.STATUS_CODE_SUCCESS, result);
    }

    /**
     * 正常情况下文件迁移,不按照模块。所有模块
     * 
     * @return List<FileUploaded>
     */
    private List<FileUploaded> getFileUploadsNomal() {
        List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
        Query query = new Query();
        fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
        return fileUploadeds;
    }

    /**
     * 正常情况下文件迁移,按照模块
     * 
     * @Title: getFileUploadsNomal
     * @param module
     *            模块名字
     * @return List<FileUploaded>
     */
    private List<FileUploaded> getFileUploadsNomal(String module) {
        List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
        Query query = new Query();
        Criteria criteria = Criteria.where("module").is(module);
        query.addCriteria(criteria);
        fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
        return fileUploadeds;
    }

    /**
     * 上传过程中出现失败情况。从新上传。所有模块
     * 
     * @Title: getFileUploadsFail
     * @return List<FileUploadedTo>
     */
    private List<FileUploadedTo> getFileUploadsFail() {
        List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
        Query query = new Query();
        Criteria criteria = Criteria.where("isSuccess").is(false);
        query.addCriteria(criteria);
        findFileUploadedTos = mongoTemplate.find(query, FileUploadedTo.class);
        return findFileUploadedTos;
    }

    /**
     * 上传过程中出现失败情况。从新上传。按模块进行上传
     * 
     * @Title: getFileUploadsFail
     * @param module
     *            模块名
     * @return List<FileUploadedTo>
     */
    private List<FileUploadedTo> getFileUploadsFail(String module) {
        List<FileUploadedTo> findFileUploadedTos = new ArrayList<FileUploadedTo>();
        Query query = new Query();
        Criteria criteria = Criteria.where("fileUploaded.module").is(module).and("isSuccess").is(false);
        query.addCriteria(criteria);
        findFileUploadedTos = mongoTemplate.find(query, FileUploadedTo.class);
        return findFileUploadedTos;
    }

    /**
     * 上传部分后,出现新增数据。根据文件最后更新时间,进行操作
     * 
     * @Title: getFileUploadTosByLastModify
     * @param lastModifyTime
     *            最后修改时间
     * @return List<FileUploadedTo>
     * @throws ParseException 
     */
    private List<FileUploaded> getFileUploadTosByLastModify(String lastModifyTime) throws ParseException {
        List<FileUploaded> fileUploadeds = new ArrayList<FileUploaded>();
        Query query = new Query();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        Criteria criteria = Criteria.where("lastModify").gte(sdf.parse(lastModifyTime));
        query.addCriteria(criteria);
        fileUploadeds = mongoTemplate.find(query, FileUploaded.class);
        return fileUploadeds;
    }

    /**
     * 正常情况下上传文件主方法
     * 
     * @Title: uploadFileNomal
     * @param fileUploadeds
     * @return
     * @throws Exception
     */
    private Map<Integer, Object> uploadFileNomal(List<FileUploaded> fileUploadeds) throws Exception {
        Map<Integer, Object> map = new HashMap<Integer, Object>();
        List<FileUploadedTo> fileUploadedTos = new ArrayList<FileUploadedTo>();
        if (fileUploadeds == null || fileUploadeds.size() == 0) {
            return map;
        }
        System.out.println("共有文件:" + fileUploadeds.size() + "条");
        long startTime = System.currentTimeMillis();
        // 将表中的数据复制到扩展表中(将文件所有信息复制、设置org为要迁移文件的org、设置迁移失败(false))
        for (FileUploaded fileUploaded : fileUploadeds) {
            FileUploadedTo fileUploadedTo = new FileUploadedTo();
            fileUploadedTo.setFileUploaded(fileUploaded);
            fileUploadedTo.setOrg(fileUploaded.getOrg());
            fileUploadedTo.setSuccess(false);
            fileUploadedTos.add(fileUploadedTo);
        }
        // 保存复制表
        fileUploadedToDao.save(fileUploadedTos);

        uploadFileCurrency(fileUploadedTos);

        long endTime = System.currentTimeMillis();
        System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
        return map;
    }

    /**
     * 上传失败后重新上传主方法
     * 
     * @Title: uploadFileFail
     * @param fileUploadedTos
     * @return
     */
    private Map<Integer, Object> uploadFileFail(List<FileUploadedTo> fileUploadedTos) throws Exception {
        Map<Integer, Object> map = new HashMap<Integer,Object>();
        
        uploadFileCurrency(fileUploadedTos);
        
        return map;
    }

    /**
     * 上传文件通用方法
     * 
     * @Title: uploadFileCurrency
     * @param fileUploadedTos
     */

    private void uploadFileCurrency(List<FileUploadedTo> fileUploadedTos) {
        int i = 0;
        for (FileUploadedTo fileUploadedTo : fileUploadedTos) {
            String path = fileUploadedTo.getFileUploaded().getPath();
            
            if (path != null && path.indexOf("group") != -1) {
                byte fileByte[] = fileService.getFileBytes(path);
                if (fileByte == null) {
                    System.out.println("第" + i++ + "个 fileByte不存在,path是" + path);
                } else {
                    if (path.indexOf(".pdf.") == -1) {// 不是从文件
                        uploadFile(fileUploadedTo, fileByte);
                    } else {// 是从文件
                        uploadSlaveFile(fileByte, "file.pdf", fileUploadedTo);
                    }
                }
            }
        }
    }

    /**
     * 上传文件到分布式系统,并写入文件 文件限制大小 上传的文件可以直接通过id使用特定接口访问 上传失败return null
     * 
     * @param fileBytes
     *            文件
     * @param name
     *            文件名称
     * @param module
     *            模块标示
     * @return
     */
    private void uploadFile(FileUploadedTo fileUploadedTo, byte[] fileBytes) {
        // 获取fileUpload
        FileUploaded fileUploaded = fileUploadedTo.getFileUploaded();
        // 获取模板名
        String module = fileUploaded.getModule();
        // 获取当前文件信息内的org
        String org = fileUploaded.getOrg();
        // 获取文件名
        String name = fileUploaded.getName();
        // 获取文件ID
        String fileId = fileUploaded.getId();
        if (!isLegal(module)) {
            log.error("module ilegal: 模块名称非法");
        }
        try {
            // 长度小于3报错
            if (org.length() < 4) {
                org = "org" + org;
            }
            CloudBlockBlob blob = BlobClient.uploadFile(fileBytes, org, module, name);
            // 如果上传成功会返回一个Blob对象(不为空)
            // 那么此时就将上传后的新路径以及是否上传成功状态进行更改。
            if (blob != null) {
                // 设置更新成功
                fileUploadedTo.setSuccess(true);
                // 设置新路径
                String newPath = BlobClient.getPath(org, blob.getName());
                fileUploadedTo.setNewPath(newPath);
                fileUploadedToDao.update(fileUploadedTo);

                // 更新fileUpload路径。
                fileUploaded.setPath(newPath);
                mongoTemplate.save(fileUploaded);

                // 更新filemap路径
                FileMap fileMap = findByFileId(fileId);
                if (fileMap != null) {
                    fileMap.setFilePath(newPath);
                    fileMapService.update(fileMap);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    /**
     * 根据fileId 获取 fileMap。
     * 
     * @Title: findByFileId
     * @param id
     *            文件ID
     * @return
     * @throws
     */
    private FileMap findByFileId(String id) {
        Query query = new Query(Criteria.where("fileId").is(id));
        return fileMapDao.findOne(query);
    }

    /**
     * 校验模块是否合法<br/>
     * 模块名在properties中存在即合法
     * 
     * @param module
     *            模块名称
     * @return 合法返回true ,非法返回false
     */
    private boolean isLegal(String module) {
        if (module.indexOf('.') >= 0) {
            return false;
        }
        String v = PropertiesUtils.getString("file.path." + module);
        if (v == null || v.length() < 1) {
            return false;
        } else {
            return true;
        }
    }

    /**
     * 上传从文件主方法
     * 
     * @Title: uploadSlaveFile
     * @param fileBytes
     *            从文件字节大小
     * @param prefix_name
     *            从文件前缀
     * @param fileUploadedTo
     *            fileUploaded 复制表
     */
    private void uploadSlaveFile(byte[] fileBytes, String prefix_name, FileUploadedTo fileUploadedTo) {
        String org = fileUploadedTo.getFileUploaded().getOrg();
        FileUploaded master = fileUploadedTo.getFileUploaded();
        String preName = master.getName().substring(0, master.getName().lastIndexOf("."));
        prefix_name = BlobClient.getPath(preName.concat("_cut"), preName.concat(".pdf"));
        System.out.println(prefix_name);
        try {
            CloudBlockBlob blob = BlobClient.uploadFile(fileBytes, org, master.getModule(), prefix_name);
            if (blob != null) {
                fileUploadedTo.setSuccess(true);
            }
            fileUploadedTo.setNewPath(BlobClient.getPath(org, blob.getName()));
        } catch (InvalidKeyException | URISyntaxException | StorageException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        fileUploadedToDao.update(fileUploadedTo);
    }
}

上面的类首先获取数据库中的文件,然后根据文件的路径获取文件的字节。如果路径符合规范且字节不为空,那么就将该文件上传。下面的方法是上传到Azure的方法。

Azure Storage 是微软 Azure 云提供的云端存储解决方案,当前支持的存储类型有 Blob、Queue、File 和 Table。

Azure Blob Storage 是用来存放大量的像文本、图片、视频等非结构化数据的存储服务。我们可以在任何地方通过互联网协议 http 或者 https 访问 Blob Storage。简单说,就是把文件放在云上,给它一个 URL,通过这个 URL 来访问文件。这就涉及到一个问题:如何控制访问权限?答案是我们可以根据自己的需要,设置 Blob 对象是只能被自己访问,还是可以被所有人访问。

下面是 Blog Storage 典型的应用场景:

    static final String connectionString = String.format(
            "DefaultEndpointsProtocol=https;AccountName=%s;AccountKey=%s;EndpointSuffix=core.chinacloudapi.cn",
            storageAccountName, storageAccountKey);
    /**
     * 通过容器的方式实现上传二进制文件
     * @throws InvalidKeyException 
     * @time:2017年9月22日 上午10:40:19
     */
    public static CloudBlockBlob uploadFile(byte[] fileBytes,String org,String module,String fileName) throws URISyntaxException, StorageException, IOException, InvalidKeyException{
        // CloudStorageAccount 类表示一个 Azure Storage Account,我们需要先创建它的实例,才能访问属于它的资源。
        // 注意连接字符串中的xxx和yyy,分别对应Access keys中的Storage account name 和 key。
        CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);

        // Create the blob client object.
        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
        CloudBlobContainer container = blobClient.getContainerReference(org);
        container.createIfNotExists(); // 创建一个容器(如果该容器不存在)
        // 保存到azure上的文件的名称,由于azure服务器对重名文件进行覆盖处理,
        // 上传时对文件名称进行唯一标识处理,缺点是在文件服务器看不到文件的真识名称了。
        UUID token = UUID.randomUUID();
        int postFixIndex = fileName.lastIndexOf(".");
        String fileFullName = "";
        String postFix = "";
        String startName = "";
        if (postFixIndex > 0) {
            postFix = fileName.substring(postFixIndex + 1);
            startName = fileName.substring(0, postFixIndex);
        }else {
            startName = fileName;
        }

        /*azure服务器存储相同文件名的文件会覆盖之前的,因此需要对文件名进行处理
        存储到azure上的文件名格试为:原文件名+"_"+uuid+扩展名
        */
        fileFullName = startName.concat("_").concat(token.toString()).concat(".").concat(postFix);


        /*getPath(module,fileFullName) 为放在 container 中的 Blob 的连接路径。
          getBlockBlobReference 方法获得一个 Block 类型的 Blob 对象的引用。
          您可以根据应用的需要,分别调用 getBlobReference,getAppendBlobReference 或 getPageBlobReference 来创建不同类型的 Blob 对象。
        */
        CloudBlockBlob blob = container.getBlockBlobReference(getPath(module,fileFullName));
        try
        {
            InputStream stream = new ByteArrayInputStream(fileBytes);
          //blob.upload(stream, (long) fileBytes.length);
            PutBlock(blob,stream);
        }
        catch (StorageException e)
        {
            return null;
        }
      return blob;
    }

大文件分块上传:

  /**
    *文件过大,分块上传
    * @Title: PutBlock
    * @param blob
    * @param stream
    * @throws StorageException
    * @throws IOException
     */
@SuppressWarnings({ "unchecked", "rawtypes" })
    public static void PutBlock(CloudBlockBlob blob,InputStream stream) throws StorageException, IOException  
    {  
        Iterable<BlockEntry> blockList = new ArrayList<BlockEntry>();
        int len = stream.available();
        int bytesRead = 0;
        int cur_read_len = STEP_LENGTH;
        byte[] b = null;
        int index = 0;
        if (len <= STEP_LENGTH) {// 如果文件没有超过设定长度,一次上传上即可
            blob.upload(stream, (long) len);
        } else {// 如果文件太大,分批上传
            while (true) {
                if (len - bytesRead > STEP_LENGTH) {
                    cur_read_len = STEP_LENGTH;
                } else {
                    cur_read_len = len - bytesRead;
                }
                b = new byte[cur_read_len + 1];
                int bytesReadLength = 0;
                bytesReadLength = stream.read(b, 0, cur_read_len);
                if (bytesReadLength == -1){// end of InputStream
                    blob.commitBlockList(blockList);
                    break;
                }
                bytesRead += bytesReadLength;
                if (bytesRead <= len) {
                    try {
                        String blockId = Base64.getEncoder().encodeToString(String.format("%08d",index).getBytes(StandardCharsets.UTF_8));
                        blob.uploadBlock(blockId, new ByteArrayInputStream(b),Long.valueOf(bytesReadLength));
                        ((AbstractCollection) blockList).add(new BlockEntry(blockId));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    
                } 
                index++;
            }
        }
      
    }
上一篇下一篇

猜你喜欢

热点阅读