package org.apache.kafka.streams.tests;

import java.util.Properties;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;

/* loaded from: input_file:org/apache/kafka/streams/tests/StreamsUpgradeTest.class */
public class StreamsUpgradeTest {
    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 1) {
            System.err.println("StreamsUpgradeTest requires one argument (properties-file) but provided none");
        }
        Properties loadProps = Utils.loadProps(strArr[0]);
        System.out.println("StreamsTest instance started (StreamsUpgradeTest v3.4)");
        System.out.println("props=" + loadProps);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.table("data", Consumed.with(SmokeTestUtil.stringSerde, SmokeTestUtil.intSerde)).toStream();
        stream.process(printProcessorSupplier("data"), new String[0]);
        stream.to("echo");
        if (Boolean.parseBoolean(loadProps.getProperty("test.run_fk_join", "false"))) {
            try {
                buildFKTable(stream, streamsBuilder.table("fk", Consumed.with(SmokeTestUtil.intSerde, SmokeTestUtil.stringSerde)));
            } catch (Exception e) {
                System.err.println("Caught " + e.getMessage());
            }
        }
        Properties properties = new Properties();
        properties.setProperty("application.id", "StreamsUpgradeTest");
        properties.put("commit.interval.ms", 1000);
        properties.putAll(loadProps);
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        kafkaStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            kafkaStreams.close();
            System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
            System.out.flush();
        }));
    }

    private static void buildFKTable(KStream<String, Integer> kStream, KTable<Integer, String> kTable) {
        KStream stream = kStream.toTable().join(kTable, num -> {
            return num;
        }, (num2, str) -> {
            return str;
        }).toStream();
        stream.process(printProcessorSupplier("fk"), new String[0]);
        stream.to("fk-result", Produced.with(SmokeTestUtil.stringSerde, SmokeTestUtil.stringSerde));
    }

    private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut, VOut> printProcessorSupplier(String str) {
        return () -> {
            return new ContextualProcessor<KIn, VIn, KOut, VOut>() { // from class: org.apache.kafka.streams.tests.StreamsUpgradeTest.1
                private int numRecordsProcessed = 0;

                public void init(ProcessorContext<KOut, VOut> processorContext) {
                    System.out.println("[3.4] initializing processor: topic=" + str + "taskId=" + processorContext.taskId());
                    this.numRecordsProcessed = 0;
                }

                public void process(Record<KIn, VIn> record) {
                    this.numRecordsProcessed++;
                    if (this.numRecordsProcessed % 100 == 0) {
                        System.out.println("processed " + this.numRecordsProcessed + " records from topic=" + str);
                    }
                }

                public void close() {
                }
            };
        };
    }
}
