java 使用FileAlterationMonitor监控目录

2020-12-09  本文已影响0人  早点起床晒太阳

需求

最近做一个导数服务,支持oracle的。然后先是使用oracle的sqllur 生成数据,然后往hdfs上传。这个时候有个问题,比如oracle对应的表有1T,这个时候对磁盘的依赖就很大,磁盘可能就会放满了。所以就需要将其导一部分上传一部分,然后上传完删除才可以解决这个问题。那么这个时候就需要来监控文件的生成。

依赖

<dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>2.6</version>
</dependency>

实现

需要一个类来继承FileAlterationListenerAdaptor,重写他的方法,然后再create事件发生的时候在方法内部执行相应的动作

public class FileListener extends FileAlterationListenerAdaptor {
    @Override
    public void onFileCreate(File file) {
    }
}

然后执行监控

//轮询间隔1秒
long interval = TimeUnit.SECONDS.toMillis(1);
FileAlterationObserver observer = new FileAlterationObserver(new File(oraclePath));
FileListener listener = new FileListener(fs, hdfsPath, ex);
observer.addListener(listener);
FileAlterationMonitor monitor = new FileAlterationMonitor(interval, observer);
monitor.start();
executeOracleShell(shellPath);
Thread.sleep(2000);//这么做的目的防止生成文件过快 而没有执行监控直接退出
monitor.stop(0);
if (ex.get() != null) {
    throw ex.get();
}

上述这些代码网上一搜一大堆,但是基本都是直接start,唯独没有什么时候stop的。毕竟你异步启动了线程也是需要关闭的。

调用stop方法

上述代码之前没有Thread.sleep(2000)的时候,我发现监控没有起作用,生成的文件根本没有上传到hdfs。然后我看了下FileAlterationMonitor
的源码,看到如下

   public synchronized void stop(long stopInterval) throws Exception {
        if (running == false) {
            throw new IllegalStateException("Monitor is not running");
        }
        running = false;
        try {
            thread.join(stopInterval);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        for (FileAlterationObserver observer : observers) {
            observer.destroy();
        }
    }
   public void run() {
        while (running) {
            for (FileAlterationObserver observer : observers) {
                observer.checkAndNotify();
            }
            if (!running) {
                break;
            }
            try {
                Thread.sleep(interval);
            } catch (final InterruptedException ignored) {
            }
        }
    }

发现调用stop方法的时候,会将running指定为false。那么这个时候再去调用异步线程的run方法时候,在它sleep的时候,还没来及扫描第二次的时候,就直接退出(针对落地文件特别快),没有起到监控的作用。所以这里Thread.sleep的作用是应对非常快速落地文件的情景,让其能够扫描到第二次起到监控的作用的。

还有一点 stop(0) 可以等待线程执行知道线程的任务关闭(线程指的是oncreate 出发的线程。

上一篇下一篇

猜你喜欢

热点阅读