/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.BranchedKStream;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.ForeachProcessor;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.kstream.Printed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.kstream.ValueMapperWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.BranchedKStreamImpl;
import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
import org.apache.kafka.streams.kstream.internals.GlobalKTableImpl;
import org.apache.kafka.streams.kstream.internals.GroupedInternal;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.JoinedInternal;
import org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl;
import org.apache.kafka.streams.kstream.internals.KStreamBranch;
import org.apache.kafka.streams.kstream.internals.KStreamFilter;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMap;
import org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransform;
import org.apache.kafka.streams.kstream.internals.KStreamFlatTransformValues;
import org.apache.kafka.streams.kstream.internals.KStreamGlobalKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin;
import org.apache.kafka.streams.kstream.internals.KStreamKTableJoin;
import org.apache.kafka.streams.kstream.internals.KStreamMap;
import org.apache.kafka.streams.kstream.internals.KStreamMapValues;
import org.apache.kafka.streams.kstream.internals.KStreamPeek;
import org.apache.kafka.streams.kstream.internals.KStreamTransformValues;
import org.apache.kafka.streams.kstream.internals.KTableImpl;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.NamedInternal;
import org.apache.kafka.streams.kstream.internals.PassThrough;
import org.apache.kafka.streams.kstream.internals.PrintedInternal;
import org.apache.kafka.streams.kstream.internals.ProducedInternal;
import org.apache.kafka.streams.kstream.internals.RepartitionedInternal;
import org.apache.kafka.streams.kstream.internals.StreamJoinedInternal;
import org.apache.kafka.streams.kstream.internals.TransformerSupplierAdapter;
import org.apache.kafka.streams.kstream.internals.graph.BaseRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode;
import org.apache.kafka.streams.kstream.internals.graph.UnoptimizableRepartitionNode;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDBTimeOrderedKeyValueBuffer;

