package org.apache.nifi.processors.elasticsearch;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
import okhttp3.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

@CapabilityDescription("Queries Elasticsearch using the specified connection properties. Note that the full body of each page of documents will be read into memory before being written to Flow Files for transfer.  Also note that the Elasticsearch max_result_window index setting is the upper bound on the number of records that can be retrieved using this query.  To retrieve more records, use the ScrollElasticsearchHttp processor.")
@DynamicProperty(name = "A URL query parameter", value = "The value to set it to", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY, description = "Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing")
@Deprecated
@SupportsBatching
@WritesAttributes({@WritesAttribute(attribute = "filename", description = "The filename attribute is set to the document identifier"), @WritesAttribute(attribute = "es.query.hitcount", description = "The number of hits for a query"), @WritesAttribute(attribute = "es.id", description = "The Elasticsearch document identifier"), @WritesAttribute(attribute = "es.index", description = "The Elasticsearch index containing the document"), @WritesAttribute(attribute = "es.query.url", description = "The Elasticsearch query that was built"), @WritesAttribute(attribute = "es.type", description = "The Elasticsearch document type"), @WritesAttribute(attribute = "es.result.*", description = "If Target is 'Flow file attributes', the JSON attributes of each result will be placed into corresponding attributes with this prefix.")})
@DeprecationNotice(classNames = {"org.apache.nifi.processors.elasticsearch.PaginatedJsonQueryElasticsearch"}, reason = "This processor is deprecated and may be removed in future releases.")
@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
@EventDriven
@Tags({"elasticsearch", "query", "read", "get", "http"})
/* loaded from: input_file:org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp.class */
public class QueryElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
    private static final String FROM_QUERY_PARAM = "from";
    private static final String ATTRIBUTE_PREFIX = "es.result.";
    private static final List<PropertyDescriptor> propertyDescriptors;
    static final AllowableValue ALWAYS = new AllowableValue(QueryInfoRouteStrategy.ALWAYS.name(), "Always", "Always route Query Info");
    static final AllowableValue NEVER = new AllowableValue(QueryInfoRouteStrategy.NEVER.name(), "Never", "Never route Query Info");
    static final AllowableValue NO_HITS = new AllowableValue(QueryInfoRouteStrategy.NOHIT.name(), "No Hits", "Route Query Info if the Query returns no hits");
    static final AllowableValue APPEND_AS_ATTRIBUTES = new AllowableValue(QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES.name(), "Append as Attributes", "Always append Query Info as attributes, using the existing relationships (does not add the Query Info relationship).");
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are read from Elasticsearch are routed to this relationship.").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be read from Elasticsearch are routed to this relationship. Note that only incoming flow files will be routed to failure.").build();
    public static final Relationship REL_RETRY = new Relationship.Builder().name("retry").description("A FlowFile is routed to this relationship if the document cannot be fetched but attempting the operation again may succeed. Note that if the processor has no incoming connections, flow files may still be sent to this relationship based on the processor properties and the results of the fetch operation.").build();
    public static final Relationship REL_QUERY_INFO = new Relationship.Builder().name("query-info").description("Depending on the setting of the Routing Strategy for Query Info property, a FlowFile is routed to this relationship with the incoming FlowFile's attributes (if present), the number of hits, and the Elasticsearch query").build();
    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder().name("query-es-query").displayName("Query").description("The Lucene-style query to run against ElasticSearch (e.g., genre:blues AND -artist:muddy)").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor INDEX = new PropertyDescriptor.Builder().name("query-es-index").displayName("Index").description("The name of the index to read from. If the property is unset or set to _all, the query will match across all indexes.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor TYPE = new PropertyDescriptor.Builder().name("query-es-type").displayName("Type").description("The type of document (if unset, the query will be against all types in the _index). This should be unset or '_doc' for Elasticsearch 7.0+.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor FIELDS = new PropertyDescriptor.Builder().name("query-es-fields").displayName("Fields").description("A comma-separated list of fields to retrieve from the document. If the Fields property is left blank, then the entire document's source will be retrieved.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SORT = new PropertyDescriptor.Builder().name("query-es-sort").displayName("Sort").description("A sort parameter (e.g., timestamp:asc). If the Sort property is left blank, then the results will be retrieved in document order.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder().name("query-es-size").displayName("Page Size").defaultValue("20").description("Determines how many documents to return per page during scrolling.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor LIMIT = new PropertyDescriptor.Builder().name("query-es-limit").displayName("Limit").description("If set, limits the number of results that will be returned.").required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).build();
    public static final String TARGET_FLOW_FILE_CONTENT = "Flow file content";
    public static final String TARGET_FLOW_FILE_ATTRIBUTES = "Flow file attributes";
    public static final PropertyDescriptor TARGET = new PropertyDescriptor.Builder().name("query-es-target").displayName("Target").description("Indicates where the results should be placed.  In the case of 'Flow file content', the JSON response will be written as the content of the flow file.  In the case of 'Flow file attributes', the original flow file (if applicable) will be cloned for each result, and all return fields will be placed in a flow file attribute of the same name, but prefixed by 'es.result.'").required(true).expressionLanguageSupported(ExpressionLanguageScope.NONE).defaultValue(TARGET_FLOW_FILE_CONTENT).allowableValues(new String[]{TARGET_FLOW_FILE_CONTENT, TARGET_FLOW_FILE_ATTRIBUTES}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor ROUTING_QUERY_INFO_STRATEGY = new PropertyDescriptor.Builder().name("routing-query-info-strategy").displayName("Routing Strategy for Query Info").description("Specifies when to generate and route Query Info after a successful query").expressionLanguageSupported(ExpressionLanguageScope.NONE).allowableValues(new AllowableValue[]{ALWAYS, NEVER, NO_HITS, APPEND_AS_ATTRIBUTES}).defaultValue(NEVER.getValue()).required(false).build();
    private volatile Set<Relationship> relationships = new HashSet(Arrays.asList(REL_SUCCESS, REL_FAILURE, REL_RETRY));
    private QueryInfoRouteStrategy queryInfoRouteStrategy = QueryInfoRouteStrategy.NEVER;

    /* loaded from: input_file:org/apache/nifi/processors/elasticsearch/QueryElasticsearchHttp$QueryInfoRouteStrategy.class */
    public enum QueryInfoRouteStrategy {
        NEVER,
        ALWAYS,
        NOHIT,
        APPEND_AS_ATTRIBUTES
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    @Override // org.apache.nifi.processors.elasticsearch.AbstractElasticsearchProcessor
    @OnScheduled
    public void setup(ProcessContext processContext) {
        super.setup(processContext);
    }

    public void onPropertyModified(PropertyDescriptor propertyDescriptor, String str, String str2) {
        if (ROUTING_QUERY_INFO_STRATEGY.equals(propertyDescriptor)) {
            HashSet hashSet = new HashSet();
            hashSet.add(REL_SUCCESS);
            hashSet.add(REL_FAILURE);
            hashSet.add(REL_RETRY);
            if (ALWAYS.getValue().equalsIgnoreCase(str2) || NO_HITS.getValue().equalsIgnoreCase(str2)) {
                hashSet.add(REL_QUERY_INFO);
            }
            this.queryInfoRouteStrategy = QueryInfoRouteStrategy.valueOf(str2);
            this.relationships = hashSet;
        }
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = null;
        if (processContext.hasIncomingConnection()) {
            flowFile = processSession.get();
            if (flowFile == null && processContext.hasNonLoopConnection()) {
                return;
            }
        }
        OkHttpClient client = getClient();
        String value = processContext.getProperty(INDEX).evaluateAttributeExpressions(flowFile).getValue();
        String value2 = processContext.getProperty(QUERY).evaluateAttributeExpressions(flowFile).getValue();
        String value3 = processContext.getProperty(TYPE).evaluateAttributeExpressions(flowFile).getValue();
        int intValue = processContext.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asInteger().intValue();
        Integer asInteger = processContext.getProperty(LIMIT).isSet() ? processContext.getProperty(LIMIT).evaluateAttributeExpressions(flowFile).asInteger() : null;
        String value4 = processContext.getProperty(FIELDS).isSet() ? processContext.getProperty(FIELDS).evaluateAttributeExpressions(flowFile).getValue() : null;
        String value5 = processContext.getProperty(SORT).isSet() ? processContext.getProperty(SORT).evaluateAttributeExpressions(flowFile).getValue() : null;
        boolean equals = processContext.getProperty(TARGET).getValue().equals(TARGET_FLOW_FILE_CONTENT);
        String value6 = processContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
        String value7 = processContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
        Charset forName = Charset.forName(processContext.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue());
        ComponentLog logger = getLogger();
        int i = 0;
        int i2 = 0;
        try {
            logger.debug("Querying {}/{} from Elasticsearch: {}", new Object[]{value, value3, value2});
            long nanoTime = System.nanoTime();
            String trimToEmpty = StringUtils.trimToEmpty(processContext.getProperty(ES_URL).evaluateAttributeExpressions().getValue());
            boolean z = false;
            do {
                int i3 = intValue;
                if (asInteger != null && asInteger.intValue() <= i + intValue) {
                    i3 = asInteger.intValue() - i;
                    z = true;
                }
                URL buildRequestURL = buildRequestURL(trimToEmpty, value2, value, value3, value4, value5, i3, i, processContext);
                Response sendRequestToElasticsearch = sendRequestToElasticsearch(client, buildRequestURL, value6, value7, "GET", null);
                i2 = getPage(sendRequestToElasticsearch, buildRequestURL, processContext, processSession, flowFile, logger, nanoTime, equals, i2, forName);
                i += intValue;
                sendRequestToElasticsearch.close();
                if (i2 <= 0) {
                    break;
                }
            } while (!z);
            if (flowFile != null) {
                processSession.remove(flowFile);
            }
        } catch (IOException e) {
            logger.error("Failed to read from Elasticsearch due to {}, this may indicate an error in configuration (hosts, username/password, etc.). Routing to retry", new Object[]{e.getLocalizedMessage()}, e);
            if (flowFile != null) {
                processSession.transfer(flowFile, REL_RETRY);
            }
            processContext.yield();
        } catch (RetryableException e2) {
            logger.error(e2.getMessage(), new Object[]{e2.getLocalizedMessage()}, e2);
            if (flowFile != null) {
                processSession.transfer(flowFile, REL_RETRY);
            }
            processContext.yield();
        } catch (Exception e3) {
            logger.error("Failed to read {} from Elasticsearch due to {}", new Object[]{flowFile, e3.getLocalizedMessage()}, e3);
            if (flowFile != null) {
                processSession.transfer(flowFile, REL_FAILURE);
            }
            processContext.yield();
        }
    }

    private int getPage(Response response, URL url, ProcessContext processContext, ProcessSession processSession, FlowFile flowFile, ComponentLog componentLog, long j, boolean z, int i, Charset charset) throws IOException {
        FlowFile putAllAttributes;
        String asText;
        ArrayList arrayList = new ArrayList();
        int code = response.code();
        if (isSuccess(code)) {
            JsonNode parseJsonResponse = parseJsonResponse(new ByteArrayInputStream(response.body().bytes()));
            JsonNode jsonNode = parseJsonResponse.get("hits").get("hits");
            if ((jsonNode.size() == 0 && i == 0 && this.queryInfoRouteStrategy == QueryInfoRouteStrategy.NOHIT) || this.queryInfoRouteStrategy == QueryInfoRouteStrategy.ALWAYS) {
                processSession.transfer(processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(flowFile == null ? processSession.create() : processSession.create(flowFile), "es.query.url", url.toExternalForm()), "es.query.hitcount", String.valueOf(jsonNode.size())), CoreAttributes.MIME_TYPE.key(), "application/json"), REL_QUERY_INFO);
            }
            for (int i2 = 0; i2 < jsonNode.size(); i2++) {
                JsonNode jsonNode2 = jsonNode.get(i2);
                String asText2 = jsonNode2.get("_id").asText();
                String asText3 = jsonNode2.get("_index").asText();
                String asText4 = jsonNode2.get("_type").asText();
                FlowFile create = flowFile != null ? z ? processSession.create(flowFile) : processSession.clone(flowFile) : processSession.create();
                if (this.queryInfoRouteStrategy == QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES) {
                    create = processSession.putAttribute(create, "es.query.hitcount", String.valueOf(jsonNode.size()));
                }
                JsonNode jsonNode3 = jsonNode2.get("_source");
                FlowFile putAttribute = processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(processSession.putAttribute(create, "es.id", asText2), "es.index", asText3), "es.type", asText4), "es.query.url", url.toExternalForm());
                if (z) {
                    putAllAttributes = processSession.write(processSession.putAttribute(processSession.putAttribute(putAttribute, "filename", asText2), "mime.type", "application/json"), outputStream -> {
                        outputStream.write(jsonNode3.toString().getBytes(charset));
                    });
                } else {
                    HashMap hashMap = new HashMap();
                    Iterator fields = jsonNode3.fields();
                    while (fields.hasNext()) {
                        Map.Entry entry = (Map.Entry) fields.next();
                        if (((JsonNode) entry.getValue()).isArray()) {
                            ArrayList arrayList2 = new ArrayList();
                            Iterator it = ((JsonNode) entry.getValue()).iterator();
                            while (it.hasNext()) {
                                arrayList2.add(((JsonNode) it.next()).asText());
                            }
                            asText = StringUtils.join(arrayList2, ',');
                        } else {
                            asText = ((JsonNode) entry.getValue()).asText();
                        }
                        hashMap.put("es.result." + ((String) entry.getKey()), asText);
                    }
                    putAllAttributes = processSession.putAllAttributes(putAttribute, hashMap);
                }
                arrayList.add(putAllAttributes);
            }
            componentLog.debug("Elasticsearch retrieved " + parseJsonResponse.size() + " documents, routing to success");
            if (this.queryInfoRouteStrategy == QueryInfoRouteStrategy.APPEND_AS_ATTRIBUTES && arrayList.isEmpty() && flowFile != null) {
                processSession.transfer(processSession.putAttribute(processSession.putAttribute(z ? processSession.create(flowFile) : processSession.clone(flowFile), "es.query.hitcount", String.valueOf(jsonNode.size())), "es.query.url", url.toExternalForm()), REL_SUCCESS);
            } else {
                processSession.transfer(arrayList, REL_SUCCESS);
            }
        } else {
            try {
                if (code / 100 == 5) {
                    throw new RetryableException(String.format("Elasticsearch returned code %s with message %s, transferring flow file to retry. This is likely a server problem, yielding...", Integer.valueOf(code), response.message()));
                }
                if (processContext.hasIncomingConnection()) {
                    throw new UnretryableException(String.format("Elasticsearch returned code %s with message %s, transferring flow file to failure", Integer.valueOf(code), response.message()));
                }
                componentLog.warn("Elasticsearch returned code {} with message {}", new Object[]{Integer.valueOf(code), response.message()});
                if (!arrayList.isEmpty()) {
                    processSession.remove(arrayList);
                    arrayList.clear();
                }
            } catch (Throwable th) {
                if (!arrayList.isEmpty()) {
                    processSession.remove(arrayList);
                    arrayList.clear();
                }
                throw th;
            }
        }
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - j);
        if (!arrayList.isEmpty()) {
            if (processContext.hasNonLoopConnection()) {
                arrayList.forEach(flowFile2 -> {
                    processSession.getProvenanceReporter().fetch(flowFile2, url.toExternalForm(), millis);
                });
            } else {
                arrayList.forEach(flowFile3 -> {
                    processSession.getProvenanceReporter().receive(flowFile3, url.toExternalForm(), millis);
                });
            }
        }
        return arrayList.size();
    }

    private URL buildRequestURL(String str, String str2, String str3, String str4, String str5, String str6, int i, int i2, ProcessContext processContext) throws MalformedURLException {
        if (StringUtils.isEmpty(str)) {
            throw new MalformedURLException("Base URL cannot be null");
        }
        HttpUrl.Builder newBuilder = HttpUrl.parse(str).newBuilder();
        newBuilder.addPathSegment(StringUtils.isEmpty(str3) ? "_all" : str3);
        if (StringUtils.isNotBlank(str4)) {
            newBuilder.addPathSegment(str4);
        }
        newBuilder.addPathSegment("_search");
        newBuilder.addQueryParameter("q", str2);
        newBuilder.addQueryParameter("size", String.valueOf(i));
        newBuilder.addQueryParameter(FROM_QUERY_PARAM, String.valueOf(i2));
        if (!StringUtils.isEmpty(str5)) {
            newBuilder.addQueryParameter("_source", (String) Stream.of((Object[]) str5.split(",")).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.joining(",")));
        }
        if (!StringUtils.isEmpty(str6)) {
            newBuilder.addQueryParameter("sort", (String) Stream.of((Object[]) str6.split(",")).map((v0) -> {
                return v0.trim();
            }).collect(Collectors.joining(",")));
        }
        for (Map.Entry entry : processContext.getProperties().entrySet()) {
            PropertyDescriptor propertyDescriptor = (PropertyDescriptor) entry.getKey();
            if (propertyDescriptor.isDynamic() && entry.getValue() != null) {
                newBuilder.addQueryParameter(propertyDescriptor.getName(), processContext.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue());
            }
        }
        return newBuilder.build().url();
    }

    static {
        ArrayList arrayList = new ArrayList(COMMON_PROPERTY_DESCRIPTORS);
        arrayList.add(QUERY);
        arrayList.add(PAGE_SIZE);
        arrayList.add(INDEX);
        arrayList.add(TYPE);
        arrayList.add(FIELDS);
        arrayList.add(SORT);
        arrayList.add(LIMIT);
        arrayList.add(TARGET);
        arrayList.add(ROUTING_QUERY_INFO_STRATEGY);
        propertyDescriptors = Collections.unmodifiableList(arrayList);
    }
}
