package org.apache.nifi.processors;

import com.maxmind.geoip2.DatabaseReader;
import com.maxmind.geoip2.model.CityResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.path.FieldValue;
import org.apache.nifi.record.path.RecordPath;
import org.apache.nifi.record.path.util.RecordPathCache;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;

@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@CapabilityDescription("Looks up geolocation information for an IP address and adds the geo information to FlowFile attributes. The geo data is provided as a MaxMind database. This version uses the NiFi Record API to allow large scale enrichment of record-oriented data sets. Each field provided by the MaxMind database can be directed to a field of the user's choosing by providing a record path for that field configuration. ")
@Tags({"geo", "enrich", "ip", "maxmind", "record"})
/* loaded from: input_file:org/apache/nifi/processors/GeoEnrichIPRecord.class */
public class GeoEnrichIPRecord extends AbstractEnrichIP {
    public static final PropertyDescriptor READER = new PropertyDescriptor.Builder().name("geo-enrich-ip-record-reader").displayName("Record Reader").description("Record reader service to use for reading the flowfile contents.").required(true).identifiesControllerService(RecordReaderFactory.class).build();
    public static final PropertyDescriptor WRITER = new PropertyDescriptor.Builder().name("geo-enrich-ip-record-writer").displayName("Record Writer").description("Record writer service to use for enriching the flowfile contents.").required(true).identifiesControllerService(RecordSetWriterFactory.class).build();
    public static final PropertyDescriptor IP_RECORD_PATH = new PropertyDescriptor.Builder().name("geo-enrich-ip-ip-record-path").displayName("IP Address Record Path").description("The record path to retrieve the IP address for doing the lookup.").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).required(true).build();
    public static final PropertyDescriptor SPLIT_FOUND_NOT_FOUND = new PropertyDescriptor.Builder().name("geo-enrich-ip-split-found-not-found").displayName("Separate Enriched From Not Enriched").description("Separate records that have been enriched from ones that have not. Default behavior is to send everything to the found relationship if even one record is enriched.").allowableValues(new String[]{"true", "false"}).defaultValue("false").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).build();
    public static final PropertyDescriptor GEO_CITY = new PropertyDescriptor.Builder().name("geo-enrich-ip-city-record-path").displayName("City Record Path").description("Record path for putting the city identified for the IP address").required(false).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor GEO_LATITUDE = new PropertyDescriptor.Builder().name("geo-enrich-ip-latitude-record-path").displayName("Latitude Record Path").description("Record path for putting the latitude identified for this IP address").required(false).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor GEO_LONGITUDE = new PropertyDescriptor.Builder().name("geo-enrich-ip-longitude-record-path").displayName("Longitude Record Path").description("Record path for putting the longitude identified for this IP address").required(false).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor GEO_COUNTRY = new PropertyDescriptor.Builder().name("geo-enrich-ip-country-record-path").displayName("Country Record Path").description("Record path for putting the country identified for this IP address").required(false).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor GEO_COUNTRY_ISO = new PropertyDescriptor.Builder().name("geo-enrich-ip-country-iso-record-path").displayName("Country ISO Code Record Path").description("Record path for putting the ISO Code for the country identified").required(false).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor GEO_POSTAL_CODE = new PropertyDescriptor.Builder().name("geo-enrich-ip-country-postal-record-path").displayName("Country Postal Code Record Path").description("Record path for putting the postal code for the country identified").required(false).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original input flowfile goes to this relationship regardless of whether the content was enriched or not.").build();
    public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet(Arrays.asList(REL_ORIGINAL, REL_FOUND, REL_NOT_FOUND)));
    public static final List<PropertyDescriptor> GEO_PROPERTIES = Collections.unmodifiableList(Arrays.asList(GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE));
    private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(GEO_DATABASE_FILE, READER, WRITER, SPLIT_FOUND_NOT_FOUND, IP_RECORD_PATH, GEO_CITY, GEO_LATITUDE, GEO_LONGITUDE, GEO_COUNTRY, GEO_COUNTRY_ISO, GEO_POSTAL_CODE));
    protected volatile RecordReaderFactory readerFactory;
    protected volatile RecordSetWriterFactory writerFactory;
    protected boolean splitOutput;

    @Override // org.apache.nifi.processors.AbstractEnrichIP
    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @Override // org.apache.nifi.processors.AbstractEnrichIP
    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @Override // org.apache.nifi.processors.AbstractEnrichIP
    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        super.onScheduled(processContext);
        this.readerFactory = processContext.getProperty(READER).asControllerService(RecordReaderFactory.class);
        this.writerFactory = processContext.getProperty(WRITER).asControllerService(RecordSetWriterFactory.class);
        this.splitOutput = processContext.getProperty(SPLIT_FOUND_NOT_FOUND).asBoolean().booleanValue();
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        FlowFile create = processSession.create(flowFile);
        FlowFile create2 = this.splitOutput ? processSession.create(flowFile) : null;
        DatabaseReader databaseReader = this.databaseReaderRef.get();
        try {
            InputStream read = processSession.read(flowFile);
            try {
                OutputStream write = processSession.write(create);
                try {
                    OutputStream write2 = this.splitOutput ? processSession.write(create2) : null;
                    try {
                        RecordPathCache recordPathCache = new RecordPathCache(GEO_PROPERTIES.size() + 1);
                        HashMap hashMap = new HashMap();
                        for (PropertyDescriptor propertyDescriptor : GEO_PROPERTIES) {
                            if (processContext.getProperty(propertyDescriptor).isSet()) {
                                hashMap.put(propertyDescriptor, recordPathCache.getCompiled(processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions(flowFile).getValue()));
                            }
                        }
                        RecordPath compiled = recordPathCache.getCompiled(processContext.getProperty(IP_RECORD_PATH).evaluateAttributeExpressions(flowFile).getValue());
                        RecordReader createRecordReader = this.readerFactory.createRecordReader(flowFile, read, getLogger());
                        RecordSchema schema = this.writerFactory.getSchema(flowFile.getAttributes(), createRecordReader.getSchema());
                        RecordSetWriter createWriter = this.writerFactory.createWriter(getLogger(), schema, write, flowFile);
                        RecordSetWriter createWriter2 = this.splitOutput ? this.writerFactory.createWriter(getLogger(), schema, write2, flowFile) : null;
                        Relationship relationship = REL_NOT_FOUND;
                        createWriter.beginRecordSet();
                        if (createWriter2 != null) {
                            createWriter2.beginRecordSet();
                        }
                        int i = 0;
                        int i2 = 0;
                        while (true) {
                            Record nextRecord = createRecordReader.nextRecord();
                            if (nextRecord == null) {
                                break;
                            }
                            boolean enrichRecord = enrichRecord(geocode(compiled, nextRecord, databaseReader), nextRecord, hashMap);
                            if (enrichRecord) {
                                relationship = REL_FOUND;
                            }
                            if (!this.splitOutput || (this.splitOutput && enrichRecord)) {
                                createWriter.write(nextRecord);
                                i++;
                            } else {
                                createWriter2.write(nextRecord);
                                i2++;
                            }
                        }
                        createWriter.finishRecordSet();
                        createWriter.close();
                        if (createWriter2 != null) {
                            createWriter2.finishRecordSet();
                            createWriter2.close();
                        }
                        read.close();
                        write.close();
                        if (write2 != null) {
                            write2.close();
                        }
                        FlowFile putAllAttributes = processSession.putAllAttributes(create, buildAttributes(i, createWriter.getMimeType()));
                        if (this.splitOutput) {
                            if (i2 > 0) {
                                create2 = processSession.putAllAttributes(create2, buildAttributes(i2, createWriter.getMimeType()));
                                processSession.transfer(create2, REL_NOT_FOUND);
                            } else {
                                processSession.remove(create2);
                            }
                            processSession.transfer(putAllAttributes, REL_FOUND);
                            processSession.transfer(flowFile, REL_ORIGINAL);
                            processSession.getProvenanceReporter().modifyContent(create2);
                        } else {
                            processSession.transfer(putAllAttributes, relationship);
                            processSession.remove(flowFile);
                        }
                        processSession.getProvenanceReporter().modifyContent(putAllAttributes);
                        if (write2 != null) {
                            write2.close();
                        }
                        if (write != null) {
                            write.close();
                        }
                        if (read != null) {
                            read.close();
                        }
                    } catch (Throwable th) {
                        if (write2 != null) {
                            try {
                                write2.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (write != null) {
                        try {
                            write.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            getLogger().error("Error enriching records.", e);
            processSession.rollback();
            processContext.yield();
        }
    }

    private Map<String, String> buildAttributes(int i, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreAttributes.MIME_TYPE.key(), str);
        hashMap.put("record.count", String.valueOf(i));
        return hashMap;
    }

    private CityResponse geocode(RecordPath recordPath, Record record, DatabaseReader databaseReader) throws Exception {
        Object value;
        Optional findFirst = recordPath.evaluate(record).getSelectedFields().findFirst();
        if (!findFirst.isPresent() || (value = ((FieldValue) findFirst.get()).getValue()) == null) {
            return null;
        }
        return databaseReader.city(InetAddress.getByName(value.toString()));
    }

    private boolean enrichRecord(CityResponse cityResponse, Record record, Map<PropertyDescriptor, RecordPath> map) {
        if (cityResponse == null || cityResponse.getCity() == null) {
            return false;
        }
        return update(GEO_CITY, map, record, cityResponse.getCity().getName()) || update(GEO_COUNTRY, map, record, cityResponse.getCountry().getName()) || update(GEO_COUNTRY_ISO, map, record, cityResponse.getCountry().getIsoCode()) || update(GEO_LATITUDE, map, record, cityResponse.getLocation().getLatitude()) || update(GEO_LONGITUDE, map, record, cityResponse.getLocation().getLongitude()) || update(GEO_POSTAL_CODE, map, record, cityResponse.getPostal().getCode());
    }

    private boolean update(PropertyDescriptor propertyDescriptor, Map<PropertyDescriptor, RecordPath> map, Record record, Object obj) {
        if (!map.containsKey(propertyDescriptor) || obj == null) {
            return false;
        }
        FieldValue fieldValue = (FieldValue) map.get(propertyDescriptor).evaluate(record).getSelectedFields().findFirst().get();
        if (((FieldValue) fieldValue.getParent().get()).getValue() == null) {
            return false;
        }
        fieldValue.updateValue(obj.toString());
        return true;
    }
}
