flume taildir 不兼容windows服务器问题解决

2019-09-25  本文已影响0人  VIAE

最初使用flume时,由于需要读取的是 多个文件夹内 的 所有文件,我选择的是flume 的 Spooling Directory Source 来监控并读取数据源。后来发现,需要监控读取的数据源除了已有的历史文件以后还需要读取实时写入的.log文件,而Spooling Directory Source 是用来读取一篇完整文件的,并不支持实时写入
查阅官方文档,选择了Taildir Source,我们的服务器是多台windows服务器,apache-flume-1.9.0的Taildir Source只支持Linux服务器,windows中使用Taildir Source的报错可以发现,之所以在windows中不能使用的原因是因为flume Taildir Source 读取文件的路径是按照Linux的路径方式读取的,所以无法读取windows中的路径文件。
知道了问题的所在,那么就来解决问题,找到flum的Taildir Source源码,在apache-flume-1.9.0-src/flume-ng-sources中复制flume-taildir-source为flume-win-taildir-source
文件目录


目录.PNG

在apache-flume-1.9.0-src/flume-ng-sources/pom.xml中新增

<module>flume-win-taildir-source</module>

在flume-win-taildir-source/src/main/java/org.apache.flume.source.wintaildir 中新增util文件夹,
新增 util/Kernel32

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.source.wintaildir.util;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.Structure;
import com.sun.jna.platform.win32.WinBase.FILETIME;
import com.sun.jna.platform.win32.WinDef.DWORD;
import com.sun.jna.platform.win32.WinNT.HANDLE;
import com.sun.jna.win32.StdCallLibrary;
import com.sun.jna.win32.W32APIFunctionMapper;
import com.sun.jna.win32.W32APITypeMapper;
public interface Kernel32  extends StdCallLibrary {
    final static Map<String, Object> WIN32API_OPTIONS = new HashMap<String, Object>() {
        private static final long serialVersionUID = 1L;
        {
            put(Library.OPTION_FUNCTION_MAPPER, W32APIFunctionMapper.UNICODE);
            put(Library.OPTION_TYPE_MAPPER, W32APITypeMapper.UNICODE);
        }
    };

    public Kernel32 INSTANCE = (Kernel32) Native.loadLibrary("Kernel32",
            Kernel32.class, WIN32API_OPTIONS);

    public int GetLastError();

    public class BY_HANDLE_FILE_INFORMATION extends Structure {
        public DWORD dwFileAttributes;
        public FILETIME ftCreationTime;
        public FILETIME ftLastAccessTime;
        public FILETIME ftLastWriteTime;
        public DWORD dwVolumeSerialNumber;
        public DWORD nFileSizeHigh;
        public DWORD nFileSizeLow;
        public DWORD nNumberOfLinks;
        public DWORD nFileIndexHigh;
        public DWORD nFileIndexLow;

        public static class ByReference extends BY_HANDLE_FILE_INFORMATION
                implements Structure.ByReference {

        };

        public static class ByValue extends BY_HANDLE_FILE_INFORMATION
                implements Structure.ByValue {

        }

        @Override
        protected List getFieldOrder() {
            List<String> fields = new ArrayList<String>();
            fields.addAll(Arrays.asList(new String[] { "dwFileAttributes",
                    "ftCreationTime", "ftLastAccessTime", "ftLastWriteTime",
                    "dwVolumeSerialNumber", "nFileSizeHigh", "nFileSizeLow",
                    "nNumberOfLinks", "nFileIndexHigh", "nFileIndexLow" }));
            return fields;

        };
    };

    boolean GetFileInformationByHandle(HANDLE hFile,
                                       BY_HANDLE_FILE_INFORMATION lpFileInformation);
}

新增 util/WinFileUtil

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.source.wintaildir.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.WinBase;
import com.sun.jna.platform.win32.WinNT.HANDLE;
public class WinFileUtil {
    private static Logger logger = LoggerFactory.getLogger(WinFileUtil.class);

    public static String getFileId(String filepath) {

        final int FILE_SHARE_READ = (0x00000001);
        final int OPEN_EXISTING = (3);
        final int GENERIC_READ = (0x80000000);
        final int FILE_ATTRIBUTE_ARCHIVE = (0x20);

        WinBase.SECURITY_ATTRIBUTES attr = null;
        org.apache.flume.source.wintaildir.util.Kernel32.BY_HANDLE_FILE_INFORMATION lpFileInformation = new org.apache.flume.source.wintaildir.util.Kernel32.BY_HANDLE_FILE_INFORMATION();
        HANDLE hFile = null;

        hFile = Kernel32.INSTANCE.CreateFile(filepath, 0,
                FILE_SHARE_READ, attr, OPEN_EXISTING, FILE_ATTRIBUTE_ARCHIVE,
                null);
        String ret = "0";
        if (Kernel32.INSTANCE.GetLastError() == 0) {

            org.apache.flume.source.wintaildir.util.Kernel32.INSTANCE
                    .GetFileInformationByHandle(hFile, lpFileInformation);

            ret = lpFileInformation.dwVolumeSerialNumber.toString()
                    + lpFileInformation.nFileIndexLow.toString();

            Kernel32.INSTANCE.CloseHandle(hFile);

            if (Kernel32.INSTANCE.GetLastError() == 0) {
                logger.debug("inode:" + ret);
                return ret;
            } else {
                logger.error("关闭文件发生错误:{}", filepath);
                throw new RuntimeException("关闭文件发生错误:" + filepath);
            }
        } else {
            if (hFile != null) {
                Kernel32.INSTANCE.CloseHandle(hFile);
            }
            logger.error("打开文件发生错误:{}", filepath);
            throw new RuntimeException("打开文件发生错误:" + filepath);
        }

    }
}

