/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.DefaultEventHandler;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.EventHandler;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.OffsetFetchApplicationEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class PrototypeAsyncConsumer<K, V>
implements Consumer<K, V> {
    static final long DEFAULT_CLOSE_TIMEOUT_MS = 30000L;
    private final LogContext logContext;
    private final EventHandler eventHandler;
    private final Time time;
    private final Optional<String> groupId;
    private final Logger log;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final SubscriptionState subscriptions;
    private final Metrics metrics;
    private final long defaultApiTimeoutMs;

    public PrototypeAsyncConsumer(Properties properties, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(Utils.propsToMap(properties), keyDeserializer, valueDeserializer);
    }

    public PrototypeAsyncConsumer(Map<String, Object> configs, Deserializer<K> keyDeser, Deserializer<V> valDeser) {
        this(new ConsumerConfig(PrototypeAsyncConsumer.appendDeserializerToConfig(configs, keyDeser, valDeser)), keyDeser, valDeser);
    }

    public PrototypeAsyncConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this.time = Time.SYSTEM;
        GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONSUMER);
        this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
        this.defaultApiTimeoutMs = config.getInt("default.api.timeout.ms").intValue();
        this.logContext = ConsumerUtils.createLogContext(config, groupRebalanceConfig);
        this.log = this.logContext.logger(this.getClass());
        this.keyDeserializer = ConsumerUtils.createKeyDeserializer(config, keyDeserializer);
        this.valueDeserializer = ConsumerUtils.createValueDeserializer(config, valueDeserializer);
        this.subscriptions = ConsumerUtils.createSubscriptionState(config, this.logContext);
        this.metrics = ConsumerUtils.createMetrics(config, this.time);
        List interceptorList = ConsumerUtils.createConsumerInterceptors(config);
        ClusterResourceListeners clusterResourceListeners = PrototypeAsyncConsumer.configureClusterResourceListeners(this.keyDeserializer, this.valueDeserializer, this.metrics.reporters(), interceptorList);
        this.eventHandler = new DefaultEventHandler(config, groupRebalanceConfig, this.logContext, this.subscriptions, new ApiVersions(), this.metrics, clusterResourceListeners, null);
    }

    PrototypeAsyncConsumer(Time time, LogContext logContext, ConsumerConfig config, SubscriptionState subscriptionState, EventHandler eventHandler, Metrics metrics, Optional<String> groupId, int defaultApiTimeoutMs) {
        this.time = time;
        this.logContext = logContext;
        this.log = logContext.logger(this.getClass());
        this.subscriptions = subscriptionState;
        this.metrics = metrics;
        this.groupId = groupId;
        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
        this.keyDeserializer = ConsumerUtils.createKeyDeserializer(config, null);
        this.valueDeserializer = ConsumerUtils.createValueDeserializer(config, null);
        this.eventHandler = eventHandler;
    }

    @Override
    public ConsumerRecords<K, V> poll(Duration timeout) {
        try {
            do {
                Fetch<K, V> fetch;
                if (!this.eventHandler.isEmpty()) {
                    Optional<BackgroundEvent> backgroundEvent = this.eventHandler.poll();
                    backgroundEvent.ifPresent(event -> this.processEvent((BackgroundEvent)event, timeout));
                }
                if ((fetch = this.collectFetches()).isEmpty()) continue;
                return this.processFetchResults(fetch);
            } while (this.time.timer(timeout).notExpired());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
        return ConsumerRecords.empty();
    }

    @Override
    public void commitSync() {
        this.commitSync(Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    private void processEvent(BackgroundEvent backgroundEvent, Duration timeout) {
    }

    private ConsumerRecords<K, V> processFetchResults(Fetch<K, V> fetch) {
        return ConsumerRecords.empty();
    }

    private Fetch<K, V> collectFetches() {
        return Fetch.empty();
    }

    @Override
    public void commitAsync() {
        this.commitAsync(null);
    }

    @Override
    public void commitAsync(OffsetCommitCallback callback) {
        this.commitAsync(this.subscriptions.allConsumed(), callback);
    }

    @Override
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        CompletableFuture<Void> future = this.commit(offsets);
        OffsetCommitCallback commitCallback = callback == null ? new DefaultOffsetCommitCallback() : callback;
        ((CompletableFuture)future.whenComplete((r, t) -> {
            if (t != null) {
                commitCallback.onComplete(offsets, new KafkaException((Throwable)t));
            } else {
                commitCallback.onComplete(offsets, null);
            }
        })).exceptionally(e -> {
            System.out.println(e);
            throw new KafkaException((Throwable)e);
        });
    }

    CompletableFuture<Void> commit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.maybeThrowInvalidGroupIdException();
        CommitApplicationEvent commitEvent = new CommitApplicationEvent(offsets);
        this.eventHandler.add(commitEvent);
        return commitEvent.future();
    }

    @Override
    public void seek(TopicPartition partition, long offset) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void seekToBeginning(Collection<TopicPartition> partitions) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void seekToEnd(Collection<TopicPartition> partitions) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public long position(TopicPartition partition) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public long position(TopicPartition partition, Duration timeout) {
        throw new KafkaException("method not implemented");
    }

    @Override
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition partition) {
        throw new KafkaException("method not implemented");
    }

    @Override
    @Deprecated
    public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
        return this.committed(partitions, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) {
        this.maybeThrowInvalidGroupIdException();
        if (partitions.isEmpty()) {
            return new HashMap<TopicPartition, OffsetAndMetadata>();
        }
        OffsetFetchApplicationEvent event = new OffsetFetchApplicationEvent(partitions);
        this.eventHandler.add(event);
        try {
            return event.future().get(timeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            throw new InterruptException(e);
        }
        catch (TimeoutException e) {
            throw new org.apache.kafka.common.errors.TimeoutException(e);
        }
        catch (ExecutionException e) {
            throw new KafkaException(e);
        }
        catch (Exception e) {
            throw e;
        }
    }

    private void maybeThrowInvalidGroupIdException() {
        if (!this.groupId.isPresent() || this.groupId.get().isEmpty()) {
            throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.");
        }
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        throw new KafkaException("method not implemented");
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics() {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(String stream) {
        throw new KafkaException("listTopics with a stream name is a MapR specific method which is not allowed in Apache mode.");
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(String stream, Duration timeout) {
        throw new KafkaException("listTopics with a stream name is a MapR specific method which is not allowed in Apache mode.");
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Pattern pattern) {
        throw new KafkaException("listTopics with a stream name is a MapR specific method which is not allowed in Apache mode.");
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Pattern pattern, Duration timeout) {
        throw new KafkaException("listTopics with a stream name is a MapR specific method which is not allowed in Apache mode.");
    }

    @Override
    public Set<TopicPartition> paused() {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void pause(Collection<TopicPartition> partitions) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void resume(Collection<TopicPartition> partitions) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public OptionalLong currentLag(TopicPartition topicPartition) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public ConsumerGroupMetadata groupMetadata() {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void enforceRebalance() {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void enforceRebalance(String reason) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void close() {
        this.close(Duration.ofMillis(30000L));
    }

    @Override
    public void close(Duration timeout) {
        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
        Utils.closeQuietly(this.eventHandler, "event handler", firstException);
        this.log.debug("Kafka consumer has been closed");
        Throwable exception = firstException.get();
        if (exception != null) {
            if (exception instanceof InterruptException) {
                throw (InterruptException)exception;
            }
            throw new KafkaException("Failed to close kafka consumer", exception);
        }
    }

    @Override
    public void wakeup() {
    }

    @Override
    public void commitSync(Duration timeout) {
        this.commitSync(this.subscriptions.allConsumed(), timeout);
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this.commitSync(offsets, Duration.ofMillis(this.defaultApiTimeoutMs));
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
        CompletableFuture<Void> commitFuture = this.commit(offsets);
        try {
            commitFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (TimeoutException e) {
            throw new org.apache.kafka.common.errors.TimeoutException(e);
        }
        catch (InterruptedException e) {
            throw new InterruptException(e);
        }
        catch (ExecutionException e) {
            throw new KafkaException(e);
        }
        catch (Exception e) {
            throw e;
        }
    }

    @Override
    public Set<TopicPartition> assignment() {
        return Collections.unmodifiableSet(this.subscriptions.assignedPartitions());
    }

    @Override
    public Set<String> subscription() {
        return Collections.unmodifiableSet(this.subscriptions.subscription());
    }

    @Override
    public void subscribe(Collection<String> topics) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void assign(Collection<TopicPartition> partitions) {
        if (partitions == null) {
            throw new IllegalArgumentException("Topic partitions collection to assign to cannot be null");
        }
        if (partitions.isEmpty()) {
            return;
        }
        for (TopicPartition tp : partitions) {
            String topic = tp != null ? tp.topic() : null;
            if (!Utils.isBlank(topic)) continue;
            throw new IllegalArgumentException("Topic partitions to assign to cannot have null or empty topic");
        }
        this.eventHandler.add(new AssignmentChangeApplicationEvent(this.subscriptions.allConsumed(), this.time.milliseconds()));
        this.log.info("Assigned to partition(s): {}", (Object)Utils.join(partitions, ", "));
        if (this.subscriptions.assignFromUser(new HashSet<TopicPartition>(partitions))) {
            this.eventHandler.add(new NewTopicsMetadataUpdateRequestEvent());
        }
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void subscribe(Pattern pattern) {
        throw new KafkaException("method not implemented");
    }

    @Override
    public void unsubscribe() {
        throw new KafkaException("method not implemented");
    }

    @Override
    @Deprecated
    public ConsumerRecords<K, V> poll(long timeout) {
        throw new KafkaException("method not implemented");
    }

    private static <K, V> ClusterResourceListeners configureClusterResourceListeners(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, List<?> ... candidateLists) {
        ClusterResourceListeners clusterResourceListeners = new ClusterResourceListeners();
        for (List<?> candidateList : candidateLists) {
            clusterResourceListeners.maybeAddAll(candidateList);
        }
        clusterResourceListeners.maybeAdd(keyDeserializer);
        clusterResourceListeners.maybeAdd(valueDeserializer);
        return clusterResourceListeners;
    }

    public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs, Deserializer<?> keyDeserializer, Deserializer<?> valueDeserializer) {
        HashMap<String, Object> newConfigs = new HashMap<String, Object>(configs);
        if (keyDeserializer != null) {
            newConfigs.put("key.deserializer", keyDeserializer.getClass());
        } else if (newConfigs.get("key.deserializer") == null) {
            throw new ConfigException("key.deserializer", null, "must be non-null.");
        }
        if (valueDeserializer != null) {
            newConfigs.put("value.deserializer", valueDeserializer.getClass());
        } else if (newConfigs.get("value.deserializer") == null) {
            throw new ConfigException("value.deserializer", null, "must be non-null.");
        }
        return newConfigs;
    }

    private class DefaultOffsetCommitCallback
    implements OffsetCommitCallback {
        private DefaultOffsetCommitCallback() {
        }

        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                PrototypeAsyncConsumer.this.log.error("Offset commit with offsets {} failed", offsets, (Object)exception);
            }
        }
    }
}

