package org.apache.kafka.streams.kstream.internals;

import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.class */
public class KStreamKStreamJoin<K, V1, V2, VOut> implements ProcessorSupplier<K, V1, K, VOut> {
    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
    private final String otherWindowName;
    private final long joinBeforeMs;
    private final long joinAfterMs;
    private final long joinGraceMs;
    private final boolean enableSpuriousResultFix;
    private final long joinSpuriousLookBackTimeMs;
    private final boolean outer;
    private final boolean isLeftSide;
    private final Optional<String> outerJoinWindowName;
    private final ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> joiner;
    private final KStreamImplJoin.TimeTrackerSupplier sharedTimeTrackerSupplier;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin$KStreamKStreamJoinProcessor.class */
    public class KStreamKStreamJoinProcessor extends ContextualProcessor<K, V1, K, VOut> {
        private WindowStore<K, V2> otherWindowStore;
        private Sensor droppedRecordsSensor;
        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>>> outerJoinStore;
        private InternalProcessorContext<K, VOut> internalProcessorContext;
        private KStreamImplJoin.TimeTracker sharedTimeTracker;

        private KStreamKStreamJoinProcessor() {
            this.outerJoinStore = Optional.empty();
        }

        @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
        public void init(ProcessorContext<K, VOut> processorContext) {
            super.init(processorContext);
            this.internalProcessorContext = (InternalProcessorContext) processorContext;
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), (StreamsMetricsImpl) processorContext.metrics());
            this.otherWindowStore = (WindowStore) processorContext.getStateStore(KStreamKStreamJoin.this.otherWindowName);
            this.sharedTimeTracker = KStreamKStreamJoin.this.sharedTimeTrackerSupplier.get(processorContext.taskId());
            if (KStreamKStreamJoin.this.enableSpuriousResultFix) {
                Optional optional = KStreamKStreamJoin.this.outerJoinWindowName;
                Objects.requireNonNull(processorContext);
                this.outerJoinStore = optional.map(processorContext::getStateStore);
                this.sharedTimeTracker.setEmitInterval(StreamsConfig.InternalConfig.getLong(processorContext.appConfigs(), StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_OUTER_JOIN_SPURIOUS_RESULTS_FIX, 1000L));
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.processor.api.Processor
        public void process(Record<K, V1> record) {
            if (StreamStreamJoinUtil.skipRecord(record, KStreamKStreamJoin.LOG, this.droppedRecordsSensor, context())) {
                return;
            }
            boolean z = KStreamKStreamJoin.this.outer;
            long timestamp = record.timestamp();
            long max = Math.max(0L, timestamp - KStreamKStreamJoin.this.joinBeforeMs);
            long max2 = Math.max(0L, timestamp + KStreamKStreamJoin.this.joinAfterMs);
            this.sharedTimeTracker.advanceStreamTime(timestamp);
            if (timestamp == this.sharedTimeTracker.streamTime) {
                this.outerJoinStore.ifPresent(keyValueStore -> {
                    emitNonJoinedOuterRecords(keyValueStore, record);
                });
            }
            WindowStoreIterator<V2> fetch = this.otherWindowStore.fetch((WindowStore<K, V2>) record.key(), max, max2);
            while (fetch.hasNext()) {
                try {
                    z = false;
                    KeyValue keyValue = (KeyValue) fetch.next();
                    long longValue = ((Long) keyValue.key).longValue();
                    this.outerJoinStore.ifPresent(keyValueStore2 -> {
                        keyValueStore2.putIfAbsent(TimestampedKeyAndJoinSide.make(!KStreamKStreamJoin.this.isLeftSide, record.key(), longValue), null);
                    });
                    context().forward(record.withValue(KStreamKStreamJoin.this.joiner.apply(record.key(), record.value(), keyValue.value)).withTimestamp(Math.max(timestamp, longValue)));
                } catch (Throwable th) {
                    if (fetch != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (z) {
                if (!this.outerJoinStore.isPresent() || max2 < this.sharedTimeTracker.streamTime) {
                    context().forward(record.withValue(KStreamKStreamJoin.this.joiner.apply(record.key(), record.value(), null)));
                } else {
                    this.sharedTimeTracker.updatedMinTime(timestamp);
                    this.outerJoinStore.ifPresent(keyValueStore3 -> {
                        keyValueStore3.put(TimestampedKeyAndJoinSide.make(KStreamKStreamJoin.this.isLeftSide, record.key(), timestamp), LeftOrRightValue.make(KStreamKStreamJoin.this.isLeftSide, record.value()));
                    });
                }
            }
            if (fetch != null) {
                fetch.close();
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private void emitNonJoinedOuterRecords(KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> keyValueStore, Record<K, V1> record) {
            if (this.sharedTimeTracker.minTime < (this.sharedTimeTracker.streamTime - KStreamKStreamJoin.this.joinSpuriousLookBackTimeMs) - KStreamKStreamJoin.this.joinGraceMs && this.internalProcessorContext.currentSystemTimeMs() >= this.sharedTimeTracker.nextTimeToEmit) {
                this.sharedTimeTracker.nextTimeToEmit = this.internalProcessorContext.currentSystemTimeMs();
                this.sharedTimeTracker.advanceNextTimeToEmit();
                this.sharedTimeTracker.minTime = Long.MAX_VALUE;
                KeyValueIterator<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<V1, V2>> all = keyValueStore.all();
                TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide = null;
                while (all.hasNext()) {
                    try {
                        KeyValue next = all.next();
                        TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide2 = (TimestampedKeyAndJoinSide) next.key;
                        LeftOrRightValue leftOrRightValue = (LeftOrRightValue) next.value;
                        K key = timestampedKeyAndJoinSide2.getKey();
                        long timestamp = timestampedKeyAndJoinSide2.getTimestamp();
                        this.sharedTimeTracker.minTime = timestamp;
                        if (timestamp + KStreamKStreamJoin.this.joinAfterMs + KStreamKStreamJoin.this.joinGraceMs >= this.sharedTimeTracker.streamTime) {
                            break;
                        }
                        context().forward(record.withKey(key).withValue(KStreamKStreamJoin.this.isLeftSide ? KStreamKStreamJoin.this.joiner.apply(key, leftOrRightValue.getLeftValue(), leftOrRightValue.getRightValue()) : KStreamKStreamJoin.this.joiner.apply(key, leftOrRightValue.getRightValue(), leftOrRightValue.getLeftValue())).withTimestamp(timestamp));
                        if (timestampedKeyAndJoinSide != null && !timestampedKeyAndJoinSide.equals(timestampedKeyAndJoinSide2)) {
                            keyValueStore.put(timestampedKeyAndJoinSide, null);
                        }
                        timestampedKeyAndJoinSide = timestampedKeyAndJoinSide2;
                    } catch (Throwable th) {
                        if (all != null) {
                            try {
                                all.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }
                if (timestampedKeyAndJoinSide != null) {
                    keyValueStore.put(timestampedKeyAndJoinSide, null);
                }
                if (all != null) {
                    all.close();
                }
            }
        }

        @Override // org.apache.kafka.streams.processor.api.Processor
        public void close() {
            KStreamKStreamJoin.this.sharedTimeTrackerSupplier.remove(context().taskId());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KStreamKStreamJoin(boolean z, String str, JoinWindowsInternal joinWindowsInternal, ValueJoinerWithKey<? super K, ? super V1, ? super V2, ? extends VOut> valueJoinerWithKey, boolean z2, Optional<String> optional, KStreamImplJoin.TimeTrackerSupplier timeTrackerSupplier) {
        this.isLeftSide = z;
        this.otherWindowName = str;
        if (z) {
            this.joinBeforeMs = joinWindowsInternal.beforeMs;
            this.joinAfterMs = joinWindowsInternal.afterMs;
            this.joinSpuriousLookBackTimeMs = joinWindowsInternal.beforeMs;
        } else {
            this.joinBeforeMs = joinWindowsInternal.afterMs;
            this.joinAfterMs = joinWindowsInternal.beforeMs;
            this.joinSpuriousLookBackTimeMs = joinWindowsInternal.afterMs;
        }
        this.joinGraceMs = joinWindowsInternal.gracePeriodMs();
        this.enableSpuriousResultFix = joinWindowsInternal.spuriousResultFixEnabled();
        this.joiner = valueJoinerWithKey;
        this.outer = z2;
        this.outerJoinWindowName = optional;
        this.sharedTimeTrackerSupplier = timeTrackerSupplier;
    }

    @Override // org.apache.kafka.streams.processor.api.ProcessorSupplier, java.util.function.Supplier
    public Processor<K, V1, K, VOut> get() {
        return new KStreamKStreamJoinProcessor();
    }
}
