springboot

SpringBoot集成camel-ftp读取ftp文件并解压

2018-07-12  本文已影响1179人  搁浅_Jay

SpringBoot集成camel-ftp读取ftp文件并解压

1、在pom.xml文件中添加依赖

这里使用的是spring boot版本是1.5.9 JDk1.7

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-spring-boot-starter</artifactId>
    <version>2.17.1</version>
</dependency>

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-ftp</artifactId>
    <version>2.17.1</version>
</dependency>

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

2、在application.yml中添加配置

使用filter=#filterName添加过滤器; 配置ftp服务连接的用户名和密码,还有要消费的路径

camel:
  springboot:
    main-run-controller: true #监听Ftp服务器的时候,为了让java进程在后台运行
ftp:
  wifi-info:
    url: 127.0.0.1
    username: test
    password: 123456
    dir: /wifi_info
    server-info: ftp://${ftp.wifi-info.url}:21${ftp.wifi-info.dir}?username=${ftp.wifi-info.username}&password=${ftp.wifi-info.password}&delay=2s&readLock=rename&include=.*zip&filter=#wifiDownloadFileFilter
    local-save-dir: E:/ftpdata/mac/save
    unzip-temp-dir: E:/ftpdata/mac/temp
    local-files-prefix: /localFilesList-  #已下载文件列表的前缀
    local-files-suffix: .txt   #已下载文件列表的后缀

3、创建过滤器 实现org.apache.camel.component.file.GenericFileFilter接口,实现accept方法;并声明到Spring容器中;

import com.lilian.utils.FileUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileFilter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * FTP文件路由过滤器,
 * 将只下载当天,且下载目录中不存在的文件
 */
@Slf4j
@Component
public class WifiDownloadFileFilter implements GenericFileFilter<Object> {

    private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");

    @Value("${ftp.wifi-info.unzip-temp-dir}")
    private String tempPath;

    @Value("${ftp.wifi-info.local-files-prefix}")
    private String filePrefix;

    @Value("${ftp.wifi-info.local-files-suffix}")
    private String fileSuffix;

    /**
     * 过滤下载文件
     * @param genericFile
     * @return true=下载,false=不下载
     */
    @Override
    public boolean accept(GenericFile<Object> genericFile) {
        long lastModified = genericFile.getLastModified();
        String fileName = genericFile.getFileName();
        return isLatestFile(lastModified) && !isInLocalDir(fileName) ? true : false;
    }

    /**
     * 文件是否已在本地目录中
     * @param fileName
     * @return  true=不存在   false=已存在了
     */
    private boolean isInLocalDir(String fileName) {
        try {

            //获取本地文件夹中已下载的文件名
            File fileDir = new File(tempPath);
            if (!fileDir.exists()) {
                fileDir.mkdir();
            }
            String path = tempPath + filePrefix + simpleDateFormat.format(new Date()) + fileSuffix;
            File file = new File(path);

            if (!file.exists()) {//如果不存在就创建
                file.createNewFile();
                FileUtil.appendMethod(path, fileName + "\r\n");
                return true;
            }

            List<String> localFileNames = FileUtil.readFileByLines(file);
            if (localFileNames.contains(fileName)) {
                return false;
            } else {
                FileUtil.appendMethod(path, fileName + "\r\n");
                return true;
            }
        } catch (Exception e) {
            log.error("获取本地已下载文件列表出错", e);
            return false;
        }
    }
    /**
     * 文件是否为今天的数据
     * @param lastModified
     * @return true=是今天的文件 false=不是今天的文件
     */
    public boolean isLatestFile(long lastModified) {
        Date lastDate = new Date(lastModified);
        String lastDateStr = simpleDateFormat.format(lastDate);

        String todayStr = simpleDateFormat.format(new Date());
        return todayStr.equals(lastDateStr);
    }

}

4、创建下载路由

继承 RouteBuilder,重写configure方法,使用@Value 将配置文件中的路由地址和下载文件存放路径注入。

import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * wifi信息Zip文件下载
 *
 * @Author: 孙龙
 * @Date: 2018/4/24.
 */
