package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest.class */
public class RepartitionOptimizingTest {
    private static final String INPUT_TOPIC = "input";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String AGGREGATION_TOPIC = "outputTopic_1";
    private static final String REDUCE_TOPIC = "outputTopic_2";
    private static final String JOINED_TOPIC = "joinedOutputTopic";
    private static final int ONE_REPARTITION_TOPIC = 1;
    private static final int FOUR_REPARTITION_TOPICS = 4;
    private Properties streamsConfiguration;
    private TopologyTestDriver topologyTestDriver;
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: sourceStream (topics: [input])\n      --> source-map\n    Processor: source-map (stores: [])\n      --> process-filter, count-groupByKey-repartition-filter\n      <-- sourceStream\n    Processor: process-filter (stores: [])\n      --> process-mapValues\n      <-- source-map\n    Processor: count-groupByKey-repartition-filter (stores: [])\n      --> count-groupByKey-repartition-sink\n      <-- source-map\n    Processor: process-mapValues (stores: [])\n      --> process\n      <-- process-filter\n    Sink: count-groupByKey-repartition-sink (topic: count-groupByKey-repartition)\n      <-- count-groupByKey-repartition-filter\n    Processor: process (stores: [])\n      --> none\n      <-- process-mapValues\n\n  Sub-topology: 1\n    Source: count-groupByKey-repartition-source (topics: [count-groupByKey-repartition])\n      --> aggregate, count, join-filter, reduce-filter\n    Processor: count (stores: [count-store])\n      --> count-toStream\n      <-- count-groupByKey-repartition-source\n    Processor: count-toStream (stores: [])\n      --> join-other-windowed, count-to\n      <-- count\n    Processor: join-filter (stores: [])\n      --> join-this-windowed\n      <-- count-groupByKey-repartition-source\n    Processor: reduce-filter (stores: [])\n      --> reduce-peek\n      <-- count-groupByKey-repartition-source\n    Processor: join-other-windowed (stores: [other-join-store])\n      --> join-other-join\n      <-- count-toStream\n    Processor: join-this-windowed (stores: [join-store])\n      --> join-this-join\n      <-- join-filter\n    Processor: reduce-peek (stores: [])\n      --> reducer\n      <-- reduce-filter\n    Processor: aggregate (stores: [aggregate-store])\n      --> aggregate-toStream\n      <-- count-groupByKey-repartition-source\n    Processor: join-other-join (stores: [join-store])\n      --> join-merge\n      <-- join-other-windowed\n    Processor: join-this-join (stores: [other-join-store])\n      --> join-merge\n      <-- join-this-windowed\n    Processor: reducer (stores: [reduce-store])\n      --> reduce-toStream\n      <-- reduce-peek\n    Processor: aggregate-toStream (stores: [])\n      --> reduce-to\n      <-- aggregate\n    Processor: join-merge (stores: [])\n      --> join-to\n      <-- join-this-join, join-other-join\n    Processor: reduce-toStream (stores: [])\n      --> KSTREAM-SINK-0000000023\n      <-- reducer\n    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n      <-- reduce-toStream\n    Sink: count-to (topic: outputTopic_0)\n      <-- count-toStream\n    Sink: join-to (topic: joinedOutputTopic)\n      <-- join-merge\n    Sink: reduce-to (topic: outputTopic_1)\n      <-- aggregate-toStream\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: sourceStream (topics: [input])\n      --> source-map\n    Processor: source-map (stores: [])\n      --> reduce-filter, process-filter, aggregate-groupByKey-repartition-filter, count-groupByKey-repartition-filter, join-filter\n      <-- sourceStream\n    Processor: reduce-filter (stores: [])\n      --> reduce-peek\n      <-- source-map\n    Processor: join-filter (stores: [])\n      --> join-left-repartition-filter\n      <-- source-map\n    Processor: process-filter (stores: [])\n      --> process-mapValues\n      <-- source-map\n    Processor: reduce-peek (stores: [])\n      --> reduce-groupByKey-repartition-filter\n      <-- reduce-filter\n    Processor: aggregate-groupByKey-repartition-filter (stores: [])\n      --> aggregate-groupByKey-repartition-sink\n      <-- source-map\n    Processor: count-groupByKey-repartition-filter (stores: [])\n      --> count-groupByKey-repartition-sink\n      <-- source-map\n    Processor: join-left-repartition-filter (stores: [])\n      --> join-left-repartition-sink\n      <-- join-filter\n    Processor: process-mapValues (stores: [])\n      --> process\n      <-- process-filter\n    Processor: reduce-groupByKey-repartition-filter (stores: [])\n      --> reduce-groupByKey-repartition-sink\n      <-- reduce-peek\n    Sink: aggregate-groupByKey-repartition-sink (topic: aggregate-groupByKey-repartition)\n      <-- aggregate-groupByKey-repartition-filter\n    Sink: count-groupByKey-repartition-sink (topic: count-groupByKey-repartition)\n      <-- count-groupByKey-repartition-filter\n    Sink: join-left-repartition-sink (topic: join-left-repartition)\n      <-- join-left-repartition-filter\n    Processor: process (stores: [])\n      --> none\n      <-- process-mapValues\n    Sink: reduce-groupByKey-repartition-sink (topic: reduce-groupByKey-repartition)\n      <-- reduce-groupByKey-repartition-filter\n\n  Sub-topology: 1\n    Source: count-groupByKey-repartition-source (topics: [count-groupByKey-repartition])\n      --> count\n    Processor: count (stores: [count-store])\n      --> count-toStream\n      <-- count-groupByKey-repartition-source\n    Processor: count-toStream (stores: [])\n      --> join-other-windowed, count-to\n      <-- count\n    Source: join-left-repartition-source (topics: [join-left-repartition])\n      --> join-this-windowed\n    Processor: join-other-windowed (stores: [other-join-store])\n      --> join-other-join\n      <-- count-toStream\n    Processor: join-this-windowed (stores: [join-store])\n      --> join-this-join\n      <-- join-left-repartition-source\n    Processor: join-other-join (stores: [join-store])\n      --> join-merge\n      <-- join-other-windowed\n    Processor: join-this-join (stores: [other-join-store])\n      --> join-merge\n      <-- join-this-windowed\n    Processor: join-merge (stores: [])\n      --> join-to\n      <-- join-this-join, join-other-join\n    Sink: count-to (topic: outputTopic_0)\n      <-- count-toStream\n    Sink: join-to (topic: joinedOutputTopic)\n      <-- join-merge\n\n  Sub-topology: 2\n    Source: aggregate-groupByKey-repartition-source (topics: [aggregate-groupByKey-repartition])\n      --> aggregate\n    Processor: aggregate (stores: [aggregate-store])\n      --> aggregate-toStream\n      <-- aggregate-groupByKey-repartition-source\n    Processor: aggregate-toStream (stores: [])\n      --> reduce-to\n      <-- aggregate\n    Sink: reduce-to (topic: outputTopic_1)\n      <-- aggregate-toStream\n\n  Sub-topology: 3\n    Source: reduce-groupByKey-repartition-source (topics: [reduce-groupByKey-repartition])\n      --> reducer\n    Processor: reducer (stores: [reduce-store])\n      --> reduce-toStream\n      <-- reduce-groupByKey-repartition-source\n    Processor: reduce-toStream (stores: [])\n      --> KSTREAM-SINK-0000000023\n      <-- reducer\n    Sink: KSTREAM-SINK-0000000023 (topic: outputTopic_2)\n      <-- reduce-toStream\n\n";
    private final Logger log = LoggerFactory.getLogger(RepartitionOptimizingTest.class);
    private final Serializer<String> stringSerializer = new StringSerializer();
    private final Deserializer<String> stringDeserializer = new StringDeserializer();
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
    private final Initializer<Integer> initializer = () -> {
        return 0;
    };
    private final Aggregator<String, String, Integer> aggregator = (str, str2, num) -> {
        return Integer.valueOf(num.intValue() + str2.length());
    };
    private final Reducer<String> reducer = (str, str2) -> {
        return str + ":" + str2;
    };
    private final List<String> processorValueCollector = new ArrayList();
    private final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L));
    private final List<KeyValue<String, Integer>> expectedAggKeyValues = Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 9));
    private final List<KeyValue<String, String>> expectedReduceKeyValues = Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz"));
    private final List<KeyValue<String, String>> expectedJoinKeyValues = Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3"));
    private final List<String> expectedCollectedProcessorValues = Arrays.asList("FOO", "BAR", "BAZ");

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/RepartitionOptimizingTest$SimpleProcessor.class */
    private static class SimpleProcessor implements Processor<String, String, Void, Void> {
        final List<String> valueList;

        SimpleProcessor(List<String> list) {
            this.valueList = list;
        }

        public void process(Record<String, String> record) {
            this.valueList.add((String) record.value());
        }
    }

    @Before
    public void setUp() {
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
        this.streamsConfiguration.setProperty("statestore.cache.max.bytes", Integer.toString(10240));
        this.streamsConfiguration.setProperty("commit.interval.ms", Long.toString(5000L));
        this.processorValueCollector.clear();
    }

    @After
    public void tearDown() {
        this.topologyTestDriver.close();
    }

    @Test
    public void shouldSendCorrectRecords_OPTIMIZED() {
        runTest("all", 1);
    }

    @Test
    public void shouldSendCorrectResults_NO_OPTIMIZATION() {
        runTest("none", FOUR_REPARTITION_TOPICS);
    }

    private void runTest(String str, int i) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream map = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()).withName("sourceStream")).map((str2, str3) -> {
            return KeyValue.pair(str2.toUpperCase(Locale.getDefault()), str3);
        }, Named.as("source-map"));
        map.filter((str4, str5) -> {
            return str4.equals("B");
        }, Named.as("process-filter")).mapValues(str6 -> {
            return str6.toUpperCase(Locale.getDefault());
        }, Named.as("process-mapValues")).process(() -> {
            return new SimpleProcessor(this.processorValueCollector);
        }, Named.as("process"), new String[0]);
        KStream stream = map.groupByKey(Grouped.as("count-groupByKey")).count(Named.as("count"), Materialized.as(Stores.inMemoryKeyValueStore("count-store")).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())).toStream(Named.as("count-toStream"));
        stream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()).withName("count-to"));
        map.groupByKey(Grouped.as("aggregate-groupByKey")).aggregate(this.initializer, this.aggregator, Named.as("aggregate"), Materialized.as(Stores.inMemoryKeyValueStore("aggregate-store")).withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer())).toStream(Named.as("aggregate-toStream")).to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()).withName("reduce-to"));
        map.filter((str7, str8) -> {
            return true;
        }, Named.as("reduce-filter")).peek((str9, str10) -> {
            System.out.println(str9 + ":" + str10);
        }, Named.as("reduce-peek")).groupByKey(Grouped.as("reduce-groupByKey")).reduce(this.reducer, Named.as("reducer"), Materialized.as(Stores.inMemoryKeyValueStore("reduce-store"))).toStream(Named.as("reduce-toStream")).to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
        map.filter((str11, str12) -> {
            return str11.equals("A");
        }, Named.as("join-filter")).join(stream, (str13, l) -> {
            return str13 + ":" + l.toString();
        }, JoinWindows.of(Duration.ofMillis(5000L)), StreamJoined.with(Stores.inMemoryWindowStore("join-store", Duration.ofDays(1L), Duration.ofMillis(10000L), true), Stores.inMemoryWindowStore("other-join-store", Duration.ofDays(1L), Duration.ofMillis(10000L), true)).withName("join").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withOtherValueSerde(Serdes.Long())).to(JOINED_TOPIC, Produced.as("join-to"));
        this.streamsConfiguration.setProperty("topology.optimization", str);
        Topology build = streamsBuilder.build(this.streamsConfiguration);
        this.topologyTestDriver = new TopologyTestDriver(build, this.streamsConfiguration);
        TestInputTopic createInputTopic = this.topologyTestDriver.createInputTopic(INPUT_TOPIC, this.stringSerializer, this.stringSerializer);
        TestOutputTopic createOutputTopic = this.topologyTestDriver.createOutputTopic(COUNT_TOPIC, this.stringDeserializer, new LongDeserializer());
        TestOutputTopic createOutputTopic2 = this.topologyTestDriver.createOutputTopic(AGGREGATION_TOPIC, this.stringDeserializer, new IntegerDeserializer());
        TestOutputTopic createOutputTopic3 = this.topologyTestDriver.createOutputTopic(REDUCE_TOPIC, this.stringDeserializer, this.stringDeserializer);
        TestOutputTopic createOutputTopic4 = this.topologyTestDriver.createOutputTopic(JOINED_TOPIC, this.stringDeserializer, this.stringDeserializer);
        createInputTopic.pipeKeyValueList(getKeyValues());
        String obj = build.describe().toString();
        if (str.equals("all")) {
            Assert.assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, obj);
        } else {
            Assert.assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, obj);
        }
        Assert.assertEquals(i, getCountOfRepartitionTopicsFound(obj));
        MatcherAssert.assertThat(3, CoreMatchers.equalTo(Integer.valueOf(this.processorValueCollector.size())));
        MatcherAssert.assertThat(this.processorValueCollector, CoreMatchers.equalTo(this.expectedCollectedProcessorValues));
        MatcherAssert.assertThat(createOutputTopic.readKeyValuesToMap(), CoreMatchers.equalTo(keyValueListToMap(this.expectedCountKeyValues)));
        MatcherAssert.assertThat(createOutputTopic2.readKeyValuesToMap(), CoreMatchers.equalTo(keyValueListToMap(this.expectedAggKeyValues)));
        MatcherAssert.assertThat(createOutputTopic3.readKeyValuesToMap(), CoreMatchers.equalTo(keyValueListToMap(this.expectedReduceKeyValues)));
        MatcherAssert.assertThat(createOutputTopic4.readKeyValuesToMap(), CoreMatchers.equalTo(keyValueListToMap(this.expectedJoinKeyValues)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private <K, V> Map<K, V> keyValueListToMap(List<KeyValue<K, V>> list) {
        HashMap hashMap = new HashMap();
        for (KeyValue<K, V> keyValue : list) {
            hashMap.put(keyValue.key, keyValue.value);
        }
        return hashMap;
    }

    private int getCountOfRepartitionTopicsFound(String str) {
        Matcher matcher = this.repartitionTopicPattern.matcher(str);
        ArrayList arrayList = new ArrayList();
        while (matcher.find()) {
            arrayList.add(matcher.group());
        }
        return arrayList.size();
    }

    private List<KeyValue<String, String>> getKeyValues() {
        ArrayList arrayList = new ArrayList();
        String[] strArr = {"foo", "bar", "baz"};
        for (String str : new String[]{"a", "b", "c"}) {
            for (String str2 : strArr) {
                arrayList.add(KeyValue.pair(str, str2));
            }
        }
        return arrayList;
    }
}
