/*
 * Decompiled with CFR 0.152.
 */
package kafka.examples;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import kafka.examples.Consumer;
import kafka.examples.Producer;
import kafka.examples.Utils;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.UnsupportedVersionException;

public class ExactlyOnceMessageProcessor
extends Thread
implements ConsumerRebalanceListener {
    private final String bootstrapServers;
    private final String inputTopic;
    private final String outputTopic;
    private final String groupInstanceId;
    private final CountDownLatch latch;
    private final String transactionalId;
    private volatile boolean closed;
    private final KafkaProducer<Integer, String> producer;
    private final KafkaConsumer<Integer, String> consumer;

    public ExactlyOnceMessageProcessor(String threadName, String bootstrapServers, String inputTopic, String outputTopic, CountDownLatch latch) {
        super(threadName);
        this.bootstrapServers = bootstrapServers;
        this.inputTopic = inputTopic;
        this.outputTopic = outputTopic;
        this.transactionalId = "tid-" + threadName;
        int transactionTimeoutMs = 10000;
        this.producer = new Producer("processor-producer", "localhost:9092", outputTopic, true, this.transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
        this.groupInstanceId = "giid-" + threadName;
        boolean readCommitted = true;
        this.consumer = new Consumer("processor-consumer", "localhost:9092", inputTopic, "processor-group", Optional.of(this.groupInstanceId), readCommitted, -1, null).createKafkaConsumer();
        this.latch = latch;
    }

    @Override
    public void run() {
        int processedRecords = 0;
        long remainingRecords = Long.MAX_VALUE;
        int transactionTimeoutMs = 10000;
        boolean readCommitted = true;
        try (KafkaProducer<Integer, String> producer = new Producer("processor-producer", this.bootstrapServers, this.outputTopic, true, this.transactionalId, true, -1, transactionTimeoutMs, null).createKafkaProducer();
             KafkaConsumer<Integer, String> consumer = new Consumer("processor-consumer", this.bootstrapServers, this.inputTopic, "processor-group", Optional.of(this.groupInstanceId), readCommitted, -1, null).createKafkaConsumer();){
            producer.initTransactions();
            consumer.subscribe(Collections.singleton(this.inputTopic), (ConsumerRebalanceListener)this);
            Utils.printOut("Processing new records", new Object[0]);
            while (!this.closed && remainingRecords > 0L) {
                try {
                    ConsumerRecords records = consumer.poll(Duration.ofMillis(200L));
                    if (!records.isEmpty()) {
                        producer.beginTransaction();
                        for (ConsumerRecord record : records) {
                            ProducerRecord newRecord = new ProducerRecord(this.outputTopic, (Object)((Integer)record.key()), (Object)((String)record.value() + "-ok"));
                            producer.send(newRecord);
                        }
                        producer.sendOffsetsToTransaction(this.getOffsetsToCommit(consumer), consumer.groupMetadata());
                        producer.commitTransaction();
                        processedRecords += records.count();
                    }
                }
                catch (AuthorizationException | FencedInstanceIdException | OutOfOrderSequenceException | ProducerFencedException | SerializationException | UnsupportedVersionException e) {
                    Utils.printErr(e.getMessage(), new Object[0]);
                    this.shutdown();
                }
                catch (NoOffsetForPartitionException | OffsetOutOfRangeException e) {
                    Utils.printOut("Invalid or no offset found, using latest", new Object[0]);
                    consumer.seekToEnd(Collections.emptyList());
                    consumer.commitSync();
                }
                catch (KafkaException e) {
                    Utils.printOut("Aborting transaction: %s", new Object[]{e});
                    producer.abortTransaction();
                }
                if ((remainingRecords = this.getRemainingRecords(consumer)) == Long.MAX_VALUE) continue;
                Utils.printOut("Remaining records: %d", remainingRecords);
            }
        }
        catch (Throwable e) {
            Utils.printOut("Unhandled exception", new Object[0]);
            e.printStackTrace();
        }
        Utils.printOut("Processed %d records", processedRecords);
        this.shutdown();
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        Utils.printOut("Revoked partitions: %s", partitions);
    }

    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        Utils.printOut("Assigned partitions: %s", partitions);
    }

    public void onPartitionsLost(Collection<TopicPartition> partitions) {
        Utils.printOut("Lost partitions: %s", partitions);
    }

    public void shutdown() {
        if (!this.closed) {
            this.closed = true;
            this.latch.countDown();
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(KafkaConsumer<Integer, String> consumer) {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition topicPartition : consumer.assignment()) {
            offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
        }
        return offsets;
    }

    private long getRemainingRecords(KafkaConsumer<Integer, String> consumer) {
        Map fullEndOffsets = consumer.endOffsets(new ArrayList(consumer.assignment()));
        if (fullEndOffsets.isEmpty()) {
            return Long.MAX_VALUE;
        }
        return consumer.assignment().stream().mapToLong(partition -> {
            long currentPosition = consumer.position(partition);
            if (fullEndOffsets.containsKey(partition)) {
                return (Long)fullEndOffsets.get(partition) - currentPosition;
            }
            return 0L;
        }).sum();
    }
}

