/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.gcp.bigquery;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.io.InputStream;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SystemResource;
import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.bigquery.AbstractBigQueryProcessor;
import org.apache.nifi.processors.gcp.bigquery.PutBigQuery;
import org.apache.nifi.processors.gcp.bigquery.PutBigQueryBatch;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;

@InputRequirement(value=InputRequirement.Requirement.INPUT_REQUIRED)
@DeprecationNotice(alternatives={PutBigQuery.class}, reason="This processor is deprecated and may be removed in future releases.")
@Tags(value={"google", "google cloud", "bq", "gcp", "bigquery", "record"})
@CapabilityDescription(value="Please be aware this processor is deprecated and may be removed in the near future. Use PutBigQuery instead. Load data into Google BigQuery table using the streaming API. This processor is not intended to load large flow files as it will load the full content into memory. If you need to insert large flow files, consider using PutBigQueryBatch instead.")
@SeeAlso(value={PutBigQueryBatch.class})
@SystemResourceConsideration(resource=SystemResource.MEMORY)
@WritesAttributes(value={@WritesAttribute(attribute="bq.records.count", description="Number of records successfully inserted")})
@Deprecated
public class PutBigQueryStreaming
extends AbstractBigQueryProcessor {
    private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
    private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSS");
    public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder().name("bq.record.reader").displayName("Record Reader").description("Specifies the Controller Service to use for parsing incoming data.").identifiesControllerService(RecordReaderFactory.class).required(true).build();
    public static final PropertyDescriptor SKIP_INVALID_ROWS = new PropertyDescriptor.Builder().name("bq.skip.invalid.rows").displayName("Skip Invalid Rows").description("Sets whether to insert all valid rows of a request, even if invalid rows exist. If not set the entire insert request will fail if it contains an invalid row.").required(true).addValidator(StandardValidators.BOOLEAN_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES).defaultValue("false").build();

    @Override
    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>(super.getSupportedPropertyDescriptors());
        descriptors.add(RECORD_READER);
        descriptors.add(SKIP_INVALID_ROWS);
        return Collections.unmodifiableList(descriptors);
    }

    @Override
    @OnScheduled
    public void onScheduled(ProcessContext context) {
        super.onScheduled(context);
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
        TableId tableId = this.getTableId(context, flowFile.getAttributes());
        try {
            InsertAllRequest.Builder request = InsertAllRequest.newBuilder((TableId)tableId);
            int nbrecord = 0;
            try (InputStream in = session.read(flowFile);){
                RecordReaderFactory readerFactory = (RecordReaderFactory)context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
                try (RecordReader reader = readerFactory.createRecordReader(flowFile, in, this.getLogger());){
                    Record currentRecord;
                    while ((currentRecord = reader.nextRecord()) != null) {
                        request.addRow(this.convertMapRecord(currentRecord.toMap()));
                        ++nbrecord;
                    }
                }
            }
            request.setIgnoreUnknownValues(context.getProperty(IGNORE_UNKNOWN).evaluateAttributeExpressions(flowFile).asBoolean().booleanValue());
            request.setSkipInvalidRows(context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean().booleanValue());
            InsertAllResponse response = ((BigQuery)this.getCloudService()).insertAll(request.build());
            HashMap<String, String> attributes = new HashMap<String, String>();
            if (response.hasErrors()) {
                this.getLogger().log(LogLevel.WARN, "Failed to insert {} of {} records into BigQuery {} table.", new Object[]{response.getInsertErrors().size(), nbrecord, tableName});
                if (this.getLogger().isDebugEnabled()) {
                    Iterator iterator = response.getInsertErrors().keySet().iterator();
                    while (iterator.hasNext()) {
                        long index = (Long)iterator.next();
                        for (BigQueryError e : (List)response.getInsertErrors().get(index)) {
                            this.getLogger().log(LogLevel.DEBUG, "Failed to insert record #{}: {}", new Object[]{index, e.getMessage()});
                        }
                    }
                }
                attributes.put("bq.records.count", Long.toString(nbrecord - response.getInsertErrors().size()));
                flowFile = session.penalize(flowFile);
                flowFile = session.putAllAttributes(flowFile, attributes);
                session.transfer(flowFile, REL_FAILURE);
            } else {
                attributes.put("bq.records.count", Long.toString(nbrecord));
                flowFile = session.putAllAttributes(flowFile, attributes);
                session.transfer(flowFile, REL_SUCCESS);
            }
        }
        catch (Exception ex) {
            this.getLogger().log(LogLevel.ERROR, ex.getMessage(), (Throwable)ex);
            flowFile = session.penalize(flowFile);
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private Map<String, Object> convertMapRecord(Map<String, Object> map) {
        HashMap<String, Object> result = new HashMap<String, Object>();
        for (String key : map.keySet()) {
            LocalDateTime dateTime;
            Object obj = map.get(key);
            if (obj instanceof MapRecord) {
                result.put(key, this.convertMapRecord(((MapRecord)obj).toMap()));
                continue;
            }
            if (obj instanceof Object[] && ((Object[])obj).length > 0 && ((Object[])obj)[0] instanceof MapRecord) {
                ArrayList<Map<String, Object>> lmapr = new ArrayList<Map<String, Object>>();
                for (Object mapr : (Object[])obj) {
                    lmapr.add(this.convertMapRecord(((MapRecord)mapr).toMap()));
                }
                result.put(key, lmapr);
                continue;
            }
            if (obj instanceof Timestamp) {
                dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Timestamp)obj).getTime()), ZoneOffset.UTC);
                result.put(key, dateTime.format(timestampFormatter));
                continue;
            }
            if (obj instanceof Time) {
                dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(((Time)obj).getTime()), ZoneOffset.UTC);
                result.put(key, dateTime.format(timeFormatter));
                continue;
            }
            if (obj instanceof Date) {
                result.put(key, obj.toString());
                continue;
            }
            result.put(key, obj);
        }
        return result;
    }
}