@Component
public class WifiFileDownLoadRoute extends RouteBuilder {

    @Value("${ftp.wifi-info.server-info}")
    private String wifiDataUrl;

    @Value("${ftp.mobile-info.local-save-dir}")
    private String localDir;

    @Autowired
    private WifiFileProcessor wifiFileProcessor;

    @Override
    public void configure() throws Exception {
        from(wifiDataUrl).to("file:" + localDir).process(wifiFileProcessor);
    }
}

5、创建本地文件路由解析类

import com.alibaba.fastjson.JSON;
import com.lilian.service.IEqMobileMacService;
import com.lilian.utils.FileHandlerMap;
import com.lilian.utils.FileUtil;
import com.lilian.utils.ZipUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFileMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;

/**
 * WiFi文件路由解析器
 * 解析从ftp服务器路由下来的wifi数据压缩文件
 *
 * @Author: 孙龙
 * @Date: 2018/4/24.
 */
@Slf4j
@Component
public class WifiFileProcessor implements Processor {

    public static final String SIGNALCHANNEL_MAC = "signalChannel_mac";

    @Value("${ftp.wifi-info.unzip-temp-dir}")
    private String unZipTempPath;

    @Value("${ftp.mobile-info.local-save-dir}")
    private String localDir;

    @Autowired
    private IEqMobileMacService eqMobileMacService;
    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Override
    public void process(Exchange exchange) throws Exception {
        log.info("开始解析wifi下载的文件。。。");
        GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn();
//        gf.getFile().
        String fileName = inFileMessage.getGenericFile().getFileName();
        File zipFile = new File(localDir + File.separator + fileName);
//        Message message = exchange.getIn();
//        GenericFile<?> gf = (GenericFile<?>) message.getBody();
//        File zipFile = (File) gf.getFile();//两种File对象获取方式,这一种有可能会报异常,类型转换异常
        //解压文件,
        List<File> files = null;
        String zipName = zipFile.getName();
        if (Pattern.matches(ZipUtil.ZIP_PATTERN, zipName)) {
            if (FileHandlerMap.readedFilesWithMobiles.get(zipFile.getName()) != null) {
                return;
            }
            files = ZipUtil.unZip(zipFile, unZipTempPath, ZipUtil.BCP_PATTERN);
        } else {
            log.debug("下载到不是zip的压缩文件:" + zipFile.getName());
            return;
        }
        if (files == null) {
            log.error("压缩文件中没有解析到bcp文件:" + zipFile.getName());
        } else {
            //读取文件
            List<String> textList = ZipUtil.batchReadFile(files, "GBK");
            //批量解析文件中数据
            List<EqMobileMac> eqMobileMacList = this.batchStrToEntity(textList);
            //删除临时解压文件
            FileUtil.batchDeleteFile(files);
            //批量存入数据库
            if (eqMobileMacList.size() > 0) {
                eqMobileMacService.batchSave(eqMobileMacList);
                for (EqMobileMac eqMobileMac : eqMobileMacList) {
                    redisTemplate.convertAndSend(SIGNALCHANNEL_MAC, JSON.toJSONString(eqMobileMac));
                }
            } else {
                log.debug("文件中没有解析到任何数据!" + unZipTempPath + zipFile.getName());
            }
        }

    }
}

6、粘贴两个文件处理工具类

ZipUtil.java只用到了两个解压方法和一个文件读取方法;

package com.lilian.utils;

import lombok.extern.slf4j.Slf4j;
import net.lingala.zip4j.exception.ZipException;
import net.lingala.zip4j.model.FileHeader;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;

/**
 * 文件解压缩工具类
 * 
 * @Author: 孙龙
 * @Date: 2018/4/24.
 */
@Slf4j
public class ZipUtil {
    public static final String BCP_PATTERN = ".*?\\.bcp";
    public static final String HP_PATTERN = ".*?\\.hp";
    public static final String ZIP_PATTERN = ".*?\\.zip";

