/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.geohash;

import ch.hsr.geohash.GeoHash;
import ch.hsr.geohash.WGS84Point;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.RecordPathResult;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.record.path.validation.RecordPathValidator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;

@EventDriven
@SideEffectFree
@SupportsBatching
@Tags(value={"geo", "geohash", "record"})
@CapabilityDescription(value="A record-based processor that encodes and decodes Geohashes from and to latitude/longitude coordinates.")
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes(value={@WritesAttribute(attribute="mime.type", description="The MIME type indicated by the record writer"), @WritesAttribute(attribute="record.count", description="The number of records in the resulting flow file")})
public class GeohashRecord
extends AbstractProcessor {
    public static final PropertyDescriptor MODE = new PropertyDescriptor.Builder().name("mode").displayName("Mode").description("Specifies whether to encode latitude/longitude to geohash or decode geohash to latitude/longitude").required(true).allowableValues((Enum[])ProcessingMode.values()).defaultValue(ProcessingMode.ENCODE.name()).build();
    public static final PropertyDescriptor ROUTING_STRATEGY = new PropertyDescriptor.Builder().name("routing-strategy").displayName("Routing Strategy").description("Specifies how to route flowfiles after encoding or decoding being performed. SKIP will enrich those records that can be enriched and skip the rest. The SKIP strategy will route a flowfile to failure only if unable to parse the data. Otherwise, it will route the enriched flowfile to success, and the original input to original. SPLIT will separate the records that have been enriched from those that have not and send them to matched, while unenriched records will be sent to unmatched; the original input flowfile will be sent to original. The SPLIT strategy will route a flowfile to failure only if unable to parse the data. REQUIRE will route a flowfile to success only if all of its records are enriched, and the original input will be sent to original. The REQUIRE strategy will route the original input flowfile to failure if any of its records cannot be enriched or unable to be parsed").required(true).allowableValues((Enum[])RoutingStrategy.values()).defaultValue(RoutingStrategy.SKIP.name()).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("record-reader").displayName("Record Reader").description("Specifies the record reader service to use for reading incoming data").required(true).identifiesControllerService(RecordReaderFactory.class).build();
    public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("record-writer").displayName("Record Writer").description("Specifies the record writer service to use for writing data").required(true).identifiesControllerService(RecordSetWriterFactory.class).build();
    public static final PropertyDescriptor LATITUDE_RECORD_PATH = new PropertyDescriptor.Builder().name("latitude-record-path").displayName("Latitude Record Path").description("In the ENCODE mode, this property specifies the record path to retrieve the latitude values. Latitude values should be in the range of [-90, 90]; invalid values will be logged at warn level. In the DECODE mode, this property specifies the record path to put the latitude value").required(true).addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor LONGITUDE_RECORD_PATH = new PropertyDescriptor.Builder().name("longitude-record-path").displayName("Longitude Record Path").description("In the ENCODE mode, this property specifies the record path to retrieve the longitude values; Longitude values should be in the range of [-180, 180]; invalid values will be logged at warn level. In the DECODE mode, this property specifies the record path to put the longitude value").required(true).addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor GEOHASH_RECORD_PATH = new PropertyDescriptor.Builder().name("geohash-record-path").displayName("Geohash Record Path").description("In the ENCODE mode, this property specifies the record path to put the geohash value; in the DECODE mode, this property specifies the record path to retrieve the geohash value").required(true).addValidator((Validator)new RecordPathValidator()).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor GEOHASH_FORMAT = new PropertyDescriptor.Builder().name("geohash-format").displayName("Geohash Format").description("In the ENCODE mode, this property specifies the desired format for encoding geohash; in the DECODE mode, this property specifies the format of geohash provided").required(true).allowableValues((Enum[])GeohashFormat.values()).defaultValue(GeohashFormat.BASE32.name()).build();
    public static final PropertyDescriptor GEOHASH_LEVEL = new PropertyDescriptor.Builder().name("geohash-level").displayName("Geohash Level").description("The integer precision level(1-12) desired for encoding geohash").required(true).addValidator(StandardValidators.createLongValidator((long)1L, (long)12L, (boolean)true)).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).dependsOn(MODE, ProcessingMode.ENCODE.name(), new String[0]).build();
    public static final Relationship REL_NOT_MATCHED = new Relationship.Builder().name("not matched").description("Using the SPLIT strategy, flowfiles that cannot be encoded or decoded due to the lack of lat/lon or geohashes will be routed to not matched").build();
    public static final Relationship REL_MATCHED = new Relationship.Builder().name("matched").description("Using the SPLIT strategy, flowfiles with lat/lon or geohashes provided that are successfully encoded or decoded will be routed to matched").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("Flowfiles that cannot be encoded or decoded will be routed to failure").build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Flowfiles that are successfully encoded or decoded will be routed to success").build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original input flowfile will be sent to this relationship").build();
    private static final List<PropertyDescriptor> RECORD_PATH_PROPERTIES = Collections.unmodifiableList(Arrays.asList(LATITUDE_RECORD_PATH, LONGITUDE_RECORD_PATH, GEOHASH_RECORD_PATH));
    private static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<Relationship>(Arrays.asList(REL_SUCCESS, REL_ORIGINAL, REL_FAILURE)));
    private static final Set<Relationship> SPLIT_RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<Relationship>(Arrays.asList(REL_MATCHED, REL_NOT_MATCHED, REL_ORIGINAL, REL_FAILURE)));
    private RoutingStrategyExecutor routingStrategyExecutor;
    private static boolean isSplit;
    private static Integer enrichedCount;
    private static Integer unenrichedCount;
    private final RecordPathCache cache = new RecordPathCache(100);
    private List<PropertyDescriptor> descriptors;

    protected void init(ProcessorInitializationContext context) {
        this.descriptors = new ArrayList<PropertyDescriptor>();
        this.descriptors.add(MODE);
        this.descriptors.add(RECORD_READER);
        this.descriptors.add(RECORD_WRITER);
        this.descriptors.add(ROUTING_STRATEGY);
        this.descriptors.add(LATITUDE_RECORD_PATH);
        this.descriptors.add(LONGITUDE_RECORD_PATH);
        this.descriptors.add(GEOHASH_RECORD_PATH);
        this.descriptors.add(GEOHASH_FORMAT);
        this.descriptors.add(GEOHASH_LEVEL);
        this.descriptors = Collections.unmodifiableList(this.descriptors);
    }

    public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) {
        if (descriptor.equals((Object)ROUTING_STRATEGY)) {
            isSplit = RoutingStrategy.SPLIT.name().equals(newValue);
        }
    }

    public Set<Relationship> getRelationships() {
        return isSplit ? SPLIT_RELATIONSHIPS : RELATIONSHIPS;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    @OnScheduled
    public void setup(ProcessContext context) {
        RoutingStrategy routingStrategy = RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue());
        switch (routingStrategy) {
            case REQUIRE: {
                this.routingStrategyExecutor = new RequireRoutingStrategyExecutor();
                break;
            }
            case SKIP: {
                this.routingStrategyExecutor = new SkipRoutingStrategyExecutor();
                break;
            }
            case SPLIT: {
                this.routingStrategyExecutor = new SplitRoutingStrategyExecutor();
                break;
            }
            default: {
                throw new AssertionError();
            }
        }
        enrichedCount = 0;
        unenrichedCount = 0;
    }

    public void onTrigger(ProcessContext context, ProcessSession session) {
        FlowFile input = session.get();
        if (input == null) {
            return;
        }
        RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
        RecordSetWriterFactory writerFactory = (RecordSetWriterFactory)context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
        boolean encode = context.getProperty(MODE).getValue().equalsIgnoreCase(ProcessingMode.ENCODE.toString());
        RoutingStrategy routingStrategy = RoutingStrategy.valueOf(context.getProperty(ROUTING_STRATEGY).getValue());
        GeohashFormat format = GeohashFormat.valueOf(context.getProperty(GEOHASH_FORMAT).getValue());
        FlowFile output = session.create(input);
        FlowFile notMatched = routingStrategy == RoutingStrategy.SPLIT ? session.create(input) : null;
        try (InputStream is = session.read(input);
             RecordReader reader = readerFactory.createRecordReader(input, is, this.getLogger());
             OutputStream os = session.write(output);
             OutputStream osNotFound = routingStrategy == RoutingStrategy.SPLIT ? session.write(notMatched) : null;){
            Record record;
            RecordSetWriter writer = writerFactory.createWriter(this.getLogger(), writerFactory.getSchema(input.getAttributes(), reader.getSchema()), os, output);
            RecordSetWriter notMatchedWriter = routingStrategy == RoutingStrategy.SPLIT ? writerFactory.createWriter(this.getLogger(), reader.getSchema(), osNotFound, notMatched) : null;
            HashMap<PropertyDescriptor, RecordPath> paths = new HashMap<PropertyDescriptor, RecordPath>();
            for (PropertyDescriptor descriptor : RECORD_PATH_PROPERTIES) {
                String rawRecordPath = context.getProperty(descriptor).evaluateAttributeExpressions(input).getValue();
                RecordPath compiled = this.cache.getCompiled(rawRecordPath);
                paths.put(descriptor, compiled);
            }
            writer.beginRecordSet();
            if (notMatchedWriter != null) {
                notMatchedWriter.beginRecordSet();
            }
            int level = context.getProperty(GEOHASH_LEVEL).evaluateAttributeExpressions(input).asInteger();
            String rawLatitudePath = context.getProperty(LATITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
            RecordPath latitudePath = this.cache.getCompiled(rawLatitudePath);
            String rawLongitudePath = context.getProperty(LONGITUDE_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
            RecordPath longitudePath = this.cache.getCompiled(rawLongitudePath);
            String rawGeohashPath = context.getProperty(GEOHASH_RECORD_PATH).evaluateAttributeExpressions(input).getValue();
            RecordPath geohashPath = this.cache.getCompiled(rawGeohashPath);
            while ((record = reader.nextRecord()) != null) {
                boolean updated = false;
                try {
                    if (encode) {
                        Object encodedGeohash = this.getEncodedGeohash(latitudePath, longitudePath, record, format, level);
                        updated = this.updateRecord(GEOHASH_RECORD_PATH, encodedGeohash, record, paths);
                    } else {
                        WGS84Point decodedPoint = this.getDecodedPointFromGeohash(geohashPath, record, format);
                        if (decodedPoint != null) {
                            updated = this.updateRecord(LATITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLatitude()), record, paths) && this.updateRecord(LONGITUDE_RECORD_PATH, String.valueOf(decodedPoint.getLongitude()), record, paths);
                        }
                    }
                }
                catch (IllegalArgumentException e) {
                    this.getLogger().warn("Unable to " + (encode ? "encode" : "decode"), (Throwable)e);
                }
                this.routingStrategyExecutor.writeFlowFiles(record, writer, notMatchedWriter, updated);
            }
            WriteResult writeResult = writer.finishRecordSet();
            writer.close();
            output = session.putAllAttributes(output, this.buildAttributes(writeResult.getRecordCount(), writer.getMimeType(), writeResult));
            if (notMatchedWriter != null) {
                WriteResult notMatchedWriterResult = notMatchedWriter.finishRecordSet();
                notMatchedWriter.close();
                if (notMatchedWriterResult.getRecordCount() > 0) {
                    notMatched = session.putAllAttributes(notMatched, this.buildAttributes(notMatchedWriterResult.getRecordCount(), writer.getMimeType(), notMatchedWriterResult));
                }
            }
        }
        catch (IOException | SchemaNotFoundException | MalformedRecordException e) {
            this.getLogger().error("Cannot parse the incoming data", e);
            session.remove(output);
            if (notMatched != null) {
                session.remove(notMatched);
            }
            session.transfer(input, REL_FAILURE);
            return;
        }
        this.routingStrategyExecutor.transferFlowFiles(session, input, output, notMatched);
    }

    private Object getEncodedGeohash(RecordPath latitudePath, RecordPath longitudePath, Record record, GeohashFormat format, int level) {
        RecordPathResult latitudeResult = latitudePath.evaluate(record);
        RecordPathResult longitudeResult = longitudePath.evaluate(record);
        Optional latitudeField = latitudeResult.getSelectedFields().findFirst();
        Optional longitudeField = longitudeResult.getSelectedFields().findFirst();
        if (!latitudeField.isPresent() || !longitudeField.isPresent()) {
            return null;
        }
        FieldValue latitudeValue = (FieldValue)latitudeField.get();
        FieldValue longitudeValue = (FieldValue)longitudeField.get();
        Object latitudeVal = latitudeValue.getValue();
        Object longitudeVal = longitudeValue.getValue();
        if (latitudeVal == null || longitudeVal == null) {
            return null;
        }
        double realLatValue = Double.parseDouble(latitudeVal.toString());
        double realLongValue = Double.parseDouble(longitudeVal.toString());
        GeoHash gh = GeoHash.withCharacterPrecision((double)realLatValue, (double)realLongValue, (int)level);
        switch (format) {
            case BINARY: {
                return gh.toBinaryString();
            }
            case LONG: {
                return gh.longValue();
            }
        }
        return gh.toBase32();
    }

    private WGS84Point getDecodedPointFromGeohash(RecordPath geohashPath, Record record, GeohashFormat format) {
        GeoHash decodedHash;
        RecordPathResult geohashResult = geohashPath.evaluate(record);
        Optional geohashField = geohashResult.getSelectedFields().findFirst();
        if (!geohashField.isPresent()) {
            return null;
        }
        FieldValue geohashFieldValue = (FieldValue)geohashField.get();
        Object geohashVal = geohashFieldValue.getValue();
        if (geohashVal == null) {
            return null;
        }
        String geohashString = geohashVal.toString();
        switch (format) {
            case BINARY: {
                decodedHash = GeoHash.fromBinaryString((String)geohashString);
                break;
            }
            case LONG: {
                String binaryString = Long.toBinaryString(Long.parseLong(geohashString));
                decodedHash = GeoHash.fromBinaryString((String)binaryString);
                break;
            }
            default: {
                decodedHash = GeoHash.fromGeohashString((String)geohashString);
            }
        }
        return decodedHash.getBoundingBoxCenter();
    }

    private boolean updateRecord(PropertyDescriptor descriptor, Object newValue, Record record, Map<PropertyDescriptor, RecordPath> cached) {
        if (!cached.containsKey(descriptor) || newValue == null) {
            return false;
        }
        RecordPath path = cached.get(descriptor);
        RecordPathResult result = path.evaluate(record);
        Optional fieldValueOption = result.getSelectedFields().findFirst();
        if (!fieldValueOption.isPresent()) {
            return false;
        }
        FieldValue fieldValue = (FieldValue)fieldValueOption.get();
        if (!fieldValue.getParent().isPresent() || ((FieldValue)fieldValue.getParent().get()).getValue() == null) {
            return false;
        }
        fieldValue.updateValue(newValue);
        return true;
    }

    private Map<String, String> buildAttributes(int recordCount, String mimeType, WriteResult writeResult) {
        HashMap<String, String> retVal = new HashMap<String, String>();
        retVal.put(CoreAttributes.MIME_TYPE.key(), mimeType);
        retVal.put("record.count", String.valueOf(recordCount));
        retVal.putAll(writeResult.getAttributes());
        return retVal;
    }

    private class RequireRoutingStrategyExecutor
    implements RoutingStrategyExecutor {
        private RequireRoutingStrategyExecutor() {
        }

        @Override
        public void writeFlowFiles(Record record, RecordSetWriter writer, RecordSetWriter notMatchedWriter, boolean updated) throws IOException {
            if (updated) {
                writer.write(record);
            } else {
                Integer n = unenrichedCount;
                Integer n2 = unenrichedCount = Integer.valueOf(unenrichedCount + 1);
            }
        }

        @Override
        public void transferFlowFiles(ProcessSession session, FlowFile input, FlowFile output, FlowFile notMatched) {
            if (unenrichedCount > 0) {
                session.remove(output);
                GeohashRecord.this.getLogger().error("There exists some records that cannot be enriched or parsed. The original input flowfile is routed to failure using the REQUIRE strategy");
                session.transfer(input, REL_FAILURE);
            } else {
                session.transfer(output, REL_SUCCESS);
                session.transfer(input, REL_ORIGINAL);
            }
        }
    }

    private class SplitRoutingStrategyExecutor
    implements RoutingStrategyExecutor {
        private SplitRoutingStrategyExecutor() {
        }

        @Override
        public void writeFlowFiles(Record record, RecordSetWriter writer, RecordSetWriter notMatchedWriter, boolean updated) throws IOException {
            if (updated) {
                Integer n = enrichedCount;
                Integer n2 = enrichedCount = Integer.valueOf(enrichedCount + 1);
                writer.write(record);
            } else {
                Integer n = unenrichedCount;
                Integer n3 = unenrichedCount = Integer.valueOf(unenrichedCount + 1);
                notMatchedWriter.write(record);
            }
        }

        @Override
        public void transferFlowFiles(ProcessSession session, FlowFile input, FlowFile output, FlowFile notMatched) {
            if (unenrichedCount > 0) {
                session.transfer(notMatched, REL_NOT_MATCHED);
            } else {
                session.remove(notMatched);
            }
            if (enrichedCount > 0) {
                session.transfer(output, REL_MATCHED);
            } else {
                session.remove(output);
            }
            session.transfer(input, REL_ORIGINAL);
        }
    }

    private class SkipRoutingStrategyExecutor
    implements RoutingStrategyExecutor {
        private SkipRoutingStrategyExecutor() {
        }

        @Override
        public void writeFlowFiles(Record record, RecordSetWriter writer, RecordSetWriter notMatchedWriter, boolean updated) throws IOException {
            writer.write(record);
        }

        @Override
        public void transferFlowFiles(ProcessSession session, FlowFile input, FlowFile output, FlowFile notMatched) {
            session.transfer(output, REL_SUCCESS);
            session.transfer(input, REL_ORIGINAL);
        }
    }

    private static interface RoutingStrategyExecutor {
        public void writeFlowFiles(Record var1, RecordSetWriter var2, RecordSetWriter var3, boolean var4) throws IOException;

        public void transferFlowFiles(ProcessSession var1, FlowFile var2, FlowFile var3, FlowFile var4);
    }

    public static enum RoutingStrategy {
        SKIP,
        SPLIT,
        REQUIRE;

    }

    public static enum GeohashFormat {
        BASE32,
        BINARY,
        LONG;

    }

    public static enum ProcessingMode {
        ENCODE,
        DECODE;

    }
}