在flume-win-taildir-source/pom.xml中新增

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>flume-ng-sources</artifactId>
        <groupId>org.apache.flume</groupId>
        <version>1.9.0</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.flume.flume-ng-sources</groupId>
    <artifactId>flume-win-taildir-source</artifactId>
    <name>Flume Window Taildir Source</name>

    <properties>
        <!-- TODO fix spotbugs/pmd violations -->
        <spotbugs.maxAllowedViolations>14</spotbugs.maxAllowedViolations>
        <pmd.maxAllowedViolations>3</pmd.maxAllowedViolations>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
        </dependency>
<!-- 新增 -->
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>4.2.2</version>
        </dependency>
<!-- 新增 -->
        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna-platform</artifactId>
            <version>4.2.2</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-all</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>

ReliableTaildirEventReader 文件中修改

public class ReliableTaildirEventReader implements ReliableEventReader {
    private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class);
    public static final String OS_NAME = System.getProperty("os.name").toLowerCase();
private long getInode(File file) throws IOException {
//        long inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
//        return inode;
        long inode;
        if (OS_NAME.contains("windows")) {
            inode = Long.parseLong(WinFileUtil.getFileId(file.toPath().toString()));
        } else {
            inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
        }
        return inode;
    }

整个ReliableTaildirEventReader 文件

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.source.wintaildir;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.gson.stream.JsonReader;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.client.avro.ReliableEventReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flume.source.wintaildir.util.WinFileUtil;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ReliableTaildirEventReader implements ReliableEventReader {
    private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class);
    public static final String OS_NAME = System.getProperty("os.name").toLowerCase();

    private final List<TaildirMatcher> taildirCache;
    private final Table<String, String, String> headerTable;

    private TailFile currentFile = null;
    private Map<Long, TailFile> tailFiles = Maps.newHashMap();
    private long updateTime;
    private boolean addByteOffset;
    private boolean cachePatternMatching;
    private boolean committed = true;
    private final boolean annotateFileName;
    private final String fileNameHeader;

    /**
     * Create a ReliableTaildirEventReader to watch the given directory.
     */
    private ReliableTaildirEventReader(Map<String, String> filePaths,
                                       Table<String, String, String> headerTable, String positionFilePath,
                                       boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching,
                                       boolean annotateFileName, String fileNameHeader) throws IOException {
        // Sanity checks
        Preconditions.checkNotNull(filePaths);
        Preconditions.checkNotNull(positionFilePath);

        if (logger.isDebugEnabled()) {
            logger.debug("Initializing {} with directory={}, metaDir={}",
                    new Object[] { ReliableTaildirEventReader.class.getSimpleName(), filePaths });
        }

        List<TaildirMatcher> taildirCache = Lists.newArrayList();
        for (Entry<String, String> e : filePaths.entrySet()) {
            taildirCache.add(new TaildirMatcher(e.getKey(), e.getValue(), cachePatternMatching));
        }
        logger.info("taildirCache: " + taildirCache.toString());
        logger.info("headerTable: " + headerTable.toString());

        this.taildirCache = taildirCache;
        this.headerTable = headerTable;
        this.addByteOffset = addByteOffset;
        this.cachePatternMatching = cachePatternMatching;
        this.annotateFileName = annotateFileName;
        this.fileNameHeader = fileNameHeader;
        updateTailFiles(skipToEnd);

        logger.info("Updating position from position file: " + positionFilePath);
        loadPositionFile(positionFilePath);
    }

    /**
     * Load a position file which has the last read position of each file.
     * If the position file exists, update tailFiles mapping.
     */
    public void loadPositionFile(String filePath) {
        Long inode, pos;
        String path;
        FileReader fr = null;
        JsonReader jr = null;
        try {
            fr = new FileReader(filePath);
            jr = new JsonReader(fr);
            jr.beginArray();
            while (jr.hasNext()) {
                inode = null;
                pos = null;
                path = null;
                jr.beginObject();
                while (jr.hasNext()) {
                    switch (jr.nextName()) {
                        case "inode":
                            inode = jr.nextLong();
                            break;
                        case "pos":
                            pos = jr.nextLong();
                            break;
                        case "file":
                            path = jr.nextString();
                            break;
                    }
                }
                jr.endObject();

                for (Object v : Arrays.asList(inode, pos, path)) {
                    Preconditions.checkNotNull(v, "Detected missing value in position file. "
                            + "inode: " + inode + ", pos: " + pos + ", path: " + path);
                }
                TailFile tf = tailFiles.get(inode);
                if (tf != null && tf.updatePos(path, inode, pos)) {
                    tailFiles.put(inode, tf);
                } else {
                    logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos);
                }
            }
            jr.endArray();
        } catch (FileNotFoundException e) {
            logger.info("File not found: " + filePath + ", not updating position");
        } catch (IOException e) {
            logger.error("Failed loading positionFile: " + filePath, e);
        } finally {
            try {
                if (fr != null) fr.close();
                if (jr != null) jr.close();
            } catch (IOException e) {
                logger.error("Error: " + e.getMessage(), e);
            }
        }
    }

    public Map<Long, TailFile> getTailFiles() {
        return tailFiles;
    }

    public void setCurrentFile(TailFile currentFile) {
        this.currentFile = currentFile;
    }

    @Override
    public Event readEvent() throws IOException {
        List<Event> events = readEvents(1);
        if (events.isEmpty()) {
            return null;
        }
        return events.get(0);
    }

    @Override
    public List<Event> readEvents(int numEvents) throws IOException {
        return readEvents(numEvents, false);
    }

    @VisibleForTesting
    public List<Event> readEvents(TailFile tf, int numEvents) throws IOException {
        setCurrentFile(tf);
        return readEvents(numEvents, true);
    }

    public List<Event> readEvents(int numEvents, boolean backoffWithoutNL)
            throws IOException {
        if (!committed) {
            if (currentFile == null) {
                throw new IllegalStateException("current file does not exist. " + currentFile.getPath());
            }
            logger.info("Last read was never committed - resetting position");
            long lastPos = currentFile.getPos();
            currentFile.updateFilePos(lastPos);
        }
        List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
        if (events.isEmpty()) {
            return events;
        }

        Map<String, String> headers = currentFile.getHeaders();
        if (annotateFileName || (headers != null && !headers.isEmpty())) {
            for (Event event : events) {
                if (headers != null && !headers.isEmpty()) {
                    event.getHeaders().putAll(headers);
                }
                if (annotateFileName) {
                    event.getHeaders().put(fileNameHeader, currentFile.getPath());
                }
            }
        }
        committed = false;
        return events;
    }

    @Override
    public void close() throws IOException {
        for (TailFile tf : tailFiles.values()) {
            if (tf.getRaf() != null) tf.getRaf().close();
        }
    }

    /** Commit the last lines which were read. */
    @Override
    public void commit() throws IOException {
        if (!committed && currentFile != null) {
            long pos = currentFile.getLineReadPos();
            currentFile.setPos(pos);
            currentFile.setLastUpdated(updateTime);
            committed = true;
        }
    }

    /**
     * Update tailFiles mapping if a new file is created or appends are detected
     * to the existing file.
     */
    public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
        updateTime = System.currentTimeMillis();
        List<Long> updatedInodes = Lists.newArrayList();

        for (TaildirMatcher taildir : taildirCache) {
            Map<String, String> headers = headerTable.row(taildir.getFileGroup());

            for (File f : taildir.getMatchingFiles()) {
                long inode;
                try {
                    inode = getInode(f);
                } catch (NoSuchFileException e) {
                    logger.info("File has been deleted in the meantime: " + e.getMessage());
                    continue;
                }
                TailFile tf = tailFiles.get(inode);
                if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
                    long startPos = skipToEnd ? f.length() : 0;
                    tf = openFile(f, headers, inode, startPos);
                } else {
                    boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
                    if (updated) {
                        if (tf.getRaf() == null) {
                            tf = openFile(f, headers, inode, tf.getPos());
                        }
                        if (f.length() < tf.getPos()) {
                            logger.info("Pos " + tf.getPos() + " is larger than file size! "
                                    + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
                            tf.updatePos(tf.getPath(), inode, 0);
                        }
                    }
                    tf.setNeedTail(updated);
                }
                tailFiles.put(inode, tf);
                updatedInodes.add(inode);
            }
        }
        return updatedInodes;
    }

    public List<Long> updateTailFiles() throws IOException {
        return updateTailFiles(false);
    }


    private long getInode(File file) throws IOException {
//        long inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
//        return inode;
        long inode;
        if (OS_NAME.contains("windows")) {
            inode = Long.parseLong(WinFileUtil.getFileId(file.toPath().toString()));
        } else {
            inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
        }
        return inode;
    }

    private TailFile openFile(File file, Map<String, String> headers, long inode, long pos) {
        try {
            logger.info("Opening file: " + file + ", inode: " + inode + ", pos: " + pos);
            return new TailFile(file, headers, inode, pos);
        } catch (IOException e) {
            throw new FlumeException("Failed opening file: " + file, e);
        }
    }

    /**
     * Special builder class for ReliableTaildirEventReader
     */
    public static class Builder {
        private Map<String, String> filePaths;
        private Table<String, String, String> headerTable;
        private String positionFilePath;
        private boolean skipToEnd;
        private boolean addByteOffset;
        private boolean cachePatternMatching;
        private Boolean annotateFileName =
                TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER;
        private String fileNameHeader =
                TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;

        public Builder filePaths(Map<String, String> filePaths) {
            this.filePaths = filePaths;
            return this;
        }

        public Builder headerTable(Table<String, String, String> headerTable) {
            this.headerTable = headerTable;
            return this;
        }

        public Builder positionFilePath(String positionFilePath) {
            this.positionFilePath = positionFilePath;
            return this;
        }

        public Builder skipToEnd(boolean skipToEnd) {
            this.skipToEnd = skipToEnd;
            return this;
        }

        public Builder addByteOffset(boolean addByteOffset) {
            this.addByteOffset = addByteOffset;
            return this;
        }

        public Builder cachePatternMatching(boolean cachePatternMatching) {
            this.cachePatternMatching = cachePatternMatching;
            return this;
        }

        public Builder annotateFileName(boolean annotateFileName) {
            this.annotateFileName = annotateFileName;
            return this;
        }

        public Builder fileNameHeader(String fileNameHeader) {
            this.fileNameHeader = fileNameHeader;
            return this;
        }

        public ReliableTaildirEventReader build() throws IOException {
            return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
                    addByteOffset, cachePatternMatching,
                    annotateFileName, fileNameHeader);
        }
    }

}

