package org.apache.drill.exec.store.kafka;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.CompleteWork;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonTypeName("kafka-scan")
/* loaded from: input_file:org/apache/drill/exec/store/kafka/KafkaGroupScan.class */
public class KafkaGroupScan extends AbstractGroupScan {
    private static final Logger logger = LoggerFactory.getLogger(KafkaGroupScan.class);
    private static final long MSG_SIZE = 1024;
    private final KafkaStoragePlugin kafkaStoragePlugin;
    private final KafkaScanSpec kafkaScanSpec;
    private List<SchemaPath> columns;
    private ListMultimap<Integer, PartitionScanWork> assignments;
    private List<EndpointAffinity> affinities;
    private Map<TopicPartition, PartitionScanWork> partitionWorkMap;

    /* loaded from: input_file:org/apache/drill/exec/store/kafka/KafkaGroupScan$PartitionScanWork.class */
    public static class PartitionScanWork implements CompleteWork {
        private final EndpointByteMapImpl byteMap;
        private final KafkaPartitionScanSpec partitionScanSpec;

        public PartitionScanWork(EndpointByteMap endpointByteMap, KafkaPartitionScanSpec kafkaPartitionScanSpec) {
            this.byteMap = (EndpointByteMapImpl) endpointByteMap;
            this.partitionScanSpec = kafkaPartitionScanSpec;
        }

        public int compareTo(CompleteWork completeWork) {
            return Long.compare(getTotalBytes(), completeWork.getTotalBytes());
        }

        public long getTotalBytes() {
            return (this.partitionScanSpec.getEndOffset() - this.partitionScanSpec.getStartOffset()) * KafkaGroupScan.MSG_SIZE;
        }

        public EndpointByteMap getByteMap() {
            return this.byteMap;
        }

        public KafkaPartitionScanSpec getPartitionScanSpec() {
            return this.partitionScanSpec;
        }
    }

    @JsonCreator
    public KafkaGroupScan(@JsonProperty("userName") String str, @JsonProperty("kafkaStoragePluginConfig") KafkaStoragePluginConfig kafkaStoragePluginConfig, @JsonProperty("columns") List<SchemaPath> list, @JsonProperty("kafkaScanSpec") KafkaScanSpec kafkaScanSpec, @JacksonInject StoragePluginRegistry storagePluginRegistry) throws ExecutionSetupException {
        this(str, storagePluginRegistry.getPlugin(kafkaStoragePluginConfig), list, kafkaScanSpec);
    }

    public KafkaGroupScan(KafkaStoragePlugin kafkaStoragePlugin, KafkaScanSpec kafkaScanSpec, List<SchemaPath> list) {
        super("");
        this.kafkaStoragePlugin = kafkaStoragePlugin;
        this.columns = list;
        this.kafkaScanSpec = kafkaScanSpec;
        init();
    }

    public KafkaGroupScan(String str, KafkaStoragePlugin kafkaStoragePlugin, List<SchemaPath> list, KafkaScanSpec kafkaScanSpec) {
        super(str);
        this.kafkaStoragePlugin = kafkaStoragePlugin;
        this.columns = list;
        this.kafkaScanSpec = kafkaScanSpec;
        init();
    }

    public KafkaGroupScan(KafkaGroupScan kafkaGroupScan) {
        super(kafkaGroupScan);
        this.kafkaStoragePlugin = kafkaGroupScan.kafkaStoragePlugin;
        this.columns = kafkaGroupScan.columns;
        this.kafkaScanSpec = kafkaGroupScan.kafkaScanSpec;
        this.assignments = kafkaGroupScan.assignments;
        this.partitionWorkMap = kafkaGroupScan.partitionWorkMap;
    }

