package org.apache.kafka.streams.kstream.internals;

import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowStore;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.class */
public class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStream<K, V> {
    static final String REDUCE_NAME = "KSTREAM-REDUCE-";
    static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
    private final Serde<K> keySerde;
    private final Serde<V> valSerde;
    private final boolean repartitionRequired;
    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
    private boolean isQueryable;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KGroupedStreamImpl(InternalStreamsBuilder internalStreamsBuilder, String str, Set<String> set, Serde<K> serde, Serde<V> serde2, boolean z) {
        super(internalStreamsBuilder, str, set);
        this.isQueryable = true;
        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(internalStreamsBuilder, serde, serde2, z, set, str);
        this.keySerde = serde;
        this.valSerde = serde2;
        this.repartitionRequired = z;
        this.isQueryable = true;
    }

    private void determineIsQueryable(String str) {
        if (str == null) {
            this.isQueryable = false;
        }
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<K, V> reduce(Reducer<V> reducer, String str) {
        determineIsQueryable(str);
        return reduce(reducer, keyValueStore(this.keySerde, this.valSerde, getOrCreateName(str, REDUCE_NAME)));
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<K, V> reduce(Reducer<V> reducer) {
        determineIsQueryable(null);
        return reduce(reducer, (String) null);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<K, V> reduce(Reducer<V> reducer, StateStoreSupplier<KeyValueStore> stateStoreSupplier) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        Objects.requireNonNull(stateStoreSupplier, "storeSupplier can't be null");
        return (KTable<K, V>) doAggregate(new KStreamReduce(stateStoreSupplier.name(), reducer), REDUCE_NAME, stateStoreSupplier);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<K, V> reduce(Reducer<V> reducer, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized, this.builder, REDUCE_NAME);
        return (KTable<K, V>) doAggregate(new KStreamReduce(materializedInternal.storeName(), reducer), REDUCE_NAME, materializedInternal);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer, Windows<W> windows, String str) {
        determineIsQueryable(str);
        return reduce(reducer, windows, windowedStore(this.keySerde, this.valSerde, windows, getOrCreateName(str, REDUCE_NAME)));
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer, Windows<W> windows) {
        return windowedBy(windows).reduce(reducer);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window> KTable<Windowed<K>, V> reduce(Reducer<V> reducer, Windows<W> windows, StateStoreSupplier<WindowStore> stateStoreSupplier) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        Objects.requireNonNull(windows, "windows can't be null");
        Objects.requireNonNull(stateStoreSupplier, "storeSupplier can't be null");
        return (KTable<Windowed<K>, V>) doAggregate(new KStreamWindowReduce(windows, stateStoreSupplier.name(), reducer), REDUCE_NAME, stateStoreSupplier);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Serde<T> serde, String str) {
        determineIsQueryable(str);
        return aggregate(initializer, aggregator, keyValueStore(this.keySerde, serde, getOrCreateName(str, AGGREGATE_NAME)));
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <VR> KTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        return aggregateMaterialized(initializer, aggregator, materialized);
    }

    private <VR> KTable<K, VR> aggregateMaterialized(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
        MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(materialized, this.builder, AGGREGATE_NAME);
        return (KTable<K, VR>) doAggregate(new KStreamAggregate(materializedInternal.storeName(), initializer, aggregator), AGGREGATE_NAME, materializedInternal);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <VR> KTable<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<>(Materialized.with(this.keySerde, null), this.builder, AGGREGATE_NAME);
        return (KTable<K, VR>) doAggregate(new KStreamAggregate(materializedInternal.storeName(), initializer, aggregator), AGGREGATE_NAME, materializedInternal);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Serde<T> serde) {
        return aggregate(initializer, aggregator, serde, (String) null);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, StateStoreSupplier<KeyValueStore> stateStoreSupplier) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(stateStoreSupplier, "storeSupplier can't be null");
        return doAggregate(new KStreamAggregate(stateStoreSupplier.name(), initializer, aggregator), AGGREGATE_NAME, stateStoreSupplier);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Windows<W> windows, Serde<T> serde, String str) {
        determineIsQueryable(str);
        return aggregate(initializer, aggregator, windows, windowedStore(this.keySerde, serde, windows, getOrCreateName(str, AGGREGATE_NAME)));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Windows<W> windows, Serde<T> serde) {
        return (KTable<Windowed<K>, T>) windowedBy(windows).aggregate(initializer, aggregator, Materialized.as(this.builder.newStoreName(AGGREGATE_NAME)).withKeySerde(this.keySerde).withValueSerde(serde));
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window, T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Windows<W> windows, StateStoreSupplier<WindowStore> stateStoreSupplier) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(windows, "windows can't be null");
        Objects.requireNonNull(stateStoreSupplier, "storeSupplier can't be null");
        return doAggregate(new KStreamWindowAggregate(windows, stateStoreSupplier.name(), initializer, aggregator), AGGREGATE_NAME, stateStoreSupplier);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<K, Long> count(String str) {
        determineIsQueryable(str);
        return count(keyValueStore(this.keySerde, Serdes.Long(), getOrCreateName(str, AGGREGATE_NAME)));
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<K, Long> count() {
        return count((String) null);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<K, Long> count(StateStoreSupplier<KeyValueStore> stateStoreSupplier) {
        return (KTable<K, Long>) aggregate(this.aggregateBuilder.countInitializer, this.aggregateBuilder.countAggregator, stateStoreSupplier);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        if (new MaterializedInternal(materialized, this.builder, AGGREGATE_NAME).valueSerde() == null) {
            materialized.withValueSerde(Serdes.Long());
        }
        return (KTable<K, Long>) aggregate(this.aggregateBuilder.countInitializer, this.aggregateBuilder.countAggregator, materialized);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows, String str) {
        determineIsQueryable(str);
        return count(windows, windowedStore(this.keySerde, Serdes.Long(), windows, getOrCreateName(str, AGGREGATE_NAME)));
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows) {
        return windowedBy(windows).count();
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window> KTable<Windowed<K>, Long> count(Windows<W> windows, StateStoreSupplier<WindowStore> stateStoreSupplier) {
        return (KTable<Windowed<K>, Long>) aggregate(this.aggregateBuilder.countInitializer, this.aggregateBuilder.countAggregator, windows, stateStoreSupplier);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Merger<? super K, T> merger, SessionWindows sessionWindows, Serde<T> serde, String str) {
        determineIsQueryable(str);
        return aggregate(initializer, aggregator, merger, sessionWindows, serde, storeFactory(this.keySerde, serde, getOrCreateName(str, AGGREGATE_NAME)).sessionWindowed(sessionWindows.maintainMs()).build());
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Merger<? super K, T> merger, SessionWindows sessionWindows, Serde<T> serde) {
        return (KTable<Windowed<K>, T>) windowedBy(sessionWindows).aggregate(initializer, aggregator, merger, Materialized.as(this.builder.newStoreName(AGGREGATE_NAME)).withKeySerde(this.keySerde).withValueSerde(serde));
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <T> KTable<Windowed<K>, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> aggregator, Merger<? super K, T> merger, SessionWindows sessionWindows, Serde<T> serde, StateStoreSupplier<SessionStore> stateStoreSupplier) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
        Objects.requireNonNull(merger, "sessionMerger can't be null");
        Objects.requireNonNull(stateStoreSupplier, "storeSupplier can't be null");
        return doAggregate(new KStreamSessionWindowAggregate(sessionWindows, stateStoreSupplier.name(), initializer, aggregator, merger), AGGREGATE_NAME, stateStoreSupplier);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public <W extends Window> TimeWindowedKStream<K, V> windowedBy(Windows<W> windows) {
        return new TimeWindowedKStreamImpl(windows, this.builder, this.sourceNodes, this.name, this.keySerde, this.valSerde, this.repartitionRequired);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public SessionWindowedKStream<K, V> windowedBy(SessionWindows sessionWindows) {
        return new SessionWindowedKStreamImpl(sessionWindows, this.builder, this.sourceNodes, this.name, this.keySerde, this.valSerde, this.aggregateBuilder);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<Windowed<K>, Long> count(SessionWindows sessionWindows, String str) {
        return windowedBy(sessionWindows).count(Materialized.as(getOrCreateName(str, AGGREGATE_NAME)).withKeySerde(this.keySerde).withValueSerde(Serdes.Long()));
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<Windowed<K>, Long> count(SessionWindows sessionWindows) {
        return windowedBy(sessionWindows).count();
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<Windowed<K>, Long> count(SessionWindows sessionWindows, StateStoreSupplier<SessionStore> stateStoreSupplier) {
        Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
        Objects.requireNonNull(stateStoreSupplier, "storeSupplier can't be null");
        return (KTable<Windowed<K>, Long>) aggregate(this.aggregateBuilder.countInitializer, this.aggregateBuilder.countAggregator, new Merger<K, Long>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.1
            /* renamed from: apply, reason: avoid collision after fix types in other method */
            public Long apply2(K k, Long l, Long l2) {
                return Long.valueOf(l.longValue() + l2.longValue());
            }

            @Override // org.apache.kafka.streams.kstream.Merger
            public /* bridge */ /* synthetic */ Long apply(Object obj, Long l, Long l2) {
                return apply2((AnonymousClass1) obj, l, l2);
            }
        }, sessionWindows, Serdes.Long(), stateStoreSupplier);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<Windowed<K>, V> reduce(Reducer<V> reducer, SessionWindows sessionWindows, String str) {
        determineIsQueryable(str);
        return reduce(reducer, sessionWindows, storeFactory(this.keySerde, this.valSerde, getOrCreateName(str, AGGREGATE_NAME)).sessionWindowed(sessionWindows.maintainMs()).build());
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<Windowed<K>, V> reduce(Reducer<V> reducer, SessionWindows sessionWindows) {
        return windowedBy(sessionWindows).reduce(reducer);
    }

    @Override // org.apache.kafka.streams.kstream.KGroupedStream
    public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer, SessionWindows sessionWindows, StateStoreSupplier<SessionStore> stateStoreSupplier) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
        Objects.requireNonNull(stateStoreSupplier, "storeSupplier can't be null");
        Initializer<V> initializer = new Initializer<V>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.2
            @Override // org.apache.kafka.streams.kstream.Initializer
            public V apply() {
                return null;
            }
        };
        final Aggregator<K, V, V> aggregator = new Aggregator<K, V, V>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.3
            @Override // org.apache.kafka.streams.kstream.Aggregator
            public V apply(K k, V v, V v2) {
                return v2 == null ? v : (V) reducer.apply(v2, v);
            }
        };
        return (KTable<Windowed<K>, V>) aggregate(initializer, aggregator, new Merger<K, V>() { // from class: org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.4
            @Override // org.apache.kafka.streams.kstream.Merger
            public V apply(K k, V v, V v2) {
                return (V) aggregator.apply(k, v2, v);
            }
        }, sessionWindows, this.valSerde, stateStoreSupplier);
    }

    private <T> KTable<K, T> doAggregate(KStreamAggProcessorSupplier<K, ?, V, T> kStreamAggProcessorSupplier, String str, MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
        return this.aggregateBuilder.build(kStreamAggProcessorSupplier, str, new KeyValueStoreMaterializer(materializedInternal).materialize(), materializedInternal.isQueryable());
    }

    private <T> KTable<K, T> doAggregate(KStreamAggProcessorSupplier<K, ?, V, T> kStreamAggProcessorSupplier, String str, StateStoreSupplier stateStoreSupplier) {
        String newProcessorName = this.builder.newProcessorName(str);
        String repartitionIfRequired = repartitionIfRequired(stateStoreSupplier.name());
        this.builder.internalTopologyBuilder.addProcessor(newProcessorName, kStreamAggProcessorSupplier, repartitionIfRequired);
        this.builder.internalTopologyBuilder.addStateStore(stateStoreSupplier, newProcessorName);
        return new KTableImpl(this.builder, newProcessorName, kStreamAggProcessorSupplier, repartitionIfRequired.equals(this.name) ? this.sourceNodes : Collections.singleton(repartitionIfRequired), stateStoreSupplier.name(), this.isQueryable);
    }

    private String repartitionIfRequired(String str) {
        return !this.repartitionRequired ? this.name : KStreamImpl.createReparitionedSource(this.builder, this.keySerde, this.valSerde, str, this.name);
    }
}
