/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.CogroupedKStream;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SessionWindowedCogroupedKStreamImplTest {
    private final StreamsBuilder builder = new StreamsBuilder();
    private static final String TOPIC = "topic";
    private static final String TOPIC2 = "topic2";
    private static final String OUTPUT = "output";
    private final Merger<String, String> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + "+" + aggTwo;
    private KGroupedStream<String, String> groupedStream;
    private KGroupedStream<String, String> groupedStream2;
    private CogroupedKStream<String, String> cogroupedStream;
    private SessionWindowedCogroupedKStream<String, String> windowedCogroupedStream;
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Before
    public void setup() {
        KStream stream = this.builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream2 = this.builder.stream(TOPIC2, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream = stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream2 = stream2.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.cogroupedStream = this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).cogroup(this.groupedStream2, MockAggregator.TOSTRING_REMOVER);
        this.windowedCogroupedStream = this.cogroupedStream.windowedBy(SessionWindows.with((Duration)Duration.ofMillis(100L)));
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(null, this.sessionMerger));
    }

    @Test
    public void shouldNotHaveNullSessionMergerOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, null));
    }

    @Test
    public void shouldNotHaveNullMaterializedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, (Named)null));
    }

    @Test
    public void shouldNotHaveNullSessionMerger2OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"test")));
    }

    @Test
    public void shouldNotHaveNullInitializer2OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(null, this.sessionMerger, Materialized.as((String)"test")));
    }

    @Test
    public void shouldNotHaveNullMaterialized2OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Named.as((String)"name"), null));
    }

    @Test
    public void shouldNotHaveNullSessionMerger3OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, null, Named.as((String)"name"), Materialized.as((String)"test")));
    }

    @Test
    public void shouldNotHaveNullNamedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, null, Materialized.as((String)"test")));
    }

    @Test
    public void shouldNotHaveNullInitializer3OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(null, this.sessionMerger, Named.as((String)"name"), Materialized.as((String)"test")));
    }

    @Test
    public void shouldNotHaveNullNamed2OnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, (Named)null));
    }

    @Test
    public void namedParamShouldSetName() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream = stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(1L))).aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Named.as((String)"foo"));
        MatcherAssert.assertThat((Object)builder.build().describe().toString(), (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> foo-cogroup-agg-0\n    Processor: foo-cogroup-agg-0 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> foo-cogroup-merge\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: foo-cogroup-merge (stores: [])\n      --> none\n      <-- foo-cogroup-agg-0\n\n"));
    }

    @Test
    public void sessionWindowAggregateTest() {
        KTable customers = this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(500L))).aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new SessionWindowedDeserializer((Deserializer)new StringDeserializer()), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k1", (Object)"B", 599L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 607L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+B", 599L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+B", 607L);
        }
    }

    @Test
    public void sessionWindowAggregate2Test() {
        KTable customers = this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(SessionWindows.with((Duration)Duration.ofMillis(500L))).aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new SessionWindowedDeserializer((Deserializer)new StringDeserializer()), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 599L);
            testInputTopic.pipeInput((Object)"k1", (Object)"B", 607L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+0+A+A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+B", 599L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+B", 607L);
        }
    }

    @Test
    public void sessionWindowAggregateTest2StreamsTest() {
        KTable customers = this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new SessionWindowedDeserializer((Deserializer)new StringDeserializer()), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 84L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 113L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 199L);
            testInputTopic.pipeInput((Object)"k1", (Object)"B", 300L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 301L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 400L);
            testInputTopic.pipeInput((Object)"k1", (Object)"B", 400L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", null, 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+0+A+A", 84L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", null, 84L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+0+0+A+A+A", 113L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", null, 113L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+0+0+0+A+A+A+A", 199L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+B", 300L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+B", 301L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", null, 301L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+0+B+B", 400L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", null, 300L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+0+B+B", 400L);
        }
    }

    @Test
    public void sessionWindowMixAggregatorsTest() {
        KTable customers = this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic testInputTopic2 = driver.createInputTopic(TOPIC2, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new SessionWindowedDeserializer((Deserializer)new StringDeserializer()), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 1L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 2L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"B", 3L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 3L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 444L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"B", 444L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", null, 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+0+A+A", 1L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", null, 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+0+A+A", 2L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", null, 2L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+0+0+A+A-B", 3L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", null, 1L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+0+0+A+A-B", 3L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0-B", 444L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0-B", 444L);
        }
    }

    @Test
    public void sessionWindowMixAggregatorsManyWindowsTest() {
        KTable customers = this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, this.sessionMerger, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic testInputTopic2 = driver.createInputTopic(TOPIC2, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new SessionWindowedDeserializer((Deserializer)new StringDeserializer()), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 0L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 1L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 2L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"B", 3L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 500L);
            testInputTopic2.pipeInput((Object)"k2", (Object)"B", 501L);
            testInputTopic2.pipeInput((Object)"k1", (Object)"B", 501L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+A", 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", null, 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+0+A+A", 1L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", null, 0L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+0+A+A", 2L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", null, 2L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+0+0+A+A-B", 3L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0-B", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", null, 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+0-B-B", 501L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0-B", 501L);
        }
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<Windowed<String>, String> outputTopic, String expectedKey, String expectedValue, long expectedTimestamp) {
        TestRecord realRecord = outputTopic.readRecord();
        TestRecord nonWindowedRecord = new TestRecord((Object)((String)((Windowed)realRecord.getKey()).key()), (Object)((String)realRecord.getValue()), null, realRecord.timestamp());
        TestRecord testRecord = new TestRecord((Object)expectedKey, (Object)expectedValue, null, Long.valueOf(expectedTimestamp));
        MatcherAssert.assertThat((Object)nonWindowedRecord, (Matcher)CoreMatchers.equalTo((Object)testRecord));
    }
}

