package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/SelfJoinUpgradeIntegrationTest.class */
public class SelfJoinUpgradeIntegrationTest {
    public static final String INPUT_TOPIC = "selfjoin-input";
    public static final String OUTPUT_TOPIC = "selfjoin-output";
    private String inputTopic;
    private String outputTopic;
    private KafkaStreams kafkaStreams;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void createTopics() throws Exception {
        this.inputTopic = INPUT_TOPIC + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.outputTopic = OUTPUT_TOPIC + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        CLUSTER.createTopic(this.inputTopic);
        CLUSTER.createTopic(this.outputTopic);
    }

    private Properties props() {
        Properties properties = new Properties();
        properties.put("application.id", "app-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.String().getClass());
        properties.put("commit.interval.ms", 1000L);
        properties.put("auto.offset.reset", "earliest");
        return properties;
    }

    @After
    public void shutdown() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
            this.kafkaStreams.cleanUp();
        }
    }

    @Test
    public void shouldUpgradeWithTopologyOptimizationOff() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
        stream.join(stream, (str, str2) -> {
            return str + str2;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(100L))).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
        Properties props = props();
        props.put("topology.optimization", "none");
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        long milliseconds = CLUSTER.time.milliseconds();
        processKeyValueAndVerifyCount("1", "A", milliseconds + 42, Arrays.asList(new KeyValueTimestamp("1", "AA", milliseconds + 42)));
        processKeyValueAndVerifyCount("1", "B", milliseconds + 43, Arrays.asList(new KeyValueTimestamp("1", "BA", milliseconds + 43), new KeyValueTimestamp("1", "AB", milliseconds + 43), new KeyValueTimestamp("1", "BB", milliseconds + 43)));
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        props.put("topology.optimization", "all");
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        long milliseconds2 = CLUSTER.time.milliseconds();
        processKeyValueAndVerifyCount("1", "C", milliseconds2 + 44, Arrays.asList(new KeyValueTimestamp("1", "CA", milliseconds2 + 44), new KeyValueTimestamp("1", "CB", milliseconds2 + 44), new KeyValueTimestamp("1", "AC", milliseconds2 + 44), new KeyValueTimestamp("1", "BC", milliseconds2 + 44), new KeyValueTimestamp("1", "CC", milliseconds2 + 44)));
        this.kafkaStreams.close();
    }

    @Test
    public void shouldRestartWithTopologyOptimizationOn() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
        stream.join(stream, (str, str2) -> {
            return str + str2;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(100L))).to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
        Properties props = props();
        props.put("topology.optimization", "all");
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        long milliseconds = CLUSTER.time.milliseconds();
        processKeyValueAndVerifyCount("1", "A", milliseconds + 42, Arrays.asList(new KeyValueTimestamp("1", "AA", milliseconds + 42)));
        processKeyValueAndVerifyCount("1", "B", milliseconds + 43, Arrays.asList(new KeyValueTimestamp("1", "BA", milliseconds + 43), new KeyValueTimestamp("1", "AB", milliseconds + 43), new KeyValueTimestamp("1", "BB", milliseconds + 43)));
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        props.put("topology.optimization", "all");
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        long milliseconds2 = CLUSTER.time.milliseconds();
        processKeyValueAndVerifyCount("1", "C", milliseconds2 + 44, Arrays.asList(new KeyValueTimestamp("1", "CA", milliseconds2 + 44), new KeyValueTimestamp("1", "CB", milliseconds2 + 44), new KeyValueTimestamp("1", "AC", milliseconds2 + 44), new KeyValueTimestamp("1", "BC", milliseconds2 + 44), new KeyValueTimestamp("1", "CC", milliseconds2 + 44)));
        this.kafkaStreams.close();
    }

    private <K, V> boolean processKeyValueAndVerifyCount(K k, V v, long j, List<KeyValueTimestamp<K, V>> list) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(KeyValue.pair(k, v)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), Long.valueOf(j));
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "group-" + safeUniqueTestName);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", StringDeserializer.class.getName());
        properties.setProperty("value.deserializer", StringDeserializer.class.getName());
        properties.put("window.size.ms", 500L);
        List waitUntilMinKeyValueWithTimestampRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(properties, this.outputTopic, list.size(), IntegrationTestUtils.DEFAULT_TIMEOUT);
        MatcherAssert.assertThat(waitUntilMinKeyValueWithTimestampRecordsReceived, Is.is(list));
        return waitUntilMinKeyValueWithTimestampRecordsReceived.equals(list);
    }
}
