/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.impl.listener;

import com.mapr.fs.proto.Dbserver;
import com.mapr.fs.proto.Marlinserver;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.MapRCDCDeserializer;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.TopicRefreshListListener;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.TopicRefreshRegexListener;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.impl.MarlinClient;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.impl.listener.ListenerRecord;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.impl.listener.MarlinConsumerCoordinator;
import org.apache.flink.kafka.shaded.com.mapr.kafka.eventstreams.impl.listener.MarlinListenerImpl;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.Consumer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.Metric;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.MetricName;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.PartitionInfo;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.record.TimestampType;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MarlinListener<K, V>
extends MarlinClient
implements Consumer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(MarlinListener.class);
    private final Deserializer<K> _keyDeserializer;
    private final Deserializer<V> _valueDeserializer;
    protected final MarlinListenerImpl _listener;
    protected MarlinConsumerCoordinator _coordinator;
    protected ConsumerGroupMetadata _groupMetadata;
    protected boolean _clientSidePartitioningEnabled = false;

    protected MarlinListener(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, MarlinListenerImpl listener) {
        LOG.debug("Starting Streams Listener");
        this._keyDeserializer = keyDeserializer == null ? config.getConfiguredInstance("key.deserializer", Deserializer.class) : keyDeserializer;
        this._valueDeserializer = valueDeserializer == null ? config.getConfiguredInstance("value.deserializer", Deserializer.class) : valueDeserializer;
        this._listener = listener;
        LOG.debug("Streams listener created");
    }

    public MarlinListener(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this(config, keyDeserializer, valueDeserializer, new MarlinListenerImpl(config, null, MarlinListener.DeserializerToCDCOpenFormatType(valueDeserializer)));
    }

    public static Dbserver.CDCOpenFormatType DeserializerToCDCOpenFormatType(Deserializer<?> valueDeserializer) {
        if (valueDeserializer == null) {
            return Dbserver.CDCOpenFormatType.COFT_NONE;
        }
        if (Arrays.stream(valueDeserializer.getClass().getInterfaces()).anyMatch(c -> "com.mapr.streams.MapRCDCDeserializer".equals(c.getName()))) {
            return Dbserver.CDCOpenFormatType.COFT_CDRECORD;
        }
        if (valueDeserializer instanceof MapRCDCDeserializer) {
            return ((MapRCDCDeserializer)((Object)valueDeserializer)).getOpenFormatType();
        }
        return Dbserver.CDCOpenFormatType.COFT_JSON;
    }

    @Override
    public Set<TopicPartition> assignment() {
        return this._listener.assignment();
    }

    @Override
    public Set<String> subscription() {
        if (this._clientSidePartitioningEnabled) {
            return this._coordinator.subscription();
        }
        return this._listener.subscription();
    }

    @Override
    public void subscribe(Collection<String> topics) {
        if (this._clientSidePartitioningEnabled) {
            this._coordinator.subscribe(topics, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
        } else {
            this.subscribe(topics, (ConsumerRebalanceListener)new NoOpConsumerRebalanceListener());
        }
    }

    public void topicRefresherRegex(Pattern pattern, TopicRefreshRegexListener callback) {
        this._listener.topicRefresherRegex(pattern, callback);
    }

    public void topicRefresherList(Collection<String> topics, TopicRefreshListListener callback) {
        this._listener.topicRefresherList(topics, callback);
    }

    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
        if (this._clientSidePartitioningEnabled) {
            this._coordinator.subscribe(topics, callback);
        } else {
            this._listener.subscribe(topics, callback);
        }
    }

    @Override
    public void assign(Collection<TopicPartition> partitions) {
        if (this._clientSidePartitioningEnabled) {
            this._coordinator.assign(partitions);
        } else {
            this._listener.assign(partitions);
        }
    }

    @Override
    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
        if (this._clientSidePartitioningEnabled) {
            this._coordinator.subscribe(pattern, callback);
        } else {
            this._listener.subscribe(pattern, callback);
        }
    }

    @Override
    public void subscribe(Pattern pattern) {
        throw new KafkaException("subscribe API not implemented");
    }

    @Override
    public void unsubscribe() {
        if (this._clientSidePartitioningEnabled) {
            this._coordinator.unsubscribe();
        } else {
            this._listener.unsubscribe();
        }
    }

    @Override
    public ConsumerRecords<K, V> poll(long timeoutMS) {
        Map<TopicPartition, List<ListenerRecord>> marlinRecMap = this._listener.poll(timeoutMS);
        HashMap kafkaRecMap = new HashMap();
        Iterator<Map.Entry<TopicPartition, List<ListenerRecord>>> it = marlinRecMap.entrySet().iterator();
        while (it.hasNext()) {
            boolean feedsAdded = false;
            Map.Entry<TopicPartition, List<ListenerRecord>> entry = it.next();
            TopicPartition partition = entry.getKey();
            List<ListenerRecord> feedRecords = entry.getValue();
            ArrayList<ConsumerRecord<K, V>> partitionRecords = new ArrayList<ConsumerRecord<K, V>>();
            for (ListenerRecord feedRec : feedRecords) {
                feedsAdded = true;
                partitionRecords.add(this.toKafkaConsumerRecord(feedRec, this._keyDeserializer, this._valueDeserializer));
            }
            if (!feedsAdded) continue;
            kafkaRecMap.put(partition, partitionRecords);
        }
        return new ConsumerRecords(kafkaRecMap);
    }

    @Override
    public ConsumerRecords<K, V> poll(Duration timeout) {
        return this.poll(timeout.toMillis());
    }

    @Override
    public void commitSync() {
        this._listener.commitSync();
    }

    @Override
    public void commitSync(Duration timeout) {
        this._listener.commitSync();
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        this._listener.commitSync(offsets);
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {
        this._listener.commitSync(offsets);
    }

    @Override
    public void commitAsync() {
        this._listener.commitAsync();
    }

    @Override
    public void commitAsync(OffsetCommitCallback callback) {
        this._listener.commitAsync(callback);
    }

    @Override
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this._listener.commitAsync(offsets, callback);
    }

    @Override
    public void seek(TopicPartition partition, long offset) {
        this._listener.seek(partition, offset);
    }

    @Override
    public void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) {
        this._listener.seek(partition, offsetAndMetadata.offset());
    }

    @Override
    public void seekToBeginning(Collection<TopicPartition> partitions) {
        this._listener.seekToBeginning(partitions);
    }

    @Override
    public void seekToEnd(Collection<TopicPartition> partitions) {
        this._listener.seekToEnd(partitions);
    }

    @Override
    public long position(TopicPartition partition) {
        return this._listener.position(partition);
    }

    @Override
    public long position(TopicPartition partition, Duration timeout) {
        return this._listener.position(partition);
    }

    @Override
    public OffsetAndMetadata committed(TopicPartition partition) {
        return this._listener.committed(partition);
    }

    @Override
    public OffsetAndMetadata committed(TopicPartition partition, Duration timeout) {
        return this._listener.committed(partition);
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(partitions.size());
        for (TopicPartition p : partitions) {
            if (p == null) continue;
            offsets.put(p, this._listener.committed(p));
        }
        return offsets;
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions, Duration timeout) {
        return this.committed(partitions);
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        return this._listener.metrics();
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic) {
        return this._listener.getTopicInfo(topic);
    }

    @Override
    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
        return this._listener.getTopicInfo(topic);
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics() {
        return this._listener.listTopics();
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
        return this._listener.listTopics();
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(String stream) {
        return this._listener.listTopics(stream);
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(String stream, Duration timeout) {
        return this._listener.listTopics(stream);
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Pattern pattern) {
        return this._listener.listTopics(pattern);
    }

    @Override
    public Map<String, List<PartitionInfo>> listTopics(Pattern pattern, Duration timeout) {
        return this.listTopics(pattern);
    }

    @Override
    public void pause(Collection<TopicPartition> partitions) {
        this._listener.pause(partitions);
    }

    @Override
    public void resume(Collection<TopicPartition> partitions) {
        this._listener.resume(partitions);
    }

    @Override
    public Set<TopicPartition> paused() {
        return this._listener.paused();
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
        return this._listener.offsetsForTimes(timestampsToSearch);
    }

    @Override
    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
        int tmillis = (int)timeout.getSeconds() * 1000;
        return this._listener.offsetsForTimes(timestampsToSearch, tmillis);
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
        return this._listener.beginningOffsets(partitions);
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        int tmillis = (int)timeout.getSeconds() * 1000;
        return this._listener.beginningOffsets(partitions, tmillis);
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
        return this._listener.endOffsets(partitions);
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
        int tmillis = (int)timeout.getSeconds() * 1000;
        return this._listener.endOffsets(partitions, tmillis);
    }

    @Override
    public ConsumerGroupMetadata groupMetadata() {
        if (this._groupMetadata == null) {
            throw new InvalidGroupIdException("To use the group management, you must provide a valid group.id in the consumer configuration.");
        }
        return this._groupMetadata;
    }

    @Override
    public void enforceRebalance(String reason) {
        LOG.warn("enforceRebalance(reason) method is not currently supported in MapR Kafka. Reason will be ignored");
        if (this._coordinator == null) {
            LOG.warn("Consumer does not have a group, ignoring enforceRebalance call.");
            return;
        }
        this._coordinator.requestRejoin();
    }

    @Override
    public void enforceRebalance() {
        this.enforceRebalance(null);
    }

    @Override
    public OptionalLong currentLag(TopicPartition topicPartition) {
        throw new KafkaException("currentLag method is not currently supported in MapR Kafka");
    }

    @Override
    public void close() {
        if (this._clientSidePartitioningEnabled) {
            this._coordinator.close();
        }
        this._listener.close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close(Duration timeout) {
        if (this._clientSidePartitioningEnabled) {
            this._coordinator.close();
        }
        this._listener.close(timeout.getSeconds(), TimeUnit.SECONDS);
    }

    @Override
    public void wakeup() {
        this._listener.wakeup();
    }

    public Marlinserver.JoinGroupResponse join(Marlinserver.JoinGroupDesc desc, MarlinJoinCallback cb) {
        return this._listener.join(desc, cb);
    }

    protected <K, V> ConsumerRecord<K, V> generateConsumerRecord(String topic, ListenerRecord rec, K kkey, V kvalue, int serializedKeySize, int serializedValueSize) {
        return new ConsumerRecord<K, V>(topic, rec.feedId(), rec.offset(), rec.timestamp(), TimestampType.NO_TIMESTAMP_TYPE, -1, -1, kkey, kvalue, rec.headers(), Optional.empty(), rec.producer());
    }

    public <K, V> ConsumerRecord<K, V> toKafkaConsumerRecord(ListenerRecord rec, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        K kkey = null;
        byte[] key = rec.key();
        byte[] value = rec.value();
        String topic = rec.topic();
        if (key != null) {
            kkey = keyDeserializer.deserialize(topic, key);
        }
        V kvalue = null;
        if (value != null) {
            kvalue = valueDeserializer.deserialize(topic, value);
        }
        return this.generateConsumerRecord(topic, rec, kkey, kvalue, key == null ? -1 : key.length, value == null ? -1 : value.length);
    }

    public static interface MarlinJoinCallback {
        public void onJoin(Marlinserver.JoinGroupInfo var1);

        public void onRejoin(Marlinserver.JoinGroupInfo var1);
    }
}

