/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ProcessorTopologyTest {
    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
    private static final String DEFAULT_STORE_NAME = "prefixScanStore";
    private static final String DEFAULT_PREFIX = "key";
    private static final String INPUT_TOPIC_1 = "input-topic-1";
    private static final String INPUT_TOPIC_2 = "input-topic-2";
    private static final String OUTPUT_TOPIC_1 = "output-topic-1";
    private static final String OUTPUT_TOPIC_2 = "output-topic-2";
    private static final String THROUGH_TOPIC_1 = "through-topic-1";
    private static final Header HEADER = new RecordHeader("key", "value".getBytes());
    private static final Headers HEADERS = new RecordHeaders(new Header[]{HEADER});
    private final TopologyWrapper topology = new TopologyWrapper();
    private final MockApiProcessorSupplier<?, ?, ?, ?> mockProcessorSupplier = new MockApiProcessorSupplier();
    private TopologyTestDriver driver;
    private final Properties props = new Properties();

    @Before
    public void setup() {
        File localState = TestUtils.tempDirectory();
        this.props.setProperty("state.dir", localState.getAbsolutePath());
        this.props.setProperty("default.key.serde", Serdes.String().getClass().getName());
        this.props.setProperty("default.value.serde", Serdes.String().getClass().getName());
        this.props.setProperty("default.timestamp.extractor", CustomTimestampExtractor.class.getName());
    }

    @After
    public void cleanup() {
        this.props.clear();
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    private List<KeyValue<String, String>> prefixScanResults(KeyValueStore<String, String> store, String prefix) {
        ArrayList<KeyValue<String, String>> results = new ArrayList<KeyValue<String, String>>();
        try (KeyValueIterator prefixScan = store.prefixScan((Object)prefix, Serdes.String().serializer());){
            while (prefixScan.hasNext()) {
                KeyValue next = (KeyValue)prefixScan.next();
                results.add((KeyValue<String, String>)next);
            }
        }
        return results;
    }

    @Test
    public void testTopologyMetadata() {
        this.topology.addSource("source-1", new String[]{"topic-1"});
        this.topology.addSource("source-2", new String[]{"topic-2", "topic-3"});
        this.topology.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.topology.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source-1", "source-2"});
        this.topology.addSink("sink-1", "topic-3", new String[]{"processor-1"});
        this.topology.addSink("sink-2", "topic-4", new String[]{"processor-1", "processor-2"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder("X").buildTopology();
        Assert.assertEquals((long)6L, (long)processorTopology.processors().size());
        Assert.assertEquals((long)2L, (long)processorTopology.sources().size());
        Assert.assertEquals((long)3L, (long)processorTopology.sourceTopics().size());
        Assert.assertNotNull((Object)processorTopology.source("topic-1"));
        Assert.assertNotNull((Object)processorTopology.source("topic-2"));
        Assert.assertNotNull((Object)processorTopology.source("topic-3"));
        Assert.assertEquals((Object)processorTopology.source("topic-2"), (Object)processorTopology.source("topic-3"));
    }

    @Test
    public void shouldGetTerminalNodes() {
        this.topology.addSource("source-1", new String[]{"topic-1"});
        this.topology.addSource("source-2", new String[]{"topic-2", "topic-3"});
        this.topology.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.topology.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source-1", "source-2"});
        this.topology.addSink("sink-1", "topic-3", new String[]{"processor-1"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder("X").buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.terminalNodes(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new String[]{"processor-2", "sink-1"})));
    }

    @Test
    public void shouldUpdateSourceTopicsWithNewMatchingTopic() {
        String sourceNode = "source-1";
        String topic = "topic-1";
        String newTopic = "topic-2";
        this.topology.addSource("source-1", new String[]{"topic-1"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder("X").buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.source("topic-2"), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
        processorTopology.updateSourceTopics(Collections.singletonMap("source-1", Arrays.asList("topic-1", "topic-2")));
        MatcherAssert.assertThat((Object)processorTopology.source("topic-2").name(), (Matcher)CoreMatchers.equalTo((Object)"source-1"));
    }

    @Test
    public void shouldUpdateSourceTopicsWithRemovedTopic() {
        String sourceNode = "source-1";
        String topic = "topic-1";
        String topicToRemove = "topic-2";
        this.topology.addSource("source-1", new String[]{"topic-1", "topic-2"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder("X").buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.source("topic-2").name(), (Matcher)CoreMatchers.equalTo((Object)"source-1"));
        processorTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.singletonList("topic-1")));
        MatcherAssert.assertThat((Object)processorTopology.source("topic-2"), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
    }

    @Test
    public void shouldUpdateSourceTopicsWithAllTopicsRemoved() {
        String sourceNode = "source-1";
        String topic = "topic-1";
        this.topology.addSource("source-1", new String[]{"topic-1"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder("X").buildTopology();
        MatcherAssert.assertThat((Object)processorTopology.source("topic-1").name(), (Matcher)CoreMatchers.equalTo((Object)"source-1"));
        processorTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.emptyList()));
        MatcherAssert.assertThat((Object)processorTopology.source("topic-1"), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
    }

    @Test
    public void shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() {
        String sourceNodeWithinSubtopology = "source-1";
        String sourceNodeOutsideSubtopology = "source-2";
        String topicWithinSubtopology = "topic-1";
        String topicOutsideSubtopology = "topic-2";
        this.topology.addSource("source-1", new String[]{"topic-1"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder("X").buildTopology();
        processorTopology.updateSourceTopics(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"source-1", Collections.singletonList("topic-1")), Utils.mkEntry((Object)"source-2", Collections.singletonList("topic-2"))}));
        MatcherAssert.assertThat((Object)processorTopology.source("topic-2"), (Matcher)CoreMatchers.is((Matcher)CoreMatchers.nullValue()));
        MatcherAssert.assertThat((Object)processorTopology.sources().size(), (Matcher)CoreMatchers.equalTo((Object)1));
    }

    @Test
    public void shouldThrowIfSourceNodeToUpdateDoesNotExist() {
        String existingSourceNode = "source-1";
        String nonExistingSourceNode = "source-2";
        String topicOfExistingSourceNode = "topic-1";
        String topicOfNonExistingSourceNode = "topic-2";
        this.topology.addSource("source-2", new String[]{"topic-2"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder("X").buildTopology();
        Throwable exception = Assert.assertThrows(IllegalStateException.class, () -> processorTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.singletonList("topic-1"))));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.is((Object)"Node source-2 not found in full topology"));
    }

    @Test
    public void shouldThrowIfMultipleSourceNodeOfSameSubtopologySubscribedToSameTopic() {
        String sourceNode = "source-1";
        String updatedSourceNode = "source-2";
        String doublySubscribedTopic = "topic-1";
        String topic = "topic-2";
        this.topology.addSource("source-1", new String[]{"topic-1"});
        this.topology.addSource("source-2", new String[]{"topic-2"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder("X").buildTopology();
        Throwable exception = Assert.assertThrows(IllegalStateException.class, () -> processorTopology.updateSourceTopics(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"source-1", Collections.singletonList("topic-1")), Utils.mkEntry((Object)"source-2", Arrays.asList("topic-2", "topic-1"))})));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.startsWith((String)"Topic topic-1 was already registered to source node"));
    }

    @Test
    public void testDrivingSimpleTopology() {
        int partition = 10;
        this.driver = new TopologyTestDriver(this.createSimpleTopology(10), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key2", "value2");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key4", (Object)"value4");
        inputTopic.pipeInput((Object)"key5", (Object)"value5");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key3", "value3");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key4", "value4");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key5", "value5");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
    }

    @Test
    public void testDrivingSimpleTopologyWithDroppingPartitioner() {
        this.driver = new TopologyTestDriver(this.createSimpleTopologyWithDroppingPartitioner(), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
    }

    @Test
    public void testDrivingStatefulTopology() {
        String storeName = "entries";
        this.driver = new TopologyTestDriver(this.createStatefulTopology("entries"), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore("entries");
        Assert.assertEquals((Object)"value4", (Object)store.get((Object)"key1"));
        Assert.assertEquals((Object)"value2", (Object)store.get((Object)"key2"));
        Assert.assertEquals((Object)"value3", (Object)store.get((Object)"key3"));
        Assert.assertNull((Object)store.get((Object)"key4"));
    }

    @Test
    public void testDrivingConnectedStateStoreTopology() {
        this.driver = new TopologyTestDriver(this.createConnectedStateStoreTopology("connectedStore"), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore("connectedStore");
        Assert.assertEquals((Object)"value4", (Object)store.get((Object)"key1"));
        Assert.assertEquals((Object)"value2", (Object)store.get((Object)"key2"));
        Assert.assertEquals((Object)"value3", (Object)store.get((Object)"key3"));
        Assert.assertNull((Object)store.get((Object)"key4"));
    }

    @Deprecated
    @Test
    public void testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI() {
        String storeName = "connectedStore";
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"connectedStore"), (Serde)Serdes.String(), (Serde)Serdes.String());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_2}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor("connectedStore"), Collections.singleton(storeBuilder)), new String[]{"source1"}).addProcessor("processor2", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor("connectedStore"), Collections.singleton(storeBuilder)), new String[]{"source2"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1", "processor2"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore("connectedStore");
        Assert.assertEquals((Object)"value4", (Object)store.get((Object)"key1"));
        Assert.assertEquals((Object)"value2", (Object)store.get((Object)"key2"));
        Assert.assertEquals((Object)"value3", (Object)store.get((Object)"key3"));
        Assert.assertNull((Object)store.get((Object)"key4"));
    }

    @Test
    public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
        String storeName = "connectedStore";
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"connectedStore"), (Serde)Serdes.String(), (Serde)Serdes.String());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_2}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor("connectedStore"), Collections.singleton(storeBuilder)), new String[]{"source1"}).addProcessor("processor2", this.defineWithStores(() -> new StatefulProcessor("connectedStore"), Collections.singleton(storeBuilder)), new String[]{"source2"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1", "processor2"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore("connectedStore");
        Assert.assertEquals((Object)"value4", (Object)store.get((Object)"key1"));
        Assert.assertEquals((Object)"value2", (Object)store.get((Object)"key2"));
        Assert.assertEquals((Object)"value3", (Object)store.get((Object)"key3"));
        Assert.assertNull((Object)store.get((Object)"key4"));
    }

    @Test
    public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingDisabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanPersistentStoreNoCachingNoLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingDisabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanPersistentStoreWithCachingNoLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanPersistentStoreWithCachingWithLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingDisabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanPersistentTimestampedStoreWithCachingNoLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanPersistentTimestampedStoreWithCachingWithLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanLruMapNoCachingNoLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.lruMap((String)DEFAULT_STORE_NAME, (int)100), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingDisabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanLruMapWithCachingNoLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.lruMap((String)DEFAULT_STORE_NAME, (int)100), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Test
    public void testPrefixScanLruMapWithCachingWithLogging() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.lruMap((String)DEFAULT_STORE_NAME, (int)100), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStores(() -> new StatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanInMemoryStoreNoCachingNoLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingDisabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanInMemoryStoreWithCachingNoLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanInMemoryStoreWithCachingWithLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanPersistentStoreNoCachingNoLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingDisabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanPersistentStoreWithCachingNoLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanPersistentStoreWithCachingWithLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingDisabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanPersistentTimestampedStoreWithCachingNoLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanPersistentTimestampedStoreWithCachingWithLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)DEFAULT_STORE_NAME), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanLruMapNoCachingNoLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.lruMap((String)DEFAULT_STORE_NAME, (int)100), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingDisabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanLruMapWithCachingNoLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.lruMap((String)DEFAULT_STORE_NAME, (int)100), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingDisabled();
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void testPrefixScanLruMapWithCachingWithLoggingOldProcessor() {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.lruMap((String)DEFAULT_STORE_NAME, (int)100), (Serde)Serdes.String(), (Serde)Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(DEFAULT_STORE_NAME), Collections.singleton(storeBuilder)), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        inputTopic.pipeInput((Object)"key1", (Object)"value4");
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        KeyValueStore store = this.driver.getKeyValueStore(DEFAULT_STORE_NAME);
        List<KeyValue<String, String>> results = this.prefixScanResults((KeyValueStore<String, String>)store, DEFAULT_PREFIX);
        Assert.assertEquals((Object)"key1", (Object)results.get((int)0).key);
        Assert.assertEquals((Object)"value4", (Object)results.get((int)0).value);
        Assert.assertEquals((Object)"key2", (Object)results.get((int)1).key);
        Assert.assertEquals((Object)"value2", (Object)results.get((int)1).value);
        Assert.assertEquals((Object)"key3", (Object)results.get((int)2).key);
        Assert.assertEquals((Object)"value3", (Object)results.get((int)2).value);
    }

    @Deprecated
    @Test
    public void shouldDriveGlobalStore() {
        String storeName = "my-store";
        String global = "global";
        String topic = "topic";
        this.topology.addGlobalStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"my-store"), (Serde)Serdes.String(), (Serde)Serdes.String()).withLoggingDisabled(), "global", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic", "processor", this.define((Processor)new OldAPIStatefulProcessor("my-store")));
        this.driver = new TopologyTestDriver((Topology)this.topology, this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic("topic", STRING_SERIALIZER, STRING_SERIALIZER);
        KeyValueStore globalStore = this.driver.getKeyValueStore("my-store");
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        Assert.assertEquals((Object)"value1", (Object)globalStore.get((Object)"key1"));
        Assert.assertEquals((Object)"value2", (Object)globalStore.get((Object)"key2"));
    }

    @Test
    public void testDrivingSimpleMultiSourceTopology() {
        int partition = 10;
        this.driver = new TopologyTestDriver(this.createSimpleMultiSourceTopology(10), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        TestOutputTopic outputTopic2 = this.driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1");
        Assert.assertTrue((boolean)outputTopic2.isEmpty());
        TestInputTopic inputTopic2 = this.driver.createInputTopic(INPUT_TOPIC_2, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        inputTopic2.pipeInput((Object)"key2", (Object)"value2");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key2", "value2");
        Assert.assertTrue((boolean)outputTopic2.isEmpty());
    }

    @Test
    public void testDrivingForwardToSourceTopology() {
        this.driver = new TopologyTestDriver(this.createForwardToSourceTopology(), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        TestOutputTopic outputTopic2 = this.driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer());
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key1", "value1");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key2", "value2");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key3", "value3");
    }

    @Test
    public void testDrivingInternalRepartitioningTopology() {
        this.driver = new TopologyTestDriver(this.createInternalRepartitioningTopology(), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        inputTopic.pipeInput((Object)"key1", (Object)"value1");
        inputTopic.pipeInput((Object)"key2", (Object)"value2");
        inputTopic.pipeInput((Object)"key3", (Object)"value3");
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key2", "value2");
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key3", "value3");
    }

    @Test
    public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
        this.driver = new TopologyTestDriver(this.createInternalRepartitioningWithValueTimestampTopology(), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        inputTopic.pipeInput((Object)"key1", (Object)"value1@1000");
        inputTopic.pipeInput((Object)"key2", (Object)"value2@2000");
        inputTopic.pipeInput((Object)"key3", (Object)"value3@3000");
        TestOutputTopic outputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER);
        MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"key1", (Object)"value1", null, Long.valueOf(1000L))));
        MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"key2", (Object)"value2", null, Long.valueOf(2000L))));
        MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"key3", (Object)"value3", null, Long.valueOf(3000L))));
    }

    @Test
    public void shouldCreateStringWithSourceAndTopics() {
        this.topology.addSource("source", new String[]{"topic1", "topic2"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder().buildTopology();
        String result = processorTopology.toString();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
    }

    @Test
    public void shouldCreateStringWithMultipleSourcesAndTopics() {
        this.topology.addSource("source", new String[]{"topic1", "topic2"});
        this.topology.addSource("source2", new String[]{"t", "t1", "t2"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder().buildTopology();
        String result = processorTopology.toString();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"source2:\n\t\ttopics:\t\t[t, t1, t2]\n"));
    }

    @Test
    public void shouldCreateStringWithProcessors() {
        this.topology.addSource("source", new String[]{"t"}).addProcessor("processor", this.mockProcessorSupplier, new String[]{"source"}).addProcessor("other", this.mockProcessorSupplier, new String[]{"source"});
        ProcessorTopology processorTopology = this.topology.getInternalBuilder().buildTopology();
        String result = processorTopology.toString();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"\t\tchildren:\t[processor, other]"));
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"processor:\n"));
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"other:\n"));
    }

    @Test
    public void shouldRecursivelyPrintChildren() {
        this.topology.addSource("source", new String[]{"t"}).addProcessor("processor", this.mockProcessorSupplier, new String[]{"source"}).addProcessor("child-one", this.mockProcessorSupplier, new String[]{"processor"}).addProcessor("child-one-one", this.mockProcessorSupplier, new String[]{"child-one"}).addProcessor("child-two", this.mockProcessorSupplier, new String[]{"processor"}).addProcessor("child-two-one", this.mockProcessorSupplier, new String[]{"child-two"});
        String result = this.topology.getInternalBuilder().buildTopology().toString();
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"child-one:\n\t\tchildren:\t[child-one-one]"));
        MatcherAssert.assertThat((Object)result, (Matcher)CoreMatchers.containsString((String)"child-two:\n\t\tchildren:\t[child-two-one]"));
    }

    @Test
    public void shouldConsiderTimeStamps() {
        int partition = 10;
        this.driver = new TopologyTestDriver(this.createSimpleTopology(10), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        inputTopic.pipeInput((Object)"key1", (Object)"value1", 10L);
        inputTopic.pipeInput((Object)"key2", (Object)"value2", 20L);
        inputTopic.pipeInput((Object)"key3", (Object)"value3", 30L);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1", 10L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key2", "value2", 20L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key3", "value3", 30L);
    }

    @Test
    public void shouldConsiderModifiedTimeStamps() {
        int partition = 10;
        this.driver = new TopologyTestDriver(this.createTimestampTopology(10), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        inputTopic.pipeInput((Object)"key1", (Object)"value1", 10L);
        inputTopic.pipeInput((Object)"key2", (Object)"value2", 20L);
        inputTopic.pipeInput((Object)"key3", (Object)"value3", 30L);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1", 20L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key2", "value2", 30L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key3", "value3", 40L);
    }

    @Test
    public void shouldConsiderModifiedTimeStampsForMultipleProcessors() {
        int partition = 10;
        this.driver = new TopologyTestDriver(this.createMultiProcessorTimestampTopology(10), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        TestOutputTopic outputTopic2 = this.driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer());
        inputTopic.pipeInput((Object)"key1", (Object)"value1", 10L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1", 10L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key1", "value1", 20L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1", 15L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key1", "value1", 20L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1", 12L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key1", "value1", 22L);
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        Assert.assertTrue((boolean)outputTopic2.isEmpty());
        inputTopic.pipeInput((Object)"key2", (Object)"value2", 20L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key2", "value2", 20L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key2", "value2", 30L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key2", "value2", 25L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key2", "value2", 30L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key2", "value2", 22L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic2.readRecord(), "key2", "value2", 32L);
        Assert.assertTrue((boolean)outputTopic1.isEmpty());
        Assert.assertTrue((boolean)outputTopic2.isEmpty());
    }

    @Test
    public void shouldConsiderHeaders() {
        int partition = 10;
        this.driver = new TopologyTestDriver(this.createSimpleTopology(10), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        inputTopic.pipeInput(new TestRecord((Object)"key1", (Object)"value1", HEADERS, Long.valueOf(10L)));
        inputTopic.pipeInput(new TestRecord((Object)"key2", (Object)"value2", HEADERS, Long.valueOf(20L)));
        inputTopic.pipeInput(new TestRecord((Object)"key3", (Object)"value3", HEADERS, Long.valueOf(30L)));
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1", HEADERS, 10L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key2", "value2", HEADERS, 20L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key3", "value3", HEADERS, 30L);
    }

    @Test
    public void shouldAddHeaders() {
        this.driver = new TopologyTestDriver(this.createAddHeaderTopology(), this.props);
        TestInputTopic inputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        inputTopic.pipeInput((Object)"key1", (Object)"value1", 10L);
        inputTopic.pipeInput((Object)"key2", (Object)"value2", 20L);
        inputTopic.pipeInput((Object)"key3", (Object)"value3", 30L);
        TestOutputTopic outputTopic1 = this.driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key1", "value1", HEADERS, 10L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key2", "value2", HEADERS, 20L);
        this.assertNextOutputRecord((TestRecord<String, String>)outputTopic1.readRecord(), "key3", "value3", HEADERS, 30L);
    }

    @Test
    public void statelessTopologyShouldNotHavePersistentStore() {
        TopologyWrapper topology = new TopologyWrapper();
        ProcessorTopology processorTopology = topology.getInternalBuilder("anyAppId").buildTopology();
        Assert.assertFalse((boolean)processorTopology.hasPersistentLocalStore());
        Assert.assertFalse((boolean)processorTopology.hasPersistentGlobalStore());
    }

    @Test
    public void inMemoryStoreShouldNotResultInPersistentLocalStore() {
        ProcessorTopology processorTopology = this.createLocalStoreTopology(Stores.inMemoryKeyValueStore((String)"my-store"));
        Assert.assertFalse((boolean)processorTopology.hasPersistentLocalStore());
    }

    @Test
    public void persistentLocalStoreShouldBeDetected() {
        ProcessorTopology processorTopology = this.createLocalStoreTopology(Stores.persistentKeyValueStore((String)"my-store"));
        Assert.assertTrue((boolean)processorTopology.hasPersistentLocalStore());
    }

    @Test
    public void inMemoryStoreShouldNotResultInPersistentGlobalStore() {
        ProcessorTopology processorTopology = this.createGlobalStoreTopology(Stores.inMemoryKeyValueStore((String)"my-store"));
        Assert.assertFalse((boolean)processorTopology.hasPersistentGlobalStore());
    }

    @Test
    public void persistentGlobalStoreShouldBeDetected() {
        ProcessorTopology processorTopology = this.createGlobalStoreTopology(Stores.persistentKeyValueStore((String)"my-store"));
        Assert.assertTrue((boolean)processorTopology.hasPersistentGlobalStore());
    }

    private ProcessorTopology createLocalStoreTopology(KeyValueBytesStoreSupplier storeSupplier) {
        TopologyWrapper topology = new TopologyWrapper();
        String processor = "processor";
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)storeSupplier, (Serde)Serdes.String(), (Serde)Serdes.String());
        topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{"topic"}).addProcessor("processor", () -> new StatefulProcessor(storeSupplier.name()), new String[]{"source"}).addStateStore(storeBuilder, new String[]{"processor"});
        return topology.getInternalBuilder("anyAppId").buildTopology();
    }

    @Deprecated
    private ProcessorTopology createGlobalStoreTopology(KeyValueBytesStoreSupplier storeSupplier) {
        TopologyWrapper topology = new TopologyWrapper();
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)storeSupplier, (Serde)Serdes.String(), (Serde)Serdes.String()).withLoggingDisabled();
        topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic", "processor", this.define((Processor)new OldAPIStatefulProcessor(storeSupplier.name())));
        return topology.getInternalBuilder("anyAppId").buildTopology();
    }

    private void assertNextOutputRecord(TestRecord<String, String> record, String key, String value) {
        this.assertNextOutputRecord(record, key, value, 0L);
    }

    private void assertNextOutputRecord(TestRecord<String, String> record, String key, String value, Long timestamp) {
        this.assertNextOutputRecord(record, key, value, (Headers)new RecordHeaders(), timestamp);
    }

    private void assertNextOutputRecord(TestRecord<String, String> record, String key, String value, Headers headers, Long timestamp) {
        Assert.assertEquals((Object)key, (Object)record.key());
        Assert.assertEquals((Object)value, (Object)record.value());
        Assert.assertEquals((Object)timestamp, (Object)record.timestamp());
        Assert.assertEquals((Object)headers, (Object)record.headers());
    }

    private StreamPartitioner<Object, Object> constantPartitioner(Integer partition) {
        return (topic, key, value, numPartitions) -> partition;
    }

    private Topology createSimpleTopology(int partition) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", ForwardingProcessor::new, new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, this.constantPartitioner(partition), new String[]{"processor"});
    }

    private Topology createTimestampTopology(int partition) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", TimestampProcessor::new, new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, this.constantPartitioner(partition), new String[]{"processor"});
    }

    private Topology createMultiProcessorTimestampTopology(int partition) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", () -> new FanOutTimestampProcessor("child1", "child2"), new String[]{"source"}).addProcessor("child1", ForwardingProcessor::new, new String[]{"processor"}).addProcessor("child2", TimestampProcessor::new, new String[]{"processor"}).addSink("sink1", OUTPUT_TOPIC_1, this.constantPartitioner(partition), new String[]{"child1"}).addSink("sink2", OUTPUT_TOPIC_2, this.constantPartitioner(partition), new String[]{"child2"});
    }

    private Topology createSimpleTopologyWithDroppingPartitioner() {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", ForwardingProcessor::new, new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, (StreamPartitioner)new DroppingPartitioner(), new String[]{"processor"});
    }

    @Deprecated
    private Topology createStatefulTopology(String storeName) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", this.define((Processor)new OldAPIStatefulProcessor(storeName)), new String[]{"source"}).addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)storeName), (Serde)Serdes.String(), (Serde)Serdes.String()), new String[]{"processor"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor"});
    }

    @Deprecated
    private Topology createConnectedStateStoreTopology(String storeName) {
        StoreBuilder storeBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)storeName), (Serde)Serdes.String(), (Serde)Serdes.String());
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", this.defineWithStoresOldAPI(() -> new OldAPIStatefulProcessor(storeName), Collections.singleton(storeBuilder)), new String[]{"source"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor"});
    }

    private Topology createInternalRepartitioningTopology() {
        this.topology.addSource("source", new String[]{INPUT_TOPIC_1}).addSink("sink0", THROUGH_TOPIC_1, new String[]{"source"}).addSource("source1", new String[]{THROUGH_TOPIC_1}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"source1"});
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(this.topology);
        internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1, InternalTopicProperties.empty());
        return this.topology;
    }

    private Topology createInternalRepartitioningWithValueTimestampTopology() {
        this.topology.addSource("source", new String[]{INPUT_TOPIC_1}).addProcessor("processor", ValueTimestampProcessor::new, new String[]{"source"}).addSink("sink0", THROUGH_TOPIC_1, new String[]{"processor"}).addSource("source1", new String[]{THROUGH_TOPIC_1}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"source1"});
        InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(this.topology);
        internalTopologyBuilder.addInternalTopic(THROUGH_TOPIC_1, InternalTopicProperties.empty());
        return this.topology;
    }

    private Topology createForwardToSourceTopology() {
        return this.topology.addSource("source-1", new String[]{INPUT_TOPIC_1}).addSink("sink-1", OUTPUT_TOPIC_1, new String[]{"source-1"}).addSource("source-2", new String[]{OUTPUT_TOPIC_1}).addSink("sink-2", OUTPUT_TOPIC_2, new String[]{"source-2"});
    }

    private Topology createSimpleMultiSourceTopology(int partition) {
        return this.topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor-1", ForwardingProcessor::new, new String[]{"source-1"}).addSink("sink-1", OUTPUT_TOPIC_1, this.constantPartitioner(partition), new String[]{"processor-1"}).addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_2}).addProcessor("processor-2", ForwardingProcessor::new, new String[]{"source-2"}).addSink("sink-2", OUTPUT_TOPIC_2, this.constantPartitioner(partition), new String[]{"processor-2"});
    }

    private Topology createAddHeaderTopology() {
        return this.topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor-1", AddHeaderProcessor::new, new String[]{"source-1"}).addSink("sink-1", OUTPUT_TOPIC_1, new String[]{"processor-1"});
    }

    private <K, V> ProcessorSupplier<K, V> define(Processor<K, V> processor) {
        return () -> processor;
    }

    private <K, V> ProcessorSupplier<K, V> defineWithStoresOldAPI(final Supplier<Processor<K, V>> supplier, final Set<StoreBuilder<?>> stores) {
        return new ProcessorSupplier<K, V>(){

            public Processor<K, V> get() {
                return (Processor)supplier.get();
            }

            public Set<StoreBuilder<?>> stores() {
                return stores;
            }
        };
    }

    private <KIn, VIn, KOut, VOut> org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> defineWithStores(final Supplier<org.apache.kafka.streams.processor.api.Processor<KIn, VIn, KOut, VOut>> supplier, final Set<StoreBuilder<?>> stores) {
        return new org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut>(){

            public org.apache.kafka.streams.processor.api.Processor<KIn, VIn, KOut, VOut> get() {
                return (org.apache.kafka.streams.processor.api.Processor)supplier.get();
            }

            public Set<StoreBuilder<?>> stores() {
                return stores;
            }
        };
    }

    public static class CustomTimestampExtractor
    implements TimestampExtractor {
        private static final long DEFAULT_TIMESTAMP = 1000L;

        public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
            if (record.value().toString().matches(".*@[0-9]+")) {
                return Long.parseLong(record.value().toString().split("@")[1]);
            }
            if (record.timestamp() >= 0L) {
                return record.timestamp();
            }
            return 1000L;
        }
    }

    protected static class StatefulProcessor
    implements org.apache.kafka.streams.processor.api.Processor<String, String, Void, Void> {
        private KeyValueStore<String, String> store;
        private final String storeName;

        StatefulProcessor(String storeName) {
            this.storeName = storeName;
        }

        public void init(org.apache.kafka.streams.processor.api.ProcessorContext<Void, Void> context) {
            this.store = (KeyValueStore)context.getStateStore(this.storeName);
        }

        public void process(Record<String, String> record) {
            this.store.put((Object)((String)record.key()), (Object)((String)record.value()));
        }
    }

    protected static class OldAPIStatefulProcessor
    extends AbstractProcessor<String, String> {
        private KeyValueStore<String, String> store;
        private final String storeName;

        OldAPIStatefulProcessor(String storeName) {
            this.storeName = storeName;
        }

        public void init(ProcessorContext context) {
            super.init(context);
            this.store = (KeyValueStore)context.getStateStore(this.storeName);
        }

        public void process(String key, String value) {
            this.store.put((Object)key, (Object)value);
        }
    }

    protected static class ValueTimestampProcessor
    implements org.apache.kafka.streams.processor.api.Processor<String, String, String, String> {
        private org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context;

        protected ValueTimestampProcessor() {
        }

        public void init(org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context) {
            this.context = context;
        }

        public void process(Record<String, String> record) {
            this.context.forward(record.withValue((Object)((String)record.value()).split("@")[0]));
        }
    }

    protected static class AddHeaderProcessor
    implements org.apache.kafka.streams.processor.api.Processor<String, String, String, String> {
        private org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context;

        protected AddHeaderProcessor() {
        }

        public void init(org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context) {
            this.context = context;
        }

        public void process(Record<String, String> record) {
            Record toForward = record.withHeaders(record.headers());
            toForward.headers().add(HEADER);
            this.context.forward(toForward);
        }
    }

    protected static class FanOutTimestampProcessor
    implements org.apache.kafka.streams.processor.api.Processor<String, String, String, String> {
        private final String firstChild;
        private final String secondChild;
        private org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context;

        FanOutTimestampProcessor(String firstChild, String secondChild) {
            this.firstChild = firstChild;
            this.secondChild = secondChild;
        }

        public void init(org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context) {
            this.context = context;
        }

        public void process(Record<String, String> record) {
            this.context.forward(record);
            this.context.forward(record.withTimestamp(record.timestamp() + 5L), this.firstChild);
            this.context.forward(record, this.secondChild);
            this.context.forward(record.withTimestamp(record.timestamp() + 2L));
        }
    }

    protected static class TimestampProcessor
    implements org.apache.kafka.streams.processor.api.Processor<String, String, String, String> {
        private org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context;

        protected TimestampProcessor() {
        }

        public void init(org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context) {
            this.context = context;
        }

        public void process(Record<String, String> record) {
            this.context.forward(record.withTimestamp(record.timestamp() + 10L));
        }
    }

    protected static class ForwardingProcessor
    implements org.apache.kafka.streams.processor.api.Processor<String, String, String, String> {
        private org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context;

        protected ForwardingProcessor() {
        }

        public void init(org.apache.kafka.streams.processor.api.ProcessorContext<String, String> context) {
            this.context = context;
        }

        public void process(Record<String, String> record) {
            this.context.forward(record);
        }
    }

    static class DroppingPartitioner
    implements StreamPartitioner<String, String> {
        DroppingPartitioner() {
        }

        @Deprecated
        public Integer partition(String topic, String key, String value, int numPartitions) {
            return null;
        }

        public Optional<Set<Integer>> partitions(String topic, String key, String value, int numPartitions) {
            HashSet<Integer> partitions = new HashSet<Integer>();
            for (int i = 1; i < numPartitions; i += 2) {
                partitions.add(i);
            }
            return Optional.of(partitions);
        }
    }
}

