/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.tests;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.tests.SmokeTestUtil;

public class SmokeTestDriver
extends SmokeTestUtil {
    private static final String[] NUMERIC_VALUE_TOPICS = new String[]{"data", "echo", "max", "min", "min-suppressed", "min-raw", "dif", "sum", "sws-raw", "sws-suppressed", "cnt", "avg", "tagg"};
    private static final String[] STRING_VALUE_TOPICS = new String[]{"fk"};
    private static final String[] TOPICS = new String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
    private static final int MAX_RECORD_EMPTY_RETRIES = 30;

    public static String[] topics() {
        return Arrays.copyOf(TOPICS, TOPICS.length);
    }

    static void generatePerpetually(String kafka, int numKeys, int maxRecordsPerKey) {
        Properties producerProps = SmokeTestDriver.generatorProperties(kafka);
        int numRecordsProduced = 0;
        ValueList[] data = new ValueList[numKeys];
        for (int i = 0; i < numKeys; ++i) {
            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
        }
        Random rand = new Random();
        KafkaProducer producer = new KafkaProducer(producerProps);
        try {
            while (true) {
                int index = rand.nextInt(numKeys);
                String key = data[index].key;
                int value = data[index].next();
                ProducerRecord record = new ProducerRecord("data", (Object)stringSerde.serializer().serialize("", (Object)key), (Object)intSerde.serializer().serialize("", (Object)value));
                producer.send(record);
                ProducerRecord fkRecord = new ProducerRecord("fk", (Object)intSerde.serializer().serialize("", (Object)value), (Object)stringSerde.serializer().serialize("", (Object)key));
                producer.send(fkRecord);
                if (++numRecordsProduced % 100 == 0) {
                    System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
                }
                Utils.sleep((long)2L);
            }
        }
        catch (Throwable throwable) {
            try {
                producer.close();
            }
            catch (Throwable throwable2) {
                throwable.addSuppressed(throwable2);
            }
            throw throwable;
        }
    }

    public static Map<String, Set<Integer>> generate(String kafka, int numKeys, int maxRecordsPerKey, Duration timeToSpend) {
        Properties producerProps = SmokeTestDriver.generatorProperties(kafka);
        int numRecordsProduced = 0;
        HashMap allData = new HashMap();
        ValueList[] data = new ValueList[numKeys];
        for (int i = 0; i < numKeys; ++i) {
            data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
            allData.put(data[i].key, new HashSet());
        }
        Random rand = new Random();
        int remaining = data.length;
        long recordPauseTime = timeToSpend.toMillis() / (long)numKeys / (long)maxRecordsPerKey;
        ArrayList<ProducerRecord<byte[], byte[]>> dataNeedRetry = new ArrayList<ProducerRecord<byte[], byte[]>>();
        ArrayList<ProducerRecord<byte[], byte[]>> fkNeedRetry = new ArrayList<ProducerRecord<byte[], byte[]>>();
        try (KafkaProducer producer = new KafkaProducer(producerProps);){
            while (remaining > 0) {
                int index = rand.nextInt(remaining);
                String key = data[index].key;
                int value = data[index].next();
                if (value < 0) {
                    data[index] = data[--remaining];
                    continue;
                }
                ProducerRecord record = new ProducerRecord("data", (Object)stringSerde.serializer().serialize("", (Object)key), (Object)intSerde.serializer().serialize("", (Object)value));
                producer.send(record, (Callback)new TestCallback((ProducerRecord<byte[], byte[]>)record, dataNeedRetry));
                ProducerRecord fkRecord = new ProducerRecord("fk", (Object)intSerde.serializer().serialize("", (Object)value), (Object)stringSerde.serializer().serialize("", (Object)key));
                producer.send(fkRecord, (Callback)new TestCallback((ProducerRecord<byte[], byte[]>)fkRecord, fkNeedRetry));
                ((Set)allData.get(key)).add(value);
                if (++numRecordsProduced % 100 == 0) {
                    System.out.println(Instant.now() + " " + numRecordsProduced + " records produced");
                }
                Utils.sleep((long)Math.max(recordPauseTime, 2L));
            }
            producer.flush();
            SmokeTestDriver.retry((KafkaProducer<byte[], byte[]>)producer, dataNeedRetry, stringSerde);
            SmokeTestDriver.retry((KafkaProducer<byte[], byte[]>)producer, fkNeedRetry, intSerde);
            SmokeTestDriver.flush((KafkaProducer<byte[], byte[]>)producer, "data", stringSerde.serializer().serialize("", (Object)"flush"), intSerde.serializer().serialize("", (Object)0));
            SmokeTestDriver.flush((KafkaProducer<byte[], byte[]>)producer, "fk", intSerde.serializer().serialize("", (Object)0), stringSerde.serializer().serialize("", (Object)"flush"));
        }
        return Collections.unmodifiableMap(allData);
    }

    private static void retry(KafkaProducer<byte[], byte[]> producer, List<ProducerRecord<byte[], byte[]>> needRetry, Serde<?> keySerde) {
        int remainingRetries = 5;
        while (!needRetry.isEmpty()) {
            ArrayList<ProducerRecord<byte[], byte[]>> needRetry2 = new ArrayList<ProducerRecord<byte[], byte[]>>();
            for (ProducerRecord<byte[], byte[]> record : needRetry) {
                System.out.println("retry producing " + keySerde.deserializer().deserialize("", (byte[])record.key()));
                producer.send(record, (Callback)new TestCallback(record, needRetry2));
            }
            producer.flush();
            needRetry = needRetry2;
            if (--remainingRetries != 0 || needRetry.isEmpty()) continue;
            System.err.println("Failed to produce all records after multiple retries");
            Exit.exit((int)1);
        }
    }

    private static void flush(KafkaProducer<byte[], byte[]> producer, String topic, byte[] keyBytes, byte[] valBytes) {
        List partitions = producer.partitionsFor(topic);
        for (PartitionInfo partition : partitions) {
            producer.send(new ProducerRecord(partition.topic(), Integer.valueOf(partition.partition()), Long.valueOf(System.currentTimeMillis() + Duration.ofDays(2L).toMillis()), (Object)keyBytes, (Object)valBytes));
        }
    }

    private static Properties generatorProperties(String kafka) {
        Properties producerProps = new Properties();
        producerProps.put("client.id", "SmokeTest");
        producerProps.put("bootstrap.servers", kafka);
        producerProps.put("key.serializer", ByteArraySerializer.class);
        producerProps.put("value.serializer", ByteArraySerializer.class);
        producerProps.put("acks", "all");
        return producerProps;
    }

    private static void shuffle(int[] data, int windowSize) {
        Random rand = new Random();
        for (int i = 0; i < data.length; ++i) {
            int j = rand.nextInt(Math.min(data.length - i, windowSize)) + i;
            int tmp = data[i];
            data[i] = data[j];
            data[j] = tmp;
        }
    }

    public static VerificationResult verify(String kafka, Map<String, Set<Integer>> inputs, int maxRecordsPerKey) {
        Properties props = new Properties();
        props.put("client.id", "verifier");
        props.put("bootstrap.servers", kafka);
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", NumberDeserializer.class);
        props.put("isolation.level", "read_committed");
        KafkaConsumer consumer = new KafkaConsumer(props);
        List<TopicPartition> partitions = SmokeTestDriver.getAllPartitions(consumer, NUMERIC_VALUE_TOPICS);
        consumer.assign(partitions);
        consumer.seekToBeginning(partitions);
        int recordsGenerated = inputs.size() * maxRecordsPerKey;
        int recordsProcessed = 0;
        Map<String, AtomicInteger> processed = Stream.of(NUMERIC_VALUE_TOPICS).collect(Collectors.toMap(t -> t, t -> new AtomicInteger(0)));
        HashMap<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events = new HashMap<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>>();
        VerificationResult verificationResult = new VerificationResult(false, "no results yet");
        int retry = 0;
        long start = System.currentTimeMillis();
        while (System.currentTimeMillis() - start < TimeUnit.MINUTES.toMillis(6L)) {
            ConsumerRecords records = consumer.poll(Duration.ofSeconds(5L));
            if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
                verificationResult = SmokeTestDriver.verifyAll(inputs, events, false);
                if (verificationResult.passed()) break;
                if (retry++ > 30) {
                    System.out.println(Instant.now() + " Didn't get any more results, verification hasn't passed, and out of retries.");
                    break;
                }
                System.out.println(Instant.now() + " Didn't get any more results, but verification hasn't passed (yet). Retrying..." + retry);
                continue;
            }
            System.out.println(Instant.now() + " Get some more results from " + records.partitions() + ", resetting retry.");
            retry = 0;
            for (ConsumerRecord record : records) {
                String key = (String)record.key();
                String topic = record.topic();
                processed.get(topic).incrementAndGet();
                if (topic.equals("echo") && ++recordsProcessed % 100 == 0) {
                    System.out.println("Echo records processed = " + recordsProcessed);
                }
                events.computeIfAbsent(topic, t -> new HashMap()).computeIfAbsent(key, k -> new LinkedList()).add(record);
            }
            System.out.println(processed);
        }
        consumer.close();
        long finished = System.currentTimeMillis() - start;
        System.out.println("Verification time=" + finished);
        System.out.println("-------------------");
        System.out.println("Result Verification");
        System.out.println("-------------------");
        System.out.println("recordGenerated=" + recordsGenerated);
        System.out.println("recordProcessed=" + recordsProcessed);
        if (recordsProcessed > recordsGenerated) {
            System.out.println("PROCESSED-MORE-THAN-GENERATED");
        } else if (recordsProcessed < recordsGenerated) {
            System.out.println("PROCESSED-LESS-THAN-GENERATED");
        }
        Map<String, Set> received = ((Map)events.get("echo")).entrySet().stream().map(entry -> Utils.mkEntry((Object)((String)entry.getKey()), ((LinkedList)entry.getValue()).stream().map(ConsumerRecord::value).collect(Collectors.toSet()))).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
        boolean success = inputs.equals(received);
        if (success) {
            System.out.println("ALL-RECORDS-DELIVERED");
        } else {
            int missedCount = 0;
            for (Map.Entry<String, Set<Integer>> entry2 : inputs.entrySet()) {
                missedCount += received.get(entry2.getKey()).size();
            }
            System.out.println("missedRecords=" + missedCount);
        }
        if (!verificationResult.passed()) {
            verificationResult = SmokeTestDriver.verifyAll(inputs, events, true);
        }
        System.out.println(verificationResult.result());
        System.out.println((success &= verificationResult.passed()) ? "SUCCESS" : "FAILURE");
        return verificationResult;
    }

    private static VerificationResult verifyAll(Map<String, Set<Integer>> inputs, Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, boolean printResults) {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        try (PrintStream resultStream = new PrintStream(byteArrayOutputStream);){
            boolean pass = SmokeTestDriver.verifyTAgg(resultStream, inputs, events.get("tagg"), printResults);
            pass &= SmokeTestDriver.verifySuppressed(resultStream, "min-suppressed", events, printResults);
            pass &= SmokeTestDriver.verify(resultStream, "min-suppressed", inputs, events, windowedKey -> {
                String unwindowedKey = windowedKey.substring(1, windowedKey.length() - 1).replaceAll("@.*", "");
                return SmokeTestDriver.getMin(unwindowedKey);
            }, printResults);
            pass &= SmokeTestDriver.verifySuppressed(resultStream, "sws-suppressed", events, printResults);
            pass &= SmokeTestDriver.verify(resultStream, "min", inputs, events, SmokeTestDriver::getMin, printResults);
            pass &= SmokeTestDriver.verify(resultStream, "max", inputs, events, SmokeTestDriver::getMax, printResults);
            pass &= SmokeTestDriver.verify(resultStream, "dif", inputs, events, key -> SmokeTestDriver.getMax(key).intValue() - SmokeTestDriver.getMin(key).intValue(), printResults);
            pass &= SmokeTestDriver.verify(resultStream, "sum", inputs, events, SmokeTestDriver::getSum, printResults);
            pass &= SmokeTestDriver.verify(resultStream, "cnt", inputs, events, key1 -> (long)(SmokeTestDriver.getMax(key1).intValue() - SmokeTestDriver.getMin(key1).intValue()) + 1L, printResults);
        }
        return new VerificationResult(pass &= SmokeTestDriver.verify(resultStream, "avg", inputs, events, SmokeTestDriver::getAvg, printResults), new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
    }

    private static boolean verify(PrintStream resultStream, String topic, Map<String, Set<Integer>> inputData, Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, Function<String, Number> keyToExpectation, boolean printResults) {
        Map<String, LinkedList<ConsumerRecord<String, Number>>> observedInputEvents = events.get("data");
        Map outputEvents = events.getOrDefault(topic, Collections.emptyMap());
        if (outputEvents.isEmpty()) {
            resultStream.println(topic + " is empty");
            return false;
        }
        resultStream.printf("verifying %s with %d keys%n", topic, outputEvents.size());
        if (outputEvents.size() != inputData.size()) {
            resultStream.printf("fail: resultCount=%d expectedCount=%s%n\tresult=%s%n\texpected=%s%n", outputEvents.size(), inputData.size(), outputEvents.keySet(), inputData.keySet());
            return false;
        }
        for (Map.Entry entry : outputEvents.entrySet()) {
            Number actual;
            String key = (String)entry.getKey();
            Number expected = keyToExpectation.apply(key);
            if (expected.equals(actual = (Number)((ConsumerRecord)((LinkedList)entry.getValue()).getLast()).value())) continue;
            resultStream.printf("%s fail: key=%s actual=%s expected=%s%n", topic, key, actual, expected);
            if (printResults) {
                resultStream.printf("\t inputEvents=%n%s%n\techoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n", SmokeTestDriver.indent("\t\t", (Iterable<ConsumerRecord<String, Number>>)observedInputEvents.get(key)), SmokeTestDriver.indent("\t\t", events.getOrDefault("echo", Collections.emptyMap()).getOrDefault(key, new LinkedList())), SmokeTestDriver.indent("\t\t", events.getOrDefault("max", Collections.emptyMap()).getOrDefault(key, new LinkedList())), SmokeTestDriver.indent("\t\t", events.getOrDefault("min", Collections.emptyMap()).getOrDefault(key, new LinkedList())), SmokeTestDriver.indent("\t\t", events.getOrDefault("dif", Collections.emptyMap()).getOrDefault(key, new LinkedList())), SmokeTestDriver.indent("\t\t", events.getOrDefault("cnt", Collections.emptyMap()).getOrDefault(key, new LinkedList())), SmokeTestDriver.indent("\t\t", events.getOrDefault("tagg", Collections.emptyMap()).getOrDefault(key, new LinkedList())));
                if (!Utils.mkSet((Object[])new String[]{"echo", "max", "min", "dif", "cnt", "tagg"}).contains(topic)) {
                    resultStream.printf("%sEvents=%n%s%n", topic, SmokeTestDriver.indent("\t\t", (Iterable)entry.getValue()));
                }
            }
            return false;
        }
        return true;
    }

    private static boolean verifySuppressed(PrintStream resultStream, String topic, Map<String, Map<String, LinkedList<ConsumerRecord<String, Number>>>> events, boolean printResults) {
        resultStream.println("verifying suppressed " + topic);
        Map topicEvents = events.getOrDefault(topic, Collections.emptyMap());
        for (Map.Entry entry : topicEvents.entrySet()) {
            if (((LinkedList)entry.getValue()).size() == 1) continue;
            String unsuppressedTopic = topic.replace("-suppressed", "-raw");
            String key = (String)entry.getKey();
            String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", "");
            resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n", key, SmokeTestDriver.indent("\t\t", (Iterable)entry.getValue()));
            if (printResults) {
                resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n", SmokeTestDriver.indent("\t\t", (Iterable<ConsumerRecord<String, Number>>)events.get(unsuppressedTopic).get(key)), SmokeTestDriver.indent("\t\t", (Iterable<ConsumerRecord<String, Number>>)events.get("data").get(unwindowedKey)));
            }
            return false;
        }
        return true;
    }

    private static String indent(String prefix, Iterable<ConsumerRecord<String, Number>> list) {
        StringBuilder stringBuilder = new StringBuilder();
        for (ConsumerRecord<String, Number> record : list) {
            stringBuilder.append(prefix).append(record).append('\n');
        }
        return stringBuilder.toString();
    }

    private static Long getSum(String key) {
        int min = SmokeTestDriver.getMin(key).intValue();
        int max = SmokeTestDriver.getMax(key).intValue();
        return ((long)min + (long)max) * ((long)(max - min) + 1L) / 2L;
    }

    private static Double getAvg(String key) {
        int min = SmokeTestDriver.getMin(key).intValue();
        int max = SmokeTestDriver.getMax(key).intValue();
        return (double)((long)min + (long)max) / 2.0;
    }

    private static boolean verifyTAgg(PrintStream resultStream, Map<String, Set<Integer>> allData, Map<String, LinkedList<ConsumerRecord<String, Number>>> taggEvents, boolean printResults) {
        if (taggEvents == null) {
            resultStream.println("tagg is missing");
            return false;
        }
        if (taggEvents.isEmpty()) {
            resultStream.println("tagg is empty");
            return false;
        }
        resultStream.println("verifying tagg");
        HashMap<String, Long> expected = new HashMap<String, Long>();
        for (String string : allData.keySet()) {
            int min = SmokeTestDriver.getMin(string).intValue();
            int max = SmokeTestDriver.getMax(string).intValue();
            String cnt = Long.toString((long)(max - min) + 1L);
            expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1L);
        }
        for (Map.Entry entry : taggEvents.entrySet()) {
            String key = (String)entry.getKey();
            Long expectedCount = (Long)expected.remove(key);
            if (expectedCount == null) {
                expectedCount = 0L;
            }
            if (((Number)((ConsumerRecord)((LinkedList)entry.getValue()).getLast()).value()).longValue() == expectedCount.longValue()) continue;
            resultStream.println("fail: key=" + key + " tagg=" + entry.getValue() + " expected=" + expectedCount);
            if (printResults) {
                resultStream.println("\t taggEvents: " + entry.getValue());
            }
            return false;
        }
        return true;
    }

    private static Number getMin(String key) {
        return Integer.parseInt(key.split("-")[0]);
    }

    private static Number getMax(String key) {
        return Integer.parseInt(key.split("-")[1]);
    }

    private static List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String ... topics) {
        ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
        for (String topic : topics) {
            for (PartitionInfo info : consumer.partitionsFor(topic)) {
                partitions.add(new TopicPartition(info.topic(), info.partition()));
            }
        }
        return partitions;
    }

    static {
        System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0, NUMERIC_VALUE_TOPICS.length);
        System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS, NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
    }

    public static class VerificationResult {
        private final boolean passed;
        private final String result;

        VerificationResult(boolean passed, String result) {
            this.passed = passed;
            this.result = result;
        }

        public boolean passed() {
            return this.passed;
        }

        public String result() {
            return this.result;
        }
    }

    public static class NumberDeserializer
    implements Deserializer<Number> {
        public Number deserialize(String topic, byte[] data) {
            Number value;
            switch (topic) {
                case "data": 
                case "echo": 
                case "min": 
                case "min-raw": 
                case "min-suppressed": 
                case "sws-raw": 
                case "sws-suppressed": 
                case "max": 
                case "dif": {
                    value = (Number)SmokeTestUtil.intSerde.deserializer().deserialize(topic, data);
                    break;
                }
                case "sum": 
                case "cnt": 
                case "tagg": {
                    value = (Number)SmokeTestUtil.longSerde.deserializer().deserialize(topic, data);
                    break;
                }
                case "avg": {
                    value = (Number)SmokeTestUtil.doubleSerde.deserializer().deserialize(topic, data);
                    break;
                }
                default: {
                    throw new RuntimeException("unknown topic: " + topic);
                }
            }
            return value;
        }
    }

    private static class TestCallback
    implements Callback {
        private final ProducerRecord<byte[], byte[]> originalRecord;
        private final List<ProducerRecord<byte[], byte[]>> needRetry;

        TestCallback(ProducerRecord<byte[], byte[]> originalRecord, List<ProducerRecord<byte[], byte[]>> needRetry) {
            this.originalRecord = originalRecord;
            this.needRetry = needRetry;
        }

        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                if (exception instanceof TimeoutException) {
                    this.needRetry.add(this.originalRecord);
                } else {
                    exception.printStackTrace();
                    Exit.exit((int)1);
                }
            }
        }
    }

    private static class ValueList {
        public final String key;
        private final int[] values;
        private int index;

        ValueList(int min, int max) {
            this.key = min + "-" + max;
            this.values = new int[max - min + 1];
            for (int i = 0; i < this.values.length; ++i) {
                this.values[i] = min + i;
            }
            SmokeTestDriver.shuffle(this.values, 10);
            this.index = 0;
        }

        int next() {
            return this.index < this.values.length ? this.values[this.index++] : -1;
        }
    }
}

