package org.apache.kafka.streams.kstream.internals;

import java.util.Properties;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.CogroupedKStream;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/CogroupedKStreamImplTest.class */
public class CogroupedKStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String OUTPUT = "output";
    private KGroupedStream<String, String> groupedStream;
    private CogroupedKStream<String, String> cogroupedStream;
    private static final Aggregator<String, String, String> STRING_AGGREGATOR = (str, str2, str3) -> {
        return str3 + str2;
    };
    private static final Initializer<String> STRING_INITIALIZER = () -> {
        return "";
    };
    private static final Aggregator<String, String, Integer> STRING_SUM_AGGREGATOR = (str, str2, num) -> {
        return Integer.valueOf(num.intValue() + Integer.parseInt(str2));
    };
    private static final Aggregator<? super String, ? super Integer, Integer> SUM_AGGREGATOR = (str, num, num2) -> {
        return Integer.valueOf(num2.intValue() + num.intValue());
    };
    private static final Initializer<Integer> SUM_INITIALIZER = () -> {
        return 0;
    };
    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());

    @Before
    public void setup() {
        this.groupedStream = new StreamsBuilder().stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
        this.cogroupedStream = this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER);
    }

    @Test
    public void shouldThrowNPEInCogroupIfKGroupedStreamIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.cogroup((KGroupedStream) null, MockAggregator.TOSTRING_ADDER);
        });
    }

    @Test
    public void shouldNotHaveNullAggregatorOnCogroup() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.cogroup(this.groupedStream, (Aggregator) null);
        });
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.aggregate((Initializer) null);
        });
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregateWitNamed() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.aggregate((Initializer) null, Named.as("name"));
        });
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregateWitMaterialized() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.aggregate((Initializer) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregateWitNamedAndMaterialized() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.aggregate((Initializer) null, Named.as("name"), Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveNullNamedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.aggregate(STRING_INITIALIZER, (Named) null);
        });
    }

    @Test
    public void shouldNotHaveNullMaterializedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.aggregate(STRING_INITIALIZER, (Materialized) null);
        });
    }

    @Test
    public void shouldNotHaveNullNamedOnAggregateWithMateriazlied() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.aggregate(STRING_INITIALIZER, (Named) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveNullMaterializedOnAggregateWithNames() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.aggregate(STRING_INITIALIZER, Named.as("name"), (Materialized) null);
        });
    }

    @Test
    public void shouldNotHaveNullWindowOnWindowedByTime() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.windowedBy((Windows) null);
        });
    }

    @Test
    public void shouldNotHaveNullWindowOnWindowedBySession() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.windowedBy((SessionWindows) null);
        });
    }

    @Test
    public void shouldNotHaveNullWindowOnWindowedBySliding() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cogroupedStream.windowedBy((SlidingWindows) null);
        });
    }

    @Test
    public void shouldNameProcessorsAndStoreBasedOnNamedParameter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(stream2.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER, Named.as("test"), Materialized.as("store")).toStream().to(OUTPUT);
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> test-cogroup-agg-0\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> test-cogroup-agg-1\n    Processor: test-cogroup-agg-0 (stores: [store])\n      --> test-cogroup-merge\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: test-cogroup-agg-1 (stores: [store])\n      --> test-cogroup-merge\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: test-cogroup-merge (stores: [])\n      --> KTABLE-TOSTREAM-0000000005\n      <-- test-cogroup-agg-0, test-cogroup-agg-1\n    Processor: KTABLE-TOSTREAM-0000000005 (stores: [])\n      --> KSTREAM-SINK-0000000006\n      <-- test-cogroup-merge\n    Sink: KSTREAM-SINK-0000000006 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000005\n\n"));
    }

    @Test
    public void shouldNameRepartitionTopic() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey(Grouped.as("repartition-test"));
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(stream2.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER).toStream().to(OUTPUT);
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> repartition-test-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: repartition-test-repartition-filter (stores: [])\n      --> repartition-test-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: repartition-test-repartition-sink (topic: repartition-test-repartition)\n      <-- repartition-test-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008\n    Source: repartition-test-repartition-source (topics: [repartition-test-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000007\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- repartition-test-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> KTABLE-TOSTREAM-0000000010\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n      --> KSTREAM-SINK-0000000011\n      <-- COGROUPKSTREAM-MERGE-0000000009\n    Sink: KSTREAM-SINK-0000000011 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000010\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModification() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(stream2.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER, Named.as("test"), Materialized.as("store")).toStream().to(OUTPUT);
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> store-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: store-repartition-filter (stores: [])\n      --> store-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: store-repartition-sink (topic: store-repartition)\n      <-- store-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> test-cogroup-agg-1\n    Source: store-repartition-source (topics: [store-repartition])\n      --> test-cogroup-agg-0\n    Processor: test-cogroup-agg-0 (stores: [store])\n      --> test-cogroup-merge\n      <-- store-repartition-source\n    Processor: test-cogroup-agg-1 (stores: [store])\n      --> test-cogroup-merge\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: test-cogroup-merge (stores: [])\n      --> KTABLE-TOSTREAM-0000000009\n      <-- test-cogroup-agg-0, test-cogroup-agg-1\n    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n      --> KSTREAM-SINK-0000000010\n      <-- test-cogroup-merge\n    Sink: KSTREAM-SINK-0000000010 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000009\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInSameCogroups() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey();
        KGroupedStream groupByKey2 = stream2.groupByKey();
        KTable aggregate = groupByKey.cogroup(STRING_AGGREGATOR).cogroup(groupByKey2, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(groupByKey2, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER).toStream().to(OUTPUT);
        aggregate.toStream().to("OUTPUT2");
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-filter, COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-filter\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008, COGROUPKSTREAM-AGGREGATE-0000000015\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000007\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000014\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000014 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> COGROUPKSTREAM-MERGE-0000000016\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000015 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> COGROUPKSTREAM-MERGE-0000000016\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> KTABLE-TOSTREAM-0000000019\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-MERGE-0000000016 (stores: [])\n      --> KTABLE-TOSTREAM-0000000017\n      <-- COGROUPKSTREAM-AGGREGATE-0000000014, COGROUPKSTREAM-AGGREGATE-0000000015\n    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n      --> KSTREAM-SINK-0000000018\n      <-- COGROUPKSTREAM-MERGE-0000000016\n    Processor: KTABLE-TOSTREAM-0000000019 (stores: [])\n      --> KSTREAM-SINK-0000000020\n      <-- COGROUPKSTREAM-MERGE-0000000009\n    Sink: KSTREAM-SINK-0000000018 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000017\n    Sink: KSTREAM-SINK-0000000020 (topic: OUTPUT2)\n      <-- KTABLE-TOSTREAM-0000000019\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInSameCogroupsWithOptimization() {
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey();
        KGroupedStream groupByKey2 = stream2.groupByKey();
        KTable aggregate = groupByKey.cogroup(STRING_AGGREGATOR).cogroup(groupByKey2, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(groupByKey2, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER).toStream().to(OUTPUT);
        aggregate.toStream().to("OUTPUT2");
        MatcherAssert.assertThat(streamsBuilder.build(properties).describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000014, COGROUPKSTREAM-AGGREGATE-0000000007\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000015, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000014 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> COGROUPKSTREAM-MERGE-0000000016\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000015 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> COGROUPKSTREAM-MERGE-0000000016\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> KTABLE-TOSTREAM-0000000019\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-MERGE-0000000016 (stores: [])\n      --> KTABLE-TOSTREAM-0000000017\n      <-- COGROUPKSTREAM-AGGREGATE-0000000014, COGROUPKSTREAM-AGGREGATE-0000000015\n    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n      --> KSTREAM-SINK-0000000018\n      <-- COGROUPKSTREAM-MERGE-0000000016\n    Processor: KTABLE-TOSTREAM-0000000019 (stores: [])\n      --> KSTREAM-SINK-0000000020\n      <-- COGROUPKSTREAM-MERGE-0000000009\n    Sink: KSTREAM-SINK-0000000018 (topic: output)\n      <-- KTABLE-TOSTREAM-0000000017\n    Sink: KSTREAM-SINK-0000000020 (topic: OUTPUT2)\n      <-- KTABLE-TOSTREAM-0000000019\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInDifferentCogroups() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KStream stream3 = streamsBuilder.stream("three", this.stringConsumed);
        KGroupedStream groupByKey = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey();
        KGroupedStream groupByKey2 = stream2.groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(stream3.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(groupByKey2, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter, COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000015\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000016\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000015 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011])\n      --> COGROUPKSTREAM-MERGE-0000000017\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000016 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011])\n      --> COGROUPKSTREAM-MERGE-0000000017\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000017 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000015, COGROUPKSTREAM-AGGREGATE-0000000016\n\n  Sub-topology: 2\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008\n    Source: KSTREAM-SOURCE-0000000002 (topics: [three])\n      --> COGROUPKSTREAM-AGGREGATE-0000000009\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])\n      --> COGROUPKSTREAM-MERGE-0000000010\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000009 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])\n      --> COGROUPKSTREAM-MERGE-0000000010\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: COGROUPKSTREAM-MERGE-0000000010 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000008, COGROUPKSTREAM-AGGREGATE-0000000009\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedInDifferentCogroupsWithOptimization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KStream stream3 = streamsBuilder.stream("three", this.stringConsumed);
        KGroupedStream groupByKey = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey();
        KGroupedStream groupByKey2 = stream2.groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(stream3.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(groupByKey2, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        MatcherAssert.assertThat(streamsBuilder.build(properties).describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000003\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008, COGROUPKSTREAM-AGGREGATE-0000000015\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000016\n    Source: KSTREAM-SOURCE-0000000002 (topics: [three])\n      --> COGROUPKSTREAM-AGGREGATE-0000000009\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])\n      --> COGROUPKSTREAM-MERGE-0000000010\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000009 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004])\n      --> COGROUPKSTREAM-MERGE-0000000010\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000015 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011])\n      --> COGROUPKSTREAM-MERGE-0000000017\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000004-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000016 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000011])\n      --> COGROUPKSTREAM-MERGE-0000000017\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000010 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000008, COGROUPKSTREAM-AGGREGATE-0000000009\n    Processor: COGROUPKSTREAM-MERGE-0000000017 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000015, COGROUPKSTREAM-AGGREGATE-0000000016\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReused() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(stream2.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupByKey.aggregate(STRING_INITIALIZER, STRING_AGGREGATOR);
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter, KSTREAM-FILTER-0000000013\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Processor: KSTREAM-FILTER-0000000013 (stores: [])\n      --> KSTREAM-SINK-0000000012\n      <-- KSTREAM-MAP-0000000002\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n    Sink: KSTREAM-SINK-0000000012 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition)\n      <-- KSTREAM-FILTER-0000000013\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000007\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n\n  Sub-topology: 2\n    Source: KSTREAM-SOURCE-0000000014 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000010-repartition])\n      --> KSTREAM-AGGREGATE-0000000011\n    Processor: KSTREAM-AGGREGATE-0000000011 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> none\n      <-- KSTREAM-SOURCE-0000000014\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedReusedWithOptimization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(stream2.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupByKey.aggregate(STRING_INITIALIZER, STRING_AGGREGATOR);
        MatcherAssert.assertThat(streamsBuilder.build(properties).describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000002\n    Processor: KSTREAM-MAP-0000000002 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink\n      <-- KSTREAM-MAP-0000000002\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000011\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000007 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000008 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003])\n      --> COGROUPKSTREAM-MERGE-0000000009\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000009 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000007, COGROUPKSTREAM-AGGREGATE-0000000008\n    Processor: KSTREAM-AGGREGATE-0000000011 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000010])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-source\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForUpstreamKeyModificationWithGroupedRemadeWithOptimization() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        properties.setProperty("topology.optimization", "all");
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KStream stream3 = streamsBuilder.stream("three", this.stringConsumed);
        KGroupedStream groupByKey = stream.map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey();
        KGroupedStream groupByKey2 = stream2.groupByKey();
        KGroupedStream groupByKey3 = stream3.groupByKey();
        KGroupedStream groupByKey4 = stream.map((str3, str4) -> {
            return new KeyValue(str4, str3);
        }).groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(groupByKey2, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        groupByKey3.cogroup(STRING_AGGREGATOR).cogroup(groupByKey4, STRING_AGGREGATOR).aggregate(STRING_INITIALIZER);
        MatcherAssert.assertThat(streamsBuilder.build(properties).describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000003, KSTREAM-MAP-0000000004\n    Processor: KSTREAM-MAP-0000000003 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: KSTREAM-MAP-0000000004 (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-sink\n      <-- KSTREAM-MAP-0000000003\n    Processor: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-filter (stores: [])\n      --> COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-sink\n      <-- KSTREAM-MAP-0000000004\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-filter\n    Sink: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-sink (topic: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition)\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-filter\n\n  Sub-topology: 1\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000009\n    Source: KSTREAM-SOURCE-0000000001 (topics: [two])\n      --> COGROUPKSTREAM-AGGREGATE-0000000010\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000009 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005])\n      --> COGROUPKSTREAM-MERGE-0000000011\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000010 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000005])\n      --> COGROUPKSTREAM-MERGE-0000000011\n      <-- KSTREAM-SOURCE-0000000001\n    Processor: COGROUPKSTREAM-MERGE-0000000011 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000009, COGROUPKSTREAM-AGGREGATE-0000000010\n\n  Sub-topology: 2\n    Source: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-source (topics: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000017\n    Source: KSTREAM-SOURCE-0000000002 (topics: [three])\n      --> COGROUPKSTREAM-AGGREGATE-0000000016\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000016 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012])\n      --> COGROUPKSTREAM-MERGE-0000000018\n      <-- KSTREAM-SOURCE-0000000002\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000017 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012])\n      --> COGROUPKSTREAM-MERGE-0000000018\n      <-- COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition-source\n    Processor: COGROUPKSTREAM-MERGE-0000000018 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000016, COGROUPKSTREAM-AGGREGATE-0000000017\n\n"));
    }

    @Test
    public void shouldInsertRepartitionsTopicForCogroupsUsedTwice() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Properties properties = new Properties();
        CogroupedKStream cogroup = streamsBuilder.stream("one", this.stringConsumed).map((str, str2) -> {
            return new KeyValue(str2, str);
        }).groupByKey(Grouped.as("foo")).cogroup(STRING_AGGREGATOR);
        cogroup.aggregate(STRING_INITIALIZER);
        cogroup.aggregate(STRING_INITIALIZER);
        MatcherAssert.assertThat(streamsBuilder.build(properties).describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [one])\n      --> KSTREAM-MAP-0000000001\n    Processor: KSTREAM-MAP-0000000001 (stores: [])\n      --> foo-repartition-filter\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: foo-repartition-filter (stores: [])\n      --> foo-repartition-sink\n      <-- KSTREAM-MAP-0000000001\n    Sink: foo-repartition-sink (topic: foo-repartition)\n      <-- foo-repartition-filter\n\n  Sub-topology: 1\n    Source: foo-repartition-source (topics: [foo-repartition])\n      --> COGROUPKSTREAM-AGGREGATE-0000000006, COGROUPKSTREAM-AGGREGATE-0000000012\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000006 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002])\n      --> COGROUPKSTREAM-MERGE-0000000007\n      <-- foo-repartition-source\n    Processor: COGROUPKSTREAM-AGGREGATE-0000000012 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008])\n      --> COGROUPKSTREAM-MERGE-0000000013\n      <-- foo-repartition-source\n    Processor: COGROUPKSTREAM-MERGE-0000000007 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000006\n    Processor: COGROUPKSTREAM-MERGE-0000000013 (stores: [])\n      --> none\n      <-- COGROUPKSTREAM-AGGREGATE-0000000012\n\n"));
    }

    @Test
    public void shouldCogroupAndAggregateSingleKStreams() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("one", this.stringConsumed).groupByKey().cogroup(STRING_AGGREGATOR).aggregate(STRING_INITIALIZER).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("one", new StringSerializer(), new StringSerializer());
            TestOutputTopic<String, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k2", "B", 0L);
            createInputTopic.pipeInput("k2", "B", 0L);
            createInputTopic.pipeInput("k1", "A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "B", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "BB", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AA", 0L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCogroupHandleNullValues() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("one", this.stringConsumed).groupByKey().cogroup(STRING_AGGREGATOR).aggregate(STRING_INITIALIZER).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("one", new StringSerializer(), new StringSerializer());
            TestOutputTopic<String, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k2", "B", 0L);
            createInputTopic.pipeInput("k2", (Object) null, 0L);
            createInputTopic.pipeInput("k2", "B", 0L);
            createInputTopic.pipeInput("k1", "A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "B", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "BB", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AA", 0L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldCogroupAndAggregateTwoKStreamsWithDistinctKeys() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(stream2.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("one", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("two", new StringSerializer(), new StringSerializer());
            TestOutputTopic<String, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k1", "A", 1L);
            createInputTopic.pipeInput("k1", "A", 10L);
            createInputTopic.pipeInput("k1", "A", 100L);
            createInputTopic2.pipeInput("k2", "B", 100L);
            createInputTopic2.pipeInput("k2", "B", 200L);
            createInputTopic2.pipeInput("k2", "B", 1L);
            createInputTopic2.pipeInput("k2", "B", 500L);
            createInputTopic2.pipeInput("k2", "B", 500L);
            createInputTopic2.pipeInput("k2", "B", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AA", 1L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AAA", 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AAAA", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "B", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "BB", 200L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "BBB", 200L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "BBBB", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "BBBBB", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "BBBBBB", 500L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldCogroupAndAggregateTwoKStreamsWithSharedKeys() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(stream2.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("one", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("two", new StringSerializer(), new StringSerializer());
            TestOutputTopic<String, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k2", "A", 1L);
            createInputTopic.pipeInput("k1", "A", 10L);
            createInputTopic.pipeInput("k2", "A", 100L);
            createInputTopic2.pipeInput("k2", "B", 100L);
            createInputTopic2.pipeInput("k2", "B", 200L);
            createInputTopic2.pipeInput("k1", "B", 1L);
            createInputTopic2.pipeInput("k2", "B", 500L);
            createInputTopic2.pipeInput("k1", "B", 500L);
            createInputTopic2.pipeInput("k2", "B", 500L);
            createInputTopic2.pipeInput("k3", "B", 500L);
            createInputTopic2.pipeInput("k2", "B", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "A", 1L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AA", 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AA", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AAB", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AABB", 200L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AAB", 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AABBB", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AABB", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AABBBB", 500L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldAllowDifferentOutputTypeInCoGroup() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.groupByKey();
        groupByKey.cogroup(STRING_SUM_AGGREGATOR).cogroup(stream2.groupByKey(), STRING_SUM_AGGREGATOR).aggregate(SUM_INITIALIZER, Materialized.as("store1").withValueSerde(Serdes.Integer())).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("one", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("two", new StringSerializer(), new StringSerializer());
            TestOutputTopic<String, Integer> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new IntegerDeserializer());
            createInputTopic.pipeInput("k1", "1", 0L);
            createInputTopic.pipeInput("k2", "1", 1L);
            createInputTopic.pipeInput("k1", "1", 10L);
            createInputTopic.pipeInput("k2", "1", 100L);
            createInputTopic2.pipeInput("k2", "2", 100L);
            createInputTopic2.pipeInput("k2", "2", 200L);
            createInputTopic2.pipeInput("k1", "2", 1L);
            createInputTopic2.pipeInput("k2", "2", 500L);
            createInputTopic2.pipeInput("k1", "2", 500L);
            createInputTopic2.pipeInput("k2", "3", 500L);
            createInputTopic2.pipeInput("k3", "2", 500L);
            createInputTopic2.pipeInput("k2", "2", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", (Integer) 1, 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 1, 1L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", (Integer) 2, 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 2, 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 4, 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 6, 200L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", (Integer) 4, 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 8, 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", (Integer) 6, 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 11, 500L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldCoGroupStreamsWithDifferentInputTypes() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        Consumed with = Consumed.with(Serdes.String(), Serdes.Integer());
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", with);
        KGroupedStream groupByKey = stream.groupByKey();
        groupByKey.cogroup(STRING_SUM_AGGREGATOR).cogroup(stream2.groupByKey(), SUM_AGGREGATOR).aggregate(SUM_INITIALIZER, Materialized.as("store1").withValueSerde(Serdes.Integer())).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("one", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("two", new StringSerializer(), new IntegerSerializer());
            TestOutputTopic<String, Integer> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new IntegerDeserializer());
            createInputTopic.pipeInput("k1", "1", 0L);
            createInputTopic.pipeInput("k2", "1", 1L);
            createInputTopic.pipeInput("k1", "1", 10L);
            createInputTopic.pipeInput("k2", "1", 100L);
            createInputTopic2.pipeInput("k2", 2, 100L);
            createInputTopic2.pipeInput("k2", 2, 200L);
            createInputTopic2.pipeInput("k1", 2, 1L);
            createInputTopic2.pipeInput("k2", 2, 500L);
            createInputTopic2.pipeInput("k1", 2, 500L);
            createInputTopic2.pipeInput("k2", 3, 500L);
            createInputTopic2.pipeInput("k3", 2, 500L);
            createInputTopic2.pipeInput("k2", 2, 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", (Integer) 1, 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 1, 1L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", (Integer) 2, 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 2, 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 4, 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 6, 200L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", (Integer) 4, 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 8, 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", (Integer) 6, 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", (Integer) 11, 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k3", (Integer) 2, 500L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCogroupKeyMixedAggregators() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KGroupedStream groupByKey = stream.groupByKey();
        groupByKey.cogroup(MockAggregator.TOSTRING_REMOVER).cogroup(stream2.groupByKey(), MockAggregator.TOSTRING_ADDER).aggregate(MockInitializer.STRING_INIT, Materialized.as("store1").withValueSerde(Serdes.String())).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("one", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("two", new StringSerializer(), new StringSerializer());
            TestOutputTopic<String, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
            createInputTopic.pipeInput("k1", "1", 0L);
            createInputTopic.pipeInput("k2", "1", 1L);
            createInputTopic.pipeInput("k1", "1", 10L);
            createInputTopic.pipeInput("k2", "1", 100L);
            createInputTopic2.pipeInput("k1", "2", 500L);
            createInputTopic2.pipeInput("k2", "2", 500L);
            createInputTopic2.pipeInput("k1", "2", 500L);
            createInputTopic2.pipeInput("k2", "2", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0-1", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0-1", 1L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0-1-1", 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0-1-1", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0-1-1+2", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0-1-1+2", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0-1-1+2+2", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0-1-1+2+2", 500L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCogroupWithThreeGroupedStreams() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("one", this.stringConsumed);
        KStream stream2 = streamsBuilder.stream("two", this.stringConsumed);
        KStream stream3 = streamsBuilder.stream("three", this.stringConsumed);
        KGroupedStream groupByKey = stream.groupByKey();
        KGroupedStream groupByKey2 = stream2.groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(groupByKey2, STRING_AGGREGATOR).cogroup(stream3.groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("one", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("two", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic3 = topologyTestDriver.createInputTopic("three", new StringSerializer(), new StringSerializer());
            TestOutputTopic<String, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k2", "A", 1L);
            createInputTopic.pipeInput("k1", "A", 10L);
            createInputTopic.pipeInput("k2", "A", 100L);
            createInputTopic2.pipeInput("k2", "B", 100L);
            createInputTopic2.pipeInput("k2", "B", 200L);
            createInputTopic2.pipeInput("k1", "B", 1L);
            createInputTopic2.pipeInput("k2", "B", 500L);
            createInputTopic3.pipeInput("k1", "B", 500L);
            createInputTopic3.pipeInput("k2", "B", 500L);
            createInputTopic3.pipeInput("k3", "B", 500L);
            createInputTopic3.pipeInput("k2", "B", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "A", 1L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AA", 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AA", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AAB", 100L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AABB", 200L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AAB", 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AABBB", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "AABB", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "AABBBB", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k3", "B", 500L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCogroupWithKTableKTableInnerJoin() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGroupedStream groupByKey = streamsBuilder.stream("one", this.stringConsumed).groupByKey();
        groupByKey.cogroup(STRING_AGGREGATOR).cogroup(streamsBuilder.stream("two", this.stringConsumed).groupByKey(), STRING_AGGREGATOR).aggregate(STRING_INITIALIZER, Named.as("name"), Materialized.as("store")).join(streamsBuilder.table("three", this.stringConsumed), MockValueJoiner.TOSTRING_JOINER, Materialized.with(Serdes.String(), Serdes.String())).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("one", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("two", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic3 = topologyTestDriver.createInputTopic("three", new StringSerializer(), new StringSerializer());
            TestOutputTopic<String, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new StringDeserializer(), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 5L);
            createInputTopic2.pipeInput("k2", "B", 6L);
            Assert.assertTrue(createOutputTopic.isEmpty());
            createInputTopic3.pipeInput("k1", "C", 0L);
            createInputTopic3.pipeInput("k2", "D", 10L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "A+C", 5L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "B+D", 10L);
            Assert.assertTrue(createOutputTopic.isEmpty());
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<String, String> testOutputTopic, String str, String str2, long j) {
        MatcherAssert.assertThat(testOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord(str, str2, (Headers) null, Long.valueOf(j))));
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<String, Integer> testOutputTopic, String str, Integer num, long j) {
        MatcherAssert.assertThat(testOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord(str, num, (Headers) null, Long.valueOf(j))));
    }
}
