Java 杂谈

Nio监控目录中文件变化的工具类

2018-11-07  本文已影响0人  sunshujie1990

把Nio WatchService简单封装了一下,支持:

  1. 监控多个目录
  2. 设置监控类型(增、删、改)
  3. 设置线程池
  4. 设置多个过滤器
  5. 设置多个监听器(串行执行)
  6. 轻量级,不依赖第三方包
  7. fluent api
import java.io.IOException;
import java.nio.file.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


/**
 * <p> description: TODO
 * <p> 2018/11/04
 *
 * @author ssj
 * @version 1.0.0
 */
public class Test {
    public static void main(String[] args) throws IOException, InterruptedException {

        ExecutorService pool = Executors.newFixedThreadPool(5);

        new FileWatcher.FileWatcherBuilder()
            .addWatchPath("/home/ssj/Downloads/test")
            .addWatchPath("/home/ssj/Downloads/test/新建文件夹")
            .addWatchKind(StandardWatchEventKinds.ENTRY_CREATE)
            .addWatchKind(StandardWatchEventKinds.ENTRY_DELETE)
            .setExecutor(pool)
            .addFilter((p,k)-> p.getFileName().toString().contains("test"))
            .addFilter((p,k)-> p.getFileName().toString().endsWith(".txt"))
            .addListener((p,k)->{
                System.out.println("listener1:" + p.getFileName());
            })
            .addListener((p,k)->{
                System.out.println("listener2:" + p.getFileName());
            })
            .build()
            .startWatch();

        TimeUnit.MINUTES.sleep(5);
    }
}

import java.io.IOException;
import java.nio.file.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.logging.Logger;

/**
 * <p> description: 监控文件变化,监控一层,不带递归
 * 可监控多个地址,增删改,设置多个回调(串行执行)
 * <p> 2018/11/07
 *
 * @author ssj
 * @version 1.0.0
 */
public class FileWatcher {

    private Set<Path> paths;
    private Set<WatchEvent.Kind> kinds;
    private List<BiConsumer<Path, WatchEvent.Kind>> listeners;
    private BiConsumer<Path, WatchEvent.Kind> consumer;
    private List<BiPredicate<Path, WatchEvent.Kind>> filters;
    private BiPredicate<Path, WatchEvent.Kind> predicate;
    private Executor pool;

    private boolean started = false;
    private static final Logger logger = Logger.getGlobal();

    private FileWatcher(Set<Path> paths, Set<WatchEvent.Kind> kinds,
                        List<BiConsumer<Path, WatchEvent.Kind>> listeners, Executor pool,
                        List<BiPredicate<Path, WatchEvent.Kind>> filters) {
        this.paths = paths;
        this.kinds = kinds;
        this.listeners = listeners;
        this.pool = pool;
        this.filters = filters;
    }

    private void initConsumer() {
        consumer = (p, k) -> {};
        for (BiConsumer<Path, WatchEvent.Kind> listener : listeners) {
            consumer = consumer.andThen(listener);
        }
    }

    private void initPredicate() {
        predicate = (p, k) -> true;
        if (filters != null && filters.size() != 0) {
            for (BiPredicate<Path, WatchEvent.Kind> filter : filters) {
                predicate = predicate.and(filter);
            }
        }
    }

    private void startWatch(Path path) throws IOException {

        WatchService watchService =
            FileSystems.getDefault().newWatchService();
        WatchEvent.Kind[] kindsArry = kinds.toArray(new WatchEvent.Kind[kinds.size()]);
        path.register(watchService, kindsArry);
        for (; ; ) {
            try {
                WatchKey key = watchService.take();
                List<WatchEvent<?>> watchEvents = key.pollEvents();
                for (WatchEvent<?> watchEvent : watchEvents) {
                    Path p = (Path) watchEvent.context();
                    WatchEvent.Kind k = watchEvent.kind();
                    if (predicate.test(p, k)) {
                        consumer.accept(p, k);
                    }
                }
                boolean reset = key.reset();
                if (!reset) {
                    logger.severe("watchkey重置失败,停止监听");
                    break;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                logger.severe("监听中断");
                break;
            }
        }
        watchService.close();
    }

    public void startWatch() {
        if (started) {
            logger.severe("监听已经开始, 无法重复监听");
            return;
        }
        started = true;

        initConsumer();
        initPredicate();

        for (Path path : paths) {
            pool.execute(() -> {
                try {
                    startWatch(path);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
        logger.info("监听开始...");
    }


    public static class FileWatcherBuilder {

        private Set<Path> paths = new HashSet<>();
        private Set<WatchEvent.Kind> kinds = new HashSet<>();
        private List<BiConsumer<Path, WatchEvent.Kind>> listeners = new ArrayList<>();
        private List<BiPredicate<Path, WatchEvent.Kind>> filters;
        private Executor pool;

        public FileWatcherBuilder addWatchPath(String path) {
            paths.add(Paths.get(path));
            return this;
        }

        public FileWatcherBuilder addWatchKind(WatchEvent.Kind kind) {
            kinds.add(kind);
            return this;
        }

        public FileWatcherBuilder addListener(BiConsumer<Path, WatchEvent.Kind> listener) {
            listeners.add(listener);
            return this;
        }

        public FileWatcherBuilder addFilter(BiPredicate<Path, WatchEvent.Kind> filter) {
            if (filters == null) {
                filters = new ArrayList<>();
            }
            filters.add(filter);
            return this;
        }

        public FileWatcherBuilder setExecutor(Executor pool) {
            this.pool = pool;
            return this;
        }

        public FileWatcher build() {
            if (paths == null || paths.size() == 0) {
                throw new IllegalArgumentException("还未设置监听路径");
            }
            if (kinds == null || kinds.size() == 0) {
                throw new IllegalArgumentException("还未设置监听类型");
            }
            if (listeners == null || listeners.size() == 0) {
                throw new IllegalArgumentException("还未设置监听回调");
            }
            if (this.pool == null) {
                throw new IllegalArgumentException("未设置监控线程池");
            }
            return new FileWatcher(this.paths, this.kinds, this.listeners, this.pool, this.filters);
        }
    }


}

上一篇 下一篇

猜你喜欢

热点阅读