TaildirMatcher.java代码

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.source.wintaildir;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * Identifies and caches the files matched by single file pattern for {@code TAILDIR} source.
 * <p></p>
 * Since file patterns only apply to the fileNames and not the parent dictionaries, this
 * implementation checks the parent directory for modification (additional or removed files
 * update modification time of parent dir)
 * If no modification happened to the parent dir that means the underlying files could only be
 * written to but no need to rerun the pattern matching on fileNames.
 * <p></p>
 * This implementation provides lazy caching or no caching. Instances of this class keep the
 * result file list from the last successful execution of {@linkplain #getMatchingFiles()}
 * function invocation, and may serve the content without hitting the FileSystem for performance
 * optimization.
 * <p></p>
 * <b>IMPORTANT:</b> It is assumed that the hosting system provides at least second granularity
 * for both {@code System.currentTimeMillis()} and {@code File.lastModified()}. Also
 * that system clock is used for file system timestamps. If it is not the case then configure it
 * as uncached. Class is solely for package only usage. Member functions are not thread safe.
 *
 * @see TaildirSource
 * @see ReliableTaildirEventReader
 * @see TaildirSourceConfigurationConstants
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class TaildirMatcher {
    private static final Logger logger = LoggerFactory.getLogger(TaildirMatcher.class);
    private static final FileSystem FS = FileSystems.getDefault();

    // flag from configuration to switch off caching completely
    private final boolean cachePatternMatching;
    // id from configuration
    private final String fileGroup;
    // plain string of the desired files from configuration
    private final String filePattern;

    // directory monitored for changes
    private final File parentDir;
    // cached instance for filtering files based on filePattern
    private final DirectoryStream.Filter<Path> fileFilter;

    // system time in milliseconds, stores the last modification time of the
    // parent directory seen by the last check, rounded to seconds
    // initial value is used in first check only when it will be replaced instantly
    // (system time is positive)
    private long lastSeenParentDirMTime = -1;
    // system time in milliseconds, time of the last check, rounded to seconds
    // initial value is used in first check only when it will be replaced instantly
    // (system time is positive)
    private long lastCheckedTime = -1;
    // cached content, files which matched the pattern within the parent directory
    private List<File> lastMatchedFiles = Lists.newArrayList();

    /**
     * Package accessible constructor. From configuration context it represents a single
     * <code>filegroup</code> and encapsulates the corresponding <code>filePattern</code>.
     * <code>filePattern</code> consists of two parts: first part has to be a valid path to an
     * existing parent directory, second part has to be a valid regex
     * {@link java.util.regex.Pattern} that match any non-hidden file names within parent directory
     * . A valid example for filePattern is <code>/dir0/dir1/.*</code> given
     * <code>/dir0/dir1</code> is an existing directory structure readable by the running user.
     * <p></p>
     * An instance of this class is created for each fileGroup
     *
     * @param fileGroup arbitrary name of the group given by the config
     * @param filePattern parent directory plus regex pattern. No wildcards are allowed in directory
     *                    name
     * @param cachePatternMatching default true, recommended in every setup especially with huge
     *                             parent directories. Don't set when local system clock is not used
     *                             for stamping mtime (eg: remote filesystems)
     * @see TaildirSourceConfigurationConstants
     */
    TaildirMatcher(String fileGroup, String filePattern, boolean cachePatternMatching) {
        // store whatever came from configuration
        this.fileGroup = fileGroup;
        this.filePattern = filePattern;
        this.cachePatternMatching = cachePatternMatching;

        // calculate final members
        File f = new File(filePattern);
        this.parentDir = f.getParentFile();
        String regex = f.getName();
        final PathMatcher matcher = FS.getPathMatcher("regex:" + regex);
        this.fileFilter = new DirectoryStream.Filter<Path>() {
            @Override
            public boolean accept(Path entry) throws IOException {
                return matcher.matches(entry.getFileName()) && !Files.isDirectory(entry);
            }
        };

        // sanity check
        Preconditions.checkState(parentDir.exists(),
                "Directory does not exist: " + parentDir.getAbsolutePath());
    }

    /**
     * Lists those files within the parentDir that match regex pattern passed in during object
     * instantiation. Designed for frequent periodic invocation
     * {@link org.apache.flume.source.PollableSourceRunner}.
     * <p></p>
     * Based on the modification of the parentDir this function may trigger cache recalculation by
     * calling {@linkplain #getMatchingFilesNoCache()} or
     * return the value stored in {@linkplain #lastMatchedFiles}.
     * Parentdir is allowed to be a symbolic link.
     * <p></p>
     * Files returned by this call are weakly consistent (see {@link DirectoryStream}).
     * It does not freeze the directory while iterating,
     * so it may (or may not) reflect updates to the directory that occur during the call,
     * In which case next call
     * will return those files (as mtime is increasing it won't hit cache but trigger recalculation).
     * It is guaranteed that invocation reflects every change which was observable at the time of
     * invocation.
     * <p></p>
     * Matching file list recalculation is triggered when caching was turned off or
     * if mtime is greater than the previously seen mtime
     * (including the case of cache hasn't been calculated before).
     * Additionally if a constantly updated directory was configured as parentDir
     * then multiple changes to the parentDir may happen
     * within the same second so in such case (assuming at least second granularity of reported mtime)
     * it is impossible to tell whether a change of the dir happened before the check or after
     * (unless the check happened after that second).
     * Having said that implementation also stores system time of the previous invocation and previous
     * invocation has to happen strictly after the current mtime to avoid further cache refresh
     * (because then it is guaranteed that previous invocation resulted in valid cache content).
     * If system clock hasn't passed the second of
     * the current mtime then logic expects more changes as well
     * (since it cannot be sure that there won't be any further changes still in that second
     * and it would like to avoid data loss in first place)
     * hence it recalculates matching files. If system clock finally
     * passed actual mtime then a subsequent invocation guarantees that it picked up every
     * change from the passed second so
     * any further invocations can be served from cache associated with that second
     * (given mtime is not updated again).
     *
     * @return List of files matching the pattern sorted by last modification time. No recursion.
     * No directories. If nothing matches then returns an empty list. If I/O issue occurred then
     * returns the list collected to the point when exception was thrown.
     *
     * @see #getMatchingFilesNoCache()
     */
    List<File> getMatchingFiles() {
        long now = TimeUnit.SECONDS.toMillis(
                TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()));
        long currentParentDirMTime = parentDir.lastModified();
        List<File> result;

        // calculate matched files if
        // - we don't want to use cache (recalculate every time) OR
        // - directory was clearly updated after the last check OR
        // - last mtime change wasn't already checked for sure
        //   (system clock hasn't passed that second yet)
        if (!cachePatternMatching ||
                lastSeenParentDirMTime < currentParentDirMTime ||
                !(currentParentDirMTime < lastCheckedTime)) {
            lastMatchedFiles = sortByLastModifiedTime(getMatchingFilesNoCache());
            lastSeenParentDirMTime = currentParentDirMTime;
            lastCheckedTime = now;
        }

        return lastMatchedFiles;
    }

    /**
     * Provides the actual files within the parentDir which
     * files are matching the regex pattern. Each invocation uses {@link DirectoryStream}
     * to identify matching files.
     *
     * Files returned by this call are weakly consistent (see {@link DirectoryStream}).
     * It does not freeze the directory while iterating, so it may (or may not) reflect updates
     * to the directory that occur during the call. In which case next call will return those files.
     *
     * @return List of files matching the pattern unsorted. No recursion. No directories.
     * If nothing matches then returns an empty list. If I/O issue occurred then returns the list
     * collected to the point when exception was thrown.
     *
     * @see DirectoryStream
     * @see DirectoryStream.Filter
     */
    private List<File> getMatchingFilesNoCache() {
        List<File> result = Lists.newArrayList();
        try (DirectoryStream<Path> stream = Files.newDirectoryStream(parentDir.toPath(), fileFilter)) {
            for (Path entry : stream) {
                result.add(entry.toFile());
            }
        } catch (IOException e) {
            logger.error("I/O exception occurred while listing parent directory. " +
                    "Files already matched will be returned. " + parentDir.toPath(), e);
        }
        return result;
    }

    /**
     * Utility function to sort matched files based on last modification time.
     * Sorting itself use only a snapshot of last modification times captured before the sorting
     * to keep the number of stat system calls to the required minimum.
     *
     * @param files list of files in any order
     * @return sorted list
     */
    private static List<File> sortByLastModifiedTime(List<File> files) {
        final HashMap<File, Long> lastModificationTimes = new HashMap<File, Long>(files.size());
        for (File f: files) {
            lastModificationTimes.put(f, f.lastModified());
        }
        Collections.sort(files, new Comparator<File>() {
            @Override
            public int compare(File o1, File o2) {
                return lastModificationTimes.get(o1).compareTo(lastModificationTimes.get(o2));
            }
        });

        return files;
    }

    @Override
    public String toString() {
        return "{" +
                "filegroup='" + fileGroup + '\'' +
                ", filePattern='" + filePattern + '\'' +
                ", cached=" + cachePatternMatching +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        TaildirMatcher that = (TaildirMatcher) o;

        return fileGroup.equals(that.fileGroup);

    }

    @Override
    public int hashCode() {
        return fileGroup.hashCode();
    }

    public String getFileGroup() {
        return fileGroup;
    }

}

