/*
 * Decompiled with CFR 0.152.
 */
package kafka.examples;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import kafka.examples.Utils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class Consumer
extends Thread
implements ConsumerRebalanceListener {
    private final String bootstrapServers;
    private final String topic;
    private final String groupId;
    private final Optional<String> instanceId;
    private final boolean readCommitted;
    private final int numRecords;
    private final CountDownLatch latch;
    private volatile boolean closed;
    private int remainingRecords;

    public Consumer(String threadName, String bootstrapServers, String topic, String groupId, Optional<String> instanceId, boolean readCommitted, int numRecords, CountDownLatch latch) {
        super(threadName);
        this.bootstrapServers = bootstrapServers;
        this.topic = topic;
        this.groupId = groupId;
        this.instanceId = instanceId;
        this.readCommitted = readCommitted;
        this.numRecords = numRecords;
        this.remainingRecords = numRecords;
        this.latch = latch;
    }

    @Override
    public void run() {
        try (KafkaConsumer<Integer, String> consumer = this.createKafkaConsumer();){
            consumer.subscribe(Collections.singleton(this.topic), (ConsumerRebalanceListener)this);
            Utils.printOut("Subscribed to %s", this.topic);
            while (!this.closed && this.remainingRecords > 0) {
                try {
                    ConsumerRecords records = consumer.poll(Duration.ofSeconds(1L));
                    for (ConsumerRecord record : records) {
                        Utils.maybePrintRecord(this.numRecords, (ConsumerRecord<Integer, String>)record);
                    }
                    this.remainingRecords -= records.count();
                }
                catch (AuthorizationException | RecordDeserializationException | UnsupportedVersionException e) {
                    Utils.printErr(e.getMessage(), new Object[0]);
                    this.shutdown();
                }
                catch (NoOffsetForPartitionException | OffsetOutOfRangeException e) {
                    Utils.printOut("Invalid or no offset found, using latest", new Object[0]);
                    consumer.seekToEnd((Collection)e.partitions());
                    consumer.commitSync();
                }
                catch (KafkaException e) {
                    Utils.printErr(e.getMessage(), new Object[0]);
                }
            }
        }
        catch (Throwable e) {
            Utils.printOut("Unhandled exception", new Object[0]);
            e.printStackTrace();
        }
        Utils.printOut("Fetched %d records", this.numRecords - this.remainingRecords);
        this.shutdown();
    }

    public void shutdown() {
        if (!this.closed) {
            this.closed = true;
            this.latch.countDown();
        }
    }

    public KafkaConsumer<Integer, String> createKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this.bootstrapServers);
        props.put("client.id", "client-" + UUID.randomUUID());
        props.put("group.id", this.groupId);
        this.instanceId.ifPresent(id -> props.put("group.instance.id", id));
        props.put("enable.auto.commit", this.readCommitted ? "false" : "true");
        props.put("key.deserializer", IntegerDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        if (this.readCommitted) {
            props.put("isolation.level", "read_committed");
        }
        props.put("auto.offset.reset", "earliest");
        return new KafkaConsumer(props);
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        Utils.printOut("Revoked partitions: %s", partitions);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        Utils.printOut("Assigned partitions: %s", partitions);
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        Utils.printOut("Lost partitions: %s", partitions);
    }
}

