/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.CogroupedKStream;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SlidingWindowedCogroupedKStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String TOPIC2 = "topic2";
    private static final String OUTPUT = "output";
    private static final long WINDOW_SIZE_MS = 500L;
    private final StreamsBuilder builder = new StreamsBuilder();
    private KGroupedStream<String, String> groupedStream;
    private TimeWindowedCogroupedKStream<String, String> windowedCogroupedStream;
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());

    @Before
    public void setup() {
        KStream stream = this.builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KStream stream2 = this.builder.stream(TOPIC2, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream = stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        KGroupedStream groupedStream2 = stream2.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        CogroupedKStream cogroupedStream = this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).cogroup(groupedStream2, MockAggregator.TOSTRING_REMOVER);
        this.windowedCogroupedStream = cogroupedStream.windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(500L), (Duration)Duration.ofMillis(2000L)));
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(null));
    }

    @Test
    public void shouldNotHaveNullMaterializedOnTwoOptionAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, (Materialized)null));
    }

    @Test
    public void shouldNotHaveNullNamedTwoOptionOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, (Named)null));
    }

    @Test
    public void shouldNotHaveNullInitializerTwoOptionNamedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(null, Named.as((String)"test")));
    }

    @Test
    public void shouldNotHaveNullInitializerTwoOptionMaterializedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(null, Materialized.as((String)"test")));
    }

    @Test
    public void shouldNotHaveNullInitializerThreeOptionOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(null, Named.as((String)"test"), Materialized.as((String)"test")));
    }

    @Test
    public void shouldNotHaveNullMaterializedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, Named.as((String)"Test"), null));
    }

    @Test
    public void shouldNotHaveNullNamedOnAggregate() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, null, Materialized.as((String)"test")));
    }

    @Test
    public void namedParamShouldSetName() {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream = stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(500L), (Duration)Duration.ofMillis(2000L))).aggregate(MockInitializer.STRING_INIT, Named.as((String)"foo"));
        MatcherAssert.assertThat((Object)builder.build().describe().toString(), (Matcher)CoreMatchers.equalTo((Object)"Topologies:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n      --> foo-cogroup-agg-0\n    Processor: foo-cogroup-agg-0 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000001])\n      --> foo-cogroup-merge\n      <-- KSTREAM-SOURCE-0000000000\n    Processor: foo-cogroup-merge (stores: [])\n      --> none\n      <-- foo-cogroup-agg-0\n\n"));
    }

    @Test
    public void slidingWindowAggregateStreamsTest() {
        KTable customers = this.windowedCogroupedStream.aggregate(MockInitializer.STRING_INIT, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(500L)), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 500L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 500L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 501L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 502L);
            testInputTopic.pipeInput((Object)"k1", (Object)"B", 503L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 503L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 504L);
            testInputTopic.pipeInput((Object)"k1", (Object)"B", 504L);
            List results = testOutputTopic.readRecordsToList();
            LinkedList<TestRecord> expected = new LinkedList<TestRecord>();
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(0L, 500L)), (Object)"0+A", null, Long.valueOf(500L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(0L, 500L)), (Object)"0+A", null, Long.valueOf(500L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(501L, 1001L)), (Object)"0+A", null, Long.valueOf(501L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(1L, 501L)), (Object)"0+A+A", null, Long.valueOf(501L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(501L, 1001L)), (Object)"0+A", null, Long.valueOf(502L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(2L, 502L)), (Object)"0+A+A", null, Long.valueOf(502L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(501L, 1001L)), (Object)"0+A+B", null, Long.valueOf(503L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(503L, 1003L)), (Object)"0+B", null, Long.valueOf(503L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(3L, 503L)), (Object)"0+A+A+B", null, Long.valueOf(503L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(501L, 1001L)), (Object)"0+A+B", null, Long.valueOf(503L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(502L, 1002L)), (Object)"0+B", null, Long.valueOf(503L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(3L, 503L)), (Object)"0+A+A+B", null, Long.valueOf(503L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(502L, 1002L)), (Object)"0+B+B", null, Long.valueOf(504L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(501L, 1001L)), (Object)"0+A+B+B", null, Long.valueOf(504L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(504L, 1004L)), (Object)"0+B", null, Long.valueOf(504L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k2", (Window)new TimeWindow(4L, 504L)), (Object)"0+A+A+B+B", null, Long.valueOf(504L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(503L, 1003L)), (Object)"0+B+B", null, Long.valueOf(504L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(501L, 1001L)), (Object)"0+A+B+B", null, Long.valueOf(504L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(504L, 1004L)), (Object)"0+B", null, Long.valueOf(504L)));
            expected.add(new TestRecord((Object)new Windowed((Object)"k1", (Window)new TimeWindow(4L, 504L)), (Object)"0+A+A+B+B", null, Long.valueOf(504L)));
            Assert.assertEquals(expected, (Object)results);
        }
    }

    @Test
    public void slidingWindowAggregateOverlappingWindowsTest() {
        KTable customers = this.groupedStream.cogroup(MockAggregator.TOSTRING_ADDER).windowedBy(SlidingWindows.withTimeDifferenceAndGrace((Duration)Duration.ofMillis(500L), (Duration)Duration.ofMillis(2000L))).aggregate(MockInitializer.STRING_INIT, Materialized.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        customers.toStream().to(OUTPUT);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic(TOPIC, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic testOutputTopic = driver.createOutputTopic(OUTPUT, (Deserializer)new TimeWindowedDeserializer((Deserializer)new StringDeserializer(), Long.valueOf(500L)), (Deserializer)new StringDeserializer());
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 500L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 500L);
            testInputTopic.pipeInput((Object)"k1", (Object)"B", 750L);
            testInputTopic.pipeInput((Object)"k2", (Object)"B", 750L);
            testInputTopic.pipeInput((Object)"k2", (Object)"A", 1000L);
            testInputTopic.pipeInput((Object)"k1", (Object)"A", 1000L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+A", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+A", 500L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+B", 750L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+A+B", 750L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+B", 750L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+A+B", 750L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+B+A", 1000L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+A", 1000L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k2", "0+A+B+A", 1000L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+B+A", 1000L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+A", 1000L);
            this.assertOutputKeyValueTimestamp((TestOutputTopic<Windowed<String>, String>)testOutputTopic, "k1", "0+A+B+A", 1000L);
        }
    }

    private void assertOutputKeyValueTimestamp(TestOutputTopic<Windowed<String>, String> outputTopic, String expectedKey, String expectedValue, long expectedTimestamp) {
        TestRecord realRecord = outputTopic.readRecord();
        TestRecord nonWindowedRecord = new TestRecord((Object)((String)((Windowed)realRecord.getKey()).key()), (Object)((String)realRecord.getValue()), null, realRecord.timestamp());
        TestRecord testRecord = new TestRecord((Object)expectedKey, (Object)expectedValue, null, Long.valueOf(expectedTimestamp));
        MatcherAssert.assertThat((Object)nonWindowedRecord, (Matcher)CoreMatchers.equalTo((Object)testRecord));
    }
}

