/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller.kudu;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.security.auth.login.LoginException;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.KuduClient;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduPredicate;
import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduScannerIterator;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.ReplicaSelection;
import org.apache.kudu.client.RowResult;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.DeprecationNotice;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.kerberos.KerberosUserService;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;

@CapabilityDescription(value="Lookup a record from Kudu Server associated with the specified key. Binary columns are base64 encoded. Only one matched row will be returned")
@Tags(value={"lookup", "enrich", "key", "value", "kudu"})
@DeprecationNotice(reason="This component is deprecated and will be removed in NiFi 2.x.")
public class KuduLookupService
extends AbstractControllerService
implements RecordLookupService {
    public static final PropertyDescriptor KUDU_MASTERS = new PropertyDescriptor.Builder().name("kudu-lu-masters").displayName("Kudu Masters").description("Comma separated addresses of the Kudu masters to connect to.").required(true).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder().name("kudu-lu-kerberos-credentials-service").displayName("Kerberos Credentials Service").description("Specifies the Kerberos Credentials to use for authentication").required(false).identifiesControllerService(KerberosCredentialsService.class).build();
    public static final PropertyDescriptor KERBEROS_USER_SERVICE = new PropertyDescriptor.Builder().name("kudu-lu-kerberos-user-service").displayName("Kerberos User Service").description("Specifies the Kerberos User to use for authentication").required(false).identifiesControllerService(KerberosUserService.class).build();
    public static final PropertyDescriptor KUDU_OPERATION_TIMEOUT_MS = new PropertyDescriptor.Builder().name("kudu-lu-operations-timeout-ms").displayName("Kudu Operation Timeout").description("Default timeout used for user operations (using sessions and scanners)").required(false).defaultValue("30000ms").addValidator(StandardValidators.TIME_PERIOD_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final AllowableValue CLOSEST_REPLICA = new AllowableValue(ReplicaSelection.CLOSEST_REPLICA.toString(), ReplicaSelection.CLOSEST_REPLICA.name(), "Select the closest replica to the client. Replicas are classified from closest to furthest as follows: 1) Local replicas 2) Replicas whose tablet server has the same location as the client 3) All other replicas");
    public static final AllowableValue LEADER_ONLY = new AllowableValue(ReplicaSelection.LEADER_ONLY.toString(), ReplicaSelection.LEADER_ONLY.name(), "Select the LEADER replica");
    public static final PropertyDescriptor KUDU_REPLICA_SELECTION = new PropertyDescriptor.Builder().name("kudu-lu-replica-selection").displayName("Kudu Replica Selection").description("Policy with which to choose amongst multiple replicas").required(true).defaultValue(CLOSEST_REPLICA.getValue()).allowableValues(new AllowableValue[]{CLOSEST_REPLICA, LEADER_ONLY}).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.NONE).build();
    public static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder().name("kudu-lu-table-name").displayName("Kudu Table Name").description("Name of the table to access.").required(true).defaultValue("default").addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    public static final PropertyDescriptor RETURN_COLUMNS = new PropertyDescriptor.Builder().name("kudu-lu-return-cols").displayName("Kudu Return Columns").description("A comma-separated list of columns to return when scanning. To return all columns set to \"*\"").required(true).defaultValue("*").addValidator(StandardValidators.NON_BLANK_VALIDATOR).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).build();
    protected List<PropertyDescriptor> properties;
    private volatile KerberosUser kerberosUser;
    protected KuduClient kuduClient;
    protected ReplicaSelection replicaSelection;
    protected volatile String tableName;
    protected volatile KuduTable table;
    protected volatile List<String> columnNames;
    protected volatile RecordSchema resultSchema;
    protected volatile Schema tableSchema;

    protected void init(ControllerServiceInitializationContext context) {
        ArrayList<PropertyDescriptor> properties = new ArrayList<PropertyDescriptor>();
        properties.add(KUDU_MASTERS);
        properties.add(KERBEROS_USER_SERVICE);
        properties.add(KERBEROS_CREDENTIALS_SERVICE);
        properties.add(KUDU_OPERATION_TIMEOUT_MS);
        properties.add(KUDU_REPLICA_SELECTION);
        properties.add(TABLE_NAME);
        properties.add(RETURN_COLUMNS);
        this.addProperties(properties);
        this.properties = Collections.unmodifiableList(properties);
    }

    protected void addProperties(List<PropertyDescriptor> properties) {
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        boolean kerberosUserServiceIsSet = validationContext.getProperty(KERBEROS_USER_SERVICE).isSet();
        boolean kerberosCredentialsServiceIsSet = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).isSet();
        if (kerberosUserServiceIsSet && kerberosCredentialsServiceIsSet) {
            results.add(new ValidationResult.Builder().valid(false).subject("Kerberos configuration").explanation("Kerberos User Service and Kerberos Credentials Service cannot be configured at the same time").build());
        }
        return results;
    }

    protected void createKuduClient(ConfigurationContext context) throws LoginException {
        String kuduMasters = context.getProperty(KUDU_MASTERS).evaluateAttributeExpressions().getValue();
        KerberosUserService kerberosUserService = (KerberosUserService)context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
        KerberosCredentialsService kerberosCredentialsService = (KerberosCredentialsService)context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
        if (kerberosUserService != null) {
            this.kerberosUser = kerberosUserService.createKerberosUser();
            this.kerberosUser.login();
        } else if (kerberosCredentialsService != null) {
            String keytab = kerberosCredentialsService.getKeytab();
            String principal = kerberosCredentialsService.getPrincipal();
            this.kerberosUser = this.loginKerberosUser(principal, keytab);
        }
        if (this.kerberosUser != null) {
            KerberosAction kerberosAction = new KerberosAction(this.kerberosUser, () -> this.buildClient(kuduMasters, context), this.getLogger());
            this.kuduClient = (KuduClient)kerberosAction.execute();
        } else {
            this.kuduClient = this.buildClient(kuduMasters, context);
        }
    }

    protected KerberosUser loginKerberosUser(String principal, String keytab) throws LoginException {
        KerberosKeytabUser kerberosUser = new KerberosKeytabUser(principal, keytab);
        kerberosUser.login();
        return kerberosUser;
    }

    protected KuduClient buildClient(String masters, ConfigurationContext context) {
        Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
        return new KuduClient.KuduClientBuilder(masters).defaultOperationTimeoutMs((long)operationTimeout.intValue()).build();
    }

    @OnEnabled
    public void onEnabled(ConfigurationContext context) throws InitializationException {
        try {
            if (this.kuduClient == null) {
                this.getLogger().debug("Setting up Kudu connection...");
                this.createKuduClient(context);
                this.getLogger().debug("Kudu connection successfully initialized");
            }
        }
        catch (Exception ex) {
            this.getLogger().error("Exception occurred while interacting with Kudu due to " + ex.getMessage(), (Throwable)ex);
            throw new InitializationException((Throwable)ex);
        }
        this.replicaSelection = ReplicaSelection.valueOf((String)context.getProperty(KUDU_REPLICA_SELECTION).getValue());
        this.tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions().getValue();
        try {
            this.table = this.kuduClient.openTable(this.tableName);
            this.tableSchema = this.table.getSchema();
            this.columnNames = this.getColumns(context.getProperty(RETURN_COLUMNS).getValue());
            this.resultSchema = this.kuduSchemaToNiFiSchema(this.tableSchema, this.columnNames);
        }
        catch (KuduException e) {
            throw new IllegalArgumentException(e);
        }
    }

    public Set<String> getRequiredKeys() {
        return new HashSet<String>();
    }

    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.properties;
    }

    public Optional<Record> lookup(Map<String, Object> coordinates) {
        Optional record;
        if (this.kerberosUser != null) {
            KerberosAction kerberosAction = new KerberosAction(this.kerberosUser, () -> this.getRecord(coordinates), this.getLogger());
            record = (Optional)kerberosAction.execute();
        } else {
            record = this.getRecord(coordinates);
        }
        return record;
    }

    private Optional<Record> getRecord(Map<String, Object> coordinates) {
        KuduScanner.KuduScannerBuilder builder = this.kuduClient.newScannerBuilder(this.table);
        builder.setProjectedColumnNames(this.columnNames);
        builder.replicaSelection(this.replicaSelection);
        builder.limit(1L);
        coordinates.forEach((key, value) -> builder.addPredicate(KuduPredicate.newComparisonPredicate((ColumnSchema)this.tableSchema.getColumn(key), (KuduPredicate.ComparisonOp)KuduPredicate.ComparisonOp.EQUAL, (Object)value)));
        KuduScanner kuduScanner = builder.build();
        KuduScannerIterator kuduScannerIterator = kuduScanner.iterator();
        if (kuduScannerIterator.hasNext()) {
            RowResult row = (RowResult)kuduScannerIterator.next();
            HashMap<String, Object> values = new HashMap<String, Object>();
            for (String columnName : this.columnNames) {
                Object object = row.getColumnType(columnName) == Type.BINARY ? Base64.getEncoder().encodeToString(row.getBinaryCopy(columnName)) : row.getObject(columnName);
                values.put(columnName, object);
            }
            return Optional.of(new MapRecord(this.resultSchema, values));
        }
        return Optional.empty();
    }

    private List<String> getColumns(String columns) {
        if (columns.equals("*")) {
            return this.tableSchema.getColumns().stream().map(ColumnSchema::getName).collect(Collectors.toList());
        }
        return Arrays.asList(columns.split(","));
    }

    private RecordSchema kuduSchemaToNiFiSchema(Schema kuduTableSchema, List<String> columnNames) {
        ArrayList<RecordField> fields = new ArrayList<RecordField>();
        for (String columnName : columnNames) {
            if (!kuduTableSchema.hasColumn(columnName)) {
                throw new IllegalArgumentException("Column not found in Kudu table schema " + columnName);
            }
            ColumnSchema cs = kuduTableSchema.getColumn(columnName);
            switch (cs.getType()) {
                case INT8: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.BYTE.getDataType()));
                    break;
                }
                case INT16: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.SHORT.getDataType()));
                    break;
                }
                case INT32: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.INT.getDataType()));
                    break;
                }
                case INT64: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.LONG.getDataType()));
                    break;
                }
                case DECIMAL: {
                    ColumnTypeAttributes attributes = cs.getTypeAttributes();
                    fields.add(new RecordField(cs.getName(), RecordFieldType.DECIMAL.getDecimalDataType(attributes.getPrecision(), attributes.getScale())));
                    break;
                }
                case UNIXTIME_MICROS: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.TIMESTAMP.getDataType()));
                    break;
                }
                case BINARY: 
                case STRING: 
                case VARCHAR: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.STRING.getDataType()));
                    break;
                }
                case DOUBLE: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.DOUBLE.getDataType()));
                    break;
                }
                case BOOL: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.BOOLEAN.getDataType()));
                    break;
                }
                case FLOAT: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.FLOAT.getDataType()));
                    break;
                }
                case DATE: {
                    fields.add(new RecordField(cs.getName(), RecordFieldType.DATE.getDataType()));
                }
            }
        }
        return new SimpleRecordSchema(fields);
    }

    @OnDisabled
    public void onDisabled() throws Exception {
        try {
            if (this.kuduClient != null) {
                this.getLogger().debug("Closing KuduClient");
                this.kuduClient.close();
                this.kuduClient = null;
            }
        }
        finally {
            if (this.kerberosUser != null) {
                this.kerberosUser.logout();
                this.kerberosUser = null;
            }
        }
    }
}