TaildirSource.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.flume.source.wintaildir;

import static org.apache.flume.source.wintaildir.TaildirSourceConfigurationConstants.*;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.PollableSourceConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;

public class TaildirSource extends AbstractSource implements
        PollableSource, Configurable, BatchSizeSupported {

    private static final Logger logger = LoggerFactory.getLogger(TaildirSource.class);

    private Map<String, String> filePaths;
    private Table<String, String, String> headerTable;
    private int batchSize;
    private String positionFilePath;
    private boolean skipToEnd;
    private boolean byteOffsetHeader;

    private SourceCounter sourceCounter;
    private ReliableTaildirEventReader reader;
    private ScheduledExecutorService idleFileChecker;
    private ScheduledExecutorService positionWriter;
    private int retryInterval = 1000;
    private int maxRetryInterval = 5000;
    private int idleTimeout;
    private int checkIdleInterval = 5000;
    private int writePosInitDelay = 5000;
    private int writePosInterval;
    private boolean cachePatternMatching;

    private List<Long> existingInodes = new CopyOnWriteArrayList<Long>();
    private List<Long> idleInodes = new CopyOnWriteArrayList<Long>();
    private Long backoffSleepIncrement;
    private Long maxBackOffSleepInterval;
    private boolean fileHeader;
    private String fileHeaderKey;
    private Long maxBatchCount;

    @Override
    public synchronized void start() {
        logger.info("{} TaildirSource source starting with directory: {}", getName(), filePaths);
        try {
            reader = new ReliableTaildirEventReader.Builder()
                    .filePaths(filePaths)
                    .headerTable(headerTable)
                    .positionFilePath(positionFilePath)
                    .skipToEnd(skipToEnd)
                    .addByteOffset(byteOffsetHeader)
                    .cachePatternMatching(cachePatternMatching)
                    .annotateFileName(fileHeader)
                    .fileNameHeader(fileHeaderKey)
                    .build();
        } catch (IOException e) {
            throw new FlumeException("Error instantiating ReliableTaildirEventReader", e);
        }
        idleFileChecker = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("idleFileChecker").build());
        idleFileChecker.scheduleWithFixedDelay(new idleFileCheckerRunnable(),
                idleTimeout, checkIdleInterval, TimeUnit.MILLISECONDS);

        positionWriter = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("positionWriter").build());
        positionWriter.scheduleWithFixedDelay(new PositionWriterRunnable(),
                writePosInitDelay, writePosInterval, TimeUnit.MILLISECONDS);

        super.start();
        logger.debug("TaildirSource started");
        sourceCounter.start();
    }

    @Override
    public synchronized void stop() {
        try {
            super.stop();
            ExecutorService[] services = {idleFileChecker, positionWriter};
            for (ExecutorService service : services) {
                service.shutdown();
                if (!service.awaitTermination(1, TimeUnit.SECONDS)) {
                    service.shutdownNow();
                }
            }
            // write the last position
            writePosition();
            reader.close();
        } catch (InterruptedException e) {
            logger.info("Interrupted while awaiting termination", e);
        } catch (IOException e) {
            logger.info("Failed: " + e.getMessage(), e);
        }
        sourceCounter.stop();
        logger.info("Taildir source {} stopped. Metrics: {}", getName(), sourceCounter);
    }

    @Override
    public String toString() {
        return String.format("Taildir source: { positionFile: %s, skipToEnd: %s, "
                        + "byteOffsetHeader: %s, idleTimeout: %s, writePosInterval: %s }",
                positionFilePath, skipToEnd, byteOffsetHeader, idleTimeout, writePosInterval);
    }

    @Override
    public synchronized void configure(Context context) {
        String fileGroups = context.getString(FILE_GROUPS);
        Preconditions.checkState(fileGroups != null, "Missing param: " + FILE_GROUPS);

        filePaths = selectByKeys(context.getSubProperties(FILE_GROUPS_PREFIX),
                fileGroups.split("\\s+"));
        Preconditions.checkState(!filePaths.isEmpty(),
                "Mapping for tailing files is empty or invalid: '" + FILE_GROUPS_PREFIX + "'");

        String homePath = System.getProperty("user.home").replace('\\', '/');
        positionFilePath = context.getString(POSITION_FILE, homePath + DEFAULT_POSITION_FILE);
        Path positionFile = Paths.get(positionFilePath);
        try {
            Files.createDirectories(positionFile.getParent());
        } catch (IOException e) {
            throw new FlumeException("Error creating positionFile parent directories", e);
        }
        headerTable = getTable(context, HEADERS_PREFIX);
        batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
        skipToEnd = context.getBoolean(SKIP_TO_END, DEFAULT_SKIP_TO_END);
        byteOffsetHeader = context.getBoolean(BYTE_OFFSET_HEADER, DEFAULT_BYTE_OFFSET_HEADER);
        idleTimeout = context.getInteger(IDLE_TIMEOUT, DEFAULT_IDLE_TIMEOUT);
        writePosInterval = context.getInteger(WRITE_POS_INTERVAL, DEFAULT_WRITE_POS_INTERVAL);
        cachePatternMatching = context.getBoolean(CACHE_PATTERN_MATCHING,
                DEFAULT_CACHE_PATTERN_MATCHING);

        backoffSleepIncrement = context.getLong(PollableSourceConstants.BACKOFF_SLEEP_INCREMENT,
                PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
        maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
                PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
        fileHeader = context.getBoolean(FILENAME_HEADER,
                DEFAULT_FILE_HEADER);
        fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
                DEFAULT_FILENAME_HEADER_KEY);
        maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);
        if (maxBatchCount <= 0) {
            maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
            logger.warn("Invalid maxBatchCount specified, initializing source "
                    + "default maxBatchCount of {}", maxBatchCount);
        }

        if (sourceCounter == null) {
            sourceCounter = new SourceCounter(getName());
        }
    }

    @Override
    public long getBatchSize() {
        return batchSize;
    }

    private Map<String, String> selectByKeys(Map<String, String> map, String[] keys) {
        Map<String, String> result = Maps.newHashMap();
        for (String key : keys) {
            if (map.containsKey(key)) {
                result.put(key, map.get(key));
            }
        }
        return result;
    }

    private Table<String, String, String> getTable(Context context, String prefix) {
        Table<String, String, String> table = HashBasedTable.create();
        for (Entry<String, String> e : context.getSubProperties(prefix).entrySet()) {
            String[] parts = e.getKey().split("\\.", 2);
            table.put(parts[0], parts[1], e.getValue());
        }
        return table;
    }

    @VisibleForTesting
    protected SourceCounter getSourceCounter() {
        return sourceCounter;
    }

    @Override
    public Status process() {
        Status status = Status.BACKOFF;
        try {
            existingInodes.clear();
            existingInodes.addAll(reader.updateTailFiles());
            for (long inode : existingInodes) {
                TailFile tf = reader.getTailFiles().get(inode);
                if (tf.needTail()) {
                    boolean hasMoreLines = tailFileProcess(tf, true);
                    if (hasMoreLines) {
                        status = Status.READY;
                    }
                }
            }
            closeTailFiles();
        } catch (Throwable t) {
            logger.error("Unable to tail files", t);
            sourceCounter.incrementEventReadFail();
            status = Status.BACKOFF;
        }
        return status;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return backoffSleepIncrement;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return maxBackOffSleepInterval;
    }

    private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL)
            throws IOException, InterruptedException {
        long batchCount = 0;
        while (true) {
            reader.setCurrentFile(tf);
            List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
            if (events.isEmpty()) {
                return false;
            }
            sourceCounter.addToEventReceivedCount(events.size());
            sourceCounter.incrementAppendBatchReceivedCount();
            try {
                getChannelProcessor().processEventBatch(events);
                reader.commit();
            } catch (ChannelException ex) {
                logger.warn("The channel is full or unexpected failure. " +
                        "The source will try again after " + retryInterval + " ms");
                sourceCounter.incrementChannelWriteFail();
                TimeUnit.MILLISECONDS.sleep(retryInterval);
                retryInterval = retryInterval << 1;
                retryInterval = Math.min(retryInterval, maxRetryInterval);
                continue;
            }
            retryInterval = 1000;
            sourceCounter.addToEventAcceptedCount(events.size());
            sourceCounter.incrementAppendBatchAcceptedCount();
            if (events.size() < batchSize) {
                logger.debug("The events taken from " + tf.getPath() + " is less than " + batchSize);
                return false;
            }
            if (++batchCount >= maxBatchCount) {
                logger.debug("The batches read from the same file is larger than " + maxBatchCount );
                return true;
            }
        }
    }

    private void closeTailFiles() throws IOException, InterruptedException {
        for (long inode : idleInodes) {
            TailFile tf = reader.getTailFiles().get(inode);
            if (tf.getRaf() != null) { // when file has not closed yet
                tailFileProcess(tf, false);
                tf.close();
                logger.info("Closed file: " + tf.getPath() + ", inode: " + inode + ", pos: " + tf.getPos());
            }
        }
        idleInodes.clear();
    }

    /**
     * Runnable class that checks whether there are files which should be closed.
     */
    private class idleFileCheckerRunnable implements Runnable {
        @Override
        public void run() {
            try {
                long now = System.currentTimeMillis();
                for (TailFile tf : reader.getTailFiles().values()) {
                    if (tf.getLastUpdated() + idleTimeout < now && tf.getRaf() != null) {
                        idleInodes.add(tf.getInode());
                    }
                }
            } catch (Throwable t) {
                logger.error("Uncaught exception in IdleFileChecker thread", t);
                sourceCounter.incrementGenericProcessingFail();
            }
        }
    }

    /**
     * Runnable class that writes a position file which has the last read position
     * of each file.
     */
    private class PositionWriterRunnable implements Runnable {
        @Override
        public void run() {
            writePosition();
        }
    }

    private void writePosition() {
        File file = new File(positionFilePath);
        FileWriter writer = null;
        try {
            writer = new FileWriter(file);
            if (!existingInodes.isEmpty()) {
                String json = toPosInfoJson();
                writer.write(json);
            }
        } catch (Throwable t) {
            logger.error("Failed writing positionFile", t);
            sourceCounter.incrementGenericProcessingFail();
        } finally {
            try {
                if (writer != null) writer.close();
            } catch (IOException e) {
                logger.error("Error: " + e.getMessage(), e);
                sourceCounter.incrementGenericProcessingFail();
            }
        }
    }

    private String toPosInfoJson() {
        @SuppressWarnings("rawtypes")
        List<Map> posInfos = Lists.newArrayList();
        for (Long inode : existingInodes) {
            TailFile tf = reader.getTailFiles().get(inode);
            posInfos.add(ImmutableMap.of("inode", inode, "pos", tf.getPos(), "file", tf.getPath()));
        }
        return new Gson().toJson(posInfos);
    }
}

