package kafka.tools;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import kafka.admin.TopicCommand;
import kafka.utils.ZkUtils;
import org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.yarn.client.cli.YarnCLI;
import org.apache.hadoop.yarn.security.client.TimelineAuthenticationConsts;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import scala.collection.JavaConversions;

@InterfaceStability.Unstable
/* loaded from: input_file:kafka/tools/StreamsResetter.class */
public class StreamsResetter {
    private static final int EXIT_CODE_SUCCESS = 0;
    private static final int EXIT_CODE_ERROR = 1;
    private static OptionSpec<String> bootstrapServerOption;
    private static OptionSpec<String> zookeeperOption;
    private static OptionSpec<String> applicationIdOption;
    private static OptionSpec<String> inputTopicsOption;
    private static OptionSpec<String> intermediateTopicsOption;
    private OptionSet options = null;
    private final Properties consumerConfig = new Properties();
    private final List<String> allTopics = new LinkedList();

    public int run(String[] strArr) {
        return run(strArr, new Properties());
    }

    public int run(String[] strArr, Properties properties) {
        this.consumerConfig.clear();
        this.consumerConfig.putAll(properties);
        int i = 0;
        ZkUtils zkUtils = null;
        try {
            try {
                parseArguments(strArr);
                zkUtils = ZkUtils.apply((String) this.options.valueOf(zookeeperOption), 30000, 30000, JaasUtils.isZkSecurityEnabled());
                this.allTopics.clear();
                this.allTopics.addAll(JavaConversions.seqAsJavaList(zkUtils.getAllTopics()));
                resetInputAndInternalTopicOffsets();
                seekToEndIntermediateTopics();
                deleteInternalTopics(zkUtils);
                if (zkUtils != null) {
                    zkUtils.close();
                }
            } catch (Exception e) {
                i = 1;
                System.err.println("ERROR: " + e.getMessage());
                if (zkUtils != null) {
                    zkUtils.close();
                }
            }
            return i;
        } catch (Throwable th) {
            if (zkUtils != null) {
                zkUtils.close();
            }
            throw th;
        }
    }

    private void parseArguments(String[] strArr) throws IOException {
        OptionParser optionParser = new OptionParser();
        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id)").withRequiredArg().ofType(String.class).describedAs("id").required();
        bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2").withRequiredArg().ofType(String.class).defaultsTo("localhost:9092", new String[0]).describedAs("urls");
        zookeeperOption = optionParser.accepts("zookeeper", "Format: HOST:POST").withRequiredArg().ofType(String.class).defaultsTo("localhost:2181", new String[0]).describedAs(TimelineAuthenticationConsts.DELEGATION_TOKEN_URL);
        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs(YarnCLI.LIST_CMD);
        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs(YarnCLI.LIST_CMD);
        try {
            this.options = optionParser.parse(strArr);
        } catch (OptionException e) {
            optionParser.printHelpOn(System.err);
            throw e;
        }
    }

    private void resetInputAndInternalTopicOffsets() {
        List<String> valuesOf = this.options.valuesOf(inputTopicsOption);
        if (valuesOf.size() == 0) {
            System.out.println("No input topics specified.");
        } else {
            System.out.println("Resetting offsets to zero for input topics " + valuesOf + " and all internal topics.");
        }
        Properties properties = new Properties();
        properties.putAll(this.consumerConfig);
        properties.setProperty("bootstrap.servers", (String) this.options.valueOf(bootstrapServerOption));
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, (String) this.options.valueOf(applicationIdOption));
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        for (String str : valuesOf) {
            if (!this.allTopics.contains(str)) {
                System.out.println("Input topic " + str + " not found. Skipping.");
            }
        }
        for (String str2 : this.allTopics) {
            if (isInputTopic(str2) || isInternalTopic(str2)) {
                System.out.println("Topic: " + str2);
                try {
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
                    Throwable th = null;
                    try {
                        try {
                            kafkaConsumer.subscribe(Collections.singleton(str2));
                            kafkaConsumer.poll(1L);
                            Set<TopicPartition> assignment = kafkaConsumer.assignment();
                            kafkaConsumer.seekToBeginning(assignment);
                            Iterator<TopicPartition> it = assignment.iterator();
                            while (it.hasNext()) {
                                kafkaConsumer.position(it.next());
                            }
                            kafkaConsumer.commitSync();
                            if (kafkaConsumer != null) {
                                if (0 != 0) {
                                    try {
                                        kafkaConsumer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    kafkaConsumer.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (RuntimeException e) {
                    System.err.println("ERROR: Resetting offsets for topic " + str2 + " failed.");
                    throw e;
                }
            }
        }
        System.out.println("Done.");
    }

    private boolean isInputTopic(String str) {
        return this.options.valuesOf(inputTopicsOption).contains(str);
    }

    private void seekToEndIntermediateTopics() {
        List<String> valuesOf = this.options.valuesOf(intermediateTopicsOption);
        if (valuesOf.size() == 0) {
            System.out.println("No intermediate user topics specified, skipping seek-to-end for user topic offsets.");
            return;
        }
        System.out.println("Seek-to-end for intermediate user topics " + valuesOf);
        Properties properties = new Properties();
        properties.putAll(this.consumerConfig);
        properties.setProperty("bootstrap.servers", (String) this.options.valueOf(bootstrapServerOption));
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, (String) this.options.valueOf(applicationIdOption));
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        for (String str : valuesOf) {
            if (this.allTopics.contains(str)) {
                System.out.println("Topic: " + str);
                try {
                    KafkaConsumer kafkaConsumer = new KafkaConsumer(properties, (Deserializer) new ByteArrayDeserializer(), (Deserializer) new ByteArrayDeserializer());
                    Throwable th = null;
                    try {
                        try {
                            kafkaConsumer.subscribe(Collections.singleton(str));
                            kafkaConsumer.poll(1L);
                            Set<TopicPartition> assignment = kafkaConsumer.assignment();
                            kafkaConsumer.seekToEnd(assignment);
                            Iterator<TopicPartition> it = assignment.iterator();
                            while (it.hasNext()) {
                                kafkaConsumer.position(it.next());
                            }
                            kafkaConsumer.commitSync();
                            if (kafkaConsumer != null) {
                                if (0 != 0) {
                                    try {
                                        kafkaConsumer.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    kafkaConsumer.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (RuntimeException e) {
                    System.err.println("ERROR: Seek-to-end for topic " + str + " failed.");
                    throw e;
                }
            } else {
                System.out.println("Topic " + str + " not found. Skipping.");
            }
        }
        System.out.println("Done.");
    }

    private void deleteInternalTopics(ZkUtils zkUtils) {
        System.out.println("Deleting all internal/auto-created topics for application " + ((String) this.options.valueOf(applicationIdOption)));
        for (String str : this.allTopics) {
            if (isInternalTopic(str)) {
                try {
                    TopicCommand.deleteTopic(zkUtils, new TopicCommand.TopicCommandOptions(new String[]{"--zookeeper", (String) this.options.valueOf(zookeeperOption), "--delete", "--topic", str}));
                } catch (RuntimeException e) {
                    System.err.println("ERROR: Deleting topic " + str + " failed.");
                    throw e;
                }
            }
        }
        System.out.println("Done.");
    }

    private boolean isInternalTopic(String str) {
        return str.startsWith(new StringBuilder().append((String) this.options.valueOf(applicationIdOption)).append(HelpFormatter.DEFAULT_OPT_PREFIX).toString()) && (str.endsWith("-changelog") || str.endsWith("-repartition"));
    }

    public static void main(String[] strArr) {
        System.exit(new StreamsResetter().run(strArr));
    }
}