public class KStreamImpl<K, V>
extends AbstractStream<K, V>
implements KStream<K, V> {
    static final String JOINTHIS_NAME = "KSTREAM-JOINTHIS-";
    static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-";
    static final String JOIN_NAME = "KSTREAM-JOIN-";
    static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-";
    static final String MERGE_NAME = "KSTREAM-MERGE-";
    static final String OUTERTHIS_NAME = "KSTREAM-OUTERTHIS-";
    static final String OUTEROTHER_NAME = "KSTREAM-OUTEROTHER-";
    static final String WINDOWED_NAME = "KSTREAM-WINDOWED-";
    static final String OUTERSHARED_NAME = "KSTREAM-OUTERSHARED-";
    static final String SOURCE_NAME = "KSTREAM-SOURCE-";
    static final String SINK_NAME = "KSTREAM-SINK-";
    static final String REPARTITION_TOPIC_SUFFIX = "-repartition";
    private static final String BRANCH_NAME = "KSTREAM-BRANCH-";
    private static final String BRANCHCHILD_NAME = "KSTREAM-BRANCHCHILD-";
    private static final String FILTER_NAME = "KSTREAM-FILTER-";
    private static final String PEEK_NAME = "KSTREAM-PEEK-";
    private static final String FLATMAP_NAME = "KSTREAM-FLATMAP-";
    private static final String FLATMAPVALUES_NAME = "KSTREAM-FLATMAPVALUES-";
    private static final String MAP_NAME = "KSTREAM-MAP-";
    private static final String MAPVALUES_NAME = "KSTREAM-MAPVALUES-";
    private static final String PROCESSOR_NAME = "KSTREAM-PROCESSOR-";
    private static final String PROCESSVALUES_NAME = "KSTREAM-PROCESSVALUES-";
    private static final String PRINTING_NAME = "KSTREAM-PRINTER-";
    private static final String KEY_SELECT_NAME = "KSTREAM-KEY-SELECT-";
    private static final String TRANSFORM_NAME = "KSTREAM-TRANSFORM-";
    private static final String TRANSFORMVALUES_NAME = "KSTREAM-TRANSFORMVALUES-";
    private static final String FOREACH_NAME = "KSTREAM-FOREACH-";
    private static final String TO_KTABLE_NAME = "KSTREAM-TOTABLE-";
    private static final String REPARTITION_NAME = "KSTREAM-REPARTITION-";
    private final boolean repartitionRequired;
    private OptimizableRepartitionNode<K, V> repartitionNode;

    KStreamImpl(String name, Serde<K> keySerde, Serde<V> valueSerde, Set<String> subTopologySourceNodes, boolean repartitionRequired, GraphNode graphNode, InternalStreamsBuilder builder) {
        super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode, builder);
        this.repartitionRequired = repartitionRequired;
    }

    @Override
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate) {
        return this.filter(predicate, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> filter(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFilter<K, V>(predicate, false), name);
        ProcessorGraphNode filterProcessorNode = new ProcessorGraphNode(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, filterProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, filterProcessorNode, this.builder);
    }

    @Override
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate) {
        return this.filterNot(predicate, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> filterNot(Predicate<? super K, ? super V> predicate, Named named) {
        Objects.requireNonNull(predicate, "predicate can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FILTER_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFilter<K, V>(predicate, true), name);
        ProcessorGraphNode filterNotProcessorNode = new ProcessorGraphNode(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, filterNotProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, filterNotProcessorNode, this.builder);
    }

    @Override
    public <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper) {
        return this.selectKey(mapper, NamedInternal.empty());
    }

    @Override
    public <KR> KStream<KR, V> selectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        ProcessorGraphNode<K, V> selectKeyProcessorNode = this.internalSelectKey(mapper, new NamedInternal(named));
        selectKeyProcessorNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, selectKeyProcessorNode);
        return new KStreamImpl<K, V>(selectKeyProcessorNode.nodeName(), null, this.valueSerde, this.subTopologySourceNodes, true, selectKeyProcessorNode, this.builder);
    }

    private <KR> ProcessorGraphNode<K, V> internalSelectKey(KeyValueMapper<? super K, ? super V, ? extends KR> mapper, NamedInternal named) {
        String name = named.orElseGenerateWithPrefix(this.builder, KEY_SELECT_NAME);
        KStreamMap kStreamMap = new KStreamMap((key, value) -> new KeyValue(mapper.apply(key, value), value));
        ProcessorParameters processorParameters = new ProcessorParameters(kStreamMap, name);
        return new ProcessorGraphNode<Object, Object>(name, processorParameters);
    }

    @Override
    public <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper) {
        return this.map(mapper, NamedInternal.empty());
    }

    @Override
    public <KR, VR> KStream<KR, VR> map(KeyValueMapper<? super K, ? super V, ? extends KeyValue<? extends KR, ? extends VR>> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAP_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamMap(mapper), name);
        ProcessorGraphNode<K, V> mapProcessorNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        mapProcessorNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, mapProcessorNode);
        return new KStreamImpl<K, V>(name, null, null, this.subTopologySourceNodes, true, mapProcessorNode, this.builder);
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> valueMapper) {
        return this.mapValues(KStreamImpl.withKey(valueMapper));
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapper<? super V, ? extends VR> mapper, Named named) {
        return this.mapValues(KStreamImpl.withKey(mapper), named);
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey) {
        return this.mapValues(valueMapperWithKey, (Named)NamedInternal.empty());
    }

    @Override
    public <VR> KStream<K, VR> mapValues(ValueMapperWithKey<? super K, ? super V, ? extends VR> valueMapperWithKey, Named named) {
        Objects.requireNonNull(valueMapperWithKey, "valueMapperWithKey can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, MAPVALUES_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamMapValues<K, V, VR>(valueMapperWithKey), name);
        ProcessorGraphNode<K, V> mapValuesProcessorNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        mapValuesProcessorNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, mapValuesProcessorNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, mapValuesProcessorNode, this.builder);
    }

    @Override
    public <KR, VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper) {
        return this.flatMap(mapper, NamedInternal.empty());
    }

    @Override
    public <KR, VR> KStream<KR, VR> flatMap(KeyValueMapper<? super K, ? super V, ? extends Iterable<? extends KeyValue<? extends KR, ? extends VR>>> mapper, Named named) {
        Objects.requireNonNull(mapper, "mapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FLATMAP_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFlatMap(mapper), name);
        ProcessorGraphNode<K, V> flatMapNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        flatMapNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, flatMapNode);
        return new KStreamImpl<K, V>(name, null, null, this.subTopologySourceNodes, true, flatMapNode, this.builder);
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper) {
        return this.flatMapValues(KStreamImpl.withKey(mapper));
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapper<? super V, ? extends Iterable<? extends VR>> mapper, Named named) {
        return this.flatMapValues(KStreamImpl.withKey(mapper), named);
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> mapper) {
        return this.flatMapValues(mapper, (Named)NamedInternal.empty());
    }

    @Override
    public <VR> KStream<K, VR> flatMapValues(ValueMapperWithKey<? super K, ? super V, ? extends Iterable<? extends VR>> valueMapper, Named named) {
        Objects.requireNonNull(valueMapper, "valueMapper can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FLATMAPVALUES_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFlatMapValues(valueMapper), name);
        ProcessorGraphNode<K, V> flatMapValuesNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        flatMapValuesNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, flatMapValuesNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, flatMapValuesNode, this.builder);
    }

    @Override
    public void print(Printed<K, V> printed) {
        Objects.requireNonNull(printed, "printed can't be null");
        PrintedInternal<K, V> printedInternal = new PrintedInternal<K, V>(printed);
        String name = new NamedInternal(printedInternal.name()).orElseGenerateWithPrefix(this.builder, PRINTING_NAME);
        ProcessorParameters<K, V, Void, Void> processorParameters = new ProcessorParameters<K, V, Void, Void>(printedInternal.build(this.name), name);
        ProcessorGraphNode<K, V> printNode = new ProcessorGraphNode<K, V>(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, printNode);
    }

    @Override
    public void foreach(ForeachAction<? super K, ? super V> action) {
        this.foreach(action, NamedInternal.empty());
    }

    @Override
    public void foreach(ForeachAction<? super K, ? super V> action, Named named) {
        Objects.requireNonNull(action, "action can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, FOREACH_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(() -> new ForeachProcessor(action), name);
        ProcessorGraphNode foreachNode = new ProcessorGraphNode(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, foreachNode);
    }

    @Override
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> action) {
        return this.peek(action, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> peek(ForeachAction<? super K, ? super V> action, Named named) {
        Objects.requireNonNull(action, "action can't be null");
        Objects.requireNonNull(named, "named can't be null");
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, PEEK_NAME);
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamPeek<K, V>(action), name);
        ProcessorGraphNode peekNode = new ProcessorGraphNode(name, processorParameters);
        this.builder.addGraphNode(this.graphNode, peekNode);
        return new KStreamImpl<K, V>(name, this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, peekNode, this.builder);
    }

    @Override
    @Deprecated
    public KStream<K, V>[] branch(Predicate<? super K, ? super V> ... predicates) {
        return this.doBranch(NamedInternal.empty(), predicates);
    }

    @Override
    @Deprecated
    public KStream<K, V>[] branch(Named named, Predicate<? super K, ? super V> ... predicates) {
        Objects.requireNonNull(named, "named can't be null");
        return this.doBranch(new NamedInternal(named), predicates);
    }

    private KStream<K, V>[] doBranch(NamedInternal named, Predicate<? super K, ? super V> ... predicates) {
        Objects.requireNonNull(predicates, "predicates can't be a null array");
        if (predicates.length == 0) {
            throw new IllegalArgumentException("branch() requires at least one predicate");
        }
        for (Predicate<? super K, ? super V> predicate : predicates) {
            Objects.requireNonNull(predicate, "predicates can't be null");
        }
        String branchName = named.orElseGenerateWithPrefix(this.builder, BRANCH_NAME);
        String[] childNames = new String[predicates.length];
        for (int i = 0; i < predicates.length; ++i) {
            childNames[i] = named.suffixWithOrElseGet("-predicate-" + i, this.builder, BRANCHCHILD_NAME);
        }
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamBranch(Arrays.asList((Predicate[])predicates.clone()), Arrays.asList(childNames)), branchName);
        ProcessorGraphNode branchNode = new ProcessorGraphNode(branchName, processorParameters);
        this.builder.addGraphNode(this.graphNode, branchNode);
        KStream[] branchChildren = (KStream[])Array.newInstance(KStream.class, predicates.length);
        for (int i = 0; i < predicates.length; ++i) {
            ProcessorParameters innerProcessorParameters = new ProcessorParameters(new PassThrough(), childNames[i]);
            ProcessorGraphNode branchChildNode = new ProcessorGraphNode(childNames[i], innerProcessorParameters);
            this.builder.addGraphNode(branchNode, branchChildNode);
            branchChildren[i] = new KStreamImpl<K, V>(childNames[i], this.keySerde, this.valueSerde, this.subTopologySourceNodes, this.repartitionRequired, branchChildNode, this.builder);
        }
        return branchChildren;
    }

    @Override
    public BranchedKStream<K, V> split() {
        return new BranchedKStreamImpl(this, this.repartitionRequired, NamedInternal.empty());
    }

    @Override
    public BranchedKStream<K, V> split(Named named) {
        Objects.requireNonNull(named, "named can't be null");
        return new BranchedKStreamImpl(this, this.repartitionRequired, new NamedInternal(named));
    }

    @Override
    public KStream<K, V> merge(KStream<K, V> stream) {
        return this.merge(stream, NamedInternal.empty());
    }

    @Override
    public KStream<K, V> merge(KStream<K, V> stream, Named named) {
        Objects.requireNonNull(stream, "stream can't be null");
        Objects.requireNonNull(named, "named can't be null");
        return this.merge(this.builder, stream, new NamedInternal(named));
    }

    private KStream<K, V> merge(InternalStreamsBuilder builder, KStream<K, V> stream, NamedInternal named) {
        KStreamImpl streamImpl = (KStreamImpl)stream;
        boolean requireRepartitioning = streamImpl.repartitionRequired || this.repartitionRequired;
        String name = named.orElseGenerateWithPrefix(builder, MERGE_NAME);
        HashSet<String> allSubTopologySourceNodes = new HashSet<String>();
        allSubTopologySourceNodes.addAll(this.subTopologySourceNodes);
        allSubTopologySourceNodes.addAll(streamImpl.subTopologySourceNodes);
        ProcessorParameters processorParameters = new ProcessorParameters(new PassThrough(), name);
        ProcessorGraphNode mergeNode = new ProcessorGraphNode(name, processorParameters);
        mergeNode.setMergeNode(true);
        builder.addGraphNode(Arrays.asList(this.graphNode, streamImpl.graphNode), mergeNode);
        return new KStreamImpl<K, V>(name, null, null, allSubTopologySourceNodes, requireRepartitioning, mergeNode, builder);
    }

    @Override
    @Deprecated
    public KStream<K, V> through(String topic) {
        return this.through(topic, Produced.with(this.keySerde, this.valueSerde, null));
    }

    @Override
    @Deprecated
    public KStream<K, V> through(String topic, Produced<K, V> produced) {
        Objects.requireNonNull(topic, "topic can't be null");
        Objects.requireNonNull(produced, "produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valueSerde);
        }
        this.to(topic, producedInternal);
        return this.builder.stream(Collections.singleton(topic), new ConsumedInternal<K, V>(producedInternal.keySerde(), producedInternal.valueSerde(), new FailOnInvalidTimestamp(), null));
    }

    @Override
    public KStream<K, V> repartition() {
        return this.doRepartition(Repartitioned.as(null));
    }

    @Override
    public KStream<K, V> repartition(Repartitioned<K, V> repartitioned) {
        return this.doRepartition(repartitioned);
    }

    private KStream<K, V> doRepartition(Repartitioned<K, V> repartitioned) {
        Objects.requireNonNull(repartitioned, "repartitioned can't be null");
        RepartitionedInternal<K, V> repartitionedInternal = new RepartitionedInternal<K, V>(repartitioned);
        String name = repartitionedInternal.name() != null ? repartitionedInternal.name() : this.builder.newProcessorName(REPARTITION_NAME);
        Serde<V> valueSerde = repartitionedInternal.valueSerde() == null ? this.valueSerde : repartitionedInternal.valueSerde();
        Serde<K> keySerde = repartitionedInternal.keySerde() == null ? this.keySerde : repartitionedInternal.keySerde();
        UnoptimizableRepartitionNode.UnoptimizableRepartitionNodeBuilder unoptimizableRepartitionNodeBuilder = UnoptimizableRepartitionNode.unoptimizableRepartitionNodeBuilder();
        InternalTopicProperties internalTopicProperties = repartitionedInternal.toInternalTopicProperties();
        String repartitionSourceName = KStreamImpl.createRepartitionedSource(this.builder, repartitionedInternal.keySerde(), valueSerde, name, repartitionedInternal.streamPartitioner(), unoptimizableRepartitionNodeBuilder.withInternalTopicProperties(internalTopicProperties));
        BaseRepartitionNode unoptimizableRepartitionNode = unoptimizableRepartitionNodeBuilder.build();
        this.builder.addGraphNode(this.graphNode, (GraphNode)unoptimizableRepartitionNode);
        HashSet<String> sourceNodes = new HashSet<String>();
        sourceNodes.add(unoptimizableRepartitionNode.nodeName());
        return new KStreamImpl<K, V>(repartitionSourceName, keySerde, valueSerde, Collections.unmodifiableSet(sourceNodes), false, unoptimizableRepartitionNode, this.builder);
    }

    @Override
    public void to(String topic) {
        this.to(topic, Produced.with(this.keySerde, this.valueSerde, null));
    }

    @Override
    public void to(String topic, Produced<K, V> produced) {
        Objects.requireNonNull(topic, "topic can't be null");
        Objects.requireNonNull(produced, "produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valueSerde);
        }
        this.to((TopicNameExtractor<K, V>)new StaticTopicNameExtractor(topic), producedInternal);
    }

    @Override
    public void to(TopicNameExtractor<K, V> topicExtractor) {
        this.to(topicExtractor, Produced.with(this.keySerde, this.valueSerde, null));
    }

    @Override
    public void to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced) {
        Objects.requireNonNull(topicExtractor, "topicExtractor can't be null");
        Objects.requireNonNull(produced, "produced can't be null");
        ProducedInternal<K, V> producedInternal = new ProducedInternal<K, V>(produced);
        if (producedInternal.keySerde() == null) {
            producedInternal.withKeySerde(this.keySerde);
        }
        if (producedInternal.valueSerde() == null) {
            producedInternal.withValueSerde(this.valueSerde);
        }
        this.to(topicExtractor, producedInternal);
    }

    @Override
    private void to(TopicNameExtractor<K, V> topicExtractor, ProducedInternal<K, V> produced) {
        String name = new NamedInternal(produced.name()).orElseGenerateWithPrefix(this.builder, SINK_NAME);
        StreamSinkNode<K, V> sinkNode = new StreamSinkNode<K, V>(name, topicExtractor, produced);
        this.builder.addGraphNode(this.graphNode, sinkNode);
    }

    @Override
    public KTable<K, V> toTable() {
        return this.toTable(NamedInternal.empty(), Materialized.with(this.keySerde, this.valueSerde));
    }

    @Override
    public KTable<K, V> toTable(Named named) {
        return this.toTable(named, Materialized.with(this.keySerde, this.valueSerde));
    }

    @Override
    public KTable<K, V> toTable(Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        return this.toTable(NamedInternal.empty(), materialized);
    }

    @Override
    public KTable<K, V> toTable(Named named, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Set<String> subTopologySourceNodes;
        GraphNode tableParentNode;
        Serde<V> valueSerdeOverride;
        Objects.requireNonNull(named, "named can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        NamedInternal namedInternal = new NamedInternal(named);
        String name = namedInternal.orElseGenerateWithPrefix(this.builder, TO_KTABLE_NAME);
        MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>>(materialized, this.builder, TO_KTABLE_NAME);
        Serde<K> keySerdeOverride = materializedInternal.keySerde() == null ? this.keySerde : materializedInternal.keySerde();
        Serde<V> serde = valueSerdeOverride = materializedInternal.valueSerde() == null ? this.valueSerde : materializedInternal.valueSerde();
        if (this.repartitionRequired) {
            OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
            String sourceName = KStreamImpl.createRepartitionedSource(this.builder, keySerdeOverride, valueSerdeOverride, name, null, repartitionNodeBuilder);
            tableParentNode = repartitionNodeBuilder.build();
            this.builder.addGraphNode(this.graphNode, tableParentNode);
            subTopologySourceNodes = Collections.singleton(sourceName);
        } else {
            tableParentNode = this.graphNode;
            subTopologySourceNodes = this.subTopologySourceNodes;
        }
        KTableSource tableSource = new KTableSource(materializedInternal.storeName(), materializedInternal.queryableStoreName());
        ProcessorParameters processorParameters = new ProcessorParameters(tableSource, name);
        StreamToTableNode tableNode = new StreamToTableNode(name, processorParameters, materializedInternal);
        tableNode.setOutputVersioned(materializedInternal.storeSupplier() instanceof VersionedBytesStoreSupplier);
        this.builder.addGraphNode(tableParentNode, tableNode);
        return new KTableImpl(name, keySerdeOverride, valueSerdeOverride, subTopologySourceNodes, materializedInternal.queryableStoreName(), tableSource, tableNode, this.builder);
    }

    @Override
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> keySelector) {
        return this.groupBy(keySelector, Grouped.with(null, this.valueSerde));
    }

    @Override
    public <KR> KGroupedStream<KR, V> groupBy(KeyValueMapper<? super K, ? super V, KR> keySelector, Grouped<KR, V> grouped) {
        Objects.requireNonNull(keySelector, "keySelector can't be null");
        Objects.requireNonNull(grouped, "grouped can't be null");
        GroupedInternal<KR, V> groupedInternal = new GroupedInternal<KR, V>(grouped);
        ProcessorGraphNode<K, V> selectKeyMapNode = this.internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
        selectKeyMapNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, selectKeyMapNode);
        return new KGroupedStreamImpl<KR, V>(selectKeyMapNode.nodeName(), this.subTopologySourceNodes, groupedInternal, true, selectKeyMapNode, this.builder);
    }

    @Override
    public KGroupedStream<K, V> groupByKey() {
        return this.groupByKey(Grouped.with(this.keySerde, this.valueSerde));
    }

    @Override
    public KGroupedStream<K, V> groupByKey(Grouped<K, V> grouped) {
        Objects.requireNonNull(grouped, "grouped can't be null");
        GroupedInternal<K, V> groupedInternal = new GroupedInternal<K, V>(grouped);
        return new KGroupedStreamImpl<K, V>(this.name, this.subTopologySourceNodes, groupedInternal, this.repartitionRequired, this.graphNode, this.builder);
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.join(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows);
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.join(otherStream, joiner, windows, StreamJoined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) {
        return this.join(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows, streamJoined);
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) {
        return this.doJoin(otherStream, joiner, windows, streamJoined, new KStreamImplJoin(this.builder, false, false));
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.leftJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows);
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.leftJoin(otherStream, joiner, windows, StreamJoined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) {
        return this.doJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows, streamJoined, new KStreamImplJoin(this.builder, true, false));
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) {
        return this.doJoin(otherStream, joiner, windows, streamJoined, new KStreamImplJoin(this.builder, true, false));
    }

    @Override
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.outerJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows);
    }

    @Override
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows) {
        return this.outerJoin(otherStream, joiner, windows, StreamJoined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> otherStream, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) {
        return this.outerJoin(otherStream, KStreamImpl.toValueJoinerWithKey(joiner), windows, streamJoined);
    }

    @Override
    public <VO, VR> KStream<K, VR> outerJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined) {
        return this.doJoin(otherStream, joiner, windows, streamJoined, new KStreamImplJoin(this.builder, true, true));
    }

    private <VO, VR> KStream<K, VR> doJoin(KStream<K, VO> otherStream, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, JoinWindows windows, StreamJoined<K, V, VO> streamJoined, KStreamImplJoin join) {
        Objects.requireNonNull(otherStream, "otherStream can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(windows, "windows can't be null");
        Objects.requireNonNull(streamJoined, "streamJoined can't be null");
        KStreamImpl<K, V> joinThis = this;
        KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>)otherStream;
        StreamJoinedInternal<K, V, VO> streamJoinedInternal = new StreamJoinedInternal<K, V, VO>(streamJoined);
        NamedInternal name = new NamedInternal(streamJoinedInternal.name());
        if (joinThis.repartitionRequired) {
            String joinThisName = joinThis.name;
            String leftJoinRepartitionTopicName = name.suffixWithOrElseGet("-left", joinThisName);
            joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.valueSerde());
        }
        if (joinOther.repartitionRequired) {
            String joinOtherName = joinOther.name;
            String rightJoinRepartitionTopicName = name.suffixWithOrElseGet("-right", joinOtherName);
            joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, streamJoinedInternal.keySerde(), streamJoinedInternal.otherValueSerde());
        }
        joinThis.ensureCopartitionWith(Collections.singleton(joinOther));
        return join.join(joinThis, joinOther, joiner, windows, streamJoined);
    }

    private KStreamImpl<K, V> repartitionForJoin(String repartitionName, Serde<K> keySerdeOverride, Serde<V> valueSerdeOverride) {
        Serde repartitionKeySerde = keySerdeOverride != null ? keySerdeOverride : this.keySerde;
        Serde repartitionValueSerde = valueSerdeOverride != null ? valueSerdeOverride : this.valueSerde;
        OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder optimizableRepartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
        String repartitionedSourceName = KStreamImpl.createRepartitionedSource(this.builder, repartitionKeySerde, repartitionValueSerde, repartitionName, null, optimizableRepartitionNodeBuilder);
        if (this.repartitionNode == null || !this.name.equals(repartitionName)) {
            this.repartitionNode = optimizableRepartitionNodeBuilder.build();
            this.builder.addGraphNode(this.graphNode, this.repartitionNode);
        }
        return new KStreamImpl<K, V>(repartitionedSourceName, repartitionKeySerde, repartitionValueSerde, Collections.singleton(repartitionedSourceName), false, this.repartitionNode, this.builder);
    }

    static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String createRepartitionedSource(InternalStreamsBuilder builder, Serde<K1> keySerde, Serde<V1> valueSerde, String repartitionTopicNamePrefix, StreamPartitioner<K1, V1> streamPartitioner, BaseRepartitionNode.BaseRepartitionNodeBuilder<K1, V1, RN> baseRepartitionNodeBuilder) {
        String nullKeyFilterProcessorName;
        String sourceName;
        String sinkName;
        String repartitionTopicName = repartitionTopicNamePrefix.endsWith(REPARTITION_TOPIC_SUFFIX) ? repartitionTopicNamePrefix : repartitionTopicNamePrefix + REPARTITION_TOPIC_SUFFIX;
        String genSinkName = builder.newProcessorName(SINK_NAME);
        String genNullKeyFilterProcessorName = builder.newProcessorName(FILTER_NAME);
        String genSourceName = builder.newProcessorName(SOURCE_NAME);
        if (repartitionTopicNamePrefix.matches("KSTREAM.*-[0-9]{10}")) {
            sinkName = genSinkName;
            sourceName = genSourceName;
            nullKeyFilterProcessorName = genNullKeyFilterProcessorName;
        } else {
            sinkName = repartitionTopicName + "-sink";
            sourceName = repartitionTopicName + "-source";
            nullKeyFilterProcessorName = repartitionTopicName + "-filter";
        }
        Predicate<Object, Object> notNullKeyPredicate = (k, v) -> k != null;
        ProcessorParameters processorParameters = new ProcessorParameters(new KStreamFilter<Object, Object>(notNullKeyPredicate, false), nullKeyFilterProcessorName);
        baseRepartitionNodeBuilder.withKeySerde(keySerde).withValueSerde(valueSerde).withSourceName(sourceName).withRepartitionTopic(repartitionTopicName).withSinkName(sinkName).withProcessorParameters(processorParameters).withStreamPartitioner(streamPartitioner).withNodeName(sourceName);
        return sourceName;
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> table, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
        return this.join(table, KStreamImpl.toValueJoinerWithKey(joiner));
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> table, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner) {
        return this.join(table, joiner, (Joined<K, V, VO>)Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> table, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(table, "table can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        return this.join(table, (ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR>)KStreamImpl.toValueJoinerWithKey(joiner), joined);
    }

    @Override
    public <VO, VR> KStream<K, VR> join(KTable<K, VO> table, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(table, "table can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<K, V, VO>(joined);
        String name = joinedInternal.name();
        if (this.repartitionRequired) {
            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(name != null ? name : this.name, joined.keySerde(), joined.valueSerde());
            return super.doStreamTableJoin(table, joiner, joined, false);
        }
        return this.doStreamTableJoin(table, joiner, joined, false);
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> table, ValueJoiner<? super V, ? super VO, ? extends VR> joiner) {
        return this.leftJoin(table, KStreamImpl.toValueJoinerWithKey(joiner));
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> table, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner) {
        return this.leftJoin(table, joiner, (Joined<K, V, VO>)Joined.with(null, null, null));
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> table, ValueJoiner<? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(table, "table can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        return this.leftJoin(table, (ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR>)KStreamImpl.toValueJoinerWithKey(joiner), joined);
    }

    @Override
    public <VO, VR> KStream<K, VR> leftJoin(KTable<K, VO> table, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined) {
        Objects.requireNonNull(table, "table can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(joined, "joined can't be null");
        JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<K, V, VO>(joined);
        String name = joinedInternal.name();
        if (this.repartitionRequired) {
            KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(name != null ? name : this.name, joined.keySerde(), joined.valueSerde());
            return super.doStreamTableJoin(table, joiner, joined, true);
        }
        return this.doStreamTableJoin(table, joiner, joined, true);
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> join(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
        return this.join(globalTable, keySelector, KStreamImpl.toValueJoinerWithKey(joiner));
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> join(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner) {
        return this.globalTableJoin(globalTable, keySelector, joiner, false, NamedInternal.empty());
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> join(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, ValueJoiner<? super V, ? super VG, ? extends VR> joiner, Named named) {
        return this.join(globalTable, keySelector, KStreamImpl.toValueJoinerWithKey(joiner), named);
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> join(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner, Named named) {
        return this.globalTableJoin(globalTable, keySelector, joiner, false, named);
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> leftJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, ValueJoiner<? super V, ? super VG, ? extends VR> joiner) {
        return this.leftJoin(globalTable, keySelector, KStreamImpl.toValueJoinerWithKey(joiner));
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> leftJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner) {
        return this.globalTableJoin(globalTable, keySelector, joiner, true, NamedInternal.empty());
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> leftJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, ValueJoiner<? super V, ? super VG, ? extends VR> joiner, Named named) {
        return this.leftJoin(globalTable, keySelector, KStreamImpl.toValueJoinerWithKey(joiner), named);
    }

    @Override
    public <KG, VG, VR> KStream<K, VR> leftJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner, Named named) {
        return this.globalTableJoin(globalTable, keySelector, joiner, true, named);
    }

    private <KG, VG, VR> KStream<K, VR> globalTableJoin(GlobalKTable<KG, VG> globalTable, KeyValueMapper<? super K, ? super V, ? extends KG> keySelector, ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner, boolean leftJoin, Named named) {
        Objects.requireNonNull(globalTable, "globalTable can't be null");
        Objects.requireNonNull(keySelector, "keySelector can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Objects.requireNonNull(named, "named can't be null");
        KTableValueGetterSupplier valueGetterSupplier = ((GlobalKTableImpl)globalTable).valueGetterSupplier();
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, LEFTJOIN_NAME);
        KStreamGlobalKTableJoin<? super K, ? super V, ? extends KG, ? super VG, ? extends VR> processorSupplier = new KStreamGlobalKTableJoin<K, V, KG, VG, VR>(valueGetterSupplier, joiner, keySelector, leftJoin);
        ProcessorParameters processorParameters = new ProcessorParameters(processorSupplier, name);
        StreamTableJoinNode streamTableJoinNode = new StreamTableJoinNode(name, processorParameters, new String[0], null, null, Optional.empty());
        this.builder.addGraphNode(this.graphNode, streamTableJoinNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, streamTableJoinNode, this.builder);
    }

    private <VO, VR> KStream<K, VR> doStreamTableJoin(KTable<K, VO> table, ValueJoinerWithKey<? super K, ? super V, ? super VO, ? extends VR> joiner, Joined<K, V, VO> joined, boolean leftJoin) {
        Objects.requireNonNull(table, "table can't be null");
        Objects.requireNonNull(joiner, "joiner can't be null");
        Set<String> allSourceNodes = this.ensureCopartitionWith(Collections.singleton((AbstractStream)((Object)table)));
        JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<K, V, VO>(joined);
        NamedInternal renamed = new NamedInternal(joinedInternal.name());
        String name = renamed.orElseGenerateWithPrefix(this.builder, leftJoin ? LEFTJOIN_NAME : JOIN_NAME);
        Optional<String> bufferStoreName = Optional.empty();
        if (joined.gracePeriod() != null) {
            if (!((KTableImpl)table).graphNode.isOutputVersioned().orElse(true).booleanValue()) {
                throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
            }
            bufferStoreName = Optional.of(name + "-Buffer");
            this.builder.addStateStore(new RocksDBTimeOrderedKeyValueBuffer.Builder(bufferStoreName.get(), joined.gracePeriod(), name));
        }
        KStreamKTableJoin<? super K, ? super V, ? super VO, ? extends VR> processorSupplier = new KStreamKTableJoin<K, V, VO, VR>(((KTableImpl)table).valueGetterSupplier(), joiner, leftJoin, Optional.ofNullable(joined.gracePeriod()), bufferStoreName);
        ProcessorParameters<? super K, ? super V, ? super VO, ? extends VR> processorParameters = new ProcessorParameters<K, V, VO, VR>(processorSupplier, name);
        StreamTableJoinNode<? super K, ? super V> streamTableJoinNode = new StreamTableJoinNode<K, V>(name, processorParameters, ((KTableImpl)table).valueGetterSupplier().storeNames(), this.name, joined.gracePeriod(), bufferStoreName);
        this.builder.addGraphNode(this.graphNode, streamTableJoinNode);
        return new KStreamImpl<K, V>(name, joined.keySerde() != null ? joined.keySerde() : this.keySerde, null, allSourceNodes, false, streamTableJoinNode, this.builder);
    }

    @Override
    @Deprecated
    public <KR, VR> KStream<KR, VR> transform(TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        String name = this.builder.newProcessorName(TRANSFORM_NAME);
        return this.flatTransform(new TransformerSupplierAdapter<K, V, KR, VR>(transformerSupplier), Named.as(name), stateStoreNames);
    }

    @Override
    @Deprecated
    public <KR, VR> KStream<KR, VR> transform(TransformerSupplier<? super K, ? super V, KeyValue<KR, VR>> transformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        return this.flatTransform(new TransformerSupplierAdapter<K, V, KR, VR>(transformerSupplier), named, stateStoreNames);
    }

    @Override
    @Deprecated
    public <K1, V1> KStream<K1, V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        String name = this.builder.newProcessorName(TRANSFORM_NAME);
        return this.flatTransform(transformerSupplier, Named.as(name), stateStoreNames);
    }

    @Override
    @Deprecated
    public <K1, V1> KStream<K1, V1> flatTransform(TransformerSupplier<? super K, ? super V, Iterable<KeyValue<K1, V1>>> transformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(transformerSupplier, "transformerSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
        ApiUtils.checkSupplier(transformerSupplier);
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name");
        }
        String name = new NamedInternal(named).name();
        StatefulProcessorNode<K, V> transformNode = new StatefulProcessorNode<K, V>(name, new ProcessorParameters<K, V, K1, V1>(new KStreamFlatTransform<K, V, K1, V1>(transformerSupplier), name), stateStoreNames);
        transformNode.keyChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, transformNode);
        return new KStreamImpl<K, V>(name, null, null, this.subTopologySourceNodes, true, transformNode, this.builder);
    }

    @Override
    @Deprecated
    public <VR> KStream<K, VR> transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doTransformValues(KStreamImpl.toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), stateStoreNames);
    }

    @Override
    @Deprecated
    public <VR> KStream<K, VR> transformValues(ValueTransformerSupplier<? super V, ? extends VR> valueTransformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        return this.doTransformValues(KStreamImpl.toValueTransformerWithKeySupplier(valueTransformerSupplier), new NamedInternal(named), stateStoreNames);
    }

    @Override
    @Deprecated
    public <VR> KStream<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
    }

    @Override
    @Deprecated
    public <VR> KStream<K, VR> transformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        return this.doTransformValues(valueTransformerSupplier, new NamedInternal(named), stateStoreNames);
    }

    private <VR> KStream<K, VR> doTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> valueTransformerWithKeySupplier, NamedInternal named, String ... stateStoreNames) {
        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name");
        }
        ApiUtils.checkSupplier(valueTransformerWithKeySupplier);
        String name = named.orElseGenerateWithPrefix(this.builder, TRANSFORMVALUES_NAME);
        StatefulProcessorNode transformNode = new StatefulProcessorNode(name, new ProcessorParameters(new KStreamTransformValues<K, V, VR>(valueTransformerWithKeySupplier), name), stateStoreNames);
        transformNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, transformNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, transformNode, this.builder);
    }

    @Override
    @Deprecated
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doFlatTransformValues(KStreamImpl.toValueTransformerWithKeySupplier(valueTransformerSupplier), NamedInternal.empty(), stateStoreNames);
    }

    @Override
    @Deprecated
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerSupplier<? super V, Iterable<VR>> valueTransformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doFlatTransformValues(KStreamImpl.toValueTransformerWithKeySupplier(valueTransformerSupplier), named, stateStoreNames);
    }

    @Override
    @Deprecated
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doFlatTransformValues(valueTransformerSupplier, NamedInternal.empty(), stateStoreNames);
    }

    @Override
    @Deprecated
    public <VR> KStream<K, VR> flatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null");
        return this.doFlatTransformValues(valueTransformerSupplier, named, stateStoreNames);
    }

    private <VR> KStream<K, VR> doFlatTransformValues(ValueTransformerWithKeySupplier<? super K, ? super V, Iterable<VR>> valueTransformerWithKeySupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "stateStoreNames can't contain `null` as store name");
        }
        ApiUtils.checkSupplier(valueTransformerWithKeySupplier);
        String name = new NamedInternal(named).orElseGenerateWithPrefix(this.builder, TRANSFORMVALUES_NAME);
        StatefulProcessorNode transformNode = new StatefulProcessorNode(name, new ProcessorParameters(new KStreamFlatTransformValues<K, V, VR>(valueTransformerWithKeySupplier), name), stateStoreNames);
        transformNode.setValueChangingOperation(true);
        this.builder.addGraphNode(this.graphNode, transformNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, transformNode, this.builder);
    }

    @Override
    @Deprecated
    public void process(org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier, String ... stateStoreNames) {
        this.process(processorSupplier, Named.as(this.builder.newProcessorName(PROCESSOR_NAME)), stateStoreNames);
    }

    @Override
    @Deprecated
    public void process(org.apache.kafka.streams.processor.ProcessorSupplier<? super K, ? super V> processorSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
        ApiUtils.checkSupplier(processorSupplier);
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null");
        }
        String name = new NamedInternal(named).name();
        StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<K, V>(name, new ProcessorParameters(processorSupplier, name), stateStoreNames);
        this.builder.addGraphNode(this.graphNode, processNode);
    }

    @Override
    public <KOut, VOut> KStream<KOut, VOut> process(ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier, String ... stateStoreNames) {
        return this.process(processorSupplier, Named.as(this.builder.newProcessorName(PROCESSOR_NAME)), stateStoreNames);
    }

    @Override
    public <KOut, VOut> KStream<KOut, VOut> process(ProcessorSupplier<? super K, ? super V, KOut, VOut> processorSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
        ApiUtils.checkSupplier(processorSupplier);
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null");
        }
        String name = new NamedInternal(named).name();
        StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<K, V>(name, new ProcessorParameters<K, V, KOut, VOut>(processorSupplier, name), stateStoreNames);
        this.builder.addGraphNode(this.graphNode, processNode);
        return new KStreamImpl<K, V>(name, null, null, this.subTopologySourceNodes, true, processNode, this.builder);
    }

    @Override
    public <VOut> KStream<K, VOut> processValues(FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier, String ... stateStoreNames) {
        return this.processValues(processorSupplier, Named.as(this.builder.newProcessorName(PROCESSVALUES_NAME)), stateStoreNames);
    }

    @Override
    public <VOut> KStream<K, VOut> processValues(FixedKeyProcessorSupplier<? super K, ? super V, VOut> processorSupplier, Named named, String ... stateStoreNames) {
        Objects.requireNonNull(processorSupplier, "processorSupplier can't be null");
        Objects.requireNonNull(named, "named can't be null");
        Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be a null array");
        ApiUtils.checkSupplier(processorSupplier);
        for (String stateStoreName : stateStoreNames) {
            Objects.requireNonNull(stateStoreName, "stateStoreNames can't be null");
        }
        String name = new NamedInternal(named).name();
        StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<K, V>(name, new ProcessorParameters(processorSupplier, name), stateStoreNames);
        this.builder.addGraphNode(this.graphNode, processNode);
        return new KStreamImpl<K, V>(name, this.keySerde, null, this.subTopologySourceNodes, this.repartitionRequired, processNode, this.builder);
    }
}

