package org.apache.hadoop.hive.druid;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.druid.io.DruidOutputFormat;
import org.apache.hadoop.hive.druid.io.DruidQueryBasedInputFormat;
import org.apache.hadoop.hive.druid.io.DruidRecordWriter;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorIOConfig;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorReport;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorSpec;
import org.apache.hadoop.hive.druid.json.KafkaSupervisorTuningConfig;
import org.apache.hadoop.hive.druid.security.KerberosHttpClient;
import org.apache.hadoop.hive.druid.serde.DruidSerDe;
import org.apache.hadoop.hive.metastore.DefaultHiveMetaHook;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.StorageHandlerInfo;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.druid.com.google.common.annotations.VisibleForTesting;
import org.apache.hive.druid.com.google.common.base.Preconditions;
import org.apache.hive.druid.com.google.common.base.Strings;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Suppliers;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.ImmutableSet;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.metamx.common.CompressionUtils;
import org.apache.hive.druid.com.metamx.common.RetryUtils;
import org.apache.hive.druid.com.metamx.common.lifecycle.Lifecycle;
import org.apache.hive.druid.com.metamx.http.client.HttpClient;
import org.apache.hive.druid.com.metamx.http.client.HttpClientConfig;
import org.apache.hive.druid.com.metamx.http.client.HttpClientInit;
import org.apache.hive.druid.com.metamx.http.client.Request;
import org.apache.hive.druid.com.metamx.http.client.response.StatusResponseHandler;
import org.apache.hive.druid.com.metamx.http.client.response.StatusResponseHolder;
import org.apache.hive.druid.io.druid.data.input.impl.DimensionSchema;
import org.apache.hive.druid.io.druid.data.input.impl.DimensionsSpec;
import org.apache.hive.druid.io.druid.data.input.impl.JSONParseSpec;
import org.apache.hive.druid.io.druid.data.input.impl.StringInputRowParser;
import org.apache.hive.druid.io.druid.data.input.impl.TimestampSpec;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.hive.druid.io.druid.metadata.MetadataStorageTablesConfig;
import org.apache.hive.druid.io.druid.metadata.SQLMetadataConnector;
import org.apache.hive.druid.io.druid.metadata.storage.derby.DerbyConnector;
import org.apache.hive.druid.io.druid.metadata.storage.derby.DerbyMetadataStorage;
import org.apache.hive.druid.io.druid.metadata.storage.derby.DerbyMetadataStorageDruidModule;
import org.apache.hive.druid.io.druid.metadata.storage.mysql.MySQLConnector;
import org.apache.hive.druid.io.druid.metadata.storage.mysql.MySQLConnectorConfig;
import org.apache.hive.druid.io.druid.metadata.storage.mysql.MySQLMetadataStorageModule;
import org.apache.hive.druid.io.druid.metadata.storage.postgresql.PostgreSQLConnector;
import org.apache.hive.druid.io.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
import org.apache.hive.druid.io.druid.query.aggregation.AggregatorFactory;
import org.apache.hive.druid.io.druid.query.search.AutoStrategy;
import org.apache.hive.druid.io.druid.segment.IndexSpec;
import org.apache.hive.druid.io.druid.segment.indexing.DataSchema;
import org.apache.hive.druid.io.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.hive.druid.io.druid.segment.loading.SegmentLoadingException;
import org.apache.hive.druid.io.druid.storage.hdfs.HdfsDataSegmentPusher;
import org.apache.hive.druid.io.druid.storage.hdfs.HdfsDataSegmentPusherConfig;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.netty.handler.codec.http.HttpHeaders;
import org.apache.hive.druid.org.jboss.netty.handler.codec.http.HttpMethod;
import org.apache.hive.druid.org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.skife.jdbi.v2.exceptions.CallbackFailedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/druid/DruidStorageHandler.class */
public class DruidStorageHandler extends DefaultHiveMetaHook implements HiveStorageHandler {
    public static final String SEGMENTS_DESCRIPTOR_DIR_NAME = "segmentsDescriptorDir";
    public static final String INTERMEDIATE_SEGMENT_DIR_NAME = "intermediateSegmentDir";
    private static final HttpClient HTTP_CLIENT;
    private SQLMetadataConnector connector;
    private MetadataStorageTablesConfig druidMetadataStorageTablesConfig;
    private String uniqueId;
    private String rootWorkingDir;
    private Configuration conf;
    protected static final Logger LOG = LoggerFactory.getLogger(DruidStorageHandler.class);
    protected static final SessionState.LogHelper console = new SessionState.LogHelper(LOG);
    private static List<String> allowedAlterTypes = ImmutableList.of("ADDPROPS", "DROPPROPS", "ADDCOLS");

    public DruidStorageHandler() {
        this.druidMetadataStorageTablesConfig = null;
        this.uniqueId = null;
        this.rootWorkingDir = null;
    }

