/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import com.mapr.kafka.eventstreams.Admin;
import com.mapr.kafka.eventstreams.Streams;
import com.mapr.kafka.eventstreams.impl.admin.AssignInfo;
import com.mapr.kafka.eventstreams.impl.admin.MarlinAdminImpl;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.utils.CommandLineUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.mapr.util.MapRTopicUtils;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import scala.collection.JavaConverters;
import scala.collection.Set;

@InterfaceStability.Unstable
public class StreamsResetter {
    private static final int EXIT_CODE_SUCCESS = 0;
    private static final int EXIT_CODE_ERROR = 1;
    private static OptionSpec<String> applicationIdOption;
    private static OptionSpec<String> inputTopicsOption;
    private static OptionSpec<String> intermediateTopicsOption;
    private static OptionSpec<Long> toOffsetOption;
    private static OptionSpec<String> toDatetimeOption;
    private static OptionSpec<String> byDurationOption;
    private static OptionSpecBuilder toEarliestOption;
    private static OptionSpecBuilder toLatestOption;
    private static OptionSpec<String> fromFileOption;
    private static OptionSpec<Long> shiftByOption;
    private static OptionSpecBuilder dryRunOption;
    private static OptionSpec helpOption;
    private static OptionSpec versionOption;
    private static OptionSpecBuilder executeOption;
    private static OptionSpec<String> commandConfigOption;
    private static OptionSpec forceOption;
    private static OptionSpec<String> defaultStreamOption;
    private static String usage;
    private OptionSet options = null;

    public int run(String[] args) {
        return this.run(args, new Properties());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int run(String[] args, Properties config) {
        int exitCode;
        org.apache.kafka.clients.admin.Admin adminClient = null;
        try {
            this.parseArguments(args);
            boolean dryRun = this.options.has((OptionSpec)dryRunOption);
            String groupId = (String)this.options.valueOf(applicationIdOption);
            Properties properties = new Properties();
            if (this.options.has(commandConfigOption)) {
                properties.putAll((Map<?, ?>)Utils.loadProps((String)((String)this.options.valueOf(commandConfigOption))));
            }
            adminClient = org.apache.kafka.clients.admin.Admin.create((Properties)properties);
            String appDir = String.format("/apps/kafka-streams/%s", groupId);
            String internalStream = String.format("%s/kafka-internal-stream", appDir);
            String internalStreamCompacted = String.format("%s/kafka-internal-stream-compacted", appDir);
            if (dryRun) {
                System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
            }
            HashMap<Object, Object> consumerConfig = new HashMap<Object, Object>(config);
            consumerConfig.putAll(properties);
            exitCode = this.maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, adminClient, dryRun, internalStreamCompacted);
            this.deleteAppDir(adminClient, dryRun, internalStream, internalStreamCompacted, appDir);
        }
        catch (Throwable e) {
            exitCode = 1;
            System.err.println("ERROR: " + e);
            e.printStackTrace(System.err);
        }
        finally {
            if (adminClient != null) {
                adminClient.close(Duration.ofSeconds(60L));
            }
        }
        return exitCode;
    }

    private void maybeDeleteActiveConsumers(String streamName, String groupId, Iterable<String> topics) {
        try (MarlinAdminImpl adminClient = null;){
            adminClient = new MarlinAdminImpl(new Configuration());
            for (String topic : topics) {
                List infoLst = adminClient.listAssigns(streamName, groupId, topic);
                for (AssignInfo info : infoLst) {
                    if (info.numListeners() <= 0) continue;
                    throw new IllegalStateException("Consumer group '" + groupId + "' is still active. Make sure to stop all running application instances before running the reset tool. You can use option '--force' to remove active members from the group.");
                }
            }
        }
    }

