/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.hive;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.regex.Pattern;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.SeekableByteArrayInput;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.hcatalog.streaming.ConnectionError;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.components.resource.ResourceCardinality;
import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hadoop.KerberosProperties;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.pattern.DiscontinuedException;
import org.apache.nifi.processor.util.pattern.ErrorTypes;
import org.apache.nifi.processor.util.pattern.ExceptionHandler;
import org.apache.nifi.processor.util.pattern.RollbackOnFailure;
import org.apache.nifi.processor.util.pattern.RoutingResult;
import org.apache.nifi.processors.hive.AbstractHiveQLProcessor;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosLoginException;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.util.hive.AuthenticationFailedException;
import org.apache.nifi.util.hive.HiveConfigurator;
import org.apache.nifi.util.hive.HiveOptions;
import org.apache.nifi.util.hive.HiveUtils;
import org.apache.nifi.util.hive.HiveWriter;
import org.apache.nifi.util.hive.ValidationResources;
import org.xerial.snappy.Snappy;

@Tags(value={"hive", "streaming", "put", "database", "store"})
@CapabilityDescription(value="This processor uses Hive Streaming to send flow file data to an Apache Hive table. The incoming flow file is expected to be in Avro format and the table must exist in Hive. Please see the Hive documentation for requirements on the Hive table (format, partitions, etc.). The partition values are extracted from the Avro record based on the names of the partition columns as specified in the processor. NOTE: If multiple concurrent tasks are configured for this processor, only one table can be written to at any time by a single thread. Additional tasks intending to write to the same table will wait for the current task to finish writing to the table.")
@WritesAttributes(value={@WritesAttribute(attribute="hivestreaming.record.count", description="This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the number of records from the incoming flow file written successfully and unsuccessfully, respectively."), @WritesAttribute(attribute="query.output.tables", description="This attribute is written on the flow files routed to the 'success' and 'failure' relationships, and contains the target table name in 'databaseName.tableName' format.")})
@RequiresInstanceClassLoading
public class PutHiveStreaming
extends AbstractSessionFactoryProcessor {
    private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
    public static final String HIVE_STREAMING_RECORD_COUNT_ATTR = "hivestreaming.record.count";
    private static final String CLIENT_CACHE_DISABLED_PROPERTY = "hcatalog.hive.client.cache.disabled";
    private static final Validator GREATER_THAN_ONE_VALIDATOR = (subject, value, context) -> {
        if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
            return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
        }
        String reason = null;
        try {
            int intVal = Integer.parseInt(value);
            if (intVal < 2) {
                reason = "value is less than 2";
            }
        }
        catch (NumberFormatException e) {
            reason = "value is not a valid integer";
        }
        return new ValidationResult.Builder().subject(subject).input(value).explanation(reason).valid(reason == null).build();
    };
    private static final Set<String> RESERVED_METADATA;
    public static final PropertyDescriptor METASTORE_URI;
    public static final PropertyDescriptor HIVE_CONFIGURATION_RESOURCES;
    public static final PropertyDescriptor DB_NAME;
    public static final PropertyDescriptor TABLE_NAME;
    public static final PropertyDescriptor PARTITION_COLUMNS;
    public static final PropertyDescriptor AUTOCREATE_PARTITIONS;
    public static final PropertyDescriptor MAX_OPEN_CONNECTIONS;
    public static final PropertyDescriptor HEARTBEAT_INTERVAL;
    public static final PropertyDescriptor TXNS_PER_BATCH;
    public static final PropertyDescriptor RECORDS_PER_TXN;
    public static final PropertyDescriptor CALL_TIMEOUT;
    public static final PropertyDescriptor ROLLBACK_ON_FAILURE;
    static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE;
    public static final Relationship REL_SUCCESS;
    public static final Relationship REL_FAILURE;
    public static final Relationship REL_RETRY;
    private List<PropertyDescriptor> propertyDescriptors;
    private Set<Relationship> relationships;
    protected KerberosProperties kerberosProperties;
    private volatile File kerberosConfigFile = null;
    protected volatile HiveConfigurator hiveConfigurator = new HiveConfigurator();
    protected volatile UserGroupInformation ugi;
    protected final AtomicReference<KerberosUser> kerberosUserReference = new AtomicReference();
    protected volatile HiveConf hiveConfig;
    protected final AtomicBoolean sendHeartBeat = new AtomicBoolean(false);
    protected volatile int callTimeout;
    protected ExecutorService callTimeoutPool;
    protected transient Timer heartBeatTimer;
    protected volatile ConcurrentLinkedQueue<Map<HiveEndPoint, HiveWriter>> threadWriterList = new ConcurrentLinkedQueue();
    protected volatile ConcurrentHashMap<String, Semaphore> tableSemaphoreMap = new ConcurrentHashMap();
    private final AtomicReference<ValidationResources> validationResourceHolder = new AtomicReference();

    protected void init(ProcessorInitializationContext context) {
        ArrayList<PropertyDescriptor> props = new ArrayList<PropertyDescriptor>();
        props.add(METASTORE_URI);
        props.add(HIVE_CONFIGURATION_RESOURCES);
        props.add(DB_NAME);
        props.add(TABLE_NAME);
        props.add(PARTITION_COLUMNS);
        props.add(AUTOCREATE_PARTITIONS);
        props.add(MAX_OPEN_CONNECTIONS);
        props.add(HEARTBEAT_INTERVAL);
        props.add(TXNS_PER_BATCH);
        props.add(RECORDS_PER_TXN);
        props.add(CALL_TIMEOUT);
        props.add(ROLLBACK_ON_FAILURE);
        props.add(KERBEROS_CREDENTIALS_SERVICE);
        this.kerberosConfigFile = context.getKerberosConfigurationFile();
        this.kerberosProperties = new KerberosProperties(this.kerberosConfigFile);
        props.add(this.kerberosProperties.getKerberosPrincipal());
        props.add(this.kerberosProperties.getKerberosKeytab());
        props.add(this.kerberosProperties.getKerberosPassword());
        this.propertyDescriptors = Collections.unmodifiableList(props);
        HashSet<Relationship> _relationships = new HashSet<Relationship>();
        _relationships.add(REL_SUCCESS);
        _relationships.add(REL_FAILURE);
        _relationships.add(REL_RETRY);
        this.relationships = Collections.unmodifiableSet(_relationships);
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.propertyDescriptors;
    }

    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        boolean confFileProvided = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).isSet();
        ArrayList<ValidationResult> problems = new ArrayList<ValidationResult>();
        if (confFileProvided) {
            String resolvedKeytab;
            String resolvedPrincipal;
            String explicitPrincipal = validationContext.getProperty(this.kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
            String explicitKeytab = validationContext.getProperty(this.kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
            String explicitPassword = validationContext.getProperty(this.kerberosProperties.getKerberosPassword()).getValue();
            KerberosCredentialsService credentialsService = (KerberosCredentialsService)validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
            if (credentialsService == null) {
                resolvedPrincipal = explicitPrincipal;
                resolvedKeytab = explicitKeytab;
            } else {
                resolvedPrincipal = credentialsService.getPrincipal();
                resolvedKeytab = credentialsService.getKeytab();
            }
            String configFiles = validationContext.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
            problems.addAll(this.hiveConfigurator.validate(configFiles, resolvedPrincipal, resolvedKeytab, explicitPassword, this.validationResourceHolder, this.getLogger()));
            if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null || explicitPassword != null)) {
                problems.add(new ValidationResult.Builder().subject("Kerberos Credentials").valid(false).explanation("Cannot specify a Kerberos Credentials Service while also specifying a Kerberos Principal, Kerberos Keytab, or Kerberos Password").build());
            }
            if (!this.isAllowExplicitKeytab() && explicitKeytab != null) {
                problems.add(new ValidationResult.Builder().subject("Kerberos Credentials").valid(false).explanation("The 'NIFI_ALLOW_EXPLICIT_KEYTAB' system environment variable is configured to forbid explicitly configuring Kerberos Keytab in processors. The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.").build());
            }
        }
        return problems;
    }

    @OnScheduled
    public void setup(ProcessContext context) {
        ComponentLog log = this.getLogger();
        Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).evaluateAttributeExpressions().asInteger();
        String configFiles = context.getProperty(HIVE_CONFIGURATION_RESOURCES).evaluateAttributeExpressions().getValue();
        this.hiveConfig = this.hiveConfigurator.getConfigurationFromFiles(configFiles);
        if (context.getMaxConcurrentTasks() > 1) {
            this.hiveConfig.setBoolean(CLIENT_CACHE_DISABLED_PROPERTY, true);
        }
        for (Map.Entry entry : context.getProperties().entrySet()) {
            PropertyDescriptor descriptor = (PropertyDescriptor)entry.getKey();
            if (!descriptor.isDynamic()) continue;
            this.hiveConfig.set(descriptor.getName(), (String)entry.getValue());
        }
        this.hiveConfigurator.preload((Configuration)this.hiveConfig);
        if (SecurityUtil.isSecurityEnabled((Configuration)this.hiveConfig)) {
            String resolvedKeytab;
            String resolvedPrincipal;
            String explicitPrincipal = context.getProperty(this.kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
            String explicitKeytab = context.getProperty(this.kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
            String explicitPassword = context.getProperty(this.kerberosProperties.getKerberosPassword()).getValue();
            KerberosCredentialsService credentialsService = (KerberosCredentialsService)context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
            if (credentialsService == null) {
                resolvedPrincipal = explicitPrincipal;
                resolvedKeytab = explicitKeytab;
            } else {
                resolvedPrincipal = credentialsService.getPrincipal();
                resolvedKeytab = credentialsService.getKeytab();
            }
            if (resolvedKeytab != null) {
                this.kerberosUserReference.set((KerberosUser)new KerberosKeytabUser(resolvedPrincipal, resolvedKeytab));
                log.info("Hive Security Enabled, logging in as principal {} with keytab {}", new Object[]{resolvedPrincipal, resolvedKeytab});
            } else if (explicitPassword != null) {
                this.kerberosUserReference.set((KerberosUser)new KerberosPasswordUser(resolvedPrincipal, explicitPassword));
                log.info("Hive Security Enabled, logging in as principal {} with password", new Object[]{resolvedPrincipal});
            } else {
                throw new ProcessException("Unable to authenticate with Kerberos, no keytab or password was provided");
            }
            try {
                this.ugi = this.hiveConfigurator.authenticate((Configuration)this.hiveConfig, this.kerberosUserReference.get());
            }
            catch (AuthenticationFailedException ae) {
                throw new ProcessException("Kerberos authentication failed for Hive Streaming", (Throwable)ae);
            }
            log.info("Successfully logged in as principal " + resolvedPrincipal);
        } else {
            this.ugi = null;
            this.kerberosUserReference.set(null);
        }
        this.callTimeout = context.getProperty(CALL_TIMEOUT).evaluateAttributeExpressions().asInteger() * 1000;
        String timeoutName = "put-hive-streaming-%d";
        this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build());
        this.sendHeartBeat.set(true);
        this.heartBeatTimer = new Timer();
        this.setupHeartBeatTimer(heartbeatInterval);
    }

    private ExceptionHandler.OnError<FunctionContext, List<HiveStreamingRecord>> onHiveRecordsError(ProcessContext context, ProcessSession session, Map<HiveEndPoint, HiveWriter> writers) {
        return RollbackOnFailure.createOnError((fc, input, res, e) -> {
            if (res.penalty() == ErrorTypes.Penalty.Yield) {
                context.yield();
            }
            switch (res.destination()) {
                case Failure: {
                    this.getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), (Throwable)e);
                    fc.appendRecordsToFailure(session, (List<HiveStreamingRecord>)input);
                    break;
                }
                case Retry: {
                    this.abortAndCloseWriters(writers);
                    throw new ShouldRetryException("Hive Streaming connect/write error, flow file will be penalized and routed to retry. " + e, e);
                }
                case Self: {
                    this.getLogger().error(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), (Throwable)e);
                    this.abortAndCloseWriters(writers);
                    break;
                }
                default: {
                    this.abortAndCloseWriters(writers);
                    if (e instanceof ProcessException) {
                        throw (ProcessException)((Object)((Object)e));
                    }
                    throw new ProcessException(String.format("Error writing %s to Hive Streaming transaction due to %s", input, e), (Throwable)e);
                }
            }
        });
    }

    private ExceptionHandler.OnError<FunctionContext, HiveStreamingRecord> onHiveRecordError(ProcessContext context, ProcessSession session, Map<HiveEndPoint, HiveWriter> writers) {
        return (fc, input, res, e) -> this.onHiveRecordsError(context, session, writers).apply((Object)fc, Collections.singletonList(input), res, e);
    }

    private ExceptionHandler.OnError<FunctionContext, GenericRecord> onRecordError(ProcessContext context, ProcessSession session, Map<HiveEndPoint, HiveWriter> writers) {
        return (fc, input, res, e) -> this.onHiveRecordError(context, session, writers).apply((Object)fc, (Object)new HiveStreamingRecord(null, (GenericRecord)input), res, e);
    }

    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
        FunctionContext functionContext = new FunctionContext(context.getProperty(ROLLBACK_ON_FAILURE).asBoolean(), this.getLogger());
        RollbackOnFailure.onTrigger((ProcessContext)context, (ProcessSessionFactory)sessionFactory, (RollbackOnFailure)functionContext, (ComponentLog)this.getLogger(), session -> this.onTrigger(context, session, functionContext));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void onTrigger(final ProcessContext context, final ProcessSession session, final FunctionContext functionContext) throws ProcessException {
        List partitionColumnList;
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String dbName = context.getProperty(DB_NAME).evaluateAttributeExpressions(flowFile).getValue();
        String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        Semaphore newSemaphore = new Semaphore(1);
        Semaphore semaphore = this.tableSemaphoreMap.putIfAbsent(dbName + "." + tableName, newSemaphore);
        if (semaphore == null) {
            semaphore = newSemaphore;
        }
        boolean gotSemaphore = false;
        try {
            gotSemaphore = semaphore.tryAcquire(0L, TimeUnit.SECONDS);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        if (!gotSemaphore) {
            session.rollback();
            return;
        }
        final ComponentLog log = this.getLogger();
        String metastoreUri = context.getProperty(METASTORE_URI).evaluateAttributeExpressions(flowFile).getValue();
        boolean autoCreatePartitions = context.getProperty(AUTOCREATE_PARTITIONS).asBoolean();
        Integer maxConnections = context.getProperty(MAX_OPEN_CONNECTIONS).asInteger();
        Integer heartbeatInterval = context.getProperty(HEARTBEAT_INTERVAL).evaluateAttributeExpressions().asInteger();
        Integer txnsPerBatch = context.getProperty(TXNS_PER_BATCH).evaluateAttributeExpressions(flowFile).asInteger();
        final Integer recordsPerTxn = context.getProperty(RECORDS_PER_TXN).evaluateAttributeExpressions(flowFile).asInteger();
        final ConcurrentHashMap myWriters = new ConcurrentHashMap();
        this.threadWriterList.add(myWriters);
        HiveOptions o = new HiveOptions(metastoreUri, dbName, tableName).withTxnsPerBatch(txnsPerBatch).withAutoCreatePartitions(autoCreatePartitions).withMaxOpenConnections(maxConnections).withHeartBeatInterval(heartbeatInterval).withCallTimeout(this.callTimeout);
        if (SecurityUtil.isSecurityEnabled((Configuration)this.hiveConfig)) {
            String resolvedKeytab;
            String resolvedPrincipal;
            String explicitPrincipal = context.getProperty(this.kerberosProperties.getKerberosPrincipal()).evaluateAttributeExpressions().getValue();
            String explicitKeytab = context.getProperty(this.kerberosProperties.getKerberosKeytab()).evaluateAttributeExpressions().getValue();
            KerberosCredentialsService credentialsService = (KerberosCredentialsService)context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
            if (credentialsService == null) {
                resolvedPrincipal = explicitPrincipal;
                resolvedKeytab = explicitKeytab;
            } else {
                resolvedPrincipal = credentialsService.getPrincipal();
                resolvedKeytab = credentialsService.getKeytab();
            }
            o = o.withKerberosPrincipal(resolvedPrincipal).withKerberosKeytab(resolvedKeytab);
        }
        final HiveOptions options = o;
        ClassLoader originalClassloader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(((Object)((Object)this)).getClass().getClassLoader());
        String partitionColumns = context.getProperty(PARTITION_COLUMNS).evaluateAttributeExpressions().getValue();
        if (partitionColumns == null || partitionColumns.isEmpty()) {
            partitionColumnList = Collections.emptyList();
        } else {
            String[] partitionCols = partitionColumns.split(",");
            partitionColumnList = new ArrayList(partitionCols.length);
            for (String col : partitionCols) {
                partitionColumnList.add(col.trim());
            }
        }
        final AtomicReference successfulRecords = new AtomicReference();
        successfulRecords.set(new ArrayList());
        final FlowFile inputFlowFile = flowFile;
        final RoutingResult result = new RoutingResult();
        final ExceptionHandler exceptionHandler = new ExceptionHandler();
        exceptionHandler.mapException(s -> {
            try {
                if (s == null) {
                    return ErrorTypes.PersistentFailure;
                }
                throw s;
            }
            catch (IllegalArgumentException | AvroRuntimeException | SerializationError | HiveWriter.WriteFailure inputError) {
                return ErrorTypes.InvalidInput;
            }
            catch (HiveWriter.CommitFailure | HiveWriter.TxnBatchFailure | HiveWriter.TxnFailure writerTxError) {
                return ErrorTypes.TemporalInputFailure;
            }
            catch (ConnectionError | HiveWriter.ConnectFailure connectionError) {
                log.error("Error connecting to Hive endpoint: table {} at {}", new Object[]{options.getTableName(), options.getMetaStoreURI()});
                return ErrorTypes.TemporalFailure;
            }
            catch (IOException | InterruptedException tempError) {
                return ErrorTypes.TemporalFailure;
            }
            catch (Exception t) {
                return ErrorTypes.UnknownFailure;
            }
        });
        final BiFunction adjustError = RollbackOnFailure.createAdjustError((ComponentLog)this.getLogger());
        exceptionHandler.adjustError(adjustError);
        functionContext.setFlowFiles(session.create(inputFlowFile), session.create(inputFlowFile));
        try {
            session.read(inputFlowFile, new InputStreamCallback(){

                public void process(InputStream in) throws IOException {
                    try (DataFileStream reader = new DataFileStream(in, (DatumReader)new GenericDatumReader());){
                        GenericRecord currRecord = null;
                        String codec = reader.getMetaString("avro.codec") == null ? "null" : reader.getMetaString("avro.codec");
                        functionContext.initAvroWriters(session, codec, (DataFileStream<GenericRecord>)reader);
                        Runnable flushSuccessfulRecords = () -> {
                            functionContext.appendRecordsToSuccess(session, (List)successfulRecords.get());
                            successfulRecords.set(new ArrayList());
                        };
                        while (reader.hasNext()) {
                            HiveWriter hiveWriter;
                            AtomicReference hiveWriterRef;
                            HiveStreamingRecord record;
                            currRecord = (GenericRecord)reader.next();
                            functionContext.recordCount.incrementAndGet();
                            ArrayList<String> partitionValues = new ArrayList<String>();
                            if (!exceptionHandler.execute((Object)functionContext, (Object)currRecord, input -> {
                                for (String partition : partitionColumnList) {
                                    Object partitionValue = input.get(partition);
                                    if (partitionValue == null) {
                                        throw new IllegalArgumentException("Partition column '" + partition + "' not found in Avro record");
                                    }
                                    partitionValues.add(partitionValue.toString());
                                }
                            }, PutHiveStreaming.this.onRecordError(context, session, myWriters)) || !exceptionHandler.execute((Object)functionContext, (Object)(record = new HiveStreamingRecord(partitionValues, currRecord)), arg_0 -> this.lambda$process$2(record, options, myWriters, hiveWriterRef = new AtomicReference(), successfulRecords, arg_0), PutHiveStreaming.this.onHiveRecordError(context, session, myWriters)) || (hiveWriter = (HiveWriter)hiveWriterRef.get()).getTotalRecords() < recordsPerTxn) continue;
                            exceptionHandler.execute((Object)functionContext, (Object)((List)successfulRecords.get()), input -> {
                                hiveWriter.flush(true);
                                functionContext.proceed();
                                flushSuccessfulRecords.run();
                            }, PutHiveStreaming.this.onHiveRecordsError(context, session, myWriters).andThen((fc, input, res, commitException) -> {
                                switch (res.destination()) {
                                    case Failure: 
                                    case Retry: {
                                        try {
                                            hiveWriter.abort();
                                            break;
                                        }
                                        catch (Exception e) {
                                            throw new ProcessException((Throwable)e);
                                        }
                                    }
                                }
                            }));
                        }
                        exceptionHandler.execute((Object)functionContext, (Object)((List)successfulRecords.get()), input -> {
                            PutHiveStreaming.this.flushAllWriters(myWriters, true);
                            PutHiveStreaming.this.closeAllWriters(myWriters);
                            flushSuccessfulRecords.run();
                        }, PutHiveStreaming.this.onHiveRecordsError(context, session, myWriters));
                    }
                    catch (IOException ioe) {
                        ErrorTypes.Result adjusted = (ErrorTypes.Result)adjustError.apply(functionContext, ErrorTypes.InvalidInput);
                        String msg = "The incoming flow file can not be read as an Avro file";
                        switch (adjusted.destination()) {
                            case Failure: {
                                log.error("The incoming flow file can not be read as an Avro file", (Throwable)ioe);
                                result.routeTo(inputFlowFile, REL_FAILURE);
                                break;
                            }
                            case ProcessException: {
                                throw new ProcessException("The incoming flow file can not be read as an Avro file", (Throwable)ioe);
                            }
                        }
                    }
                }

                private /* synthetic */ void lambda$process$2(HiveStreamingRecord record, HiveOptions options2, Map myWriters2, AtomicReference hiveWriterRef, AtomicReference successfulRecords2, HiveStreamingRecord input) throws Exception {
                    HiveEndPoint endPoint = PutHiveStreaming.this.makeHiveEndPoint(record.getPartitionValues(), options2);
                    HiveWriter hiveWriter = PutHiveStreaming.this.getOrCreateWriter(myWriters2, options2, endPoint);
                    hiveWriterRef.set(hiveWriter);
                    hiveWriter.write(record.getRecord().toString().getBytes(StandardCharsets.UTF_8));
                    ((List)successfulRecords2.get()).add(record);
                }
            });
            if (result.getRoutedFlowFiles().values().stream().noneMatch(routed -> routed.contains(inputFlowFile))) {
                session.remove(inputFlowFile);
            }
        }
        catch (DiscontinuedException e) {
            this.getLogger().warn("Discontinued processing for {} due to {}", new Object[]{flowFile, e}, (Throwable)e);
            result.routeTo(flowFile, Relationship.SELF);
        }
        catch (ShouldRetryException e) {
            this.getLogger().error(e.getMessage(), (Throwable)e);
            flowFile = session.penalize(flowFile);
            result.routeTo(flowFile, REL_RETRY);
        }
        finally {
            this.threadWriterList.remove(myWriters);
            functionContext.transferFlowFiles(session, result, options);
            Thread.currentThread().setContextClassLoader(originalClassloader);
            semaphore.release();
        }
    }

    @OnStopped
    public void cleanup() {
        this.validationResourceHolder.set(null);
        ComponentLog log = this.getLogger();
        this.sendHeartBeat.set(false);
        for (Map<HiveEndPoint, HiveWriter> allWriters : this.threadWriterList) {
            for (Map.Entry<HiveEndPoint, HiveWriter> entry : allWriters.entrySet()) {
                try {
                    HiveWriter w = entry.getValue();
                    w.flushAndClose();
                }
                catch (Exception ex) {
                    log.warn("Error while closing writer to " + entry.getKey() + ". Exception follows.", (Throwable)ex);
                    if (!(ex instanceof InterruptedException)) continue;
                    Thread.currentThread().interrupt();
                }
            }
            allWriters.clear();
        }
        if (this.callTimeoutPool != null) {
            this.callTimeoutPool.shutdown();
            try {
                while (!this.callTimeoutPool.isTerminated()) {
                    this.callTimeoutPool.awaitTermination(this.callTimeout, TimeUnit.MILLISECONDS);
                }
            }
            catch (Throwable t) {
                log.warn("shutdown interrupted on " + this.callTimeoutPool, t);
            }
            this.callTimeoutPool = null;
        }
        this.ugi = null;
        this.kerberosUserReference.set(null);
    }

    private void setupHeartBeatTimer(final int heartbeatInterval) {
        if (heartbeatInterval > 0) {
            final ComponentLog log = this.getLogger();
            this.heartBeatTimer.schedule(new TimerTask(){

                @Override
                public void run() {
                    try {
                        if (PutHiveStreaming.this.sendHeartBeat.get()) {
                            log.debug("Start sending heartbeat on all writers");
                            PutHiveStreaming.this.sendHeartBeatOnAllWriters();
                            PutHiveStreaming.this.setupHeartBeatTimer(heartbeatInterval);
                        }
                    }
                    catch (Exception e) {
                        log.warn("Failed to heartbeat on HiveWriter ", (Throwable)e);
                    }
                }
            }, heartbeatInterval * 1000);
        }
    }

    private void sendHeartBeatOnAllWriters() throws InterruptedException {
        for (Map<HiveEndPoint, HiveWriter> allWriters : this.threadWriterList) {
            for (HiveWriter writer : allWriters.values()) {
                writer.heartBeat();
            }
        }
    }

    private void flushAllWriters(Map<HiveEndPoint, HiveWriter> writers, boolean rollToNext) throws HiveWriter.CommitFailure, HiveWriter.TxnBatchFailure, HiveWriter.TxnFailure, InterruptedException {
        for (HiveWriter writer : writers.values()) {
            writer.flush(rollToNext);
        }
    }

    private void abortAndCloseWriters(Map<HiveEndPoint, HiveWriter> writers) {
        try {
            this.abortAllWriters(writers);
            this.closeAllWriters(writers);
        }
        catch (Exception ie) {
            this.getLogger().warn("unable to close hive connections. ", (Throwable)ie);
        }
    }

    private void abortAllWriters(Map<HiveEndPoint, HiveWriter> writers) throws InterruptedException, StreamingException, HiveWriter.TxnBatchFailure {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : writers.entrySet()) {
            try {
                entry.getValue().abort();
            }
            catch (Exception e) {
                this.getLogger().error("Failed to abort hive transaction batch, HiveEndPoint " + entry.getValue() + " due to exception ", (Throwable)e);
            }
        }
    }

    private void closeAllWriters(Map<HiveEndPoint, HiveWriter> writers) {
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : writers.entrySet()) {
            try {
                entry.getValue().close();
            }
            catch (Exception e) {
                this.getLogger().warn("unable to close writers. ", (Throwable)e);
            }
        }
        writers.clear();
    }

    private HiveWriter getOrCreateWriter(Map<HiveEndPoint, HiveWriter> writers, HiveOptions options, HiveEndPoint endPoint) throws HiveWriter.ConnectFailure, InterruptedException {
        ComponentLog log = this.getLogger();
        try {
            HiveWriter writer = writers.get(endPoint);
            if (writer == null) {
                log.debug("Creating Writer to Hive end point : " + endPoint);
                writer = this.makeHiveWriter(endPoint, this.callTimeoutPool, this.getUgi(), options);
                if (writers.size() > options.getMaxOpenConnections() - 1) {
                    log.info("cached HiveEndPoint size {} exceeded maxOpenConnections {} ", new Object[]{writers.size(), options.getMaxOpenConnections()});
                    int retired = this.retireIdleWriters(writers, options.getIdleTimeout());
                    if (retired == 0) {
                        this.retireEldestWriter(writers);
                    }
                }
                writers.put(endPoint, writer);
                HiveUtils.logAllHiveEndPoints(writers);
            }
            return writer;
        }
        catch (HiveWriter.ConnectFailure e) {
            log.error("Failed to create HiveWriter for endpoint: " + endPoint, (Throwable)e);
            throw e;
        }
    }

    private void retireEldestWriter(Map<HiveEndPoint, HiveWriter> writers) {
        ComponentLog log = this.getLogger();
        log.info("Attempting close eldest writers");
        long oldestTimeStamp = System.currentTimeMillis();
        HiveEndPoint eldest = null;
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : writers.entrySet()) {
            if (entry.getValue().getLastUsed() >= oldestTimeStamp) continue;
            eldest = entry.getKey();
            oldestTimeStamp = entry.getValue().getLastUsed();
        }
        try {
            log.info("Closing least used Writer to Hive end point : " + eldest);
            writers.remove(eldest).flushAndClose();
        }
        catch (IOException e) {
            log.warn("Failed to close writer for end point: " + eldest, (Throwable)e);
        }
        catch (InterruptedException e) {
            log.warn("Interrupted when attempting to close writer for end point: " + eldest, (Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            log.warn("Interrupted when attempting to close writer for end point: " + eldest, (Throwable)e);
        }
    }

    private int retireIdleWriters(Map<HiveEndPoint, HiveWriter> writers, int idleTimeout) {
        ComponentLog log = this.getLogger();
        log.info("Attempting to close idle HiveWriters");
        int count = 0;
        long now = System.currentTimeMillis();
        ArrayList<HiveEndPoint> retirees = new ArrayList<HiveEndPoint>();
        for (Map.Entry<HiveEndPoint, HiveWriter> entry : writers.entrySet()) {
            if (now - entry.getValue().getLastUsed() <= (long)idleTimeout) continue;
            ++count;
            retirees.add(entry.getKey());
        }
        for (HiveEndPoint ep : retirees) {
            try {
                log.info("Closing idle Writer to Hive end point : {}", new Object[]{ep});
                writers.remove(ep).flushAndClose();
            }
            catch (IOException e) {
                log.warn("Failed to close HiveWriter for end point: {}. Error: " + ep, (Throwable)e);
            }
            catch (InterruptedException e) {
                log.warn("Interrupted when attempting to close HiveWriter for end point: " + ep, (Throwable)e);
                Thread.currentThread().interrupt();
            }
            catch (Exception e) {
                log.warn("Interrupted when attempting to close HiveWriter for end point: " + ep, (Throwable)e);
            }
        }
        return count;
    }

    protected HiveEndPoint makeHiveEndPoint(List<String> partitionValues, HiveOptions options) throws ConnectionError {
        return HiveUtils.makeEndPoint(partitionValues, options);
    }

    protected HiveWriter makeHiveWriter(HiveEndPoint endPoint, ExecutorService callTimeoutPool, UserGroupInformation ugi, HiveOptions options) throws HiveWriter.ConnectFailure, InterruptedException {
        return HiveUtils.makeHiveWriter(endPoint, callTimeoutPool, ugi, options, this.hiveConfig);
    }

    protected KerberosProperties getKerberosProperties() {
        return this.kerberosProperties;
    }

    UserGroupInformation getUgi() {
        this.getLogger().trace("getting UGI instance");
        if (this.kerberosUserReference.get() != null) {
            KerberosUser kerberosUser = this.kerberosUserReference.get();
            this.getLogger().debug("kerberosUser is " + kerberosUser);
            try {
                this.getLogger().debug("checking TGT on kerberosUser [{}]", new Object[]{kerberosUser});
                kerberosUser.checkTGTAndRelogin();
            }
            catch (KerberosLoginException e) {
                throw new ProcessException("Unable to relogin with kerberos credentials for " + kerberosUser.getPrincipal(), (Throwable)e);
            }
        } else {
            this.getLogger().debug("kerberosUser was null, will not refresh TGT with KerberosUser");
        }
        return this.ugi;
    }

    boolean isAllowExplicitKeytab() {
        return Boolean.parseBoolean(System.getenv(ALLOW_EXPLICIT_KEYTAB));
    }

    static {
        try {
            Snappy.compress((String)"");
        }
        catch (IOException iOException) {
            // empty catch block
        }
        HashSet<String> reservedMetadata = new HashSet<String>();
        reservedMetadata.add("avro.schema");
        reservedMetadata.add("avro.codec");
        RESERVED_METADATA = Collections.unmodifiableSet(reservedMetadata);
        METASTORE_URI = new PropertyDescriptor.Builder().name("hive-stream-metastore-uri").displayName("Hive Metastore URI").description("The URI location for the Hive Metastore. Note that this is not the location of the Hive Server. The default port for the Hive metastore is 9043.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.URI_VALIDATOR).addValidator(StandardValidators.createRegexMatchingValidator((Pattern)Pattern.compile("(^[^/]+.*[^/]+$|^[^/]+$|^$)"))).build();
        HIVE_CONFIGURATION_RESOURCES = new PropertyDescriptor.Builder().name("hive-config-resources").displayName("Hive Configuration Resources").description("A file or comma separated list of files which contains the Hive configuration (hive-site.xml, e.g.). Without this, Hadoop will search the classpath for a 'hive-site.xml' file or will revert to a default configuration. Note that to enable authentication with Kerberos e.g., the appropriate properties must be set in the configuration files. Also note that if Max Concurrent Tasks is set to a number greater than one, the 'hcatalog.hive.client.cache.disabled' property will be forced to 'true' to avoid concurrency issues. Please see the Hive documentation for more details.").required(false).identifiesExternalResource(ResourceCardinality.MULTIPLE, ResourceType.FILE, new ResourceType[0]).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
        DB_NAME = new PropertyDescriptor.Builder().name("hive-stream-database-name").displayName("Database Name").description("The name of the database in which to put the data.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        TABLE_NAME = new PropertyDescriptor.Builder().name("hive-stream-table-name").displayName("Table Name").description("The name of the database table in which to put the data.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
        PARTITION_COLUMNS = new PropertyDescriptor.Builder().name("hive-stream-partition-cols").displayName("Partition Columns").description("A comma-delimited list of column names on which the table has been partitioned. The order of values in this list must correspond exactly to the order of partition columns specified during the table creation.").required(false).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.createRegexMatchingValidator((Pattern)Pattern.compile("[^,]+(,[^,]+)*"))).build();
        AUTOCREATE_PARTITIONS = new PropertyDescriptor.Builder().name("hive-stream-autocreate-partition").displayName("Auto-Create Partitions").description("Flag indicating whether partitions should be automatically created").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
        MAX_OPEN_CONNECTIONS = new PropertyDescriptor.Builder().name("hive-stream-max-open-connections").displayName("Max Open Connections").description("The maximum number of open connections that can be allocated from this pool at the same time, or negative for no limit.").defaultValue("8").required(true).addValidator(StandardValidators.INTEGER_VALIDATOR).sensitive(false).build();
        HEARTBEAT_INTERVAL = new PropertyDescriptor.Builder().name("hive-stream-heartbeat-interval").displayName("Heartbeat Interval").description("Indicates that a heartbeat should be sent when the specified number of seconds has elapsed. A value of 0 indicates that no heartbeat should be sent. Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.").defaultValue("60").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
        TXNS_PER_BATCH = new PropertyDescriptor.Builder().name("hive-stream-transactions-per-batch").displayName("Transactions per Batch").description("A hint to Hive Streaming indicating how many transactions the processor task will need. This value must be greater than 1.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(GREATER_THAN_ONE_VALIDATOR).defaultValue("100").build();
        RECORDS_PER_TXN = new PropertyDescriptor.Builder().name("hive-stream-records-per-transaction").displayName("Records per Transaction").description("Number of records to process before committing the transaction. This value must be greater than 1.").required(true).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(GREATER_THAN_ONE_VALIDATOR).defaultValue("10000").build();
        CALL_TIMEOUT = new PropertyDescriptor.Builder().name("hive-stream-call-timeout").displayName("Call Timeout").description("The number of seconds allowed for a Hive Streaming operation to complete. A value of 0 indicates the processor should wait indefinitely on operations. Note that although this property supports Expression Language, it will not be evaluated against incoming FlowFile attributes.").defaultValue("0").required(true).addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
        ROLLBACK_ON_FAILURE = RollbackOnFailure.createRollbackOnFailureProperty((String)"NOTE: When an error occurred after a Hive streaming transaction which is derived from the same input FlowFile is already committed, (i.e. a FlowFile contains more records than 'Records per Transaction' and a failure occurred at the 2nd transaction or later) then the succeeded records will be transferred to 'success' relationship while the original input FlowFile stays in incoming queue. Duplicated records can be created for the succeeded ones when the same FlowFile is processed again.");
        KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().name("kerberos-credentials-service").displayName("Kerberos Credentials Service").description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos").identifiesControllerService(KerberosCredentialsService.class).required(false).build();
        REL_SUCCESS = new Relationship.Builder().name("success").description("A FlowFile containing Avro records routed to this relationship after the record has been successfully transmitted to Hive.").build();
        REL_FAILURE = new Relationship.Builder().name("failure").description("A FlowFile containing Avro records routed to this relationship if the record could not be transmitted to Hive.").build();
        REL_RETRY = new Relationship.Builder().name("retry").description("The incoming FlowFile is routed to this relationship if its records cannot be transmitted to Hive. Note that some records may have been processed successfully, they will be routed (as Avro flow files) to the success relationship. The combination of the retry, success, and failure relationships indicate how many records succeeded and/or failed. This can be used to provide a retry capability since full rollback is not possible.").build();
    }

    protected class HiveStreamingRecord {
        private List<String> partitionValues;
        private GenericRecord record;

        public HiveStreamingRecord(List<String> partitionValues, GenericRecord record) {
            this.partitionValues = partitionValues;
            this.record = record;
        }

        public List<String> getPartitionValues() {
            return this.partitionValues;
        }

        public GenericRecord getRecord() {
            return this.record;
        }
    }

    private static class ShouldRetryException
    extends RuntimeException {
        private ShouldRetryException(String message, Throwable cause) {
            super(message, cause);
        }
    }

    private static class FunctionContext
    extends RollbackOnFailure {
        private AtomicReference<FlowFile> successFlowFile;
        private AtomicReference<FlowFile> failureFlowFile;
        private final DataFileWriter<GenericRecord> successAvroWriter = new DataFileWriter((DatumWriter)new GenericDatumWriter());
        private final DataFileWriter<GenericRecord> failureAvroWriter = new DataFileWriter((DatumWriter)new GenericDatumWriter());
        private byte[] successAvroHeader;
        private byte[] failureAvroHeader;
        private final AtomicInteger recordCount = new AtomicInteger(0);
        private final AtomicInteger successfulRecordCount = new AtomicInteger(0);
        private final AtomicInteger failedRecordCount = new AtomicInteger(0);
        private final ComponentLog logger;

        private FunctionContext(boolean rollbackOnFailure, ComponentLog logger) {
            super(rollbackOnFailure, false);
            this.logger = logger;
        }

        private void setFlowFiles(FlowFile successFlowFile, FlowFile failureFlowFile) {
            this.successFlowFile = new AtomicReference<FlowFile>(successFlowFile);
            this.failureFlowFile = new AtomicReference<FlowFile>(failureFlowFile);
        }

        private byte[] initAvroWriter(ProcessSession session, String codec, DataFileStream<GenericRecord> reader, DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef) {
            writer.setCodec(CodecFactory.fromString((String)codec));
            for (String metaKey : reader.getMetaKeys()) {
                if (RESERVED_METADATA.contains(metaKey)) continue;
                writer.setMeta(metaKey, reader.getMeta(metaKey));
            }
            ByteArrayOutputStream avroHeader = new ByteArrayOutputStream();
            flowFileRef.set(session.append(flowFileRef.get(), out -> {
                writer.create(reader.getSchema(), (OutputStream)avroHeader);
                writer.close();
                byte[] header = avroHeader.toByteArray();
                out.write(header);
            }));
            return avroHeader.toByteArray();
        }

        private void initAvroWriters(ProcessSession session, String codec, DataFileStream<GenericRecord> reader) {
            this.successAvroHeader = this.initAvroWriter(session, codec, reader, this.successAvroWriter, this.successFlowFile);
            this.failureAvroHeader = this.initAvroWriter(session, codec, reader, this.failureAvroWriter, this.failureFlowFile);
        }

        private void appendAvroRecords(ProcessSession session, byte[] avroHeader, DataFileWriter<GenericRecord> writer, AtomicReference<FlowFile> flowFileRef, List<HiveStreamingRecord> hRecords) {
            flowFileRef.set(session.append(flowFileRef.get(), out -> {
                if (hRecords != null) {
                    writer.appendTo((SeekableInput)new SeekableByteArrayInput(avroHeader), out);
                    try {
                        for (HiveStreamingRecord hRecord : hRecords) {
                            writer.append((Object)hRecord.getRecord());
                        }
                    }
                    catch (IOException ioe) {
                        this.logger.error("Error writing Avro records (which were sent successfully to Hive Streaming) to the flow file, " + ioe, (Throwable)ioe);
                    }
                }
                writer.close();
            }));
        }

        private void appendRecordsToSuccess(ProcessSession session, List<HiveStreamingRecord> records) {
            this.appendAvroRecords(session, this.successAvroHeader, this.successAvroWriter, this.successFlowFile, records);
            this.successfulRecordCount.addAndGet(records.size());
        }

        private void appendRecordsToFailure(ProcessSession session, List<HiveStreamingRecord> records) {
            this.appendAvroRecords(session, this.failureAvroHeader, this.failureAvroWriter, this.failureFlowFile, records);
            this.failedRecordCount.addAndGet(records.size());
        }

        private void transferFlowFiles(ProcessSession session, RoutingResult result, HiveOptions options) {
            HashMap<String, String> updateAttributes;
            if (this.successfulRecordCount.get() > 0) {
                updateAttributes = new HashMap<String, String>();
                updateAttributes.put(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(this.successfulRecordCount.get()));
                updateAttributes.put(AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
                this.successFlowFile.set(session.putAllAttributes(this.successFlowFile.get(), updateAttributes));
                session.getProvenanceReporter().send(this.successFlowFile.get(), options.getMetaStoreURI());
                result.routeTo(this.successFlowFile.get(), REL_SUCCESS);
            } else {
                session.remove(this.successFlowFile.get());
            }
            if (this.failedRecordCount.get() > 0) {
                updateAttributes = new HashMap();
                updateAttributes.put(PutHiveStreaming.HIVE_STREAMING_RECORD_COUNT_ATTR, Integer.toString(this.failedRecordCount.get()));
                updateAttributes.put(AbstractHiveQLProcessor.ATTR_OUTPUT_TABLES, options.getQualifiedTableName());
                this.failureFlowFile.set(session.putAllAttributes(this.failureFlowFile.get(), updateAttributes));
                result.routeTo(this.failureFlowFile.get(), REL_FAILURE);
            } else {
                session.remove(this.failureFlowFile.get());
            }
            result.getRoutedFlowFiles().forEach((relationship, flowFiles) -> session.transfer((Collection)flowFiles, relationship));
        }
    }
}