    @VisibleForTesting
    public DruidStorageHandler(SQLMetadataConnector sQLMetadataConnector, MetadataStorageTablesConfig metadataStorageTablesConfig) {
        this.druidMetadataStorageTablesConfig = null;
        this.uniqueId = null;
        this.rootWorkingDir = null;
        this.connector = sQLMetadataConnector;
        this.druidMetadataStorageTablesConfig = metadataStorageTablesConfig;
    }

    public Class<? extends InputFormat> getInputFormatClass() {
        return DruidQueryBasedInputFormat.class;
    }

    public Class<? extends OutputFormat> getOutputFormatClass() {
        return DruidOutputFormat.class;
    }

    public Class<? extends AbstractSerDe> getSerDeClass() {
        return DruidSerDe.class;
    }

    public HiveMetaHook getMetaHook() {
        return this;
    }

    public HiveAuthorizationProvider getAuthorizationProvider() {
        return new DefaultHiveAuthorizationProvider();
    }

    public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> map) {
    }

    public void configureInputJobCredentials(TableDesc tableDesc, Map<String, String> map) {
    }

    public void preCreateTable(Table table) throws MetaException {
        if (MetaStoreUtils.isExternalTable(table) && !StringUtils.isEmpty(table.getSd().getLocation())) {
            throw new MetaException("LOCATION may not be specified for Druid");
        }
        if (table.getPartitionKeysSize() != 0) {
            throw new MetaException("PARTITIONED BY may not be specified for Druid");
        }
        if (table.getSd().getBucketColsSize() != 0) {
            throw new MetaException("CLUSTERED BY may not be specified for Druid");
        }
        String str = (String) table.getParameters().get("druid.datasource");
        if (MetaStoreUtils.isExternalTable(table)) {
            if (str == null) {
                throw new MetaException(String.format("Datasource name should be specified using [%s] for external tables using Druid", "druid.datasource"));
            }
            return;
        }
        if (str != null) {
            throw new MetaException(String.format("Datasource name cannot be specified using [%s] for managed tables using Druid", "druid.datasource"));
        }
        String qualifiedName = Warehouse.getQualifiedName(table);
        try {
            getConnector().createSegmentTable();
            Collection<String> allDataSourceNames = DruidStorageHandlerUtils.getAllDataSourceNames(getConnector(), getDruidMetadataStorageTablesConfig());
            LOG.debug("pre-create data source with name {}", qualifiedName);
            if (allDataSourceNames.contains(qualifiedName)) {
                throw new MetaException(String.format("Data source [%s] already existing", qualifiedName));
            }
            table.getParameters().put("druid.datasource", qualifiedName);
        } catch (Exception e) {
            LOG.error("Exception while trying to create druid segments table", e);
            throw new MetaException(e.getMessage());
        }
    }

    public void rollbackCreateTable(Table table) {
        if (MetaStoreUtils.isExternalTable(table)) {
            return;
        }
        try {
            try {
                for (DataSegment dataSegment : DruidStorageHandlerUtils.getCreatedSegments(getSegmentDescriptorDir(), getConf())) {
                    try {
                        deleteSegment(dataSegment);
                    } catch (SegmentLoadingException e) {
                        LOG.error(String.format("Error while trying to clean the segment [%s]", dataSegment), e);
                    }
                }
            } catch (IOException e2) {
                LOG.error("Exception while rollback", e2);
                throw Throwables.propagate(e2);
            }
        } finally {
            cleanWorkingDir();
        }
    }

    public void commitCreateTable(Table table) throws MetaException {
        if (MetaStoreUtils.isExternalTable(table)) {
            return;
        }
        if (isKafkaStreamingTable(table)) {
            updateKafkaIngestion(table);
        }
        commitInsertTable(table, true);
    }

    private void updateKafkaIngestion(Table table) {
        String var = HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS);
        String str = (String) Preconditions.checkNotNull(getTableProperty(table, "druid.datasource"), "Druid datasource name is null");
        String str2 = (String) Preconditions.checkNotNull(getTableProperty(table, "kafka.topic"), "kafka topic is null");
        String str3 = (String) Preconditions.checkNotNull(getTableProperty(table, "kafka.bootstrap.servers"), "kafka connect string is null");
        Properties properties = new Properties();
        properties.putAll(table.getParameters());
        GranularitySpec granularitySpec = DruidStorageHandlerUtils.getGranularitySpec(getConf(), properties);
        List<FieldSchema> cols = table.getSd().getCols();
        ArrayList arrayList = new ArrayList(cols.size());
        ArrayList arrayList2 = new ArrayList(cols.size());
        for (FieldSchema fieldSchema : cols) {
            arrayList.add(fieldSchema.getName());
            arrayList2.add(TypeInfoUtils.getTypeInfoFromTypeString(fieldSchema.getType()));
        }
        Pair<List<DimensionSchema>, AggregatorFactory[]> dimensionsAndAggregates = DruidStorageHandlerUtils.getDimensionsAndAggregates(getConf(), arrayList, arrayList2);
        if (!arrayList.contains("__time")) {
            throw new IllegalStateException("Timestamp column (' __time') not specified in create table; list of columns is : " + arrayList);
        }
        KafkaSupervisorSpec createKafkaSupervisorSpec = createKafkaSupervisorSpec(table, str2, str3, new DataSchema(str, (Map) DruidStorageHandlerUtils.JSON_MAPPER.convertValue(new StringInputRowParser(new JSONParseSpec(new TimestampSpec("__time", AutoStrategy.NAME, null), new DimensionsSpec(dimensionsAndAggregates.lhs, null, null), null, null), "UTF-8"), Map.class), dimensionsAndAggregates.rhs, granularitySpec, null, DruidStorageHandlerUtils.JSON_MAPPER), DruidStorageHandlerUtils.getIndexSpec(getConf()));
        KafkaSupervisorSpec fetchKafkaIngestionSpec = fetchKafkaIngestionSpec(table);
        String tableProperty = getTableProperty(table, "druid.kafka.ingestion");
        if (tableProperty == null) {
            tableProperty = fetchKafkaIngestionSpec == null ? "STOP" : "START";
        }
        if (tableProperty.equalsIgnoreCase("STOP")) {
            if (fetchKafkaIngestionSpec != null) {
                stopKafkaIngestion(var, str);
            }
        } else if (!tableProperty.equalsIgnoreCase("START")) {
            if (!tableProperty.equalsIgnoreCase("RESET")) {
                throw new IllegalArgumentException(String.format("Invalid value for property [%s], Valid values are [START, STOP, RESET]", "druid.kafka.ingestion"));
            }
            if (fetchKafkaIngestionSpec != null && !fetchKafkaIngestionSpec.equals(createKafkaSupervisorSpec)) {
                updateKafkaIngestionSpec(var, createKafkaSupervisorSpec);
            }
            resetKafkaIngestion(var, str);
        } else if (fetchKafkaIngestionSpec == null || !fetchKafkaIngestionSpec.equals(createKafkaSupervisorSpec)) {
            updateKafkaIngestionSpec(var, createKafkaSupervisorSpec);
        }
        table.getParameters().remove("druid.kafka.ingestion");
    }

    private static KafkaSupervisorSpec createKafkaSupervisorSpec(Table table, String str, String str2, DataSchema dataSchema, IndexSpec indexSpec) {
        return new KafkaSupervisorSpec(dataSchema, new KafkaSupervisorTuningConfig(getIntegerProperty(table, "druid.kafka.ingestion.maxRowsInMemory"), getIntegerProperty(table, "druid.kafka.ingestion.maxRowsPerSegment"), getPeriodProperty(table, "druid.kafka.ingestion.intermediatePersistPeriod"), null, getIntegerProperty(table, "druid.kafka.ingestion.maxPendingPersists"), indexSpec, null, getBooleanProperty(table, "druid.kafka.ingestion.reportParseExceptions"), getLongProperty(table, "druid.kafka.ingestion.handoffConditionTimeout"), getBooleanProperty(table, "druid.kafka.ingestion.resetOffsetAutomatically"), getIntegerProperty(table, "druid.kafka.ingestion.workerThreads"), getIntegerProperty(table, "druid.kafka.ingestion.chatThreads"), getLongProperty(table, "druid.kafka.ingestion.chatRetries"), getPeriodProperty(table, "druid.kafka.ingestion.httpTimeout"), getPeriodProperty(table, "druid.kafka.ingestion.shutdownTimeout"), getPeriodProperty(table, "druid.kafka.ingestion.offsetFetchPeriod")), new KafkaSupervisorIOConfig(str, getIntegerProperty(table, "druid.kafka.ingestion.replicas"), getIntegerProperty(table, "druid.kafka.ingestion.taskCount"), getPeriodProperty(table, "druid.kafka.ingestion.taskDuration"), getKafkaConsumerProperties(table, str2), getPeriodProperty(table, "druid.kafka.ingestion.startDelay"), getPeriodProperty(table, "druid.kafka.ingestion.period"), getBooleanProperty(table, "druid.kafka.ingestion.useEarliestOffset"), getPeriodProperty(table, "druid.kafka.ingestion.completionTimeout"), getPeriodProperty(table, "druid.kafka.ingestion.lateMessageRejectionPeriod"), getPeriodProperty(table, "druid.kafka.ingestion.earlyMessageRejectionPeriod"), getBooleanProperty(table, "druid.kafka.ingestion.skipOffsetGaps")), new HashMap());
    }

    private static Map<String, String> getKafkaConsumerProperties(Table table, String str) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        builder.put(KafkaSupervisorIOConfig.BOOTSTRAP_SERVERS_KEY, str);
        for (Map.Entry entry : table.getParameters().entrySet()) {
            if (((String) entry.getKey()).startsWith("druid.kafka.ingestion.consumer.")) {
                builder.put(((String) entry.getKey()).substring("druid.kafka.ingestion.consumer.".length()), (String) entry.getValue());
            }
        }
        return builder.build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static void updateKafkaIngestionSpec(String str, KafkaSupervisorSpec kafkaSupervisorSpec) {
        try {
            String writeValueAsString = DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsString(kafkaSupervisorSpec);
            console.printInfo("submitting kafka Spec {}", writeValueAsString);
            LOG.info("submitting kafka Supervisor Spec {}", writeValueAsString);
            StatusResponseHolder statusResponseHolder = (StatusResponseHolder) getHttpClient().go(new Request(HttpMethod.POST, new URL(String.format("http://%s/druid/indexer/v1/supervisor", str))).setContent(HttpHeaders.Values.APPLICATION_JSON, DruidStorageHandlerUtils.JSON_MAPPER.writeValueAsBytes(kafkaSupervisorSpec)), new StatusResponseHandler(Charset.forName("UTF-8"))).get();
            if (!statusResponseHolder.getStatus().equals(HttpResponseStatus.OK)) {
                throw new IOException(String.format("Unable to update Kafka Ingestion for Druid status [%d] full response [%s]", Integer.valueOf(statusResponseHolder.getStatus().getCode()), statusResponseHolder.getContent()));
            }
            String format = String.format("Kafka Supervisor for [%s] Submitted Successfully to druid.", kafkaSupervisorSpec.getDataSchema().getDataSource());
            LOG.info(format);
            console.printInfo(format);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void resetKafkaIngestion(String str, String str2) {
        try {
            StatusResponseHolder statusResponseHolder = (StatusResponseHolder) RetryUtils.retry(() -> {
                return (StatusResponseHolder) getHttpClient().go(new Request(HttpMethod.POST, new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/reset", str, str2))), new StatusResponseHandler(Charset.forName("UTF-8"))).get();
            }, th -> {
                return th instanceof IOException;
            }, getMaxRetryCount());
            if (!statusResponseHolder.getStatus().equals(HttpResponseStatus.OK)) {
                throw new IOException(String.format("Unable to reset Kafka Ingestion Druid status [%d] full response [%s]", Integer.valueOf(statusResponseHolder.getStatus().getCode()), statusResponseHolder.getContent()));
            }
            console.printInfo("Druid Kafka Ingestion Reset successful.");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void stopKafkaIngestion(String str, String str2) {
        try {
            StatusResponseHolder statusResponseHolder = (StatusResponseHolder) RetryUtils.retry(() -> {
                return (StatusResponseHolder) getHttpClient().go(new Request(HttpMethod.POST, new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/shutdown", str, str2))), new StatusResponseHandler(Charset.forName("UTF-8"))).get();
            }, th -> {
                return th instanceof IOException;
            }, getMaxRetryCount());
            if (!statusResponseHolder.getStatus().equals(HttpResponseStatus.OK)) {
                throw new IOException(String.format("Unable to stop Kafka Ingestion Druid status [%d] full response [%s]", Integer.valueOf(statusResponseHolder.getStatus().getCode()), statusResponseHolder.getContent()));
            }
            console.printInfo("Druid Kafka Ingestion shutdown successful.");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private KafkaSupervisorSpec fetchKafkaIngestionSpec(Table table) {
        String str = (String) Preconditions.checkNotNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), "Druid Overlord Address is null");
        String str2 = (String) Preconditions.checkNotNull(getTableProperty(table, "druid.datasource"), "Druid Datasource name is null");
        try {
            StatusResponseHolder statusResponseHolder = (StatusResponseHolder) RetryUtils.retry(() -> {
                return (StatusResponseHolder) getHttpClient().go(new Request(HttpMethod.GET, new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s", str, str2))), new StatusResponseHandler(Charset.forName("UTF-8"))).get();
            }, th -> {
                return th instanceof IOException;
            }, getMaxRetryCount());
            if (statusResponseHolder.getStatus().equals(HttpResponseStatus.OK)) {
                return (KafkaSupervisorSpec) DruidStorageHandlerUtils.JSON_MAPPER.readValue(statusResponseHolder.getContent(), KafkaSupervisorSpec.class);
            }
            if (!statusResponseHolder.getStatus().equals(HttpResponseStatus.NOT_FOUND) && !statusResponseHolder.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) {
                throw new IOException(String.format("Unable to fetch Kafka Ingestion Spec from Druid status [%d] full response [%s]", Integer.valueOf(statusResponseHolder.getStatus().getCode()), statusResponseHolder.getContent()));
            }
            LOG.debug("No Kafka Supervisor found for datasource[%s]", str2);
            return null;
        } catch (Exception e) {
            throw new RuntimeException("Exception while fetching kafka ingestion spec from druid", e);
        }
    }

    @Nullable
    private KafkaSupervisorReport fetchKafkaSupervisorReport(Table table) {
        String str = (String) Preconditions.checkNotNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), "Druid Overlord Address is null");
        String str2 = (String) Preconditions.checkNotNull(getTableProperty(table, "druid.datasource"), "Druid Datasource name is null");
        try {
            StatusResponseHolder statusResponseHolder = (StatusResponseHolder) RetryUtils.retry(() -> {
                return (StatusResponseHolder) getHttpClient().go(new Request(HttpMethod.GET, new URL(String.format("http://%s/druid/indexer/v1/supervisor/%s/status", str, str2))), new StatusResponseHandler(Charset.forName("UTF-8"))).get();
            }, th -> {
                return th instanceof IOException;
            }, getMaxRetryCount());
            if (statusResponseHolder.getStatus().equals(HttpResponseStatus.OK)) {
                return (KafkaSupervisorReport) DruidStorageHandlerUtils.JSON_MAPPER.readValue(statusResponseHolder.getContent(), KafkaSupervisorReport.class);
            }
            if (statusResponseHolder.getStatus().equals(HttpResponseStatus.NOT_FOUND) || statusResponseHolder.getStatus().equals(HttpResponseStatus.BAD_REQUEST)) {
                LOG.info("No Kafka Supervisor found for datasource[%s]", str2);
                return null;
            }
            LOG.error("Unable to fetch Kafka Supervisor status [%d] full response [%s]", Integer.valueOf(statusResponseHolder.getStatus().getCode()), statusResponseHolder.getContent());
            return null;
        } catch (Exception e) {
            LOG.error("Exception while fetching kafka ingestion spec from druid", e);
            return null;
        }
    }

    protected List<DataSegment> loadAndCommitDruidSegments(Table table, boolean z, List<DataSegment> list) throws IOException, CallbackFailedException {
        String str = (String) table.getParameters().get("druid.datasource");
        String var = table.getParameters().get("druid.storage.storageDirectory") != null ? (String) table.getParameters().get("druid.storage.storageDirectory") : HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_SEGMENT_DIRECTORY);
        HdfsDataSegmentPusherConfig hdfsDataSegmentPusherConfig = new HdfsDataSegmentPusherConfig();
        LOG.info(String.format("Moving [%s] Druid segments from staging directory [%s] to Deep storage [%s]", Integer.valueOf(list.size()), getStagingWorkingDir().toString(), var));
        hdfsDataSegmentPusherConfig.setStorageDirectory(var);
        return DruidStorageHandlerUtils.publishSegmentsAndCommit(getConnector(), getDruidMetadataStorageTablesConfig(), str, list, z, getConf(), new HdfsDataSegmentPusher(hdfsDataSegmentPusherConfig, getConf(), DruidStorageHandlerUtils.JSON_MAPPER));
    }

    private int checkLoadStatus(List<DataSegment> list) {
        String var = HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_COORDINATOR_DEFAULT_ADDRESS);
        int maxRetryCount = getMaxRetryCount();
        if (maxRetryCount == 0) {
            return list.size();
        }
        LOG.debug("checking load status from coordinator {}", var);
        try {
            if (Strings.isNullOrEmpty((String) RetryUtils.retry(() -> {
                return DruidStorageHandlerUtils.getURL(getHttpClient(), new URL(String.format("http://%s/status", var)));
            }, th -> {
                return th instanceof IOException;
            }, maxRetryCount))) {
                console.printInfo("Will skip waiting for data loading empty response from coordinator");
                return list.size();
            }
            console.printInfo(String.format("Waiting for the loading of [%s] segments", Integer.valueOf(list.size())));
            long longVar = HiveConf.getLongVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_PASSIVE_WAIT_TIME);
            Set set = (Set) list.stream().map(dataSegment -> {
                try {
                    return new URL(String.format("http://%s/druid/coordinator/v1/datasources/%s/segments/%s", var, dataSegment.getDataSource(), dataSegment.getIdentifier()));
                } catch (MalformedURLException e) {
                    Throwables.propagate(e);
                    return null;
                }
            }).collect(Collectors.toSet());
            int i = 0;
            while (true) {
                int i2 = i;
                i++;
                if (i2 >= maxRetryCount || set.isEmpty()) {
                    break;
                }
                set = ImmutableSet.copyOf((Collection) Sets.filter(set, url -> {
                    try {
                        String url = DruidStorageHandlerUtils.getURL(getHttpClient(), url);
                        LOG.debug("Checking segment [{}] response is [{}]", url, url);
                        return Strings.isNullOrEmpty(url);
                    } catch (IOException e) {
                        LOG.error(String.format("Error while checking URL [%s]", url), e);
                        return true;
                    }
                }));
                try {
                    if (!set.isEmpty()) {
                        Thread.sleep(longVar);
                    }
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    Throwables.propagate(e);
                }
            }
            if (!set.isEmpty()) {
                console.printError(String.format("Wait time exhausted and we have [%s] out of [%s] segments not loaded yet", Integer.valueOf(set.size()), Integer.valueOf(list.size())));
            }
            return set.size();
        } catch (Exception e2) {
            console.printInfo("Will skip waiting for data loading, coordinator unavailable");
            return list.size();
        }
    }

    @VisibleForTesting
    protected void deleteSegment(DataSegment dataSegment) throws SegmentLoadingException {
        Path path = DruidStorageHandlerUtils.getPath(dataSegment);
        LOG.info("removing segment {}, located at path {}", dataSegment.getIdentifier(), path);
        try {
            if (!path.getName().endsWith(CompressionUtils.ZIP_SUFFIX)) {
                throw new SegmentLoadingException("Unknown file type[%s]", path);
            }
            FileSystem fileSystem = path.getFileSystem(getConf());
            if (!fileSystem.exists(path)) {
                LOG.warn("Segment Path {} does not exist. It appears to have been deleted already.", path);
                return;
            }
            Path parent = path.getParent();
            if (!fileSystem.delete(parent, true)) {
                throw new SegmentLoadingException("Unable to kill segment, failed to delete dir [%s]", parent.toString());
            }
            Path parent2 = parent.getParent();
            if (safeNonRecursiveDelete(fileSystem, parent2)) {
                Path parent3 = parent2.getParent();
                if (safeNonRecursiveDelete(fileSystem, parent3)) {
                    safeNonRecursiveDelete(fileSystem, parent3.getParent());
                }
            }
        } catch (IOException e) {
            throw new SegmentLoadingException(e, "Unable to kill segment", new Object[0]);
        }
    }

    private static boolean safeNonRecursiveDelete(FileSystem fileSystem, Path path) {
        try {
            return fileSystem.delete(path, false);
        } catch (Exception e) {
            return false;
        }
    }

    public void preDropTable(Table table) {
    }

    public void rollbackDropTable(Table table) {
    }

    public void commitDropTable(Table table, boolean z) {
        if (MetaStoreUtils.isExternalTable(table)) {
            return;
        }
        if (isKafkaStreamingTable(table)) {
            stopKafkaIngestion((String) Preconditions.checkNotNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_OVERLORD_DEFAULT_ADDRESS), "Druid Overlord Address is null"), (String) Preconditions.checkNotNull(getTableProperty(table, "druid.datasource"), "Druid Datasource name is null"));
        }
        String str = (String) Preconditions.checkNotNull((String) table.getParameters().get("druid.datasource"), "DataSource name is null !");
        if (z) {
            LOG.info("Dropping with purge all the data for data source {}", str);
            List<DataSegment> dataSegmentList = DruidStorageHandlerUtils.getDataSegmentList(getConnector(), getDruidMetadataStorageTablesConfig(), str);
            if (dataSegmentList.isEmpty()) {
                LOG.info("Nothing to delete for data source {}", str);
                return;
            }
            for (DataSegment dataSegment : dataSegmentList) {
                try {
                    deleteSegment(dataSegment);
                } catch (SegmentLoadingException e) {
                    LOG.error(String.format("Error while deleting segment [%s]", dataSegment.getIdentifier()), e);
                }
            }
        }
        if (DruidStorageHandlerUtils.disableDataSource(getConnector(), getDruidMetadataStorageTablesConfig(), str)) {
            LOG.info("Successfully dropped druid data source {}", str);
        }
    }

    public void commitInsertTable(Table table, boolean z) throws MetaException {
        LOG.debug("commit insert into table {} overwrite {}", table.getTableName(), Boolean.valueOf(z));
        try {
            if (MetaStoreUtils.isExternalTable(table)) {
                throw new MetaException("Cannot insert data into external table backed by Druid");
            }
            try {
                List<DataSegment> fetchSegmentsMetadata = fetchSegmentsMetadata(getSegmentDescriptorDir());
                String str = (String) table.getParameters().get("druid.datasource");
                if (fetchSegmentsMetadata.isEmpty() && z) {
                    DruidStorageHandlerUtils.disableDataSource(getConnector(), getDruidMetadataStorageTablesConfig(), str);
                    cleanWorkingDir();
                } else {
                    if (!fetchSegmentsMetadata.isEmpty()) {
                        checkLoadStatus(loadAndCommitDruidSegments(table, z, fetchSegmentsMetadata));
                    }
                }
            } catch (IOException e) {
                throw new MetaException(e.getMessage());
            } catch (CallbackFailedException e2) {
                throw new MetaException(e2.getCause().getMessage());
            }
        } finally {
            cleanWorkingDir();
        }
    }

    private List<DataSegment> fetchSegmentsMetadata(Path path) throws IOException {
        if (path.getFileSystem(getConf()).exists(path)) {
            return DruidStorageHandlerUtils.getCreatedSegments(path, getConf());
        }
        LOG.info("Directory {} does not exist, ignore this if it is create statement or inserts of 0 rows, no Druid segments to move, cleaning working directory {}", path.toString(), getStagingWorkingDir().toString());
        return Collections.EMPTY_LIST;
    }

    public void preInsertTable(Table table, boolean z) {
    }

    public void rollbackInsertTable(Table table, boolean z) {
    }

    public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> map) {
        map.put("druid.datasource", tableDesc.getTableName());
        map.put("druid.segment.version", new DateTime().toString());
        map.put("druid.job.workingDirectory", getStagingWorkingDir().toString());
        map.put("druid.storage.storageDirectory.intermediate", getIntermediateSegmentDir().toString());
    }

    public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> map) {
    }

    public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
        if (UserGroupInformation.isSecurityEnabled()) {
            LOG.debug("Setting {} to {} to enable split generation on HS2", HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(), Boolean.FALSE.toString());
            jobConf.set(HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION.toString(), Boolean.FALSE.toString());
        }
        try {
            DruidStorageHandlerUtils.addDependencyJars(jobConf, DruidRecordWriter.class);
        } catch (IOException e) {
            Throwables.propagate(e);
        }
    }

    public void setConf(Configuration configuration) {
        this.conf = configuration;
    }

    public Configuration getConf() {
        return this.conf;
    }

    public LockType getLockType(WriteEntity writeEntity) {
        return writeEntity.getWriteType().equals(WriteEntity.WriteType.INSERT) ? LockType.SHARED_READ : LockType.SHARED_WRITE;
    }

    public String toString() {
        return "org.apache.hadoop.hive.druid.DruidStorageHandler";
    }

    public String getUniqueId() {
        if (this.uniqueId == null) {
            this.uniqueId = (String) Preconditions.checkNotNull(Strings.emptyToNull(HiveConf.getVar(getConf(), HiveConf.ConfVars.HIVEQUERYID)), "Hive query id is null");
        }
        return this.uniqueId;
    }

    private Path getStagingWorkingDir() {
        return new Path(getRootWorkingDir(), makeStagingName());
    }

    private MetadataStorageTablesConfig getDruidMetadataStorageTablesConfig() {
        if (this.druidMetadataStorageTablesConfig != null) {
            return this.druidMetadataStorageTablesConfig;
        }
        this.druidMetadataStorageTablesConfig = MetadataStorageTablesConfig.fromBase(HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_BASE));
        return this.druidMetadataStorageTablesConfig;
    }

    private SQLMetadataConnector getConnector() {
        return (SQLMetadataConnector) Suppliers.memoize(this::buildConnector).get();
    }

    private SQLMetadataConnector buildConnector() {
        if (this.connector != null) {
            return this.connector;
        }
        String var = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_TYPE);
        final String var2 = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_USERNAME);
        final String var3 = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_PASSWORD);
        final String var4 = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_METADATA_DB_URI);
        LOG.debug("Supplying SQL Connector with DB type {}, URI {}, User {}", new Object[]{var, var4, var2});
        Supplier ofInstance = Suppliers.ofInstance(new MetadataStorageConnectorConfig() { // from class: org.apache.hadoop.hive.druid.DruidStorageHandler.1
            @Override // org.apache.hive.druid.io.druid.metadata.MetadataStorageConnectorConfig
            public String getConnectURI() {
                return var4;
            }

            @Override // org.apache.hive.druid.io.druid.metadata.MetadataStorageConnectorConfig
            public String getUser() {
                return Strings.emptyToNull(var2);
            }

            @Override // org.apache.hive.druid.io.druid.metadata.MetadataStorageConnectorConfig
            public String getPassword() {
                return Strings.emptyToNull(var3);
            }
        });
        if (var.equals(MySQLMetadataStorageModule.TYPE)) {
            this.connector = new MySQLConnector(ofInstance, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()), new MySQLConnectorConfig());
        } else if (var.equals(PostgreSQLMetadataStorageModule.TYPE)) {
            this.connector = new PostgreSQLConnector(ofInstance, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()));
        } else {
            if (!var.equals(DerbyMetadataStorageDruidModule.TYPE)) {
                throw new IllegalStateException(String.format("Unknown metadata storage type [%s]", var));
            }
            this.connector = new DerbyConnector(new DerbyMetadataStorage((MetadataStorageConnectorConfig) ofInstance.get()), ofInstance, Suppliers.ofInstance(getDruidMetadataStorageTablesConfig()));
        }
        return this.connector;
    }

    @VisibleForTesting
    protected String makeStagingName() {
        return ".staging-".concat(getUniqueId().replace(":", ""));
    }

    private Path getSegmentDescriptorDir() {
        return new Path(getStagingWorkingDir(), SEGMENTS_DESCRIPTOR_DIR_NAME);
    }

    private Path getIntermediateSegmentDir() {
        return new Path(getStagingWorkingDir(), INTERMEDIATE_SEGMENT_DIR_NAME);
    }

    private void cleanWorkingDir() {
        try {
            getStagingWorkingDir().getFileSystem(getConf()).delete(getStagingWorkingDir(), true);
        } catch (IOException e) {
            LOG.error("Got Exception while cleaning working directory", e);
        }
    }

    private String getRootWorkingDir() {
        if (Strings.isNullOrEmpty(this.rootWorkingDir)) {
            this.rootWorkingDir = HiveConf.getVar(getConf(), HiveConf.ConfVars.DRUID_WORKING_DIR);
        }
        return this.rootWorkingDir;
    }

    private static HttpClient makeHttpClient(Lifecycle lifecycle) {
        int intVar = HiveConf.getIntVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_NUM_HTTP_CONNECTION);
        Period period = new Period(HiveConf.getVar(SessionState.getSessionConf(), HiveConf.ConfVars.HIVE_DRUID_HTTP_READ_TIMEOUT));
        LOG.info("Creating Druid HTTP client with {} max parallel connections and {}ms read timeout", Integer.valueOf(intVar), Long.valueOf(period.toStandardDuration().getMillis()));
        HttpClient createClient = HttpClientInit.createClient(HttpClientConfig.builder().withNumConnections(intVar).withReadTimeout(new Period(period).toStandardDuration()).build(), lifecycle);
        if (!UserGroupInformation.isSecurityEnabled()) {
            return createClient;
        }
        LOG.info("building Kerberos Http Client");
        return new KerberosHttpClient(createClient);
    }

    public static HttpClient getHttpClient() {
        return HTTP_CLIENT;
    }

    public void preAlterTable(Table table, EnvironmentContext environmentContext) throws MetaException {
        String str = environmentContext == null ? null : (String) environmentContext.getProperties().get("alterTableOpType");
        if (str != null && !allowedAlterTypes.contains(str)) {
            throw new MetaException("ALTER TABLE can not be used for " + str + " to a non-native table ");
        }
        if (isKafkaStreamingTable(table)) {
            updateKafkaIngestion(table);
        }
    }

    private static <T> Boolean getBooleanProperty(Table table, String str) {
        String tableProperty = getTableProperty(table, str);
        if (tableProperty == null) {
            return null;
        }
        return Boolean.valueOf(Boolean.parseBoolean(tableProperty));
    }

    private static <T> Integer getIntegerProperty(Table table, String str) {
        String tableProperty = getTableProperty(table, str);
        if (tableProperty == null) {
            return null;
        }
        try {
            return Integer.valueOf(Integer.parseInt(tableProperty));
        } catch (NumberFormatException e) {
            throw new NumberFormatException(String.format("Exception while parsing property[%s] with Value [%s] as Integer", str, tableProperty));
        }
    }

    private static <T> Long getLongProperty(Table table, String str) {
        String tableProperty = getTableProperty(table, str);
        if (tableProperty == null) {
            return null;
        }
        try {
            return Long.valueOf(Long.parseLong(tableProperty));
        } catch (NumberFormatException e) {
            throw new NumberFormatException(String.format("Exception while parsing property[%s] with Value [%s] as Long", str, tableProperty));
        }
    }

    private static <T> Period getPeriodProperty(Table table, String str) {
        String tableProperty = getTableProperty(table, str);
        if (tableProperty == null) {
            return null;
        }
        try {
            return Period.parse(tableProperty);
        } catch (IllegalArgumentException e) {
            throw new IllegalArgumentException(String.format("Exception while parsing property[%s] with Value [%s] as Period", str, tableProperty));
        }
    }

    private static String getTableProperty(Table table, String str) {
        return (String) table.getParameters().get(str);
    }

    private static boolean isKafkaStreamingTable(Table table) {
        return getTableProperty(table, "kafka.topic") != null;
    }

    private int getMaxRetryCount() {
        return HiveConf.getIntVar(getConf(), HiveConf.ConfVars.HIVE_DRUID_MAX_TRIES);
    }

    public StorageHandlerInfo getStorageHandlerInfo(Table table) throws MetaException {
        if (!isKafkaStreamingTable(table)) {
            return null;
        }
        KafkaSupervisorReport fetchKafkaSupervisorReport = fetchKafkaSupervisorReport(table);
        return fetchKafkaSupervisorReport == null ? DruidStorageHandlerInfo.UNREACHABLE : new DruidStorageHandlerInfo(fetchKafkaSupervisorReport);
    }

    static {
        Lifecycle lifecycle = new Lifecycle();
        try {
            lifecycle.start();
        } catch (Exception e) {
            LOG.error("Issues with lifecycle start", e);
        }
        HTTP_CLIENT = makeHttpClient(lifecycle);
        ShutdownHookManager.addShutdownHook(() -> {
            lifecycle.stop();
        });
    }
}
