package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImplTest.class */
public class TimeWindowedKStreamImplTest {
    private static final String TOPIC = "input";
    private static final Windowed<String> KEY_1_WINDOW_0 = new Windowed<>("1", new TimeWindow(0, 500));
    private static final Windowed<String> KEY_1_WINDOW_1 = new Windowed<>("1", new TimeWindow(500, 1000));
    private static final Windowed<String> KEY_2_WINDOW_1 = new Windowed<>("2", new TimeWindow(500, 1000));
    private static final Windowed<String> KEY_2_WINDOW_2 = new Windowed<>("2", new TimeWindow(1000, 1500));
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());
    private TimeWindowedKStream<String, String> windowedStream;

    @Parameterized.Parameter
    public EmitStrategy.StrategyType type;

    @Parameterized.Parameter(1)
    public boolean withCache;
    private EmitStrategy emitStrategy;
    private boolean emitFinal;

    @Parameterized.Parameters(name = "{0}_cache:{1}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true}, new Object[]{EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false}, new Object[]{EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true}, new Object[]{EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false});
    }

    @Before
    public void before() {
        this.emitFinal = this.type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
        this.emitStrategy = EmitStrategy.StrategyType.forType(this.type);
        this.props.setProperty("__emit.interval.ms.kstreams.windowed.aggregation__", "0");
        this.windowedStream = this.builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())).windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(500L)));
    }

    @Test
    public void shouldCountWindowed() {
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        this.windowedStream.emitStrategy(this.emitStrategy).count().toStream().process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            topologyTestDriver.close();
            ArrayList processed = mockApiProcessorSupplier.theCapturedProcessor().processed();
            if (this.emitFinal) {
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(KEY_1_WINDOW_0, 2L, 15L), new KeyValueTimestamp(KEY_1_WINDOW_1, 1L, 500L), new KeyValueTimestamp(KEY_2_WINDOW_1, 2L, 550L)), processed);
            } else {
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(KEY_1_WINDOW_0, 1L, 10L), new KeyValueTimestamp(KEY_1_WINDOW_0, 2L, 15L), new KeyValueTimestamp(KEY_1_WINDOW_1, 1L, 500L), new KeyValueTimestamp(KEY_2_WINDOW_1, 1L, 550L), new KeyValueTimestamp(KEY_2_WINDOW_1, 2L, 550L), new KeyValueTimestamp(KEY_2_WINDOW_2, 1L, 1000L)), processed);
            }
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldReduceWindowed() {
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        this.windowedStream.emitStrategy(this.emitStrategy).reduce(MockReducer.STRING_ADDER).toStream().process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            topologyTestDriver.close();
            ArrayList processed = mockApiProcessorSupplier.theCapturedProcessor().processed();
            if (this.emitFinal) {
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(KEY_1_WINDOW_0, "1+2", 15L), new KeyValueTimestamp(KEY_1_WINDOW_1, "3", 500L), new KeyValueTimestamp(KEY_2_WINDOW_1, "10+20", 550L)), processed);
            } else {
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(KEY_1_WINDOW_0, "1", 10L), new KeyValueTimestamp(KEY_1_WINDOW_0, "1+2", 15L), new KeyValueTimestamp(KEY_1_WINDOW_1, "3", 500L), new KeyValueTimestamp(KEY_2_WINDOW_1, "10", 550L), new KeyValueTimestamp(KEY_2_WINDOW_1, "10+20", 550L), new KeyValueTimestamp(KEY_2_WINDOW_2, "30", 1000L)), processed);
            }
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldAggregateWindowed() {
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        this.windowedStream.emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, setMaterializedCache(Materialized.with(Serdes.String(), Serdes.String()))).toStream().process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            topologyTestDriver.close();
            ArrayList processed = mockApiProcessorSupplier.theCapturedProcessor().processed();
            if (this.emitFinal) {
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(KEY_1_WINDOW_0, "0+1+2", 15L), new KeyValueTimestamp(KEY_1_WINDOW_1, "0+3", 500L), new KeyValueTimestamp(KEY_2_WINDOW_1, "0+10+20", 550L)), processed);
            } else {
                Assert.assertEquals(Arrays.asList(new KeyValueTimestamp(KEY_1_WINDOW_0, "0+1", 10L), new KeyValueTimestamp(KEY_1_WINDOW_0, "0+1+2", 15L), new KeyValueTimestamp(KEY_1_WINDOW_1, "0+3", 500L), new KeyValueTimestamp(KEY_2_WINDOW_1, "0+10", 550L), new KeyValueTimestamp(KEY_2_WINDOW_1, "0+10+20", 550L), new KeyValueTimestamp(KEY_2_WINDOW_2, "0+30", 1000L)), processed);
            }
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldMaterializeCount() {
        this.windowedStream.emitStrategy(this.emitStrategy).count(setMaterializedCache(Materialized.as("count-store").withKeySerde(Serdes.String()).withValueSerde(Serdes.Long())));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            List list = StreamsTestUtils.toList(topologyTestDriver.getWindowStore("count-store").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            if (this.withCache) {
                MatcherAssert.assertThat(list, CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 500L)), 2L), KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), 1L), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), 2L), KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), 1L))));
            } else if (this.emitFinal) {
                MatcherAssert.assertThat(list, CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), 1L), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), 2L), KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), 1L))));
            } else {
                MatcherAssert.assertThat(list, CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), 1L))));
            }
            List list2 = StreamsTestUtils.toList(topologyTestDriver.getTimestampedWindowStore("count-store").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            if (this.withCache) {
                MatcherAssert.assertThat(list2, CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 500L)), ValueAndTimestamp.make(2L, 15L)), KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), ValueAndTimestamp.make(1L, 500L)), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), ValueAndTimestamp.make(2L, 550L)), KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), ValueAndTimestamp.make(1L, 1000L)))));
            } else if (this.emitFinal) {
                MatcherAssert.assertThat(list2, CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), ValueAndTimestamp.make(1L, 500L)), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), ValueAndTimestamp.make(2L, 550L)), KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), ValueAndTimestamp.make(1L, 1000L)))));
            } else {
                MatcherAssert.assertThat(list2, CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), ValueAndTimestamp.make(1L, 1000L)))));
            }
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldMaterializeReduced() {
        this.windowedStream.reduce(MockReducer.STRING_ADDER, setMaterializedCache(Materialized.as("reduced").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            List list = StreamsTestUtils.toList(topologyTestDriver.getWindowStore("reduced").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            if (this.withCache) {
                MatcherAssert.assertThat(list, CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 500L)), "1+2"), KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), "3"), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), "10+20"), KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), "30"))));
            } else {
                MatcherAssert.assertThat(list, CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), "30"))));
            }
            List list2 = StreamsTestUtils.toList(topologyTestDriver.getTimestampedWindowStore("reduced").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            if (this.withCache) {
                MatcherAssert.assertThat(list2, CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 500L)), ValueAndTimestamp.make("1+2", 15L)), KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), ValueAndTimestamp.make("3", 500L)), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), ValueAndTimestamp.make("10+20", 550L)), KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), ValueAndTimestamp.make("30", 1000L)))));
            } else {
                MatcherAssert.assertThat(list2, CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), ValueAndTimestamp.make("30", 1000L)))));
            }
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldMaterializeAggregated() {
        this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, setMaterializedCache(Materialized.as("aggregated").withKeySerde(Serdes.String()).withValueSerde(Serdes.String())));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            List list = StreamsTestUtils.toList(topologyTestDriver.getWindowStore("aggregated").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            if (this.withCache) {
                MatcherAssert.assertThat(list, CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 500L)), "0+1+2"), KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), "0+3"), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), "0+10+20"), KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), "0+30"))));
            } else {
                MatcherAssert.assertThat(list, CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), "0+30"))));
            }
            List list2 = StreamsTestUtils.toList(topologyTestDriver.getTimestampedWindowStore("aggregated").fetch("1", "2", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000L)));
            if (this.withCache) {
                MatcherAssert.assertThat(list2, CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new TimeWindow(0L, 500L)), ValueAndTimestamp.make("0+1+2", 15L)), KeyValue.pair(new Windowed("1", new TimeWindow(500L, 1000L)), ValueAndTimestamp.make("0+3", 500L)), KeyValue.pair(new Windowed("2", new TimeWindow(500L, 1000L)), ValueAndTimestamp.make("0+10+20", 550L)), KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), ValueAndTimestamp.make("0+30", 1000L)))));
            } else {
                MatcherAssert.assertThat(list2, CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair(new Windowed("2", new TimeWindow(1000L, 1500L)), ValueAndTimestamp.make("0+30", 1000L)))));
            }
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER);
        });
    }

    @Test
    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.reduce((Reducer) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, setMaterializedCache(Materialized.as("store")));
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, setMaterializedCache(Materialized.as("store")));
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.reduce((Reducer) null, setMaterializedCache(Materialized.as("store")));
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnMaterializedReduceIfNamedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.reduce(MockReducer.STRING_ADDER, (Named) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.windowedStream.count((Materialized) null);
        });
    }

    private void processData(TopologyTestDriver topologyTestDriver) {
        TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC, new StringSerializer(), new StringSerializer());
        createInputTopic.pipeInput("1", "1", 10L);
        createInputTopic.pipeInput("1", "2", 15L);
        createInputTopic.pipeInput("1", "3", 500L);
        createInputTopic.pipeInput("2", "10", 550L);
        createInputTopic.pipeInput("2", "20", 500L);
        createInputTopic.pipeInput("2", "30", 1000L);
    }

    private <K, V, S extends StateStore> Materialized<K, V, S> setMaterializedCache(Materialized<K, V, S> materialized) {
        return this.withCache ? materialized.withCachingEnabled() : materialized.withCachingDisabled();
    }
}
