/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.spring;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.TriggerWhenEmpty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.spring.SpringContextFactory;
import org.apache.nifi.spring.SpringDataExchanger;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TriggerWhenEmpty
@Tags(value={"Spring", "Message", "Get", "Put", "Integration"})
@CapabilityDescription(value="A Processor that supports sending and receiving data from application defined in Spring Application Context via predefined in/out MessageChannels.")
public class SpringContextProcessor
extends AbstractProcessor {
    private final Logger logger = LoggerFactory.getLogger(SpringContextProcessor.class);
    public static final PropertyDescriptor CTX_CONFIG_PATH = new PropertyDescriptor.Builder().name("Application Context config path").description("The path to the Spring Application Context configuration file relative to the classpath").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor CTX_LIB_PATH = new PropertyDescriptor.Builder().name("Application Context class path").description("Path to the directory with resources (i.e., JARs, configuration files etc.) required to be on the classpath of the ApplicationContext.").identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.DIRECTORY, new ResourceType[0]).required(true).build();
    public static final PropertyDescriptor SEND_TIMEOUT = new PropertyDescriptor.Builder().name("Send Timeout").description("Timeout for sending data to Spring Application Context. Defaults to 0.").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor RECEIVE_TIMEOUT = new PropertyDescriptor.Builder().name("Receive Timeout").description("Timeout for receiving date from Spring context. Defaults to 0.").required(false).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("All FlowFiles that are successfully received from Spring Application Context are routed to this relationship").build();
    public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure").description("All FlowFiles that cannot be sent to Spring Application Context are routed to this relationship").build();
    private static final Set<Relationship> relationships;
    private static final List<PropertyDescriptor> propertyDescriptors;
    private volatile String applicationContextConfigFileName;
    private volatile String applicationContextLibPath;
    private volatile long sendTimeout;
    private volatile long receiveTimeout;
    private volatile SpringDataExchanger exchanger;

    public Set<Relationship> getRelationships() {
        return relationships;
    }

    @OnScheduled
    public void initializeSpringContext(ProcessContext processContext) {
        this.applicationContextConfigFileName = processContext.getProperty(CTX_CONFIG_PATH).getValue();
        this.applicationContextLibPath = processContext.getProperty(CTX_LIB_PATH).getValue();
        String stStr = processContext.getProperty(SEND_TIMEOUT).getValue();
        this.sendTimeout = stStr == null ? 0L : FormatUtils.getTimeDuration((String)stStr, (TimeUnit)TimeUnit.MILLISECONDS);
        String rtStr = processContext.getProperty(RECEIVE_TIMEOUT).getValue();
        this.receiveTimeout = rtStr == null ? 0L : FormatUtils.getTimeDuration((String)rtStr, (TimeUnit)TimeUnit.MILLISECONDS);
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Initializing Spring Application Context defined in " + this.applicationContextConfigFileName);
            }
            this.exchanger = SpringContextFactory.createSpringContextDelegate(this.applicationContextLibPath, this.applicationContextConfigFileName);
        }
        catch (Exception e) {
            throw new IllegalStateException("Failed while initializing Spring Application Context", e);
        }
        if (this.logger.isInfoEnabled()) {
            this.logger.info("Successfully initialized Spring Application Context defined in " + this.applicationContextConfigFileName);
        }
    }

    @OnStopped
    public void closeSpringContext(ProcessContext processContext) {
        if (this.exchanger != null) {
            try {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Closing Spring Application Context defined in " + this.applicationContextConfigFileName);
                }
                this.exchanger.close();
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("Successfully closed Spring Application Context defined in " + this.applicationContextConfigFileName);
                }
            }
            catch (IOException e) {
                this.getLogger().warn("Failed while closing Spring Application Context", (Throwable)e);
            }
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
        FlowFile flowFileToProcess = processSession.get();
        if (flowFileToProcess != null) {
            this.sendToSpring(flowFileToProcess, context, processSession);
        }
        this.receiveFromSpring(processSession);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        SpringContextConfigValidator v = new SpringContextConfigValidator();
        return Collections.singletonList(v.validate(CTX_CONFIG_PATH.getName(), null, validationContext));
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return propertyDescriptors;
    }

    private void sendToSpring(FlowFile flowFileToProcess, ProcessContext context, ProcessSession processSession) {
        byte[] payload = this.extractMessage(flowFileToProcess, processSession);
        boolean sent = false;
        try {
            sent = this.exchanger.send(payload, flowFileToProcess.getAttributes(), this.sendTimeout);
            if (sent) {
                processSession.getProvenanceReporter().send(flowFileToProcess, this.applicationContextConfigFileName);
                processSession.remove(flowFileToProcess);
            } else {
                processSession.transfer(processSession.penalize(flowFileToProcess), REL_FAILURE);
                this.getLogger().error("Timed out while sending FlowFile to Spring Application Context " + this.applicationContextConfigFileName);
                context.yield();
            }
        }
        catch (Exception e) {
            processSession.transfer(flowFileToProcess, REL_FAILURE);
            this.getLogger().error("Failed while sending FlowFile to Spring Application Context " + this.applicationContextConfigFileName + "; " + e.getMessage(), (Throwable)e);
            context.yield();
        }
    }

    private void receiveFromSpring(ProcessSession processSession) {
        final SpringDataExchanger.SpringResponse msgFromSpring = this.exchanger.receive(this.receiveTimeout);
        if (msgFromSpring != null) {
            FlowFile flowFileToProcess = processSession.create();
            flowFileToProcess = processSession.write(flowFileToProcess, new OutputStreamCallback(){

                public void process(OutputStream out) throws IOException {
                    Object payload = msgFromSpring.getPayload();
                    byte[] payloadBytes = payload instanceof String ? ((String)payload).getBytes() : (byte[])payload;
                    out.write(payloadBytes);
                }
            });
            flowFileToProcess = processSession.putAllAttributes(flowFileToProcess, this.extractFlowFileAttributesFromMessageHeaders(msgFromSpring.getHeaders()));
            processSession.transfer(flowFileToProcess, REL_SUCCESS);
            processSession.getProvenanceReporter().receive(flowFileToProcess, this.applicationContextConfigFileName);
        }
    }

    private Map<String, String> extractFlowFileAttributesFromMessageHeaders(Map<String, Object> messageHeaders) {
        HashMap<String, String> attributes = new HashMap<String, String>();
        for (Map.Entry<String, Object> entry : messageHeaders.entrySet()) {
            if (!(entry.getValue() instanceof String)) continue;
            attributes.put(entry.getKey(), (String)entry.getValue());
        }
        return attributes;
    }

    private byte[] extractMessage(FlowFile flowFile, ProcessSession processSession) {
        final byte[] messageContent = new byte[(int)flowFile.getSize()];
        processSession.read(flowFile, new InputStreamCallback(){

            public void process(InputStream in) throws IOException {
                StreamUtils.fillBuffer((InputStream)in, (byte[])messageContent, (boolean)true);
            }
        });
        return messageContent;
    }

    private static void validateClassPath(String libDirPath, StringBuilder invalidationMessageBuilder) {
        File libDirPathFile = new File(libDirPath);
        if (!libDirPathFile.exists()) {
            invalidationMessageBuilder.append("'Application Context class path' does not exist. Was '" + libDirPathFile.getAbsolutePath() + "'.");
        } else if (!libDirPathFile.isDirectory()) {
            invalidationMessageBuilder.append("'Application Context class path' must point to a directory. Was '" + libDirPathFile.getAbsolutePath() + "'.");
        }
    }

    private static boolean isConfigResolvable(String configPath, File libDirPathFile) {
        ClassLoader parentLoader = SpringContextProcessor.class.getClassLoader();
        boolean resolvable = false;
        URL[] urls = SpringContextFactory.gatherAdditionalClassPathUrls(libDirPathFile.getAbsolutePath()).toArray(new URL[0]);
        try (URLClassLoader throwawayCl = new URLClassLoader(urls, parentLoader);){
            resolvable = throwawayCl.getResource(configPath) != null;
        }
        catch (IOException iOException) {
            // empty catch block
        }
        return resolvable;
    }

    static {
        ArrayList<PropertyDescriptor> _propertyDescriptors = new ArrayList<PropertyDescriptor>();
        _propertyDescriptors.add(CTX_CONFIG_PATH);
        _propertyDescriptors.add(CTX_LIB_PATH);
        _propertyDescriptors.add(SEND_TIMEOUT);
        _propertyDescriptors.add(RECEIVE_TIMEOUT);
        propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        relationships = Collections.unmodifiableSet(_relationships);
    }

    static class SpringContextConfigValidator
    implements Validator {
        SpringContextConfigValidator() {
        }

        public ValidationResult validate(String subject, String input, ValidationContext context) {
            String invalidationMessage;
            String configPath = context.getProperty(CTX_CONFIG_PATH).getValue();
            String libDirPath = context.getProperty(CTX_LIB_PATH).getValue();
            StringBuilder invalidationMessageBuilder = new StringBuilder();
            if (configPath != null && libDirPath != null) {
                SpringContextProcessor.validateClassPath(libDirPath, invalidationMessageBuilder);
                if (invalidationMessageBuilder.length() == 0 && !SpringContextProcessor.isConfigResolvable(configPath, new File(libDirPath))) {
                    invalidationMessageBuilder.append("'Application Context config path' can not be located in the provided classpath.");
                }
            } else if (StringUtils.isEmpty((CharSequence)configPath)) {
                invalidationMessageBuilder.append("'Application Context config path' must not be empty.");
            } else if (StringUtils.isEmpty((CharSequence)libDirPath)) {
                invalidationMessageBuilder.append("'Application Context class path' must not be empty.");
            } else {
                SpringContextProcessor.validateClassPath(libDirPath, invalidationMessageBuilder);
            }
            ValidationResult vResult = (invalidationMessage = invalidationMessageBuilder.toString()).length() == 0 ? new ValidationResult.Builder().subject(subject).input(input).explanation("Spring configuration '" + configPath + "' is resolvable against provided classpath '" + libDirPath + "'.").valid(true).build() : new ValidationResult.Builder().subject(subject).input(input).explanation("Spring configuration '" + configPath + "' is NOT resolvable against provided classpath '" + libDirPath + "'. Validation message: " + invalidationMessage).valid(false).build();
            return vResult;
        }
    }
}

