/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.lookup;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.lookup.AbstractCSVLookupService;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;

@Tags(value={"lookup", "cache", "enrich", "join", "csv", "reloadable", "key", "value", "record"})
@CapabilityDescription(value="A reloadable CSV file-based lookup service. When the lookup key is found in the CSV file, the columns are returned as a Record. All returned fields will be strings. The first line of the csv file is considered as header.")
public class CSVRecordLookupService
extends AbstractCSVLookupService
implements RecordLookupService {
    private static final Set<String> REQUIRED_KEYS = Collections.unmodifiableSet(Stream.of("key").collect(Collectors.toSet()));
    static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder().fromPropertyDescriptor(AbstractCSVLookupService.CSV_FORMAT).name("csv-format").build();
    private volatile ConcurrentMap<String, Record> cache;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void loadCache() throws IllegalStateException, IOException {
        if (this.lock.tryLock()) {
            try {
                ComponentLog logger = this.getLogger();
                if (logger.isDebugEnabled()) {
                    logger.debug("Loading lookup table from file: " + this.csvFile);
                }
                ConcurrentHashMap<String, Record> cache = new ConcurrentHashMap<String, Record>();
                try (FileInputStream is = new FileInputStream(this.csvFile);
                     InputStreamReader reader = new InputStreamReader((InputStream)is, this.charset);){
                    CSVParser records = this.csvFormat.withFirstRecordAsHeader().parse((Reader)reader);
                    SimpleRecordSchema lookupRecordSchema = null;
                    for (CSVRecord record : records) {
                        String key = record.get(this.lookupKeyColumn);
                        if (StringUtils.isBlank((CharSequence)key)) {
                            throw new IllegalStateException("Empty lookup key encountered in: " + this.csvFile);
                        }
                        if (!this.ignoreDuplicates && cache.containsKey(key)) {
                            throw new IllegalStateException("Duplicate lookup key encountered: " + key + " in " + this.csvFile);
                        }
                        if (this.ignoreDuplicates && cache.containsKey(key)) {
                            logger.warn("Duplicate lookup key encountered: {} in {}", new Object[]{key, this.csvFile});
                        }
                        HashMap<String, Object> properties = new HashMap<String, Object>();
                        record.toMap().forEach((k, v) -> {
                            if (!this.lookupKeyColumn.equals(k)) {
                                properties.put((String)k, v);
                            }
                        });
                        if (lookupRecordSchema == null) {
                            ArrayList recordFields = new ArrayList(properties.size());
                            properties.forEach((k, v) -> recordFields.add(new RecordField(k, RecordFieldType.STRING.getDataType())));
                            lookupRecordSchema = new SimpleRecordSchema(recordFields);
                        }
                        cache.put(key, (Record)new MapRecord(lookupRecordSchema, properties));
                    }
                }
                this.cache = cache;
                if (cache.isEmpty()) {
                    logger.warn("Lookup table is empty after reading file: " + this.csvFile);
                }
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    @Override
    @OnEnabled
    public void onEnabled(ConfigurationContext context) throws InitializationException, IOException {
        super.onEnabled(context);
        try {
            this.loadCache();
        }
        catch (IllegalStateException e) {
            throw new InitializationException(e.getMessage(), (Throwable)e);
        }
    }

    public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFailureException {
        if (coordinates == null) {
            return Optional.empty();
        }
        Object key = coordinates.get("key");
        if (key == null) {
            return Optional.empty();
        }
        String keyString = key.toString();
        if (StringUtils.isBlank((CharSequence)keyString)) {
            return Optional.empty();
        }
        try {
            if (this.watcher.checkAndReset()) {
                this.loadCache();
            }
        }
        catch (IOException | IllegalStateException e) {
            throw new LookupFailureException(e.getMessage(), (Throwable)e);
        }
        return Optional.ofNullable((Record)this.cache.get(keyString));
    }

    public Set<String> getRequiredKeys() {
        return REQUIRED_KEYS;
    }

    @OnDisabled
    public void onDisabled() {
        this.cache = null;
    }

    boolean isCaching() {
        return this.cache != null;
    }
}

