/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.mongodb;

import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mongodb.AbstractMongoQueryProcessor;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;

@CapabilityDescription(value="A record-based version of GetMongo that uses the Record writers to write the MongoDB result set.")
@Tags(value={"mongo", "mongodb", "get", "fetch", "record", "json"})
@InputRequirement(value=InputRequirement.Requirement.INPUT_ALLOWED)
@WritesAttributes(value={@WritesAttribute(attribute="mongo.database.name", description="The database where the results came from."), @WritesAttribute(attribute="mongo.collection.name", description="The collection where the results came from.")})
public class GetMongoRecord
extends AbstractMongoQueryProcessor {
    public static final PropertyDescriptor WRITER_FACTORY = new PropertyDescriptor.Builder().name("get-mongo-record-writer-factory").displayName("Record Writer").description("The record writer to use to write the result sets.").identifiesControllerService(RecordSetWriterFactory.class).required(true).build();
    public static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder().name("mongodb-schema-name").displayName("Schema Name").description("The name of the schema in the configured schema registry to use for the query results.").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_BLANK_VALIDATOR).defaultValue("${schema.name}").required(true).build();
    private static final List<PropertyDescriptor> DESCRIPTORS;
    private static final Set<Relationship> RELATIONSHIPS;
    private volatile MongoDBClientService clientService;
    private volatile RecordSetWriterFactory writerFactory;

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    public Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    @OnScheduled
    public void onEnabled(ProcessContext context) {
        this.clientService = (MongoDBClientService)context.getProperty(CLIENT_SERVICE).asControllerService(MongoDBClientService.class);
        this.writerFactory = (RecordSetWriterFactory)context.getProperty(WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        block18: {
            FlowFile input = null;
            if (context.hasIncomingConnection() && (input = session.get()) == null && context.hasNonLoopConnection()) {
                return;
            }
            String database = context.getProperty(DATABASE_NAME).evaluateAttributeExpressions(input).getValue();
            String collection = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions(input).getValue();
            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(input).getValue();
            Document query = this.getQuery(context, session, input);
            MongoCollection mongoCollection = this.clientService.getDatabase(database).getCollection(collection);
            FindIterable find = mongoCollection.find((Bson)query);
            if (context.getProperty(SORT).isSet()) {
                find = find.sort((Bson)Document.parse((String)context.getProperty(SORT).evaluateAttributeExpressions(input).getValue()));
            }
            if (context.getProperty(PROJECTION).isSet()) {
                find = find.projection((Bson)Document.parse((String)context.getProperty(PROJECTION).evaluateAttributeExpressions(input).getValue()));
            }
            if (context.getProperty(LIMIT).isSet()) {
                find = find.limit(context.getProperty(LIMIT).evaluateAttributeExpressions(input).asInteger().intValue());
            }
            MongoCursor cursor = find.iterator();
            FlowFile output = input != null ? session.create(input) : session.create();
            FlowFile inputPtr = input;
            try {
                Map<String, String> attributes = this.getAttributes(context, input, query, mongoCollection);
                try (OutputStream out = session.write(output);){
                    Map<Object, Object> attrs = inputPtr != null ? inputPtr.getAttributes() : new HashMap<String, String>(){
                        {
                            this.put("schema.name", schemaName);
                        }
                    };
                    RecordSchema schema = this.writerFactory.getSchema((Map)attrs, null);
                    RecordSetWriter writer = this.writerFactory.createWriter(this.getLogger(), schema, out, (Map)attrs);
                    long count = 0L;
                    writer.beginRecordSet();
                    while (cursor.hasNext()) {
                        Document next = (Document)cursor.next();
                        if (next.get((Object)"_id") instanceof ObjectId) {
                            next.put("_id", (Object)next.get((Object)"_id").toString());
                        }
                        MapRecord record = new MapRecord(schema, (Map)next);
                        writer.write((Record)record);
                        ++count;
                    }
                    writer.finishRecordSet();
                    writer.close();
                    out.close();
                    attributes.put("record.count", String.valueOf(count));
                }
                catch (SchemaNotFoundException e) {
                    throw new RuntimeException(e);
                }
                output = session.putAllAttributes(output, attributes);
                session.getProvenanceReporter().fetch(output, this.getURI(context));
                session.transfer(output, REL_SUCCESS);
                if (input != null) {
                    session.transfer(input, REL_ORIGINAL);
                }
            }
            catch (Exception ex) {
                ex.printStackTrace();
                this.getLogger().error("Error writing record set from Mongo query.", (Throwable)ex);
                session.remove(output);
                if (input == null) break block18;
                session.transfer(input, REL_FAILURE);
            }
        }
    }

    static {
        ArrayList<PropertyDescriptor> _temp = new ArrayList<PropertyDescriptor>();
        _temp.add(CLIENT_SERVICE);
        _temp.add(WRITER_FACTORY);
        _temp.add(DATABASE_NAME);
        _temp.add(COLLECTION_NAME);
        _temp.add(SCHEMA_NAME);
        _temp.add(QUERY_ATTRIBUTE);
        _temp.add(QUERY);
        _temp.add(PROJECTION);
        _temp.add(SORT);
        _temp.add(LIMIT);
        _temp.add(BATCH_SIZE);
        DESCRIPTORS = Collections.unmodifiableList(_temp);
        HashSet<Relationship> _rels = new HashSet<Relationship>();
        _rels.add(REL_SUCCESS);
        _rels.add(REL_FAILURE);
        _rels.add(REL_ORIGINAL);
        RELATIONSHIPS = Collections.unmodifiableSet(_rels);
    }
}