    /**
     * 此方法描述的是:ZIP压缩
     * 
     * @param source
     *            源文件
     * @param dest
     *            压缩文件
     * @version: 2015年3月2日 上午9:17:26
     */
    public static String zip(File source, File dest) {
        ZipOutputStream out = null;
        BufferedOutputStream bo = null;
        try {
            File zipParent = dest.getParentFile();
            if (!zipParent.exists()) {
                zipParent.mkdirs();
            }
            out = new ZipOutputStream(new FileOutputStream(dest));
            bo = new BufferedOutputStream(out);
            zip(out, source, source.getName(), bo);
            return dest.getAbsolutePath();
        } catch (Exception e) {
            log.error("文件读取错误:" + e.getMessage());
        } finally {
            if (bo != null) {
                try {
                    bo.close();
                } catch (IOException e) {
                }
            }
            if (out != null) {
                try {
                    out.close(); // 输出流关闭
                } catch (IOException e) {
                }
            }
        }
        return null;
    }

    public static void zip(ZipOutputStream out, File f, String base, BufferedOutputStream bo) throws Exception { // 方法重载
        out.putNextEntry(new ZipEntry(base)); // 创建zip压缩进入点base
        FileInputStream in = new FileInputStream(f);
        BufferedInputStream bi = new BufferedInputStream(in);
        int b;
        try {
            while ((b = bi.read()) != -1) {
                bo.write(b); // 将字节流写入当前zip目录
            }
        } finally {
            bi.close();
            in.close(); // 输入流关闭
            bo.close();
        }
    }

    /**
     * 解压加密压缩文件
     * 
     * @param zipFile
     * @param outPath
     * @param passwd
     * @return
     * @throws ZipException
     * @throws IOException
     */
    public static List<File> unEncryptZip(File zipFile, String outPath, String fileNameRegexp, String passwd) {
        List<File> extractedFileList = null;
        try {
            net.lingala.zip4j.core.ZipFile zFile = new net.lingala.zip4j.core.ZipFile(zipFile);

            if (!zFile.isValidZipFile()) {
                throw new ZipException("压缩文件不合法,可能被损坏.文件名:" + zipFile.getName());
            }
            File destDir = new File(outPath);
            if (destDir.isDirectory() && !destDir.exists()) {
                destDir.mkdir();
            }
            if (zFile.isEncrypted()) {
                zFile.setPassword(passwd);
            }
            zFile.extractAll(outPath);
            List<FileHeader> headerList = zFile.getFileHeaders();
            extractedFileList = new ArrayList<File>();
            for (FileHeader fileHeader : headerList) {
                if (!fileHeader.isDirectory()) {
                    if (!Pattern.matches(fileNameRegexp, fileHeader.getFileName())) {
                        //解压到没用的文件,直接删除
                        new File(destDir, fileHeader.getFileName()).delete();
                        continue;
                    }
                    extractedFileList.add(new File(destDir, fileHeader.getFileName()));
                }
            }
            FileHandlerMap.readedFilesWithMobiles.put(zipFile.getName(), 1);
        } catch (ZipException e) {
            log.error("解压文件时出错:" + e.getMessage());
            e.printStackTrace();
        }

        return extractedFileList;
    }

    /**
     * 解压文件,并将匹配表达式的文件读取到内存
     * 
     * @param zipFile
     * @param outPath
     * @return
     * @throws ZipException
     * @throws IOException
     */
    public static List<File> unZip(File zipFile, String outPath, String fileNameRegexp) {
        List<File> extractedFileList = null;
        try {
            net.lingala.zip4j.core.ZipFile zFile = new net.lingala.zip4j.core.ZipFile(zipFile);
            zFile.setFileNameCharset("GBK");
            if (!zFile.isValidZipFile()) {
                throw new ZipException("压缩文件不合法,可能被损坏.文件名:" + zipFile.getName());
            }
            File destDir = new File(outPath);
            if (destDir.isDirectory() && !destDir.exists()) {
                destDir.mkdir();
            }

            zFile.extractAll(outPath);

            List<FileHeader> headerList = zFile.getFileHeaders();
            extractedFileList = new ArrayList<>();
            for (FileHeader fileHeader : headerList) {
                if (!fileHeader.isDirectory()) {
                    if (!Pattern.matches(fileNameRegexp, fileHeader.getFileName())) {
                        new File(destDir, fileHeader.getFileName()).delete();//匹配错误直接删除
                        continue;
                    }
                    extractedFileList.add(new File(destDir, fileHeader.getFileName()));
                }
            }
            FileHandlerMap.readedFilesWithMobiles.put(zipFile.getName(), 1);
        } catch (ZipException e) {
            log.error("解压文件时出错:" + e.getMessage());
            e.printStackTrace();
        } finally {

        }
        return extractedFileList;
    }

