package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.CogroupedKStream;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/SessionWindowedCogroupedKStreamImplTest.class */
public class SessionWindowedCogroupedKStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String TOPIC2 = "topic2";
    private static final String OUTPUT = "output";
    private KGroupedStream<String, String> groupedStream;
    private KGroupedStream<String, String> groupedStream2;
    private CogroupedKStream<String, String> cogroupedStream;
    private SessionWindowedCogroupedKStream<String, String> windowedCogroupedStream;
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Merger<String, String> sessionMerger = (str, str2, str3) -> {
        return str2 + "+" + str3;
    };
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());

    @Before
    public void setup() {
        KStream stream = this.builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String()));
        KStream stream2 = this.builder.stream("topic2", Consumed.with(Serdes.String(), Serdes.String()));
        this.groupedStream = stream.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
        this.groupedStream2 = stream2.groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
        this.cogroupedStream = this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).cogroup(this.groupedStream2, MockAggregator.TOSTRING_REMOVER);
        this.windowedCogroupedStream = this.cogroupedStream.windowedBy(SessionWindows.with(Duration.ofMillis(100L)));
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate((Initializer) null, this.sessionMerger);
        });
    }

    @Test
    public void shouldNotHaveNullSessionMergerOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, (Merger) null);
        });
    }

    @Test
    public void shouldNotHaveNullMaterializedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, (Named) null);
        });
    }

    @Test
    public void shouldNotHaveNullSessionMerger2OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, (Merger) null, Materialized.as("test"));
        });
    }

    @Test
    public void shouldNotHaveNullInitializer2OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate((Initializer) null, this.sessionMerger, Materialized.as("test"));
        });
    }

    @Test
    public void shouldNotHaveNullMaterialized2OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Named.as("name"), (Materialized) null);
        });
    }

    @Test
    public void shouldNotHaveNullSessionMerger3OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, (Merger) null, Named.as("name"), Materialized.as("test"));
        });
    }

    @Test
    public void shouldNotHaveNullNamedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, (Named) null, Materialized.as("test"));
        });
    }

    @Test
    public void shouldNotHaveNullInitializer3OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate((Initializer) null, this.sessionMerger, Named.as("name"), Materialized.as("test"));
        });
    }

    @Test
    public void shouldNotHaveNullNamed2OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, (Named) null);
        });
    }

    @Test
    public void namedParamShouldSetName() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        this.groupedStream = streamsBuilder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
        this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(SessionWindows.with(Duration.ofMillis(1L))).aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Named.as("foo"));
        MatcherAssert.assertThat(streamsBuilder.build().describe().toString(), CoreMatchers.equalTo("Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> foo-cogroup-agg-0\n    Processor: foo-cogroup-agg-0 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> foo-cogroup-merge\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: foo-cogroup-merge (stores: [])\n      --> none\n      <-- foo-cogroup-agg-0\n\n"));
    }

    @Test
    public void sessionWindowAggregateTest() {
        this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(SessionWindows.with(Duration.ofMillis(500L))).aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with(Serdes.String(), Serdes.String())).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            TestOutputTopic<Windowed<String>, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new SessionWindowedDeserializer(new StringDeserializer()), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k2", "A", 0L);
            createInputTopic.pipeInput("k1", "B", 599L);
            createInputTopic.pipeInput("k2", "B", 607L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+B", 599L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+B", 607L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void sessionWindowAggregate2Test() {
        this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(SessionWindows.with(Duration.ofMillis(500L))).aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with(Serdes.String(), Serdes.String())).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            TestOutputTopic<Windowed<String>, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new SessionWindowedDeserializer(new StringDeserializer()), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k2", "B", 599L);
            createInputTopic.pipeInput("k1", "B", 607L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+0+A+A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+B", 599L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+B", 607L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void sessionWindowAggregateTest2StreamsTest() {
        this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with(Serdes.String(), Serdes.String())).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            TestOutputTopic<Windowed<String>, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new SessionWindowedDeserializer(new StringDeserializer()), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k1", "A", 84L);
            createInputTopic.pipeInput("k1", "A", 113L);
            createInputTopic.pipeInput("k1", "A", 199L);
            createInputTopic.pipeInput("k1", "B", 300L);
            createInputTopic.pipeInput("k2", "B", 301L);
            createInputTopic.pipeInput("k2", "B", 400L);
            createInputTopic.pipeInput("k1", "B", 400L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", null, 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+0+A+A", 84L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", null, 84L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+0+0+A+A+A", 113L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", null, 113L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+0+0+0+A+A+A+A", 199L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+B", 300L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+B", 301L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", null, 301L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+0+B+B", 400L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", null, 300L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+0+B+B", 400L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void sessionWindowMixAggregatorsTest() {
        this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with(Serdes.String(), Serdes.String())).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic2", new StringSerializer(), new StringSerializer());
            TestOutputTopic<Windowed<String>, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new SessionWindowedDeserializer(new StringDeserializer()), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k2", "A", 0L);
            createInputTopic.pipeInput("k2", "A", 1L);
            createInputTopic.pipeInput("k1", "A", 2L);
            createInputTopic2.pipeInput("k1", "B", 3L);
            createInputTopic2.pipeInput("k2", "B", 3L);
            createInputTopic2.pipeInput("k2", "B", 444L);
            createInputTopic2.pipeInput("k1", "B", 444L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", null, 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+0+A+A", 1L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", null, 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+0+A+A", 2L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", null, 2L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+0+0+A+A-B", 3L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", null, 1L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+0+0+A+A-B", 3L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0-B", 444L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0-B", 444L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void sessionWindowMixAggregatorsManyWindowsTest() {
        this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with(Serdes.String(), Serdes.String())).toStream().to(OUTPUT);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("topic2", new StringSerializer(), new StringSerializer());
            TestOutputTopic<Windowed<String>, String> createOutputTopic = topologyTestDriver.createOutputTopic(OUTPUT, new SessionWindowedDeserializer(new StringDeserializer()), new StringDeserializer());
            createInputTopic.pipeInput("k1", "A", 0L);
            createInputTopic.pipeInput("k2", "A", 0L);
            createInputTopic.pipeInput("k2", "A", 1L);
            createInputTopic.pipeInput("k1", "A", 2L);
            createInputTopic2.pipeInput("k1", "B", 3L);
            createInputTopic2.pipeInput("k2", "B", 500L);
            createInputTopic2.pipeInput("k2", "B", 501L);
            createInputTopic2.pipeInput("k1", "B", 501L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+A", 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", null, 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+0+A+A", 1L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", null, 0L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+0+A+A", 2L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", null, 2L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0+0+0+A+A-B", 3L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0-B", 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", null, 500L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k2", "0+0-B-B", 501L);
            assertOutputKeyValueTimestamp(createOutputTopic, "k1", "0-B", 501L);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<Windowed<String>, String> testOutputTopic, String str, String str2, long j) {
        TestRecord readRecord = testOutputTopic.readRecord();
        MatcherAssert.assertThat(new TestRecord((String) ((Windowed) readRecord.getKey()).key(), (String) readRecord.getValue(), (Headers) null, readRecord.timestamp()), CoreMatchers.equalTo(new TestRecord(str, str2, (Headers) null, Long.valueOf(j))));
    }
}
