/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.bigquery;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BQTableSchemaToProtoDescriptor;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.CivilTimeEncoder;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ProtoSchemaConverter;
import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.grpc.Status;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor;
import org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch;
import org.apache.nifi.processors.gcp.bigquery.PutBigQueryStreaming;
import org.apache.nifi.processors.gcp.bigquery.proto.ProtoUtils;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.threeten.bp.LocalTime;

@TriggerSerially
@EventDriven
@Tags(value={"google", "google cloud", "bq", "bigquery"})
@CapabilityDescription(value="Unified processor for batch and stream flow files content to a Google BigQuery table via the Storage Write API.The processor is record based so the used schema is driven by the RecordReader. Attributes that are not matched to the target schemaare skipped. Exactly once delivery semantics are achieved via stream offsets. The Storage Write API is more efficient than the older insertAll method because it uses gRPC streaming rather than REST over HTTP")
@SeeAlso(value={PutBigQueryBatch.class, PutBigQueryStreaming.class})
@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttributes(value={@WritesAttribute(attribute="bq.records.count", description="Number of records successfully inserted")})
public class PutBigQuery
extends AbstractBigQueryProcessor {
    static final String STREAM = "STREAM";
    static final String BATCH = "BATCH";
    static final AllowableValue STREAM_TYPE = new AllowableValue("STREAM", "STREAM", "Use streaming record handling strategy");
    static final AllowableValue BATCH_TYPE = new AllowableValue("BATCH", "BATCH", "Use batching record handling strategy");
    private static final String APPEND_RECORD_COUNT_NAME = "bq.append.record.count";
    private static final String APPEND_RECORD_COUNT_DESC = "The number of records to be appended to the write stream at once. Applicable for both batch and stream types";
    private static final String TRANSFER_TYPE_NAME = "bq.transfer.type";
    private static final String TRANSFER_TYPE_DESC = "Defines the preferred transfer type streaming or batching";
    private static final List<Status.Code> RETRYABLE_ERROR_CODES = Arrays.asList(Status.Code.INTERNAL, Status.Code.ABORTED, Status.Code.CANCELLED);
    private final AtomicReference<Exception> error = new AtomicReference();
    private final AtomicInteger appendSuccessCount = new AtomicInteger(0);
    private final Phaser inflightRequestCount = new Phaser(1);
    private TableName tableName = null;
    private BigQueryWriteClient writeClient = null;
    private StreamWriter streamWriter = null;
    private String transferType;
    private int maxRetryCount;
    private int recordBatchCount;
    private boolean skipInvalidRows;
    public static final PropertyDescriptor PROJECT_ID = new PropertyDescriptor.Builder().fromPropertyDescriptor(AbstractBigQueryProcessor.PROJECT_ID).required(true).build();
    static final PropertyDescriptor TRANSFER_TYPE = new PropertyDescriptor.Builder().name("bq.transfer.type").displayName("Transfer Type").description("Defines the preferred transfer type streaming or batching").required(true).defaultValue(STREAM_TYPE.getValue()).allowableValues(new AllowableValue[]{STREAM_TYPE, BATCH_TYPE}).build();
    static final PropertyDescriptor APPEND_RECORD_COUNT = new PropertyDescriptor.Builder().name("bq.append.record.count").displayName("Append Record Count").description("The number of records to be appended to the write stream at once. Applicable for both batch and stream types").required(true).defaultValue("20").addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR).build();
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("bq.record.reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder().name("bq.skip.invalid.rows").displayName("Skip Invalid Rows").description("Sets whether to insert all valid rows of a request, even if invalid rows exist. If not set the entire insert request will fail if it contains an invalid row.").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("false").build();
    private static final List<PropertyDescriptor> DESCRIPTORS = Stream.of(GCP_CREDENTIALS_PROVIDER_SERVICE, PROJECT_ID, DATASET, TABLE_NAME, RECORD_READER, TRANSFER_TYPE, APPEND_RECORD_COUNT, RETRY_COUNT, SKIP_INVALID_ROWS).collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList));

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
        this.transferType = context.getProperty(TRANSFER_TYPE).getValue();
        this.maxRetryCount = context.getProperty(RETRY_COUNT).asInteger();
        this.skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).asBoolean();
        this.recordBatchCount = context.getProperty(APPEND_RECORD_COUNT).asInteger();
        this.tableName = TableName.of((String)context.getProperty(PROJECT_ID).getValue(), (String)context.getProperty(DATASET).getValue(), (String)context.getProperty(TABLE_NAME).getValue());
        this.writeClient = this.createWriteClient(this.getGoogleCredentials(context));
    }

    @OnUnscheduled
    public void onUnscheduled() {
        this.writeClient.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSession session) {
        Descriptors.Descriptor protoDescriptor;
        WriteStream writeStream;
        try {
            writeStream = this.createWriteStream();
            protoDescriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor((TableSchema)writeStream.getTableSchema());
            this.streamWriter = this.createStreamWriter(writeStream.getName(), protoDescriptor, this.getGoogleCredentials(context));
        }
        catch (Descriptors.DescriptorValidationException | IOException e) {
            this.getLogger().error("Failed to create Big Query Stream Writer for writing", e);
            context.yield();
            return;
        }
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        try {
            int recordNumWritten;
            try (InputStream in = session.read(flowFile);){
                RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
                try (RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());){
                    recordNumWritten = this.writeRecordsToStream(reader, protoDescriptor);
                }
            }
            flowFile = session.putAttribute(flowFile, "bq.records.count", Integer.toString(recordNumWritten));
        }
        catch (Exception e) {
            this.getLogger().error("Writing Records failed", (Throwable)e);
            this.error.set(e);
        }
        finally {
            this.finishProcessing(session, flowFile, this.streamWriter, writeStream.getName(), this.tableName.toString());
        }
    }

    private int writeRecordsToStream(RecordReader reader, Descriptors.Descriptor descriptor) throws Exception {
        Record currentRecord;
        int offset = 0;
        int recordNum = 0;
        ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
        while ((currentRecord = reader.nextRecord()) != null) {
            DynamicMessage message = this.recordToProtoMessage(currentRecord, descriptor);
            if (message == null) continue;
            rowsBuilder.addSerializedRows(message.toByteString());
            if (++recordNum % this.recordBatchCount != 0) continue;
            this.append(new AppendContext(rowsBuilder.build(), offset));
            rowsBuilder = ProtoRows.newBuilder();
            offset = recordNum;
        }
        if (recordNum > offset) {
            this.append(new AppendContext(rowsBuilder.build(), offset));
        }
        return recordNum;
    }

    private DynamicMessage recordToProtoMessage(Record record, Descriptors.Descriptor descriptor) {
        DynamicMessage message;
        block2: {
            Map<String, Object> valueMap = PutBigQuery.convertMapRecord(record.toMap());
            message = null;
            try {
                message = ProtoUtils.createMessage(descriptor, valueMap);
            }
            catch (RuntimeException e) {
                this.getLogger().error("Cannot convert record to message", (Throwable)e);
                if (this.skipInvalidRows) break block2;
                throw e;
            }
        }
        return message;
    }

    private void append(AppendContext appendContext) throws Exception {
        if (this.error.get() != null) {
            throw this.error.get();
        }
        ApiFuture future = this.streamWriter.append(appendContext.getData(), appendContext.getOffset());
        ApiFutures.addCallback((ApiFuture)future, (ApiFutureCallback)new AppendCompleteCallback(appendContext), Runnable::run);
        this.inflightRequestCount.register();
    }

    private void finishProcessing(ProcessSession session, FlowFile flowFile, StreamWriter streamWriter, String streamName, String parentTable) {
        this.inflightRequestCount.arriveAndAwaitAdvance();
        streamWriter.close();
        if (this.error.get() != null) {
            this.getLogger().error("Stream processing failed", (Throwable)this.error.get());
            flowFile = session.putAttribute(flowFile, "bq.records.count", this.isBatch() ? "0" : String.valueOf(this.appendSuccessCount.get() * this.recordBatchCount));
            session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
        } else {
            if (this.isBatch()) {
                this.writeClient.finalizeWriteStream(streamName);
                BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest.newBuilder().setParent(parentTable).addWriteStreams(streamName).build();
                BatchCommitWriteStreamsResponse commitResponse = this.writeClient.batchCommitWriteStreams(commitRequest);
                if (!commitResponse.hasCommitTime()) {
                    for (StorageError err : commitResponse.getStreamErrorsList()) {
                        this.getLogger().error("Commit Storage Error Code: {} with message {}", new Object[]{err.getCode().name(), err.getErrorMessage()});
                    }
                    session.penalize(flowFile);
                    session.transfer(flowFile, REL_FAILURE);
                    return;
                }
                this.getLogger().info("Appended and committed all records successfully.");
            }
            session.transfer(flowFile, REL_SUCCESS);
        }
    }

    private WriteStream createWriteStream() {
        WriteStream.Type type = this.isBatch() ? WriteStream.Type.PENDING : WriteStream.Type.COMMITTED;
        CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder().setParent(this.tableName.toString()).setWriteStream(WriteStream.newBuilder().setType(type).build()).build();
        return this.writeClient.createWriteStream(createWriteStreamRequest);
    }

    protected BigQueryWriteClient createWriteClient(GoogleCredentials credentials) {
        BigQueryWriteClient client;
        try {
            client = BigQueryWriteClient.create((BigQueryWriteSettings)((BigQueryWriteSettings.Builder)BigQueryWriteSettings.newBuilder().setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)credentials))).build());
        }
        catch (Exception e) {
            throw new ProcessException("Failed to create Big Query Write Client for writing", (Throwable)e);
        }
        return client;
    }

    protected StreamWriter createStreamWriter(String streamName, Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws IOException {
        ProtoSchema protoSchema = ProtoSchemaConverter.convert((Descriptors.Descriptor)descriptor);
        return StreamWriter.newBuilder((String)streamName).setWriterSchema(protoSchema).setCredentialsProvider((CredentialsProvider)FixedCredentialsProvider.create((Credentials)credentials)).build();
    }

    private boolean isBatch() {
        return BATCH_TYPE.getValue().equals(this.transferType);
    }

    private static Map<String, Object> convertMapRecord(Map<String, Object> map) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        for (String key : map.keySet()) {
            Object obj = map.get(key);
            if (obj instanceof MapRecord) {
                result.put(key, PutBigQuery.convertMapRecord(((MapRecord)obj).toMap()));
                continue;
            }
            if (obj instanceof Object[] && ((Object[])obj).length > 0 && ((Object[])obj)[0] instanceof MapRecord) {
                ArrayList<Map<String, Object>> lmapr = new ArrayList<Map<String, Object>>();
                for (Object mapr : (Object[])obj) {
                    lmapr.add(PutBigQuery.convertMapRecord(((MapRecord)mapr).toMap()));
                }
                result.put(key, lmapr);
                continue;
            }
            if (obj instanceof Timestamp) {
                result.put(key, ((Timestamp)obj).getTime() * 1000L);
                continue;
            }
            if (obj instanceof Time) {
                java.time.LocalTime time = ((Time)obj).toLocalTime();
                LocalTime localTime = LocalTime.of((int)time.getHour(), (int)time.getMinute(), (int)time.getSecond());
                result.put(key, CivilTimeEncoder.encodePacked64TimeMicros((LocalTime)localTime));
                continue;
            }
            if (obj instanceof Date) {
                result.put(key, (int)((Date)obj).toLocalDate().toEpochDay());
                continue;
            }
            result.put(key, obj);
        }
        return result;
    }

    private static class AppendContext {
        private final ProtoRows data;
        private final long offset;
        private int retryCount;

        AppendContext(ProtoRows data, long offset) {
            this.data = data;
            this.offset = offset;
            this.retryCount = 0;
        }

        public ProtoRows getData() {
            return this.data;
        }

        public int getRetryCount() {
            return this.retryCount;
        }

        public void incrementRetryCount() {
            ++this.retryCount;
        }

        public long getOffset() {
            return this.offset;
        }
    }

    class AppendCompleteCallback
    implements ApiFutureCallback<AppendRowsResponse> {
        private final AppendContext appendContext;

        public AppendCompleteCallback(AppendContext appendContext) {
            this.appendContext = appendContext;
        }

        public void onSuccess(AppendRowsResponse response) {
            PutBigQuery.this.getLogger().info("Append success with offset: {}", new Object[]{this.appendContext.getOffset()});
            PutBigQuery.this.appendSuccessCount.incrementAndGet();
            PutBigQuery.this.inflightRequestCount.arriveAndDeregister();
        }

        public void onFailure(Throwable throwable) {
            Status status = Status.fromThrowable((Throwable)throwable);
            if (this.appendContext.getRetryCount() < PutBigQuery.this.maxRetryCount && RETRYABLE_ERROR_CODES.contains(status.getCode())) {
                this.appendContext.incrementRetryCount();
                try {
                    PutBigQuery.this.append(this.appendContext);
                    PutBigQuery.this.inflightRequestCount.arriveAndDeregister();
                    return;
                }
                catch (Exception e) {
                    PutBigQuery.this.getLogger().error("Failed to retry append", (Throwable)e);
                }
            }
            PutBigQuery.this.error.compareAndSet(null, Optional.ofNullable(Exceptions.toStorageException((Throwable)throwable)).map(RuntimeException.class::cast).orElse(new RuntimeException(throwable)));
            PutBigQuery.this.getLogger().error("Failure during appending data", throwable);
            PutBigQuery.this.inflightRequestCount.arriveAndDeregister();
        }
    }
}