TaildirSourceConfigurationConstants.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with this
 * work for additional information regarding copyright ownership. The ASF
 * licenses this file to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package org.apache.flume.source.wintaildir;

public class TaildirSourceConfigurationConstants {
    /** Mapping for tailing file groups. */
    public static final String FILE_GROUPS = "filegroups";
    public static final String FILE_GROUPS_PREFIX = FILE_GROUPS + ".";

    /** Mapping for putting headers to events grouped by file groups. */
    public static final String HEADERS_PREFIX = "headers.";

    /** Path of position file. */
    public static final String POSITION_FILE = "positionFile";
    public static final String DEFAULT_POSITION_FILE = "/.flume/taildir_position.json";

    /** What size to batch with before sending to ChannelProcessor. */
    public static final String BATCH_SIZE = "batchSize";
    public static final int DEFAULT_BATCH_SIZE = 100;

    /** Whether to skip the position to EOF in the case of files not written on the position file. */
    public static final String SKIP_TO_END = "skipToEnd";
    public static final boolean DEFAULT_SKIP_TO_END = false;

    /** Time (ms) to close idle files. */
    public static final String IDLE_TIMEOUT = "idleTimeout";
    public static final int DEFAULT_IDLE_TIMEOUT = 120000;

    /** Interval time (ms) to write the last position of each file on the position file. */
    public static final String WRITE_POS_INTERVAL = "writePosInterval";
    public static final int DEFAULT_WRITE_POS_INTERVAL = 3000;

