/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hadoop;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.configuration.DefaultSchedule;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractHadoopProcessor;
import org.apache.nifi.processors.hadoop.FetchHDFS;
import org.apache.nifi.processors.hadoop.GetHDFS;
import org.apache.nifi.processors.hadoop.PutHDFS;
import org.apache.nifi.processors.hadoop.util.FileStatusIterable;
import org.apache.nifi.processors.hadoop.util.FileStatusManager;
import org.apache.nifi.processors.hadoop.util.FilterMode;
import org.apache.nifi.processors.hadoop.util.writer.HadoopFileStatusWriter;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.serialization.RecordSetWriterFactory;

@PrimaryNodeOnly
@TriggerSerially
@TriggerWhenEmpty
@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"hadoop", "HCFS", "HDFS", "get", "list", "ingest", "source", "filesystem"})
@SeeAlso(value={GetHDFS.class, FetchHDFS.class, PutHDFS.class})
@CapabilityDescription(value="Retrieves a listing of files from HDFS. For each file that is listed in HDFS, this processor creates a FlowFile that represents the HDFS file to be fetched in conjunction with FetchHDFS. This Processor is designed to run on Primary Node only in a cluster. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating all of the data. Unlike GetHDFS, this Processor does not delete any data from HDFS.")
@WritesAttributes(value={@WritesAttribute(attribute="filename", description="The name of the file that was read from HDFS."), @WritesAttribute(attribute="path", description="The path is set to the absolute path of the file's directory on HDFS. For example, if the Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to \"./\". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to \"/tmp/abc/1/2/3\"."), @WritesAttribute(attribute="hdfs.owner", description="The user that owns the file in HDFS"), @WritesAttribute(attribute="hdfs.group", description="The group that owns the file in HDFS"), @WritesAttribute(attribute="hdfs.lastModified", description="The timestamp of when the file in HDFS was last modified, as milliseconds since midnight Jan 1, 1970 UTC"), @WritesAttribute(attribute="hdfs.length", description="The number of bytes in the file in HDFS"), @WritesAttribute(attribute="hdfs.replication", description="The number of HDFS replicas for hte file"), @WritesAttribute(attribute="hdfs.permissions", description="The permissions for the file in HDFS. This is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example rw-rw-r--")})
@Stateful(scopes={Scope.CLUSTER}, description="After performing a listing of HDFS files, the latest timestamp of all the files listed is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run, without having to store all of the actual filenames/paths which could lead to performance problems. State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected, the new node can pick up where the previous node left off, without duplicating the data.")
@DefaultSchedule(strategy=SchedulingStrategy.TIMER_DRIVEN, period="1 min")
public class ListHDFS
extends AbstractHadoopProcessor {
    private static final String NON_HIDDEN_FILES_REGEX = "[^\\.].*";
    private static final String HDFS_ATTRIBUTE_PREFIX = "hdfs";
    public static final PropertyDescriptor RECURSE_SUBDIRS = new PropertyDescriptor.Builder().name("Recurse Subdirectories").description("Indicates whether to list files from subdirectories of the HDFS directory").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the Record Writer to use for creating the listing. If not specified, one FlowFile will be created for each entity that is listed. If the Record Writer is specified, all entities will be written to a single FlowFile.").required(false).identifiesControllerService(RecordSetWriterFactory.class).build();
    public static final PropertyDescriptor FILE_FILTER = new PropertyDescriptor.Builder().name("File Filter").description("Only files whose names match the given regular expression will be picked up").required(true).defaultValue("[^\\.].*").addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    public static final PropertyDescriptor FILE_FILTER_MODE = new PropertyDescriptor.Builder().name("file-filter-mode").displayName("File Filter Mode").description("Determines how the regular expression in  " + FILE_FILTER.getDisplayName() + " will be used when retrieving listings.").required(true).allowableValues(FilterMode.class).defaultValue(FilterMode.FILTER_DIRECTORIES_AND_FILES.getValue()).addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR).build();
    public static final PropertyDescriptor MINIMUM_FILE_AGE = new PropertyDescriptor.Builder().name("minimum-file-age").displayName("Minimum File Age").description("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (based on last modification date) will be ignored").required(false).addValidator(StandardValidators.createTimePeriodValidator((long)0L, (TimeUnit)TimeUnit.MILLISECONDS, (long)Long.MAX_VALUE, (TimeUnit)TimeUnit.NANOSECONDS)).build();
    public static final PropertyDescriptor MAXIMUM_FILE_AGE = new PropertyDescriptor.Builder().name("maximum-file-age").displayName("Maximum File Age").description("The maximum age that a file must be in order to be pulled; any file older than this amount of time (based on last modification date) will be ignored. Minimum value is 100ms.").required(false).addValidator(StandardValidators.createTimePeriodValidator((long)100L, (TimeUnit)TimeUnit.MILLISECONDS, (long)Long.MAX_VALUE, (TimeUnit)TimeUnit.NANOSECONDS)).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles are transferred to this relationship").build();
    public static final String LEGACY_EMITTED_TIMESTAMP_KEY = "emitted.timestamp";
    public static final String LEGACY_LISTING_TIMESTAMP_KEY = "listing.timestamp";
    public static final String LATEST_TIMESTAMP_KEY = "latest.timestamp";
    public static final String LATEST_FILES_KEY = "latest.file.%d";
    private static final List<PropertyDescriptor> LIST_HDFS_PROPERTIES = Arrays.asList(DIRECTORY, RECURSE_SUBDIRS, RECORD_WRITER, FILE_FILTER, FILE_FILTER_MODE, MINIMUM_FILE_AGE, MAXIMUM_FILE_AGE);
    private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
    private Pattern fileFilterRegexPattern;
    private volatile boolean resetState = false;

    protected void preProcessConfiguration(Configuration config, ProcessContext context) {
        super.preProcessConfiguration(config, context);
        this.fileFilterRegexPattern = Pattern.compile(context.getProperty(FILE_FILTER).getValue());
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>(this.properties);
        props.addAll(LIST_HDFS_PROPERTIES);
        return props;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext context) {
        long maximumAge;
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>(super.customValidate(context));
        Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        long minimumAge = minAgeProp == null ? 0L : minAgeProp;
        long l = maximumAge = maxAgeProp == null ? Long.MAX_VALUE : maxAgeProp;
        if (minimumAge > maximumAge) {
            problems.add(new ValidationResult.Builder().valid(false).subject("ListHDFS Configuration").explanation(MINIMUM_FILE_AGE.getDisplayName() + " cannot be greater than " + MAXIMUM_FILE_AGE.getDisplayName()).build());
        }
        return problems;
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        super.onPropertyModified(descriptor, oldValue, newValue);
        if (this.isConfigurationRestored() && (descriptor.equals((Object)DIRECTORY) || descriptor.equals((Object)FILE_FILTER))) {
            this.resetState = true;
        }
    }

    @OnScheduled
    public void resetStateIfNecessary(ProcessContext context) throws IOException {
        if (this.resetState) {
            this.getLogger().debug("Property has been modified. Resetting the state values.");
            context.getStateManager().clear(Scope.CLUSTER);
            this.resetState = false;
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        ArrayList<String> latestFiles;
        long latestTimestamp;
        try {
            StateMap stateMap = session.getState(Scope.CLUSTER);
            String latestTimestampString = stateMap.get(LATEST_TIMESTAMP_KEY);
            String legacyLatestListingTimestampString = stateMap.get(LEGACY_LISTING_TIMESTAMP_KEY);
            String legacyLatestEmittedTimestampString = stateMap.get(LEGACY_EMITTED_TIMESTAMP_KEY);
            if (legacyLatestListingTimestampString != null) {
                long legacyLatestEmittedTimestamp;
                long legacyLatestListingTimestamp = Long.parseLong(legacyLatestListingTimestampString);
                latestTimestamp = legacyLatestListingTimestamp == (legacyLatestEmittedTimestamp = Long.parseLong(legacyLatestEmittedTimestampString)) ? legacyLatestListingTimestamp + 1L : legacyLatestListingTimestamp;
                latestFiles = new ArrayList();
                this.getLogger().debug("Transitioned from legacy state to new state. 'legacyLatestListingTimestamp': {}, 'legacyLatestEmittedTimeStamp': {}','latestTimestamp': {}", new Object[]{legacyLatestListingTimestamp, legacyLatestEmittedTimestamp, latestTimestamp});
            } else if (latestTimestampString != null) {
                latestTimestamp = Long.parseLong(latestTimestampString);
                latestFiles = stateMap.toMap().entrySet().stream().filter(entry -> ((String)entry.getKey()).startsWith("latest.file")).map(Map.Entry::getValue).collect(Collectors.toList());
            } else {
                latestTimestamp = 0L;
                latestFiles = new ArrayList();
            }
        }
        catch (IOException e) {
            this.getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
            context.yield();
            return;
        }
        FileSystem hdfs = this.getFileSystem();
        boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
        PathFilter pathFilter = this.createPathFilter(context);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        FileStatusManager fileStatusManager = new FileStatusManager(latestTimestamp, latestFiles);
        Path rootPath = this.getNormalizedPath(context, DIRECTORY);
        FileStatusIterable fileStatusIterable = new FileStatusIterable(rootPath, recursive, hdfs, this.getUserGroupInformation());
        Long minAgeProp = context.getProperty(MINIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        long minimumAge = minAgeProp == null ? Long.MIN_VALUE : minAgeProp;
        Long maxAgeProp = context.getProperty(MAXIMUM_FILE_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
        long maximumAge = maxAgeProp == null ? Long.MAX_VALUE : maxAgeProp;
        HadoopFileStatusWriter writer = HadoopFileStatusWriter.builder().session(session).successRelationship(this.getSuccessRelationship()).fileStatusIterable(fileStatusIterable).fileStatusManager(fileStatusManager).pathFilter(pathFilter).minimumAge(minimumAge).maximumAge(maximumAge).previousLatestTimestamp(latestTimestamp).previousLatestFiles(latestFiles).writerFactory(writerFactory).hdfsPrefix(this.getAttributePrefix()).logger(this.getLogger()).build();
        writer.write();
        this.getLogger().debug("Found a total of {} files in HDFS, {} are listed", new Object[]{fileStatusIterable.getTotalFileCount(), writer.getListedFileCount()});
        if (writer.getListedFileCount() > 0L) {
            HashMap<String, String> updatedState = new HashMap<String, String>();
            updatedState.put(LATEST_TIMESTAMP_KEY, String.valueOf(fileStatusManager.getCurrentLatestTimestamp()));
            List<String> files = fileStatusManager.getCurrentLatestFiles();
            for (int i = 0; i < files.size(); ++i) {
                String currentFilePath = files.get(i);
                updatedState.put(String.format(LATEST_FILES_KEY, i), currentFilePath);
            }
            this.getLogger().debug("New state map: {}", new Object[]{updatedState});
            this.updateState(session, updatedState);
            this.getLogger().info("Successfully created listing with {} new files from HDFS", new Object[]{writer.getListedFileCount()});
        } else {
            this.getLogger().debug("There is no data to list. Yielding.");
            context.yield();
        }
    }

    private PathFilter createPathFilter(ProcessContext context) {
        FilterMode filterMode = FilterMode.forName(context.getProperty(FILE_FILTER_MODE).getValue());
        boolean recursive = context.getProperty(RECURSE_SUBDIRS).asBoolean();
        switch (filterMode) {
            case FILTER_MODE_FILES_ONLY: {
                return path -> this.fileFilterRegexPattern.matcher(path.getName()).matches();
            }
            case FILTER_MODE_FULL_PATH: {
                return path -> this.fileFilterRegexPattern.matcher(path.toString()).matches() || this.fileFilterRegexPattern.matcher(Path.getPathWithoutSchemeAndAuthority((Path)path).toString()).matches();
            }
        }
        return path -> Stream.of(Path.getPathWithoutSchemeAndAuthority((Path)path).toString().split("/")).skip(this.getPathSegmentsToSkip(recursive)).allMatch(v -> this.fileFilterRegexPattern.matcher((CharSequence)v).matches());
    }

    private int getPathSegmentsToSkip(boolean recursive) {
        return recursive ? 2 : 1;
    }

    private void updateState(ProcessSession session, Map<String, String> newState) {
        try {
            session.setState(newState, Scope.CLUSTER);
        }
        catch (IOException e) {
            this.getLogger().warn("Failed to save cluster-wide state. If NiFi is restarted, data duplication may occur", (Throwable)e);
        }
    }

    protected Relationship getSuccessRelationship() {
        return REL_SUCCESS;
    }

    protected String getAttributePrefix() {
        return HDFS_ATTRIBUTE_PREFIX;
    }
}

