SpringBoot集成camel-ftp读取ftp文件并解压
2018-07-12 本文已影响1179人
搁浅_Jay
SpringBoot集成camel-ftp读取ftp文件并解压
1、在pom.xml文件中添加依赖
这里使用的是spring boot版本是1.5.9 JDk1.7
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-ftp</artifactId>
<version>2.17.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
2、在application.yml中添加配置
使用filter=#filterName添加过滤器; 配置ftp服务连接的用户名和密码,还有要消费的路径
camel:
springboot:
main-run-controller: true #监听Ftp服务器的时候,为了让java进程在后台运行
ftp:
wifi-info:
url: 127.0.0.1
username: test
password: 123456
dir: /wifi_info
server-info: ftp://${ftp.wifi-info.url}:21${ftp.wifi-info.dir}?username=${ftp.wifi-info.username}&password=${ftp.wifi-info.password}&delay=2s&readLock=rename&include=.*zip&filter=#wifiDownloadFileFilter
local-save-dir: E:/ftpdata/mac/save
unzip-temp-dir: E:/ftpdata/mac/temp
local-files-prefix: /localFilesList- #已下载文件列表的前缀
local-files-suffix: .txt #已下载文件列表的后缀
3、创建过滤器 实现org.apache.camel.component.file.GenericFileFilter接口,实现accept方法;并声明到Spring容器中;
import com.lilian.utils.FileUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileFilter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
/**
* FTP文件路由过滤器,
* 将只下载当天,且下载目录中不存在的文件
*/
@Slf4j
@Component
public class WifiDownloadFileFilter implements GenericFileFilter<Object> {
private static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
@Value("${ftp.wifi-info.unzip-temp-dir}")
private String tempPath;
@Value("${ftp.wifi-info.local-files-prefix}")
private String filePrefix;
@Value("${ftp.wifi-info.local-files-suffix}")
private String fileSuffix;
/**
* 过滤下载文件
* @param genericFile
* @return true=下载,false=不下载
*/
@Override
public boolean accept(GenericFile<Object> genericFile) {
long lastModified = genericFile.getLastModified();
String fileName = genericFile.getFileName();
return isLatestFile(lastModified) && !isInLocalDir(fileName) ? true : false;
}
/**
* 文件是否已在本地目录中
* @param fileName
* @return true=不存在 false=已存在了
*/
private boolean isInLocalDir(String fileName) {
try {
//获取本地文件夹中已下载的文件名
File fileDir = new File(tempPath);
if (!fileDir.exists()) {
fileDir.mkdir();
}
String path = tempPath + filePrefix + simpleDateFormat.format(new Date()) + fileSuffix;
File file = new File(path);
if (!file.exists()) {//如果不存在就创建
file.createNewFile();
FileUtil.appendMethod(path, fileName + "\r\n");
return true;
}
List<String> localFileNames = FileUtil.readFileByLines(file);
if (localFileNames.contains(fileName)) {
return false;
} else {
FileUtil.appendMethod(path, fileName + "\r\n");
return true;
}
} catch (Exception e) {
log.error("获取本地已下载文件列表出错", e);
return false;
}
}
/**
* 文件是否为今天的数据
* @param lastModified
* @return true=是今天的文件 false=不是今天的文件
*/
public boolean isLatestFile(long lastModified) {
Date lastDate = new Date(lastModified);
String lastDateStr = simpleDateFormat.format(lastDate);
String todayStr = simpleDateFormat.format(new Date());
return todayStr.equals(lastDateStr);
}
}
4、创建下载路由
继承 RouteBuilder,重写configure方法,使用@Value 将配置文件中的路由地址和下载文件存放路径注入。
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* wifi信息Zip文件下载
*
* @Author: 孙龙
* @Date: 2018/4/24.
*/
@Component
public class WifiFileDownLoadRoute extends RouteBuilder {
@Value("${ftp.wifi-info.server-info}")
private String wifiDataUrl;
@Value("${ftp.mobile-info.local-save-dir}")
private String localDir;
@Autowired
private WifiFileProcessor wifiFileProcessor;
@Override
public void configure() throws Exception {
from(wifiDataUrl).to("file:" + localDir).process(wifiFileProcessor);
}
}
5、创建本地文件路由解析类
import com.alibaba.fastjson.JSON;
import com.lilian.service.IEqMobileMacService;
import com.lilian.utils.FileHandlerMap;
import com.lilian.utils.FileUtil;
import com.lilian.utils.ZipUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.file.GenericFileMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.io.File;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
/**
* WiFi文件路由解析器
* 解析从ftp服务器路由下来的wifi数据压缩文件
*
* @Author: 孙龙
* @Date: 2018/4/24.
*/
@Slf4j
@Component
public class WifiFileProcessor implements Processor {
public static final String SIGNALCHANNEL_MAC = "signalChannel_mac";
@Value("${ftp.wifi-info.unzip-temp-dir}")
private String unZipTempPath;
@Value("${ftp.mobile-info.local-save-dir}")
private String localDir;
@Autowired
private IEqMobileMacService eqMobileMacService;
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public void process(Exchange exchange) throws Exception {
log.info("开始解析wifi下载的文件。。。");
GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn();
// gf.getFile().
String fileName = inFileMessage.getGenericFile().getFileName();
File zipFile = new File(localDir + File.separator + fileName);
// Message message = exchange.getIn();
// GenericFile<?> gf = (GenericFile<?>) message.getBody();
// File zipFile = (File) gf.getFile();//两种File对象获取方式,这一种有可能会报异常,类型转换异常
//解压文件,
List<File> files = null;
String zipName = zipFile.getName();
if (Pattern.matches(ZipUtil.ZIP_PATTERN, zipName)) {
if (FileHandlerMap.readedFilesWithMobiles.get(zipFile.getName()) != null) {
return;
}
files = ZipUtil.unZip(zipFile, unZipTempPath, ZipUtil.BCP_PATTERN);
} else {
log.debug("下载到不是zip的压缩文件:" + zipFile.getName());
return;
}
if (files == null) {
log.error("压缩文件中没有解析到bcp文件:" + zipFile.getName());
} else {
//读取文件
List<String> textList = ZipUtil.batchReadFile(files, "GBK");
//批量解析文件中数据
List<EqMobileMac> eqMobileMacList = this.batchStrToEntity(textList);
//删除临时解压文件
FileUtil.batchDeleteFile(files);
//批量存入数据库
if (eqMobileMacList.size() > 0) {
eqMobileMacService.batchSave(eqMobileMacList);
for (EqMobileMac eqMobileMac : eqMobileMacList) {
redisTemplate.convertAndSend(SIGNALCHANNEL_MAC, JSON.toJSONString(eqMobileMac));
}
} else {
log.debug("文件中没有解析到任何数据!" + unZipTempPath + zipFile.getName());
}
}
}
}
6、粘贴两个文件处理工具类
ZipUtil.java只用到了两个解压方法和一个文件读取方法;
package com.lilian.utils;
import lombok.extern.slf4j.Slf4j;
import net.lingala.zip4j.exception.ZipException;
import net.lingala.zip4j.model.FileHeader;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipFile;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
/**
* 文件解压缩工具类
*
* @Author: 孙龙
* @Date: 2018/4/24.
*/
@Slf4j
public class ZipUtil {
public static final String BCP_PATTERN = ".*?\\.bcp";
public static final String HP_PATTERN = ".*?\\.hp";
public static final String ZIP_PATTERN = ".*?\\.zip";
/**
* 此方法描述的是:ZIP压缩
*
* @param source
* 源文件
* @param dest
* 压缩文件
* @version: 2015年3月2日 上午9:17:26
*/
public static String zip(File source, File dest) {
ZipOutputStream out = null;
BufferedOutputStream bo = null;
try {
File zipParent = dest.getParentFile();
if (!zipParent.exists()) {
zipParent.mkdirs();
}
out = new ZipOutputStream(new FileOutputStream(dest));
bo = new BufferedOutputStream(out);
zip(out, source, source.getName(), bo);
return dest.getAbsolutePath();
} catch (Exception e) {
log.error("文件读取错误:" + e.getMessage());
} finally {
if (bo != null) {
try {
bo.close();
} catch (IOException e) {
}
}
if (out != null) {
try {
out.close(); // 输出流关闭
} catch (IOException e) {
}
}
}
return null;
}
public static void zip(ZipOutputStream out, File f, String base, BufferedOutputStream bo) throws Exception { // 方法重载
out.putNextEntry(new ZipEntry(base)); // 创建zip压缩进入点base
FileInputStream in = new FileInputStream(f);
BufferedInputStream bi = new BufferedInputStream(in);
int b;
try {
while ((b = bi.read()) != -1) {
bo.write(b); // 将字节流写入当前zip目录
}
} finally {
bi.close();
in.close(); // 输入流关闭
bo.close();
}
}
/**
* 解压加密压缩文件
*
* @param zipFile
* @param outPath
* @param passwd
* @return
* @throws ZipException
* @throws IOException
*/
public static List<File> unEncryptZip(File zipFile, String outPath, String fileNameRegexp, String passwd) {
List<File> extractedFileList = null;
try {
net.lingala.zip4j.core.ZipFile zFile = new net.lingala.zip4j.core.ZipFile(zipFile);
if (!zFile.isValidZipFile()) {
throw new ZipException("压缩文件不合法,可能被损坏.文件名:" + zipFile.getName());
}
File destDir = new File(outPath);
if (destDir.isDirectory() && !destDir.exists()) {
destDir.mkdir();
}
if (zFile.isEncrypted()) {
zFile.setPassword(passwd);
}
zFile.extractAll(outPath);
List<FileHeader> headerList = zFile.getFileHeaders();
extractedFileList = new ArrayList<File>();
for (FileHeader fileHeader : headerList) {
if (!fileHeader.isDirectory()) {
if (!Pattern.matches(fileNameRegexp, fileHeader.getFileName())) {
//解压到没用的文件,直接删除
new File(destDir, fileHeader.getFileName()).delete();
continue;
}
extractedFileList.add(new File(destDir, fileHeader.getFileName()));
}
}
FileHandlerMap.readedFilesWithMobiles.put(zipFile.getName(), 1);
} catch (ZipException e) {
log.error("解压文件时出错:" + e.getMessage());
e.printStackTrace();
}
return extractedFileList;
}
/**
* 解压文件,并将匹配表达式的文件读取到内存
*
* @param zipFile
* @param outPath
* @return
* @throws ZipException
* @throws IOException
*/
public static List<File> unZip(File zipFile, String outPath, String fileNameRegexp) {
List<File> extractedFileList = null;
try {
net.lingala.zip4j.core.ZipFile zFile = new net.lingala.zip4j.core.ZipFile(zipFile);
zFile.setFileNameCharset("GBK");
if (!zFile.isValidZipFile()) {
throw new ZipException("压缩文件不合法,可能被损坏.文件名:" + zipFile.getName());
}
File destDir = new File(outPath);
if (destDir.isDirectory() && !destDir.exists()) {
destDir.mkdir();
}
zFile.extractAll(outPath);
List<FileHeader> headerList = zFile.getFileHeaders();
extractedFileList = new ArrayList<>();
for (FileHeader fileHeader : headerList) {
if (!fileHeader.isDirectory()) {
if (!Pattern.matches(fileNameRegexp, fileHeader.getFileName())) {
new File(destDir, fileHeader.getFileName()).delete();//匹配错误直接删除
continue;
}
extractedFileList.add(new File(destDir, fileHeader.getFileName()));
}
}
FileHandlerMap.readedFilesWithMobiles.put(zipFile.getName(), 1);
} catch (ZipException e) {
log.error("解压文件时出错:" + e.getMessage());
e.printStackTrace();
} finally {
}
return extractedFileList;
}
/**
* 此方法描述的是:获得ZIP文件同名解压目录
*
* @version: 2015年3月5日 下午2:10:41
*/
public static String getUnpackForder(File zipFile) {
String filePath = zipFile.getAbsolutePath();
return filePath.substring(0, filePath.lastIndexOf("."));
}
/**
* 此方法描述的是:获得ZIP文件指定解压目录
*
* @version: 2015年3月6日 下午10:28:36
*/
public static String getUnpackForder(File zipFile, String subDir) {
return zipFile.getParent() + File.separator + subDir;
}
/**
* 批量读取文件,每一行解析为一个字符串;
*
* @param files
* @return
*/
public static List<String> batchReadFile(List<File> files, String charset) throws Exception {
List<String> textList = new ArrayList<>();
for (File file : files) {
List<String> list = readTextFile(file, charset);
textList.addAll(list);
}
return textList;
}
/**
* 将文件中内容读取到内存中
*
* @param file
* @return
*/
public static List<String> readTextFile(File file, String charset) throws Exception {
List<String> list = new ArrayList<>();
InputStreamReader reader = new InputStreamReader(new FileInputStream(file), charset);
BufferedReader bufferedReader = new BufferedReader(reader);
String lineText;
while ((lineText = bufferedReader.readLine()) != null) {
list.add(lineText);
}
return list;
}
}
FileUtil.java
package com.lilian.utils;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class FileUtil {
/**
* 以行为单位读取文件,常用于读面向行的格式化文件
*/
public static List<String> readFileByLines(File file) {
BufferedReader reader = null;
List<String> strList = new ArrayList<>();
try {
// System.out.println("以行为单位读取文件内容,一次读一整行:");
reader = new BufferedReader(new FileReader(file));
String tempString = null;
// 一次读入一行,直到读入null为文件结束
while ((tempString = reader.readLine()) != null) {
// 显示行号
strList.add(tempString);
}
reader.close();
} catch (IOException e) {
log.error("文件读取时出现错误!");
e.printStackTrace();
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException e1) {
log.error("流关闭异常!");
}
}
}
return strList;
}
/**
* 方法追加文件:使用FileWriter
* @param fileName
* @param content
*/
public static void appendMethod(String fileName, String content) {
try {
//打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件
FileWriter writer = new FileWriter(fileName, true);
writer.write(content);
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 删除单个文件
* @param file 要删除的文件对象
* @return 单个文件删除成功返回true,否则返回false
*/
public static boolean deleteFile(File file) {
if (file.exists() && file.isFile()) {
if (file.delete()) {
return true;
} else {
return false;
}
} else {
return false;
}
}
/**
* 批量删除文件
* @param files
*/
public static void batchDeleteFile(List<File> files) {
for (File file : files) {
deleteFile(file);
}
}
}
随后我会将我的一个完整Demo整理出来开源到github上面,希望能够帮到你!
参考:https://blog.csdn.net/tiantian12234/article/details/77942598 https://blog.csdn.net/sunknew/article/details/79374781