    /** Whether to add the byte offset of a tailed line to the header */
    public static final String BYTE_OFFSET_HEADER = "byteOffsetHeader";
    public static final String BYTE_OFFSET_HEADER_KEY = "byteoffset";
    public static final boolean DEFAULT_BYTE_OFFSET_HEADER = false;

    /** Whether to cache the list of files matching the specified file patterns till parent directory
     * is modified.
     */
    public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching";
    public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true;

    /** Header in which to put absolute path filename. */
    public static final String FILENAME_HEADER_KEY = "fileHeaderKey";
    public static final String DEFAULT_FILENAME_HEADER_KEY = "file";

    /** Whether to include absolute path filename in a header. */
    public static final String FILENAME_HEADER = "fileHeader";
    public static final boolean DEFAULT_FILE_HEADER = false;

    /** The max number of batch reads from a file in one loop */
    public static final String MAX_BATCH_COUNT = "maxBatchCount";
    public static final Long DEFAULT_MAX_BATCH_COUNT = Long.MAX_VALUE;
}

TailFile.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.flume.source.wintaildir;

import com.google.common.collect.Lists;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.List;
import java.util.Map;

import static org.apache.flume.source.wintaildir.TaildirSourceConfigurationConstants.BYTE_OFFSET_HEADER_KEY;