    private void init() {
        this.partitionWorkMap = Maps.newHashMap();
        Collection<CoordinationProtos.DrillbitEndpoint> bits = this.kafkaStoragePlugin.getContext().getBits();
        HashMap newHashMap = Maps.newHashMap();
        for (CoordinationProtos.DrillbitEndpoint drillbitEndpoint : bits) {
            newHashMap.put(drillbitEndpoint.getAddress(), drillbitEndpoint);
        }
        HashMap newHashMap2 = Maps.newHashMap();
        HashMap newHashMap3 = Maps.newHashMap();
        String topicName = this.kafkaScanSpec.getTopicName();
        try {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.kafkaStoragePlugin.m9getConfig().getKafkaConsumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer());
            try {
                if (!kafkaConsumer.listTopics().keySet().contains(topicName)) {
                    throw UserException.dataReadError().message("Table '%s' does not exist", new Object[]{topicName}).build(logger);
                }
                kafkaConsumer.subscribe(Arrays.asList(topicName));
                kafkaConsumer.poll(0L);
                Set<TopicPartition> assignment = kafkaConsumer.assignment();
                List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(topicName);
                kafkaConsumer.seekToBeginning(assignment);
                for (TopicPartition topicPartition : assignment) {
                    newHashMap2.put(topicPartition, Long.valueOf(kafkaConsumer.position(topicPartition)));
                }
                kafkaConsumer.seekToEnd(assignment);
                for (TopicPartition topicPartition2 : assignment) {
                    newHashMap3.put(topicPartition2, Long.valueOf(kafkaConsumer.position(topicPartition2)));
                }
                kafkaConsumer.close();
                for (PartitionInfo partitionInfo : partitionsFor) {
                    TopicPartition topicPartition3 = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    long longValue = ((Long) newHashMap2.get(topicPartition3)).longValue();
                    long longValue2 = ((Long) newHashMap3.get(topicPartition3)).longValue();
                    logger.debug("Latest offset of {} is {}", topicPartition3, Long.valueOf(longValue2));
                    logger.debug("Last committed offset of {} is {}", topicPartition3, Long.valueOf(longValue));
                    PartitionScanWork partitionScanWork = new PartitionScanWork(new EndpointByteMapImpl(), new KafkaPartitionScanSpec(topicPartition3.topic(), topicPartition3.partition(), longValue, longValue2));
                    for (Node node : partitionInfo.inSyncReplicas()) {
                        CoordinationProtos.DrillbitEndpoint drillbitEndpoint2 = (CoordinationProtos.DrillbitEndpoint) newHashMap.get(node.host());
                        if (drillbitEndpoint2 != null) {
                            partitionScanWork.getByteMap().add(drillbitEndpoint2, partitionScanWork.getTotalBytes());
                        }
                    }
                    this.partitionWorkMap.put(topicPartition3, partitionScanWork);
                }
            } finally {
            }
        } catch (Exception e) {
            throw UserException.dataReadError(e).message("Failed to fetch start/end offsets of the topic  %s", new Object[]{topicName}).addContext(e.getMessage()).build(logger);
        }
    }

    public void applyAssignments(List<CoordinationProtos.DrillbitEndpoint> list) {
        this.assignments = AssignmentCreator.getMappings(list, Lists.newArrayList(this.partitionWorkMap.values()));
    }

    /* renamed from: getSpecificScan, reason: merged with bridge method [inline-methods] */
    public KafkaSubScan m1getSpecificScan(int i) {
        List list = this.assignments.get(Integer.valueOf(i));
        ArrayList newArrayList = Lists.newArrayList();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            newArrayList.add(((PartitionScanWork) it.next()).partitionScanSpec);
        }
        return new KafkaSubScan(getUserName(), this.kafkaStoragePlugin, this.columns, newArrayList);
    }

    public int getMaxParallelizationWidth() {
        return this.partitionWorkMap.values().size();
    }

    public ScanStats getScanStats() {
        long j = 0;
        for (PartitionScanWork partitionScanWork : this.partitionWorkMap.values()) {
            j += partitionScanWork.getPartitionScanSpec().getEndOffset() - partitionScanWork.getPartitionScanSpec().getStartOffset();
        }
        return new ScanStats(ScanStats.GroupScanProperty.EXACT_ROW_COUNT, j, 1.0d, j * MSG_SIZE);
    }

    public String getDigest() {
        return toString();
    }

    public PhysicalOperator getNewWithChildren(List<PhysicalOperator> list) throws ExecutionSetupException {
        Preconditions.checkArgument(list.isEmpty());
        return new KafkaGroupScan(this);
    }

    public List<EndpointAffinity> getOperatorAffinity() {
        if (this.affinities == null) {
            this.affinities = AffinityCreator.getAffinityMap(Lists.newArrayList(this.partitionWorkMap.values()));
        }
        return this.affinities;
    }

    @JsonIgnore
    public boolean canPushdownProjects(List<SchemaPath> list) {
        return true;
    }

    public GroupScan clone(List<SchemaPath> list) {
        KafkaGroupScan kafkaGroupScan = new KafkaGroupScan(this);
        kafkaGroupScan.columns = list;
        return kafkaGroupScan;
    }

    public GroupScan cloneWithNewSpec(List<KafkaPartitionScanSpec> list) {
        KafkaGroupScan kafkaGroupScan = new KafkaGroupScan(this);
        HashSet newHashSet = Sets.newHashSet();
        for (KafkaPartitionScanSpec kafkaPartitionScanSpec : list) {
            TopicPartition topicPartition = new TopicPartition(kafkaPartitionScanSpec.getTopicName(), kafkaPartitionScanSpec.getPartitionId());
            newHashSet.add(topicPartition);
            kafkaGroupScan.partitionWorkMap.put(topicPartition, new PartitionScanWork(this.partitionWorkMap.get(topicPartition).getByteMap(), kafkaPartitionScanSpec));
        }
        kafkaGroupScan.partitionWorkMap.keySet().removeIf(topicPartition2 -> {
            return !newHashSet.contains(topicPartition2);
        });
        return kafkaGroupScan;
    }

    @JsonProperty
    public KafkaStoragePluginConfig getKafkaStoragePluginConfig() {
        return this.kafkaStoragePlugin.m9getConfig();
    }

    @JsonProperty
    public List<SchemaPath> getColumns() {
        return this.columns;
    }

    @JsonProperty
    public KafkaScanSpec getKafkaScanSpec() {
        return this.kafkaScanSpec;
    }

    @JsonIgnore
    public KafkaStoragePlugin getStoragePlugin() {
        return this.kafkaStoragePlugin;
    }

    public String toString() {
        return String.format("KafkaGroupScan [KafkaScanSpec=%s, columns=%s]", this.kafkaScanSpec, this.columns);
    }

    @JsonIgnore
    public List<KafkaPartitionScanSpec> getPartitionScanSpecList() {
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<PartitionScanWork> it = this.partitionWorkMap.values().iterator();
        while (it.hasNext()) {
            newArrayList.add(it.next().partitionScanSpec.m3clone());
        }
        return newArrayList;
    }
}
