/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.CommitType;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

public class MockConsumer<K, V>
implements Consumer<K, V> {
    private final Map<String, List<PartitionInfo>> partitions;
    private final SubscriptionState subscriptions;
    private Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
    private boolean closed;

    public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
        this.subscriptions = new SubscriptionState(offsetResetStrategy);
        this.partitions = new HashMap<String, List<PartitionInfo>>();
        this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
        this.closed = false;
    }

    @Override
    public synchronized Set<TopicPartition> subscriptions() {
        return this.subscriptions.assignedPartitions();
    }

    @Override
    public synchronized void subscribe(String ... topics) {
        this.ensureNotClosed();
        for (String topic : topics) {
            this.subscriptions.subscribe(topic);
        }
    }

    @Override
    public synchronized void subscribe(TopicPartition ... partitions) {
        this.ensureNotClosed();
        for (TopicPartition partition : partitions) {
            this.subscriptions.subscribe(partition);
        }
    }

    @Override
    public synchronized void unsubscribe(String ... topics) {
        this.ensureNotClosed();
        for (String topic : topics) {
            this.subscriptions.unsubscribe(topic);
        }
    }

    @Override
    public synchronized void unsubscribe(TopicPartition ... partitions) {
        this.ensureNotClosed();
        for (TopicPartition partition : partitions) {
            this.subscriptions.unsubscribe(partition);
        }
    }

    @Override
    public synchronized ConsumerRecords<K, V> poll(long timeout) {
        this.ensureNotClosed();
        for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
            List<ConsumerRecord<K, V>> recs = entry.getValue();
            if (recs.isEmpty()) continue;
            this.subscriptions.consumed(entry.getKey(), recs.get(recs.size() - 1).offset());
        }
        ConsumerRecords<K, V> copy = new ConsumerRecords<K, V>(this.records);
        this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>();
        return copy;
    }

    public synchronized void addRecord(ConsumerRecord<K, V> record) {
        this.ensureNotClosed();
        TopicPartition tp = new TopicPartition(record.topic(), record.partition());
        this.subscriptions.assignedPartitions().add(tp);
        List<ConsumerRecord<K, V>> recs = this.records.get(tp);
        if (recs == null) {
            recs = new ArrayList<ConsumerRecord<K, V>>();
            this.records.put(tp, recs);
        }
        recs.add(record);
    }

    @Override
    public synchronized void commit(Map<TopicPartition, Long> offsets, CommitType commitType) {
        this.ensureNotClosed();
        for (Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
            this.subscriptions.committed(entry.getKey(), entry.getValue());
        }
    }

    @Override
    public synchronized void commit(CommitType commitType) {
        this.ensureNotClosed();
        this.commit(this.subscriptions.allConsumed(), commitType);
    }

    @Override
    public synchronized void seek(TopicPartition partition, long offset) {
        this.ensureNotClosed();
        this.subscriptions.seek(partition, offset);
    }

    @Override
    public synchronized long committed(TopicPartition partition) {
        this.ensureNotClosed();
        return this.subscriptions.committed(partition);
    }

    @Override
    public synchronized long position(TopicPartition partition) {
        this.ensureNotClosed();
        return this.subscriptions.consumed(partition);
    }

    @Override
    public synchronized void seekToBeginning(TopicPartition ... partitions) {
        this.ensureNotClosed();
        throw new UnsupportedOperationException();
    }

    @Override
    public synchronized void seekToEnd(TopicPartition ... partitions) {
        this.ensureNotClosed();
        throw new UnsupportedOperationException();
    }

    @Override
    public Map<MetricName, ? extends Metric> metrics() {
        this.ensureNotClosed();
        return Collections.emptyMap();
    }

    @Override
    public synchronized List<PartitionInfo> partitionsFor(String topic) {
        this.ensureNotClosed();
        List<PartitionInfo> parts = this.partitions.get(topic);
        if (parts == null) {
            return Collections.emptyList();
        }
        return parts;
    }

    public synchronized void updatePartitions(String topic, List<PartitionInfo> partitions) {
        this.ensureNotClosed();
        this.partitions.put(topic, partitions);
    }

    @Override
    public synchronized void close() {
        this.ensureNotClosed();
        this.closed = true;
    }

    @Override
    public void wakeup() {
    }

    private void ensureNotClosed() {
        if (this.closed) {
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }
}