    /**
     * 此方法描述的是:获得ZIP文件同名解压目录
     * 
     * @version: 2015年3月5日 下午2:10:41
     */
    public static String getUnpackForder(File zipFile) {
        String filePath = zipFile.getAbsolutePath();
        return filePath.substring(0, filePath.lastIndexOf("."));
    }

    /**
     * 此方法描述的是:获得ZIP文件指定解压目录
     * 
     * @version: 2015年3月6日 下午10:28:36
     */
    public static String getUnpackForder(File zipFile, String subDir) {
        return zipFile.getParent() + File.separator + subDir;
    }

    /**
     * 批量读取文件,每一行解析为一个字符串;
     * 
     * @param files
     * @return
     */
    public static List<String> batchReadFile(List<File> files, String charset) throws Exception {
        List<String> textList = new ArrayList<>();
        for (File file : files) {
            List<String> list = readTextFile(file, charset);
            textList.addAll(list);

        }
        return textList;
    }

    /**
     * 将文件中内容读取到内存中
     * 
     * @param file
     * @return
     */
    public static List<String> readTextFile(File file, String charset) throws Exception {
        List<String> list = new ArrayList<>();
        InputStreamReader reader = new InputStreamReader(new FileInputStream(file), charset);
        BufferedReader bufferedReader = new BufferedReader(reader);
        String lineText;
        while ((lineText = bufferedReader.readLine()) != null) {
            list.add(lineText);
        }
        return list;

    }

}

FileUtil.java

package com.lilian.utils;

import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.ArrayList;
import java.util.List;

@Slf4j
public class FileUtil {

    /**
     * 以行为单位读取文件,常用于读面向行的格式化文件
     */
    public static List<String> readFileByLines(File file) {
        BufferedReader reader = null;
        List<String> strList = new ArrayList<>();
        try {
//            System.out.println("以行为单位读取文件内容,一次读一整行:");
            reader = new BufferedReader(new FileReader(file));
            String tempString = null;
            // 一次读入一行,直到读入null为文件结束
            while ((tempString = reader.readLine()) != null) {
                // 显示行号
                strList.add(tempString);
            }
            reader.close();

        } catch (IOException e) {
            log.error("文件读取时出现错误!");
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                    log.error("流关闭异常!");
                }
            }
        }
        return strList;
    }

    /**
     * 方法追加文件:使用FileWriter
     * @param fileName
     * @param content
     */
    public static void appendMethod(String fileName, String content) {
        try {
            //打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件
            FileWriter writer = new FileWriter(fileName, true);
            writer.write(content);
            writer.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 删除单个文件
     * @param file 要删除的文件对象
     * @return 单个文件删除成功返回true,否则返回false
     */
    public static boolean deleteFile(File file) {
        if (file.exists() && file.isFile()) {
            if (file.delete()) {
                return true;
            } else {
                return false;
            }
        } else {
            return false;
        }
    }

    /**
     * 批量删除文件
     * @param files
     */
    public static void batchDeleteFile(List<File> files) {
        for (File file : files) {
            deleteFile(file);
        }
    }
}

随后我会将我的一个完整Demo整理出来开源到github上面,希望能够帮到你!
参考:https://blog.csdn.net/tiantian12234/article/details/77942598 https://blog.csdn.net/sunknew/article/details/79374781

上一篇下一篇

猜你喜欢

热点阅读