package org.apache.kafka.streams.kstream;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Properties;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/RepartitionTopicNamingTest.class */
public class RepartitionTopicNamingTest {
    private static final String INPUT_TOPIC = "input";
    private static final String COUNT_TOPIC = "outputTopic_0";
    private static final String AGGREGATION_TOPIC = "outputTopic_1";
    private static final String REDUCE_TOPIC = "outputTopic_2";
    private static final String JOINED_TOPIC = "outputTopicForJoin";
    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000002, count-stream-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000003\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-PROCESSOR-0000000004\n      <-- KSTREAM-FILTER-0000000002\n    Processor: count-stream-repartition-filter (stores: [])\n      --> count-stream-repartition-sink\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n      --> none\n      <-- KSTREAM-MAPVALUES-0000000003\n    Sink: count-stream-repartition-sink (topic: count-stream-repartition)\n      <-- count-stream-repartition-filter\n\n  Sub-topology: 1\n    Source: count-stream-repartition-source (topics: [count-stream-repartition])\n      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n      --> KTABLE-TOSTREAM-0000000011\n      <-- count-stream-repartition-source\n    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n      --> joined-stream-other-windowed, KSTREAM-SINK-0000000012\n      <-- KSTREAM-AGGREGATE-0000000007\n    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n      --> KSTREAM-PEEK-0000000021\n      <-- count-stream-repartition-source\n    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n      --> joined-stream-this-windowed\n      <-- count-stream-repartition-source\n    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n      --> KSTREAM-REDUCE-0000000023\n      <-- KSTREAM-FILTER-0000000020\n    Processor: joined-stream-other-windowed (stores: [joined-stream-other-join-store])\n      --> joined-stream-other-join\n      <-- KTABLE-TOSTREAM-0000000011\n    Processor: joined-stream-this-windowed (stores: [joined-stream-this-join-store])\n      --> joined-stream-this-join\n      <-- KSTREAM-FILTER-0000000029\n    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n      --> KTABLE-TOSTREAM-0000000018\n      <-- count-stream-repartition-source\n    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n      --> KTABLE-TOSTREAM-0000000027\n      <-- KSTREAM-PEEK-0000000021\n    Processor: joined-stream-other-join (stores: [joined-stream-this-join-store])\n      --> joined-stream-merge\n      <-- joined-stream-other-windowed\n    Processor: joined-stream-this-join (stores: [joined-stream-other-join-store])\n      --> joined-stream-merge\n      <-- joined-stream-this-windowed\n    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KSTREAM-AGGREGATE-0000000014\n    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n      --> KSTREAM-SINK-0000000028\n      <-- KSTREAM-REDUCE-0000000023\n    Processor: joined-stream-merge (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- joined-stream-this-join, joined-stream-other-join\n    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000011\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KTABLE-TOSTREAM-0000000018\n    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n      <-- KTABLE-TOSTREAM-0000000027\n    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n      <-- joined-stream-merge\n\n";
    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> KSTREAM-FILTER-0000000029, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000020, aggregate-stream-repartition-filter, count-stream-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n      --> KSTREAM-PEEK-0000000021\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-MAPVALUES-0000000003\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n      --> joined-stream-left-repartition-filter\n      <-- KSTREAM-MAP-0000000001\n    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n      --> reduced-stream-repartition-filter\n      <-- KSTREAM-FILTER-0000000020\n    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n      --> KSTREAM-PROCESSOR-0000000004\n      <-- KSTREAM-FILTER-0000000002\n    Processor: aggregate-stream-repartition-filter (stores: [])\n      --> aggregate-stream-repartition-sink\n      <-- KSTREAM-MAP-0000000001\n    Processor: count-stream-repartition-filter (stores: [])\n      --> count-stream-repartition-sink\n      <-- KSTREAM-MAP-0000000001\n    Processor: joined-stream-left-repartition-filter (stores: [])\n      --> joined-stream-left-repartition-sink\n      <-- KSTREAM-FILTER-0000000029\n    Processor: reduced-stream-repartition-filter (stores: [])\n      --> reduced-stream-repartition-sink\n      <-- KSTREAM-PEEK-0000000021\n    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n      --> none\n      <-- KSTREAM-MAPVALUES-0000000003\n    Sink: aggregate-stream-repartition-sink (topic: aggregate-stream-repartition)\n      <-- aggregate-stream-repartition-filter\n    Sink: count-stream-repartition-sink (topic: count-stream-repartition)\n      <-- count-stream-repartition-filter\n    Sink: joined-stream-left-repartition-sink (topic: joined-stream-left-repartition)\n      <-- joined-stream-left-repartition-filter\n    Sink: reduced-stream-repartition-sink (topic: reduced-stream-repartition)\n      <-- reduced-stream-repartition-filter\n\n  Sub-topology: 1\n    Source: count-stream-repartition-source (topics: [count-stream-repartition])\n      --> KSTREAM-AGGREGATE-0000000007\n    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n      --> KTABLE-TOSTREAM-0000000011\n      <-- count-stream-repartition-source\n    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n      --> KSTREAM-SINK-0000000012, joined-stream-other-windowed\n      <-- KSTREAM-AGGREGATE-0000000007\n    Source: joined-stream-left-repartition-source (topics: [joined-stream-left-repartition])\n      --> joined-stream-this-windowed\n    Processor: joined-stream-other-windowed (stores: [joined-stream-other-join-store])\n      --> joined-stream-other-join\n      <-- KTABLE-TOSTREAM-0000000011\n    Processor: joined-stream-this-windowed (stores: [joined-stream-this-join-store])\n      --> joined-stream-this-join\n      <-- joined-stream-left-repartition-source\n    Processor: joined-stream-other-join (stores: [joined-stream-this-join-store])\n      --> joined-stream-merge\n      <-- joined-stream-other-windowed\n    Processor: joined-stream-this-join (stores: [joined-stream-other-join-store])\n      --> joined-stream-merge\n      <-- joined-stream-this-windowed\n    Processor: joined-stream-merge (stores: [])\n      --> KSTREAM-SINK-0000000038\n      <-- joined-stream-this-join, joined-stream-other-join\n    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n      <-- KTABLE-TOSTREAM-0000000011\n    Sink: KSTREAM-SINK-0000000038 (topic: outputTopicForJoin)\n      <-- joined-stream-merge\n\n  Sub-topology: 2\n    Source: aggregate-stream-repartition-source (topics: [aggregate-stream-repartition])\n      --> KSTREAM-AGGREGATE-0000000014\n    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n      --> KTABLE-TOSTREAM-0000000018\n      <-- aggregate-stream-repartition-source\n    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n      --> KSTREAM-SINK-0000000019\n      <-- KSTREAM-AGGREGATE-0000000014\n    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n      <-- KTABLE-TOSTREAM-0000000018\n\n  Sub-topology: 3\n    Source: reduced-stream-repartition-source (topics: [reduced-stream-repartition])\n      --> KSTREAM-REDUCE-0000000023\n    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n      --> KTABLE-TOSTREAM-0000000027\n      <-- reduced-stream-repartition-source\n    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n      --> KSTREAM-SINK-0000000028\n      <-- KSTREAM-REDUCE-0000000023\n    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n      <-- KTABLE-TOSTREAM-0000000027\n\n";
    private final KeyValueMapper<String, String, String> kvMapper = (str, str2) -> {
        return str + str2;
    };
    private final String firstRepartitionTopicName = "count-stream";
    private final String secondRepartitionTopicName = "aggregate-stream";
    private final String thirdRepartitionTopicName = "reduced-stream";
    private final String fourthRepartitionTopicName = "joined-stream";
    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/RepartitionTopicNamingTest$SimpleProcessor.class */
    public static class SimpleProcessor implements Processor<String, String, Void, Void> {
        final List<String> valueList;

        SimpleProcessor(List<String> list) {
            this.valueList = list;
        }

        public void process(Record<String, String> record) {
            this.valueList.add((String) record.value());
        }
    }

    @Test
    public void shouldReuseFirstRepartitionTopicNameWhenOptimizing() {
        String obj = buildTopology("all").describe().toString();
        String obj2 = buildTopology("none").describe().toString();
        MatcherAssert.assertThat(obj, CoreMatchers.is(EXPECTED_OPTIMIZED_TOPOLOGY));
        MatcherAssert.assertThat(1, CoreMatchers.is(Integer.valueOf(getCountOfRepartitionTopicsFound(obj, this.repartitionTopicPattern))));
        Assert.assertTrue(obj.contains("count-stream-repartition"));
        MatcherAssert.assertThat(obj2, CoreMatchers.is(EXPECTED_UNOPTIMIZED_TOPOLOGY));
        MatcherAssert.assertThat(4, CoreMatchers.is(Integer.valueOf(getCountOfRepartitionTopicsFound(obj2, this.repartitionTopicPattern))));
        Assert.assertTrue(obj2.contains("count-stream-repartition"));
        Assert.assertTrue(obj2.contains("aggregate-stream-repartition"));
        Assert.assertTrue(obj2.contains("reduced-stream-repartition"));
        Assert.assertTrue(obj2.contains("joined-stream-left-repartition"));
    }

    @Test
    public void shouldFailWithSameRepartitionTopicName() {
        try {
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
                return str;
            }).groupByKey(Grouped.as("grouping")).count().toStream();
            streamsBuilder.stream("topicII").selectKey((str3, str4) -> {
                return str3;
            }).groupByKey(Grouped.as("grouping")).count().toStream();
            streamsBuilder.build();
            Assert.fail("Should not build re-using repartition topic name");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStream() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGroupedStream groupByKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
            return str;
        }).groupByKey(Grouped.as("grouping"));
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count().toStream().to("output-one");
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
        String obj = streamsBuilder.build().describe().toString();
        MatcherAssert.assertThat(1, CoreMatchers.is(Integer.valueOf(getCountOfRepartitionTopicsFound(obj, this.repartitionTopicPattern))));
        Assert.assertTrue(obj.contains("grouping-repartition"));
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameTimeWindowStream() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGroupedStream groupByKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
            return str;
        }).groupByKey(Grouped.as("grouping"));
        TimeWindowedKStream windowedBy = groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)));
        windowedBy.count().toStream().to("output-one");
        windowedBy.reduce((str3, str4) -> {
            return str3 + str4;
        }).toStream().to("output-two");
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
        String obj = streamsBuilder.build().describe().toString();
        MatcherAssert.assertThat(1, CoreMatchers.is(Integer.valueOf(getCountOfRepartitionTopicsFound(obj, this.repartitionTopicPattern))));
        Assert.assertTrue(obj.contains("grouping-repartition"));
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameSessionWindowStream() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGroupedStream groupByKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
            return str;
        }).groupByKey(Grouped.as("grouping"));
        SessionWindowedKStream windowedBy = groupByKey.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L)));
        windowedBy.count().toStream().to("output-one");
        windowedBy.reduce((str3, str4) -> {
            return str3 + str4;
        }).toStream().to("output-two");
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
        String obj = streamsBuilder.build().describe().toString();
        MatcherAssert.assertThat(1, CoreMatchers.is(Integer.valueOf(getCountOfRepartitionTopicsFound(obj, this.repartitionTopicPattern))));
        Assert.assertTrue(obj.contains("grouping-repartition"));
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGroupedTable groupBy = streamsBuilder.table(AssignmentTestUtils.TOPIC_PREFIX).groupBy((v0, v1) -> {
            return KeyValue.pair(v0, v1);
        }, Grouped.as("grouping"));
        groupBy.count().toStream().to("output-count");
        groupBy.reduce((str, str2) -> {
            return str2;
        }, (str3, str4) -> {
            return str4;
        }).toStream().to("output-reduce");
        String obj = streamsBuilder.build().describe().toString();
        MatcherAssert.assertThat(1, CoreMatchers.is(Integer.valueOf(getCountOfRepartitionTopicsFound(obj, this.repartitionTopicPattern))));
        Assert.assertTrue(obj.contains("grouping-repartition"));
    }

    @Test
    public void shouldNotReuseRepartitionNodeWithUnnamedRepartitionTopics() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGroupedStream groupByKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
            return str;
        }).groupByKey();
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count().toStream().to("output-one");
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count().toStream().to("output-two");
        MatcherAssert.assertThat(2, CoreMatchers.is(Integer.valueOf(getCountOfRepartitionTopicsFound(streamsBuilder.build().describe().toString(), this.repartitionTopicPattern))));
    }

    @Test
    public void shouldNotReuseRepartitionNodeWithUnnamedRepartitionTopicsKGroupedTable() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGroupedTable groupBy = streamsBuilder.table(AssignmentTestUtils.TOPIC_PREFIX).groupBy((v0, v1) -> {
            return KeyValue.pair(v0, v1);
        });
        groupBy.count().toStream().to("output-count");
        groupBy.reduce((str, str2) -> {
            return str2;
        }, (str3, str4) -> {
            return str4;
        }).toStream().to("output-reduce");
        MatcherAssert.assertThat(2, CoreMatchers.is(Integer.valueOf(getCountOfRepartitionTopicsFound(streamsBuilder.build().describe().toString(), this.repartitionTopicPattern))));
    }

    @Test
    public void shouldNotFailWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimizationsOn() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGroupedStream groupByKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
            return str;
        }).groupByKey(Grouped.as("grouping"));
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count();
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count();
        Properties properties = new Properties();
        properties.put("topology.optimization", "all");
        MatcherAssert.assertThat(Integer.valueOf(getCountOfRepartitionTopicsFound(streamsBuilder.build(properties).describe().toString(), this.repartitionTopicPattern)), CoreMatchers.is(1));
    }

    @Test
    public void shouldFailWithSameRepartitionTopicNameInJoin() {
        try {
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            KStream selectKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
                return str;
            });
            KStream selectKey2 = streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME).selectKey((str3, str4) -> {
                return str3;
            });
            selectKey.join(selectKey2, (str5, str6) -> {
                return str5 + str6;
            }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(30L)), StreamJoined.as("join-store").withName("join-repartition")).join(streamsBuilder.stream(AssignmentTestUtils.TP_3_NAME).selectKey((str7, str8) -> {
                return str7;
            }), (str9, str10) -> {
                return str9 + str10;
            }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(30L)), StreamJoined.as("join-store").withName("join-repartition"));
            streamsBuilder.build();
            Assert.fail("Should not build re-using repartition topic name");
        } catch (TopologyException e) {
        }
    }

    @Test
    public void shouldPassWithSameRepartitionTopicNameUsingSameKGroupedStreamOptimized() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.put("topology.optimization", "all");
        KGroupedStream groupByKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
            return str;
        }).groupByKey(Grouped.as("grouping"));
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count();
        groupByKey.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(30L))).count();
        streamsBuilder.build(properties);
    }

    @Test
    public void shouldKeepRepartitionTopicNameForJoins() {
        String buildStreamJoin = buildStreamJoin(false);
        Assert.assertTrue(buildStreamJoin.contains("(topic: my-join-left-repartition)"));
        Assert.assertTrue(buildStreamJoin.contains("(topic: my-join-right-repartition)"));
        String buildStreamJoin2 = buildStreamJoin(true);
        Assert.assertTrue(buildStreamJoin2.contains("(topic: my-join-left-repartition)"));
        Assert.assertTrue(buildStreamJoin2.contains("(topic: my-join-right-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByKeyTimeWindows() {
        Assert.assertTrue(buildStreamGroupByKeyTimeWindows(false, true).contains("(topic: time-window-grouping-repartition)"));
        Assert.assertTrue(buildStreamGroupByKeyTimeWindows(true, true).contains("(topic: time-window-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByTimeWindows() {
        Assert.assertTrue(buildStreamGroupByKeyTimeWindows(false, false).contains("(topic: time-window-grouping-repartition)"));
        Assert.assertTrue(buildStreamGroupByKeyTimeWindows(true, false).contains("(topic: time-window-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByKeyNoWindows() {
        Assert.assertTrue(buildStreamGroupByKeyNoWindows(false, true).contains("(topic: kstream-grouping-repartition)"));
        Assert.assertTrue(buildStreamGroupByKeyNoWindows(true, true).contains("(topic: kstream-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByNoWindows() {
        Assert.assertTrue(buildStreamGroupByKeyNoWindows(false, false).contains("(topic: kstream-grouping-repartition)"));
        Assert.assertTrue(buildStreamGroupByKeyNoWindows(true, false).contains("(topic: kstream-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupByKeySessionWindows() {
        Assert.assertTrue(buildStreamGroupByKeySessionWindows(false, true).contains("(topic: session-window-grouping-repartition)"));
        Assert.assertTrue(buildStreamGroupByKeySessionWindows(true, true).contains("(topic: session-window-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionTopicNameForGroupBySessionWindows() {
        Assert.assertTrue(buildStreamGroupByKeySessionWindows(false, false).contains("(topic: session-window-grouping-repartition)"));
        Assert.assertTrue(buildStreamGroupByKeySessionWindows(true, false).contains("(topic: session-window-grouping-repartition)"));
    }

    @Test
    public void shouldKeepRepartitionNameForGroupByKTable() {
        Assert.assertTrue(buildKTableGroupBy(false).contains("(topic: ktable-group-by-repartition)"));
        Assert.assertTrue(buildKTableGroupBy(true).contains("(topic: ktable-group-by-repartition)"));
    }

    private String buildKTableGroupBy(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable table = streamsBuilder.table(AssignmentTestUtils.TOPIC_PREFIX);
        if (z) {
            table.filter((str, str2) -> {
                return true;
            }).groupBy((v0, v1) -> {
                return KeyValue.pair(v0, v1);
            }, Grouped.as("ktable-group-by")).count();
        } else {
            table.groupBy((v0, v1) -> {
                return KeyValue.pair(v0, v1);
            }, Grouped.as("ktable-group-by")).count();
        }
        return streamsBuilder.build().describe().toString();
    }

    private String buildStreamGroupByKeyTimeWindows(boolean z, boolean z2) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream selectKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
            return str + str2;
        });
        if (z2) {
            if (z) {
                selectKey.filter((str3, str4) -> {
                    return true;
                }).mapValues(str5 -> {
                    return str5;
                }).groupByKey(Grouped.as("time-window-grouping")).windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count();
            } else {
                selectKey.groupByKey(Grouped.as("time-window-grouping")).windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).count();
            }
        } else if (z) {
            selectKey.filter((str6, str7) -> {
                return true;
            }).mapValues(str8 -> {
                return str8;
            }).groupBy(this.kvMapper, Grouped.as("time-window-grouping")).count();
        } else {
            selectKey.groupBy(this.kvMapper, Grouped.as("time-window-grouping")).count();
        }
        return streamsBuilder.build().describe().toString();
    }

    private String buildStreamGroupByKeySessionWindows(boolean z, boolean z2) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream selectKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
            return str + str2;
        });
        if (z2) {
            if (z) {
                selectKey.filter((str3, str4) -> {
                    return true;
                }).mapValues(str5 -> {
                    return str5;
                }).groupByKey(Grouped.as("session-window-grouping")).windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L))).count();
            } else {
                selectKey.groupByKey(Grouped.as("session-window-grouping")).windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L))).count();
            }
        } else if (z) {
            selectKey.filter((str6, str7) -> {
                return true;
            }).mapValues(str8 -> {
                return str8;
            }).groupBy(this.kvMapper, Grouped.as("session-window-grouping")).windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L))).count();
        } else {
            selectKey.groupBy(this.kvMapper, Grouped.as("session-window-grouping")).windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L))).count();
        }
        return streamsBuilder.build().describe().toString();
    }

    private String buildStreamGroupByKeyNoWindows(boolean z, boolean z2) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream selectKey = streamsBuilder.stream(AssignmentTestUtils.TOPIC_PREFIX).selectKey((str, str2) -> {
            return str + str2;
        });
        if (z2) {
            if (z) {
                selectKey.filter((str3, str4) -> {
                    return true;
                }).mapValues(str5 -> {
                    return str5;
                }).groupByKey(Grouped.as("kstream-grouping")).count();
            } else {
                selectKey.groupByKey(Grouped.as("kstream-grouping")).count();
            }
        } else if (z) {
            selectKey.filter((str6, str7) -> {
                return true;
            }).mapValues(str8 -> {
                return str8;
            }).groupBy(this.kvMapper, Grouped.as("kstream-grouping")).count();
        } else {
            selectKey.groupBy(this.kvMapper, Grouped.as("kstream-grouping")).count();
        }
        return streamsBuilder.build().describe().toString();
    }

    private String buildStreamJoin(boolean z) {
        KStream selectKey;
        KStream selectKey2;
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("topic-one");
        KStream stream2 = streamsBuilder.stream("topic-two");
        if (z) {
            selectKey = stream.selectKey((str, str2) -> {
                return str + str2;
            }).filter((str3, str4) -> {
                return true;
            }).peek((str5, str6) -> {
                System.out.println(str5 + str6);
            });
            selectKey2 = stream2.selectKey((str7, str8) -> {
                return str7 + str8;
            }).filter((str9, str10) -> {
                return true;
            }).peek((str11, str12) -> {
                System.out.println(str11 + str12);
            });
        } else {
            selectKey = stream.selectKey((str13, str14) -> {
                return str13 + str14;
            });
            selectKey2 = stream2.selectKey((str15, str16) -> {
                return str15 + str16;
            });
        }
        selectKey.join(selectKey2, (str17, str18) -> {
            return str17 + str18;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(1000L)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()).withName("my-join"));
        return streamsBuilder.build().describe().toString();
    }

    private int getCountOfRepartitionTopicsFound(String str, Pattern pattern) {
        Matcher matcher = pattern.matcher(str);
        ArrayList arrayList = new ArrayList();
        while (matcher.find()) {
            arrayList.add(matcher.group());
        }
        return arrayList.size();
    }

    private Topology buildTopology(String str) {
        Initializer initializer = () -> {
            return 0;
        };
        Aggregator aggregator = (str2, str3, num) -> {
            return Integer.valueOf(num.intValue() + str3.length());
        };
        Reducer reducer = (str4, str5) -> {
            return str4 + ":" + str5;
        };
        ArrayList arrayList = new ArrayList();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream map = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())).map((str6, str7) -> {
            return KeyValue.pair(str6.toUpperCase(Locale.getDefault()), str7);
        });
        map.filter((str8, str9) -> {
            return str8.equals("B");
        }).mapValues(str10 -> {
            return str10.toUpperCase(Locale.getDefault());
        }).process(() -> {
            return new SimpleProcessor(arrayList);
        }, new String[0]);
        KStream stream = map.groupByKey(Grouped.as("count-stream")).count(Materialized.with(Serdes.String(), Serdes.Long())).toStream();
        stream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
        map.groupByKey(Grouped.as("aggregate-stream")).aggregate(initializer, aggregator, Materialized.with(Serdes.String(), Serdes.Integer())).toStream().to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
        map.filter((str11, str12) -> {
            return true;
        }).peek((str13, str14) -> {
            System.out.println(str13 + ":" + str14);
        }).groupByKey(Grouped.as("reduced-stream")).reduce(reducer, Materialized.with(Serdes.String(), Serdes.String())).toStream().to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
        map.filter((str15, str16) -> {
            return str15.equals("A");
        }).join(stream, (str17, l) -> {
            return str17 + ":" + l.toString();
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(5000L)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.Long()).withStoreName("joined-stream").withName("joined-stream")).to(JOINED_TOPIC);
        Properties properties = new Properties();
        properties.put("topology.optimization", str);
        return streamsBuilder.build(properties);
    }
}
