/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.consumer.CommitType;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceCallback;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.MapRKafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.mapr.AbstractConsumer;
import org.apache.kafka.clients.mapr.GenericHFactory;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumer<K, V>
implements Consumer<K, V> {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
    private final ConsumerConfig config;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final ConsumerRebalanceCallback callback;
    private boolean closed;
    private Consumer<K, V> consumerDriver = null;

    public KafkaConsumer(Map<String, Object> configs) {
        this(configs, null, null, null);
    }

    public KafkaConsumer(Map<String, Object> configs, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)), callback, keyDeserializer, valueDeserializer);
    }

    public KafkaConsumer(Properties properties) {
        this(properties, (ConsumerRebalanceCallback)null, (Deserializer<K>)null, (Deserializer<V>)null);
    }

    public KafkaConsumer(Properties properties, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)), callback, keyDeserializer, valueDeserializer);
    }

    private KafkaConsumer(ConsumerConfig config, ConsumerRebalanceCallback callback, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        log.debug("Starting the Kafka consumer");
        this.callback = callback;
        this.config = config;
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
        this.closed = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initializeConsumer(String topic) {
        KafkaConsumer kafkaConsumer = this;
        synchronized (kafkaConsumer) {
            if (this.closed) {
                log.error("cannot initialize consumer. already closed.");
                return;
            }
            if (this.consumerDriver != null) {
                log.debug("initialized consumer already.");
                return;
            }
            if (topic.startsWith("/") || topic.contains(":")) {
                AbstractConsumer ac;
                GenericHFactory consumerFactory = new GenericHFactory();
                this.consumerDriver = ac = (AbstractConsumer)consumerFactory.getImplementorInstance("com.mapr.fs.marlin.listener.MarlinListener", new Object[]{this.config, this.callback, this.keyDeserializer, this.valueDeserializer}, ConsumerConfig.class, ConsumerRebalanceCallback.class, Deserializer.class, Deserializer.class);
            } else {
                List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(this.config.getList("bootstrap.servers"));
                if (addresses.size() == 0 || addresses.get(0).equals("")) {
                    throw new KafkaException("Bootstrap servers not specified in configuration");
                }
                this.consumerDriver = new MapRKafkaConsumer<K, V>(this.config, this.callback, this.keyDeserializer, this.valueDeserializer);
            }
        }
    }

    @Override
    public Set<TopicPartition> subscriptions() {
        if (this.consumerDriver == null) {
            return new HashSet<TopicPartition>();
        }
        return this.consumerDriver.subscriptions();
    }

    @Override
    public void subscribe(String ... topics) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(topics[0]);
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot subscribe");
            return;
        }
        this.consumerDriver.subscribe(topics);
    }

    @Override
    public void subscribe(TopicPartition ... partitions) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions[0].topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot subscribe");
            return;
        }
        this.consumerDriver.subscribe(partitions);
    }

    @Override
    public void unsubscribe(String ... topics) {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("Topics were never subscribed to.");
        }
        this.consumerDriver.unsubscribe(topics);
    }

    @Override
    public void unsubscribe(TopicPartition ... partitions) {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("Partitions  were never subscribed to.");
        }
        this.consumerDriver.unsubscribe(partitions);
    }

    @Override
    public ConsumerRecords<K, V> poll(long timeout) {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("No active subscriptions");
        }
        return this.consumerDriver.poll(timeout);
    }

    @Override
    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
        if (this.consumerDriver == null) {
            Set<TopicPartition> partitions = offsets.keySet();
            this.initializeConsumer(partitions.iterator().next().topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot commit");
            return;
        }
        this.consumerDriver.commit(offsets, commitType);
    }

    @Override
    public void commit(CommitType commitType) {
        if (this.consumerDriver == null) {
            throw new IllegalStateException("No active subscriptions");
        }
        this.consumerDriver.commit(commitType);
    }

    @Override
    public void seek(TopicPartition partition, long offset) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot seek");
            return;
        }
        this.consumerDriver.seek(partition, offset);
    }

    @Override
    public void seekToBeginning(TopicPartition ... partitions) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions[0].topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot seek");
            return;
        }
        this.consumerDriver.seekToBeginning(partitions);
    }

    @Override
    public void seekToEnd(TopicPartition ... partitions) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partitions[0].topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot seek");
            return;
        }
        this.consumerDriver.seekToEnd(partitions);
    }

    @Override
    public long position(TopicPartition partition) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot get position");
            throw new NoOffsetForPartitionException("consumer closed, cannot get position");
        }
        return this.consumerDriver.position(partition);
    }

    @Override
    public long committed(TopicPartition partition) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(partition.topic());
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot get committed");
            throw new NoOffsetForPartitionException("consumer closed, cannot get committed");
        }
        return this.consumerDriver.committed(partition);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        if (this.consumerDriver == null) {
            log.error("consumed not initialized, cannot get metrics");
            return null;
        }
        return this.consumerDriver.metrics();
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        if (this.consumerDriver == null) {
            this.initializeConsumer(topic);
        }
        if (this.consumerDriver == null) {
            log.error("consumer closed, cannot get partitionsFor " + topic);
            return null;
        }
        return this.consumerDriver.partitionsFor(topic);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        KafkaConsumer kafkaConsumer = this;
        synchronized (kafkaConsumer) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.consumerDriver == null) {
                return;
            }
        }
        this.consumerDriver.close();
    }

    @Override
    public void wakeup() {
        if (this.closed) {
            log.error("Consumer closed, cannot wake up.");
            return;
        }
        if (this.consumerDriver == null) {
            return;
        }
        this.consumerDriver.wakeup();
    }
}