public class TailFile {
    private static final Logger logger = LoggerFactory.getLogger(TailFile.class);

    private static final byte BYTE_NL = (byte) 10;
    private static final byte BYTE_CR = (byte) 13;

    private static final int BUFFER_SIZE = 8192;
    private static final int NEED_READING = -1;

    private RandomAccessFile raf;
    private final String path;
    private final long inode;
    private long pos;
    private long lastUpdated;
    private boolean needTail;
    private final Map<String, String> headers;
    private byte[] buffer;
    private byte[] oldBuffer;
    private int bufferPos;
    private long lineReadPos;

    public TailFile(File file, Map<String, String> headers, long inode, long pos)
            throws IOException {
        this.raf = new RandomAccessFile(file, "r");
        if (pos > 0) {
            raf.seek(pos);
            lineReadPos = pos;
        }
        this.path = file.getAbsolutePath();
        this.inode = inode;
        this.pos = pos;
        this.lastUpdated = 0L;
        this.needTail = true;
        this.headers = headers;
        this.oldBuffer = new byte[0];
        this.bufferPos = NEED_READING;
    }

    public RandomAccessFile getRaf() {
        return raf;
    }

    public String getPath() {
        return path;
    }

    public long getInode() {
        return inode;
    }

    public long getPos() {
        return pos;
    }