    private void parseArguments(String[] args) {
        OptionParser optionParser = new OptionParser(false);
        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id).").withRequiredArg().ofType(String.class).describedAs("id").required();
        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs("list");
        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics that are input and output topics, e.g., used in the deprecated through() method). For these topics, the tool will skip to the end.").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs("list");
        toOffsetOption = optionParser.accepts("to-offset", "Reset offsets to a specific offset.").withRequiredArg().ofType(Long.class);
        toDatetimeOption = optionParser.accepts("to-datetime", "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'").withRequiredArg().ofType(String.class);
        byDurationOption = optionParser.accepts("by-duration", "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'").withRequiredArg().ofType(String.class);
        toEarliestOption = optionParser.accepts("to-earliest", "Reset offsets to earliest offset.");
        toLatestOption = optionParser.accepts("to-latest", "Reset offsets to latest offset.");
        fromFileOption = optionParser.accepts("from-file", "Reset offsets to values defined in CSV file.").withRequiredArg().ofType(String.class);
        shiftByOption = optionParser.accepts("shift-by", "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative").withRequiredArg().describedAs("number-of-offsets").ofType(Long.class);
        commandConfigOption = optionParser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.").withRequiredArg().ofType(String.class).describedAs("file name");
        forceOption = optionParser.accepts("force", "Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances.");
        defaultStreamOption = optionParser.accepts("default-stream", "Default stream that is used if topic is specified without stream.").withRequiredArg().ofType(String.class).describedAs("default-stream");
        executeOption = optionParser.accepts("execute", "Execute the command.");
        dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");
        helpOption = optionParser.accepts("help", "Print usage information.").forHelp();
        versionOption = optionParser.accepts("version", "Print version information and exit.").forHelp();
        optionParser.accepts("zookeeper", "Zookeeper option is deprecated by bootstrap.servers, as the reset tool would no longer access Zookeeper directly.");
        try {
            this.options = optionParser.parse(args);
            if (args.length == 0 || this.options.has(helpOption)) {
                CommandLineUtils.printUsageAndDie(optionParser, usage);
            }
            if (this.options.has(versionOption)) {
                CommandLineUtils.printVersionAndDie();
            }
        }
        catch (OptionException e) {
            CommandLineUtils.printUsageAndDie(optionParser, e.getMessage());
        }
        if (this.options.has((OptionSpec)executeOption) && this.options.has((OptionSpec)dryRunOption)) {
            CommandLineUtils.printUsageAndDie(optionParser, "Only one of --dry-run and --execute can be specified");
        }
        HashSet allScenarioOptions = new HashSet();
        allScenarioOptions.add(toOffsetOption);
        allScenarioOptions.add(toDatetimeOption);
        allScenarioOptions.add(byDurationOption);
        allScenarioOptions.add((OptionSpec<?>)toEarliestOption);
        allScenarioOptions.add((OptionSpec<?>)toLatestOption);
        allScenarioOptions.add(fromFileOption);
        allScenarioOptions.add(shiftByOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, toOffsetOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, toDatetimeOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, byDurationOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, (OptionSpec)toEarliestOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, (OptionSpec)toLatestOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, fromFileOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, shiftByOption);
    }

    private <T> void checkInvalidArgs(OptionParser optionParser, OptionSet options, java.util.Set<OptionSpec<?>> allOptions, OptionSpec<T> option) {
        HashSet invalidOptions = new HashSet(allOptions);
        invalidOptions.remove(option);
        CommandLineUtils.checkInvalidArgs(optionParser, options, option, (Set)JavaConverters.asScalaSetConverter(invalidOptions).asScala());
    }

    private SplitTopicListResult splitTopicListOnSubcribeAndNotFoundedLists(Map<String, java.util.Set<String>> groupedTopicsByStreamName, Map<String, java.util.Set<String>> allGroupedTopicsByStreamName) {
        ArrayList<String> topicsToSubscribe = new ArrayList<String>();
        ArrayList<String> notFoundInputTopics = new ArrayList<String>();
        for (Map.Entry<String, java.util.Set<String>> entry : groupedTopicsByStreamName.entrySet()) {
            String streamName = entry.getKey();
            java.util.Set<String> inputTopicsForStream = entry.getValue();
            java.util.Set<String> existingTopicsInStream = allGroupedTopicsByStreamName.get(streamName);
            for (String inputTopic : inputTopicsForStream) {
                String fullTopicName = MapRTopicUtils.buildFullTopicName((String)streamName, (String)inputTopic);
                if (existingTopicsInStream == null || !existingTopicsInStream.contains(inputTopic)) {
                    notFoundInputTopics.add(fullTopicName);
                    continue;
                }
                topicsToSubscribe.add(fullTopicName);
            }
        }
        return new SplitTopicListResult(topicsToSubscribe, notFoundInputTopics);
    }

    private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(Map consumerConfig, org.apache.kafka.clients.admin.Admin adminClient, boolean dryRun, String streamForCliSideAssignment) throws IOException, ParseException {
        String defaultStream = this.options.has(defaultStreamOption) ? (String)this.options.valueOf(defaultStreamOption) : "";
        List inputTopics = MapRTopicUtils.decorateTopicsWithDefaultStreamIfNeeded((List)this.options.valuesOf(inputTopicsOption), (String)defaultStream);
        List intermediateTopics = MapRTopicUtils.decorateTopicsWithDefaultStreamIfNeeded((List)this.options.valuesOf(intermediateTopicsOption), (String)defaultStream);
        int topicNotFound = 0;
        Map groupedInputTopics = MapRTopicUtils.groupTopicsByStreamName((List)inputTopics);
        Map groupedIntermediateTopics = MapRTopicUtils.groupTopicsByStreamName((List)intermediateTopics);
        java.util.Set allStreamNames = groupedInputTopics.keySet();
        allStreamNames.addAll(groupedIntermediateTopics.keySet());
        Map allTopicsGroupedByStreamName = MapRTopicUtils.allTopicsForStreamSet(allStreamNames);
        ArrayList notFoundInputTopics = new ArrayList();
        ArrayList notFoundIntermediateTopics = new ArrayList();
        String groupId = (String)this.options.valueOf(applicationIdOption);
        if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
            System.out.println("No input or intermediate topics specified. Skipping seek.");
            return 0;
        }
        if (inputTopics.size() != 0) {
            System.out.println("Reset-offsets for input topics " + inputTopics);
        }
        if (intermediateTopics.size() != 0) {
            System.out.println("Seeking for intermediate topics " + intermediateTopics);
        }
        HashSet topicsToSubscribe = new HashSet(inputTopics.size() + intermediateTopics.size());
        SplitTopicListResult inputTopicsSplitResult = this.splitTopicListOnSubcribeAndNotFoundedLists(groupedInputTopics, allTopicsGroupedByStreamName);
        topicsToSubscribe.addAll(inputTopicsSplitResult.topicsToSubscribe);
        notFoundInputTopics.addAll(inputTopicsSplitResult.notFoundedTopics);
        SplitTopicListResult intermediateTopicsSplitResult = this.splitTopicListOnSubcribeAndNotFoundedLists(groupedIntermediateTopics, allTopicsGroupedByStreamName);
        topicsToSubscribe.addAll(intermediateTopicsSplitResult.topicsToSubscribe);
        notFoundInputTopics.addAll(intermediateTopicsSplitResult.notFoundedTopics);
        if (!notFoundInputTopics.isEmpty()) {
            System.out.println("Following input topics are not found, skipping them");
            for (String topic : notFoundInputTopics) {
                System.out.println("Topic: " + topic);
            }
            topicNotFound = 1;
        }
        if (!notFoundIntermediateTopics.isEmpty()) {
            System.out.println("Following intermediate topics are not found, skipping them");
            for (String topic : notFoundIntermediateTopics) {
                System.out.println("Topic:" + topic);
            }
            topicNotFound = 1;
        }
        if (topicsToSubscribe.isEmpty()) {
            return topicNotFound;
        }
        int expectedPartitionsCount = 0;
        try (Admin admin = Streams.newAdmin((Configuration)new Configuration());){
            for (Map.Entry streamWithTopics : MapRTopicUtils.groupTopicsByStreamName((List)inputTopics).entrySet()) {
                this.maybeDeleteActiveConsumers((String)streamWithTopics.getKey(), groupId, (Iterable)streamWithTopics.getValue());
                for (String shortTopic : (java.util.Set)streamWithTopics.getValue()) {
                    expectedPartitionsCount += admin.getTopicDescriptor((String)streamWithTopics.getKey(), shortTopic).getPartitions();
                }
            }
        }
        Properties config = new Properties();
        config.putAll((Map<?, ?>)consumerConfig);
        config.setProperty("group.id", groupId);
        config.setProperty("enable.auto.commit", "false");
        config.setProperty("streams.clientside.partition.assignment", "true");
        config.setProperty("streams.clientside.partition.assignment.internal.stream", streamForCliSideAssignment);
        config.setProperty("partition.assignment.strategy", RoundRobinAssignor.class.getName());
        try (KafkaConsumer client = new KafkaConsumer(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());){
            Collection partitions = topicsToSubscribe.stream().map(arg_0 -> ((KafkaConsumer)client).partitionsFor(arg_0)).flatMap(Collection::stream).map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList());
            client.assign(partitions);
            HashSet<TopicPartition> inputTopicPartitions = new HashSet<TopicPartition>();
            HashSet<TopicPartition> intermediateTopicPartitions = new HashSet<TopicPartition>();
            for (TopicPartition p : partitions) {
                String topic = p.topic();
                if (this.isInputTopic(topic)) {
                    inputTopicPartitions.add(p);
                    continue;
                }
                if (this.isIntermediateTopic(topic)) {
                    intermediateTopicPartitions.add(p);
                    continue;
                }
                System.err.println("Skipping invalid partition: " + p);
            }
            this.maybeReset(groupId, (Consumer<byte[], byte[]>)client, inputTopicPartitions);
            this.maybeSeekToEnd(groupId, (Consumer<byte[], byte[]>)client, intermediateTopicPartitions);
            if (!dryRun) {
                for (TopicPartition p : partitions) {
                    client.position(p);
                }
                client.commitSync();
            }
        }
        catch (IOException | ParseException e) {
            System.err.println("ERROR: Resetting offsets failed.");
            throw e;
        }
        System.out.println("Done.");
        return topicNotFound;
    }

    public void maybeSeekToEnd(String groupId, Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> intermediateTopicPartitions) {
        if (intermediateTopicPartitions.size() > 0) {
            System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")");
            for (TopicPartition topicPartition : intermediateTopicPartitions) {
                System.out.println("Topic: " + topicPartition.topic());
            }
            client.seekToEnd(intermediateTopicPartitions);
        }
    }

    private void maybeReset(String groupId, Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions) throws IOException, ParseException {
        if (inputTopicPartitions.size() > 0) {
            System.out.println("Following input topics offsets will be reset to (for consumer group " + groupId + ")");
            if (this.options.has(toOffsetOption)) {
                this.resetOffsetsTo(client, inputTopicPartitions, (Long)this.options.valueOf(toOffsetOption));
            } else if (this.options.has((OptionSpec)toEarliestOption)) {
                client.seekToBeginning(inputTopicPartitions);
            } else if (this.options.has((OptionSpec)toLatestOption)) {
                client.seekToEnd(inputTopicPartitions);
            } else if (this.options.has(shiftByOption)) {
                this.shiftOffsetsBy(client, inputTopicPartitions, (Long)this.options.valueOf(shiftByOption));
            } else if (this.options.has(toDatetimeOption)) {
                String ts = (String)this.options.valueOf(toDatetimeOption);
                long timestamp = this.getDateTime(ts);
                this.resetToDatetime(client, inputTopicPartitions, timestamp);
            } else if (this.options.has(byDurationOption)) {
                String duration = (String)this.options.valueOf(byDurationOption);
                this.resetByDuration(client, inputTopicPartitions, Duration.parse(duration));
            } else if (this.options.has(fromFileOption)) {
                String resetPlanPath = (String)this.options.valueOf(fromFileOption);
                Map<TopicPartition, Long> topicPartitionsAndOffset = this.getTopicPartitionOffsetFromResetPlan(resetPlanPath);
                this.resetOffsetsFromResetPlan(client, inputTopicPartitions, topicPartitionsAndOffset);
            } else {
                client.seekToBeginning(inputTopicPartitions);
            }
            for (TopicPartition p : inputTopicPartitions) {
                System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));
            }
        }
    }

    public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
        Map endOffsets = client.endOffsets(inputTopicPartitions);
        Map beginningOffsets = client.beginningOffsets(inputTopicPartitions);
        Map<TopicPartition, Long> validatedTopicPartitionsAndOffset = this.checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition).longValue());
        }
    }

    private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(String resetPlanPath) throws IOException, ParseException {
        String resetPlanCsv = Utils.readFileAsString((String)resetPlanPath);
        return this.parseResetPlan(resetPlanCsv);
    }

    private void resetByDuration(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, Duration duration) {
        this.resetToDatetime(client, inputTopicPartitions, Instant.now().minus(duration).toEpochMilli());
    }

    public void resetToDatetime(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, Long timestamp) {
        HashMap<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            topicPartitionsAndTimes.put(topicPartition, timestamp);
        }
        Map topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            Optional<Long> partitionOffset = Optional.ofNullable((OffsetAndTimestamp)topicPartitionsAndOffset.get(topicPartition)).map(OffsetAndTimestamp::offset).filter(offset -> offset != -1L);
            if (partitionOffset.isPresent()) {
                client.seek(topicPartition, partitionOffset.get().longValue());
                continue;
            }
            client.seekToEnd(Collections.singletonList(topicPartition));
            System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() + " is empty, without a committed record. Falling back to latest known offset.");
        }
    }

    public void shiftOffsetsBy(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, long shiftBy) {
        Map endOffsets = client.endOffsets(inputTopicPartitions);
        Map beginningOffsets = client.beginningOffsets(inputTopicPartitions);
        HashMap<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            long position = client.position(topicPartition);
            long offset = position + shiftBy;
            topicPartitionsAndOffset.put(topicPartition, offset);
        }
        Map<TopicPartition, Long> validatedTopicPartitionsAndOffset = this.checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition).longValue());
        }
    }

    public void resetOffsetsTo(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, Long offset) {
        Map endOffsets = client.endOffsets(inputTopicPartitions);
        Map beginningOffsets = client.beginningOffsets(inputTopicPartitions);
        HashMap<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            topicPartitionsAndOffset.put(topicPartition, offset);
        }
        Map<TopicPartition, Long> validatedTopicPartitionsAndOffset = this.checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition).longValue());
        }
    }

    public long getDateTime(String timestamp) throws ParseException {
        String[] timestampParts = timestamp.split("T");
        if (timestampParts.length < 2) {
            throw new ParseException("Error parsing timestamp. It does not contain a 'T' according to ISO8601 format", timestamp.length());
        }
        String secondPart = timestampParts[1];
        if (secondPart == null || secondPart.isEmpty()) {
            throw new ParseException("Error parsing timestamp. Time part after 'T' is null or empty", timestamp.length());
        }
        if (!(secondPart.contains("+") || secondPart.contains("-") || secondPart.contains("Z"))) {
            timestamp = timestamp + "Z";
        }
        try {
            Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(timestamp);
            return date.getTime();
        }
        catch (ParseException e) {
            Date date = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(timestamp);
            return date.getTime();
        }
    }

    private Map<TopicPartition, Long> parseResetPlan(String resetPlanCsv) throws ParseException {
        String[] resetPlanCsvParts;
        HashMap<TopicPartition, Long> topicPartitionAndOffset = new HashMap<TopicPartition, Long>();
        if (resetPlanCsv == null || resetPlanCsv.isEmpty()) {
            throw new ParseException("Error parsing reset plan CSV file. It is empty,", 0);
        }
        for (String line : resetPlanCsvParts = resetPlanCsv.split("\n")) {
            String[] lineParts = line.split(",");
            if (lineParts.length != 3) {
                throw new ParseException("Reset plan CSV file is not following the format `TOPIC,PARTITION,OFFSET`.", 0);
            }
            String topic = lineParts[0];
            int partition = Integer.parseInt(lineParts[1]);
            long offset = Long.parseLong(lineParts[2]);
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            topicPartitionAndOffset.put(topicPartition, offset);
        }
        return topicPartitionAndOffset;
    }

    private Map<TopicPartition, Long> checkOffsetRange(Map<TopicPartition, Long> inputTopicPartitionsAndOffset, Map<TopicPartition, Long> beginningOffsets, Map<TopicPartition, Long> endOffsets) {
        HashMap<TopicPartition, Long> validatedTopicPartitionsOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, Long> topicPartitionAndOffset : inputTopicPartitionsAndOffset.entrySet()) {
            long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
            long offset = topicPartitionAndOffset.getValue();
            if (offset < endOffset) {
                long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
                if (offset > beginningOffset) {
                    validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), offset);
                    continue;
                }
                System.out.println("New offset (" + offset + ") is lower than earliest offset. Value will be set to " + beginningOffset);
                validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), beginningOffset);
                continue;
            }
            System.out.println("New offset (" + offset + ") is higher than latest offset. Value will be set to " + endOffset);
            validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), endOffset);
        }
        return validatedTopicPartitionsOffsets;
    }

    private boolean isInputTopic(String topic) {
        return MapRTopicUtils.decorateTopicsWithDefaultStreamIfNeeded((List)this.options.valuesOf(inputTopicsOption), (String)((String)this.options.valueOf(defaultStreamOption))).contains(topic);
    }

    private boolean isIntermediateTopic(String topic) {
        return MapRTopicUtils.decorateTopicsWithDefaultStreamIfNeeded((List)this.options.valuesOf(intermediateTopicsOption), (String)((String)this.options.valueOf(defaultStreamOption))).contains(topic);
    }

    private void deleteAppDir(org.apache.kafka.clients.admin.Admin adminClient, boolean dryRun, String internalStream, String internalStreamCompacted, String appDir) {
        System.out.println("Deleting KStreams Application dir and internal streams for application: " + (String)this.options.valueOf(applicationIdOption));
        if (!dryRun) {
            this.doDeleteForStreamsAndAppDir(internalStream, internalStreamCompacted, appDir);
        } else {
            System.out.println("MapR-ES Stream: " + internalStream);
            System.out.println("MapR-ES Stream: " + internalStreamCompacted);
            System.out.println("MapR-FS Directory: " + appDir);
        }
        System.out.println("Done.");
    }

    public void doDeleteForStreamsAndAppDir(String internalStream, String internalStreamCompacted, String appDir) {
        Configuration conf = new Configuration();
        try (FileSystem fs = FileSystem.get((Configuration)conf);
             Admin admin = Streams.newAdmin((Configuration)conf);){
            Path p;
            if (admin.streamExists(internalStream)) {
                admin.deleteStream(internalStream);
            }
            if (admin.streamExists(internalStreamCompacted)) {
                admin.deleteStream(internalStreamCompacted);
            }
            if (fs.exists(p = new Path(appDir))) {
                fs.delete(p, true);
            }
        }
        catch (IOException e) {
            throw new KafkaException((Throwable)e);
        }
    }

    private void waitUntilClientObtainsAllPartitions(Consumer<byte[], byte[]> client, int expectedPartitionsCount) {
        java.util.Set partitions;
        int maxRetriesCount = 10;
        long delay = 1000L;
        int retriesCount = 0;
        boolean allPartitionsObtained = false;
        do {
            client.poll(1000L);
        } while (!(allPartitionsObtained = (partitions = client.assignment()).size() == expectedPartitionsCount) && ++retriesCount < 10);
        if (!allPartitionsObtained) {
            throw new KafkaException("Couldn't obtain all partitions after 10 attempts.");
        }
    }

    public boolean matchesInternalTopicFormat(String topicName) {
        return topicName.endsWith("-changelog") || topicName.endsWith("-repartition") || topicName.endsWith("-subscription-registration-topic") || topicName.endsWith("-subscription-response-topic") || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic") || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
    }

    public static void main(String[] args) {
        Exit.exit((int)new StreamsResetter().run(args));
    }

    static {
        usage = "This tool helps to quickly reset an application in order to reprocess its data from scratch.\n* This tool resets offsets of input topics to the earliest available offset and it skips to the end of intermediate topics (topics that are input and output topics, e.g., used by deprecated through() method).\n* This tool deletes the internal topics that were created by Kafka Streams (topics starting with \"<application.id>-\").\nYou do not need to specify internal topics because the tool finds them automatically.\n* This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command).\n* This tool will not clean up the local state on the stream application instances (the persisted stores used to cache aggregation results).\nYou need to call KafkaStreams#cleanUp() in your application or manually delete them from the directory specified by \"state.dir\" configuration (/tmp/kafka-streams/<application.id> by default).\n* When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the \"--force\" option could remove those left-over members immediately. Make sure to stop all stream applications when this option is specified to avoid unexpected disruptions.\n\n*** Important! You will get wrong output if you don't clean up the local stores after running the reset tool!\n\n";
    }

    private class SplitTopicListResult {
        private final List<String> topicsToSubscribe;
        private final List<String> notFoundedTopics;

        public SplitTopicListResult(List<String> topicsToSubscribe, List<String> notFoundedTopics) {
            this.topicsToSubscribe = topicsToSubscribe;
            this.notFoundedTopics = notFoundedTopics;
        }

        public List<String> getTopicsToSubscribe() {
            return this.topicsToSubscribe;
        }

        public List<String> getNotFoundedTopics() {
            return this.notFoundedTopics;
        }
    }
}

