package org.apache.nifi.processors.gcp.bigquery;

import com.google.cloud.RetryOption;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.storage.DeleteGCSObject;
import org.apache.nifi.processors.gcp.storage.PutGCSObject;
import org.apache.nifi.util.StringUtils;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;

@CapabilityDescription("Batch loads flow files content to a Google BigQuery table.")
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@Tags({"google", "google cloud", "bq", "bigquery"})
@SeeAlso({PutGCSObject.class, DeleteGCSObject.class})
@WritesAttributes({@WritesAttribute(attribute = BigQueryAttributes.JOB_CREATE_TIME_ATTR, description = BigQueryAttributes.JOB_CREATE_TIME_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_END_TIME_ATTR, description = BigQueryAttributes.JOB_END_TIME_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_START_TIME_ATTR, description = BigQueryAttributes.JOB_START_TIME_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_LINK_ATTR, description = BigQueryAttributes.JOB_LINK_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_ID_ATTR, description = BigQueryAttributes.JOB_ID_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_MSG_ATTR, description = BigQueryAttributes.JOB_ERROR_MSG_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_REASON_ATTR, description = BigQueryAttributes.JOB_ERROR_REASON_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, description = BigQueryAttributes.JOB_ERROR_LOCATION_DESC), @WritesAttribute(attribute = BigQueryAttributes.JOB_NB_RECORDS_ATTR, description = BigQueryAttributes.JOB_NB_RECORDS_DESC)})
/* loaded from: input_file:org/apache/nifi/processors/gcp/bigquery/PutBigQueryBatch.class */
public class PutBigQueryBatch extends AbstractBigQueryProcessor {
    private static final List<String> TYPES = Arrays.asList(FormatOptions.json().getType(), FormatOptions.csv().getType(), FormatOptions.avro().getType());
    private static final Validator FORMAT_VALIDATOR = new Validator() { // from class: org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch.1
        public ValidationResult validate(String str, String str2, ValidationContext validationContext) {
            ValidationResult.Builder builder = new ValidationResult.Builder();
            builder.subject(str).input(str2);
            if (validationContext.isExpressionLanguageSupported(str) && validationContext.isExpressionLanguagePresent(str2)) {
                return builder.valid(true).explanation("Contains Expression Language").build();
            }
            if (PutBigQueryBatch.TYPES.contains(str2.toUpperCase())) {
                builder.valid(true);
            } else {
                builder.valid(false).explanation("Load File Type must be one of the following options: " + StringUtils.join(PutBigQueryBatch.TYPES, ", "));
            }
            return builder.build();
        }
    };
    public static final PropertyDescriptor READ_TIMEOUT = new PropertyDescriptor.Builder().name(BigQueryAttributes.JOB_READ_TIMEOUT_ATTR).displayName("Read Timeout").description(BigQueryAttributes.JOB_READ_TIMEOUT_DESC).required(true).defaultValue("5 minutes").expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).build();
    public static final PropertyDescriptor TABLE_SCHEMA = new PropertyDescriptor.Builder().name(BigQueryAttributes.TABLE_SCHEMA_ATTR).displayName("Table Schema").description(BigQueryAttributes.TABLE_SCHEMA_DESC).required(false).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor SOURCE_TYPE = new PropertyDescriptor.Builder().name(BigQueryAttributes.SOURCE_TYPE_ATTR).displayName("Load file type").description(BigQueryAttributes.SOURCE_TYPE_DESC).required(true).addValidator(FORMAT_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor CREATE_DISPOSITION = new PropertyDescriptor.Builder().name(BigQueryAttributes.CREATE_DISPOSITION_ATTR).displayName("Create Disposition").description(BigQueryAttributes.CREATE_DISPOSITION_DESC).required(true).allowableValues(new AllowableValue[]{BigQueryAttributes.CREATE_IF_NEEDED, BigQueryAttributes.CREATE_NEVER}).defaultValue(BigQueryAttributes.CREATE_IF_NEEDED.getValue()).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor WRITE_DISPOSITION = new PropertyDescriptor.Builder().name(BigQueryAttributes.WRITE_DISPOSITION_ATTR).displayName("Write Disposition").description(BigQueryAttributes.WRITE_DISPOSITION_DESC).required(true).allowableValues(new AllowableValue[]{BigQueryAttributes.WRITE_EMPTY, BigQueryAttributes.WRITE_APPEND, BigQueryAttributes.WRITE_TRUNCATE}).defaultValue(BigQueryAttributes.WRITE_EMPTY.getValue()).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).build();
    public static final PropertyDescriptor MAXBAD_RECORDS = new PropertyDescriptor.Builder().name(BigQueryAttributes.MAX_BADRECORDS_ATTR).displayName("Max Bad Records").description(BigQueryAttributes.MAX_BADRECORDS_DESC).required(true).defaultValue("0").addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).build();
    public static final PropertyDescriptor CSV_ALLOW_JAGGED_ROWS = new PropertyDescriptor.Builder().name(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_ATTR).displayName("CSV Input - Allow Jagged Rows").description(BigQueryAttributes.CSV_ALLOW_JAGGED_ROWS_DESC).required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor CSV_ALLOW_QUOTED_NEW_LINES = new PropertyDescriptor.Builder().name(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_ATTR).displayName("CSV Input - Allow Quoted New Lines").description(BigQueryAttributes.CSV_ALLOW_QUOTED_NEW_LINES_DESC).required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();
    public static final PropertyDescriptor CSV_CHARSET = new PropertyDescriptor.Builder().name(BigQueryAttributes.CSV_CHARSET_ATTR).displayName("CSV Input - Character Set").description(BigQueryAttributes.CSV_CHARSET_DESC).required(true).allowableValues(new String[]{"UTF-8", "ISO-8859-1"}).defaultValue("UTF-8").build();
    public static final PropertyDescriptor CSV_FIELD_DELIMITER = new PropertyDescriptor.Builder().name(BigQueryAttributes.CSV_FIELD_DELIMITER_ATTR).displayName("CSV Input - Field Delimiter").description(BigQueryAttributes.CSV_FIELD_DELIMITER_DESC).required(true).defaultValue(",").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor CSV_QUOTE = new PropertyDescriptor.Builder().name(BigQueryAttributes.CSV_QUOTE_ATTR).displayName("CSV Input - Quote").description(BigQueryAttributes.CSV_QUOTE_DESC).required(true).defaultValue("\"").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor CSV_SKIP_LEADING_ROWS = new PropertyDescriptor.Builder().name(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_ATTR).displayName("CSV Input - Skip Leading Rows").description(BigQueryAttributes.CSV_SKIP_LEADING_ROWS_DESC).required(true).defaultValue("0").addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).build();
    public static final PropertyDescriptor AVRO_USE_LOGICAL_TYPES = new PropertyDescriptor.Builder().name(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_ATTR).displayName("Avro Input - Use Logical Types").description(BigQueryAttributes.AVRO_USE_LOGICAL_TYPES_DESC).required(true).allowableValues(new String[]{"true", "false"}).defaultValue("false").build();

    @Override // org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor, org.apache.nifi.processors.gcp.AbstractGCPProcessor
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList arrayList = new ArrayList(super.getSupportedPropertyDescriptors());
        arrayList.add(TABLE_SCHEMA);
        arrayList.add(READ_TIMEOUT);
        arrayList.add(SOURCE_TYPE);
        arrayList.add(CREATE_DISPOSITION);
        arrayList.add(WRITE_DISPOSITION);
        arrayList.add(MAXBAD_RECORDS);
        arrayList.add(CSV_ALLOW_JAGGED_ROWS);
        arrayList.add(CSV_ALLOW_QUOTED_NEW_LINES);
        arrayList.add(CSV_CHARSET);
        arrayList.add(CSV_FIELD_DELIMITER);
        arrayList.add(CSV_QUOTE);
        arrayList.add(CSV_SKIP_LEADING_ROWS);
        arrayList.add(AVRO_USE_LOGICAL_TYPES);
        return Collections.unmodifiableList(arrayList);
    }

    @Override // org.apache.nifi.processors.gcp.AbstractGCPProcessor
    @OnScheduled
    public void onScheduled(ProcessContext processContext) {
        super.onScheduled(processContext);
    }

    public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            return;
        }
        String value = processContext.getProperty(SOURCE_TYPE).evaluateAttributeExpressions(flowFile).getValue();
        try {
            TableDataWriteChannel writer = getCloudService().writer(WriteChannelConfiguration.newBuilder(getTableId(processContext, flowFile.getAttributes())).setCreateDisposition(JobInfo.CreateDisposition.valueOf(processContext.getProperty(CREATE_DISPOSITION).getValue())).setWriteDisposition(JobInfo.WriteDisposition.valueOf(processContext.getProperty(WRITE_DISPOSITION).getValue())).setIgnoreUnknownValues(processContext.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean()).setUseAvroLogicalTypes(processContext.getProperty(AVRO_USE_LOGICAL_TYPES).asBoolean()).setMaxBadRecords(processContext.getProperty(MAXBAD_RECORDS).asInteger()).setSchema(BigQueryUtils.schemaFromString(processContext.getProperty(TABLE_SCHEMA).evaluateAttributeExpressions(flowFile).getValue())).setFormatOptions(value.equals(FormatOptions.csv().getType()) ? FormatOptions.csv().toBuilder().setAllowJaggedRows(processContext.getProperty(CSV_ALLOW_JAGGED_ROWS).asBoolean().booleanValue()).setAllowQuotedNewLines(processContext.getProperty(CSV_ALLOW_QUOTED_NEW_LINES).asBoolean().booleanValue()).setEncoding(processContext.getProperty(CSV_CHARSET).getValue()).setFieldDelimiter(processContext.getProperty(CSV_FIELD_DELIMITER).evaluateAttributeExpressions(flowFile).getValue()).setQuote(processContext.getProperty(CSV_QUOTE).evaluateAttributeExpressions(flowFile).getValue()).setSkipLeadingRows(processContext.getProperty(CSV_SKIP_LEADING_ROWS).evaluateAttributeExpressions(flowFile).asInteger().intValue()).build() : FormatOptions.of(value)).build());
            try {
                processSession.read(flowFile, inputStream -> {
                    ReadableByteChannel newChannel = Channels.newChannel(inputStream);
                    ByteBuffer allocateDirect = ByteBuffer.allocateDirect(65536);
                    while (newChannel.read(allocateDirect) >= 0) {
                        allocateDirect.flip();
                        writer.write(allocateDirect);
                        allocateDirect.clear();
                    }
                });
                writer.close();
                Job waitFor = writer.getJob().waitFor(new RetryOption[]{RetryOption.totalTimeout(Duration.of(processContext.getProperty(READ_TIMEOUT).evaluateAttributeExpressions(flowFile).asTimePeriod(TimeUnit.SECONDS).longValue(), ChronoUnit.SECONDS))});
                if (waitFor != null) {
                    HashMap hashMap = new HashMap();
                    hashMap.put(BigQueryAttributes.JOB_CREATE_TIME_ATTR, Long.toString(waitFor.getStatistics().getCreationTime().longValue()));
                    hashMap.put(BigQueryAttributes.JOB_END_TIME_ATTR, Long.toString(waitFor.getStatistics().getEndTime().longValue()));
                    hashMap.put(BigQueryAttributes.JOB_START_TIME_ATTR, Long.toString(waitFor.getStatistics().getStartTime().longValue()));
                    hashMap.put(BigQueryAttributes.JOB_LINK_ATTR, waitFor.getSelfLink());
                    hashMap.put(BigQueryAttributes.JOB_ID_ATTR, waitFor.getJobId().getJob());
                    boolean z = waitFor.getStatus().getError() != null;
                    if (z) {
                        hashMap.put(BigQueryAttributes.JOB_ERROR_MSG_ATTR, waitFor.getStatus().getError().getMessage());
                        hashMap.put(BigQueryAttributes.JOB_ERROR_REASON_ATTR, waitFor.getStatus().getError().getReason());
                        hashMap.put(BigQueryAttributes.JOB_ERROR_LOCATION_ATTR, waitFor.getStatus().getError().getLocation());
                    } else {
                        flowFile = processSession.removeAttribute(processSession.removeAttribute(processSession.removeAttribute(flowFile, BigQueryAttributes.JOB_ERROR_MSG_ATTR), BigQueryAttributes.JOB_ERROR_REASON_ATTR), BigQueryAttributes.JOB_ERROR_LOCATION_ATTR);
                        if (waitFor.getStatistics() instanceof JobStatistics.LoadStatistics) {
                            hashMap.put(BigQueryAttributes.JOB_NB_RECORDS_ATTR, Long.toString(waitFor.getStatistics().getOutputRows().longValue()));
                        }
                    }
                    if (!hashMap.isEmpty()) {
                        flowFile = processSession.putAllAttributes(flowFile, hashMap);
                    }
                    if (z) {
                        getLogger().log(LogLevel.WARN, waitFor.getStatus().getError().getMessage());
                        processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
                    } else {
                        processSession.getProvenanceReporter().send(flowFile, waitFor.getSelfLink(), waitFor.getStatistics().getEndTime().longValue() - waitFor.getStatistics().getStartTime().longValue());
                        processSession.transfer(flowFile, REL_SUCCESS);
                    }
                }
                if (writer != null) {
                    writer.close();
                }
            } finally {
            }
        } catch (Exception e) {
            getLogger().log(LogLevel.ERROR, e.getMessage(), e);
            processSession.transfer(processSession.penalize(flowFile), REL_FAILURE);
        }
    }
}