    public long getLastUpdated() {
        return lastUpdated;
    }

    public boolean needTail() {
        return needTail;
    }

    public Map<String, String> getHeaders() {
        return headers;
    }

    public long getLineReadPos() {
        return lineReadPos;
    }

    public void setPos(long pos) {
        this.pos = pos;
    }

    public void setLastUpdated(long lastUpdated) {
        this.lastUpdated = lastUpdated;
    }

    public void setNeedTail(boolean needTail) {
        this.needTail = needTail;
    }

    public void setLineReadPos(long lineReadPos) {
        this.lineReadPos = lineReadPos;
    }

    public boolean updatePos(String path, long inode, long pos) throws IOException {
        if (this.inode == inode && this.path.equals(path)) {
            setPos(pos);
            updateFilePos(pos);
            logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
            return true;
        }
        return false;
    }
    public void updateFilePos(long pos) throws IOException {
        raf.seek(pos);
        lineReadPos = pos;
        bufferPos = NEED_READING;
        oldBuffer = new byte[0];
    }


    public List<Event> readEvents(int numEvents, boolean backoffWithoutNL,
                                  boolean addByteOffset) throws IOException {
        List<Event> events = Lists.newLinkedList();
        for (int i = 0; i < numEvents; i++) {
            Event event = readEvent(backoffWithoutNL, addByteOffset);
            if (event == null) {
                break;
            }
            events.add(event);
        }
        return events;
    }

    private Event readEvent(boolean backoffWithoutNL, boolean addByteOffset) throws IOException {
        Long posTmp = getLineReadPos();
        LineResult line = readLine();
        if (line == null) {
            return null;
        }
        if (backoffWithoutNL && !line.lineSepInclude) {
            logger.info("Backing off in file without newline: "
                    + path + ", inode: " + inode + ", pos: " + raf.getFilePointer());
            updateFilePos(posTmp);
            return null;
        }
        Event event = EventBuilder.withBody(line.line);
        if (addByteOffset == true) {
            event.getHeaders().put(BYTE_OFFSET_HEADER_KEY, posTmp.toString());
        }
        return event;
    }

    private void readFile() throws IOException {
        if ((raf.length() - raf.getFilePointer()) < BUFFER_SIZE) {
            buffer = new byte[(int) (raf.length() - raf.getFilePointer())];
        } else {
            buffer = new byte[BUFFER_SIZE];
        }
        raf.read(buffer, 0, buffer.length);
        bufferPos = 0;
    }

    private byte[] concatByteArrays(byte[] a, int startIdxA, int lenA,
                                    byte[] b, int startIdxB, int lenB) {
        byte[] c = new byte[lenA + lenB];
        System.arraycopy(a, startIdxA, c, 0, lenA);
        System.arraycopy(b, startIdxB, c, lenA, lenB);
        return c;
    }

    public LineResult readLine() throws IOException {
        LineResult lineResult = null;
        while (true) {
            if (bufferPos == NEED_READING) {
                if (raf.getFilePointer() < raf.length()) {
                    readFile();
                } else {
                    if (oldBuffer.length > 0) {
                        lineResult = new LineResult(false, oldBuffer);
                        oldBuffer = new byte[0];
                        setLineReadPos(lineReadPos + lineResult.line.length);
                    }
                    break;
                }
            }
            for (int i = bufferPos; i < buffer.length; i++) {
                if (buffer[i] == BYTE_NL) {
                    int oldLen = oldBuffer.length;
                    // Don't copy last byte(NEW_LINE)
                    int lineLen = i - bufferPos;
                    // For windows, check for CR
                    if (i > 0 && buffer[i - 1] == BYTE_CR) {
                        lineLen -= 1;
                    } else if (oldBuffer.length > 0 && oldBuffer[oldBuffer.length - 1] == BYTE_CR) {
                        oldLen -= 1;
                    }
                    lineResult = new LineResult(true,
                            concatByteArrays(oldBuffer, 0, oldLen, buffer, bufferPos, lineLen));
                    setLineReadPos(lineReadPos + (oldBuffer.length + (i - bufferPos + 1)));
                    oldBuffer = new byte[0];
                    if (i + 1 < buffer.length) {
                        bufferPos = i + 1;
                    } else {
                        bufferPos = NEED_READING;
                    }
                    break;
                }
            }
            if (lineResult != null) {
                break;
            }
            // NEW_LINE not showed up at the end of the buffer
            oldBuffer = concatByteArrays(oldBuffer, 0, oldBuffer.length,
                    buffer, bufferPos, buffer.length - bufferPos);
            bufferPos = NEED_READING;
        }
        return lineResult;
    }

    public void close() {
        try {
            raf.close();
            raf = null;
            long now = System.currentTimeMillis();
            setLastUpdated(now);
        } catch (IOException e) {
            logger.error("Failed closing file: " + path + ", inode: " + inode, e);
        }
    }

    private class LineResult {
        final boolean lineSepInclude;
        final byte[] line;

        public LineResult(boolean lineSepInclude, byte[] line) {
            super();
            this.lineSepInclude = lineSepInclude;
            this.line = line;
        }
    }
}

build出新的jar包,将新的jar包放入apache-flume-1.9.0-bin/lib中,

参考文章https://blog.csdn.net/u012373815/article/details/62238079
https://stackoverflow.com/questions/8309199/get-unique-file-id-in-windows-with-java

上一篇下一篇

猜你喜欢

热点阅读