package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.nio.ByteBuffer;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.class */
public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
    private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class);
    private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder;
    private final CombinedKeySchema<KO, K> keySchema;
    private boolean useVersionedSemantics = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier$KTableKTableJoinProcessor.class */
    public final class KTableKTableJoinProcessor extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
        private Sensor droppedRecordsSensor;
        private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> subscriptionStore;

        private KTableKTableJoinProcessor() {
        }

        @Override // org.apache.kafka.streams.processor.api.ContextualProcessor, org.apache.kafka.streams.processor.api.Processor
        public void init(ProcessorContext<K, SubscriptionResponseWrapper<VO>> processorContext) {
            super.init(processorContext);
            InternalProcessorContext internalProcessorContext = (InternalProcessorContext) processorContext;
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), internalProcessorContext.taskId().toString(), internalProcessorContext.metrics());
            this.subscriptionStore = (TimestampedKeyValueStore) internalProcessorContext.getStateStore(ForeignTableJoinProcessorSupplier.this.storeBuilder);
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.kafka.streams.processor.api.Processor
        public void process(Record<KO, Change<VO>> record) {
            if (record.key() == null) {
                if (context().recordMetadata().isPresent()) {
                    RecordMetadata recordMetadata = context().recordMetadata().get();
                    ForeignTableJoinProcessorSupplier.LOG.warn("Skipping record due to null key. topic=[{}] partition=[{}] offset=[{}]", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
                } else {
                    ForeignTableJoinProcessorSupplier.LOG.warn("Skipping record due to null key. Topic, partition, and offset not known.");
                }
                this.droppedRecordsSensor.record();
                return;
            }
            if (ForeignTableJoinProcessorSupplier.this.useVersionedSemantics && !record.value().isLatest) {
                ForeignTableJoinProcessorSupplier.LOG.info("Skipping out-of-order record from versioned table while performing table-table join.");
                this.droppedRecordsSensor.record();
                return;
            }
            Bytes prefixBytes = ForeignTableJoinProcessorSupplier.this.keySchema.prefixBytes(record.key());
            KeyValueIterator<Bytes, SubscriptionWrapper<K>> range = this.subscriptionStore.range(prefixBytes, Bytes.increment(prefixBytes));
            while (range.hasNext()) {
                try {
                    KeyValue next = range.next();
                    if (prefixEquals(((Bytes) next.key).get(), prefixBytes.get())) {
                        context().forward(record.withKey(ForeignTableJoinProcessorSupplier.this.keySchema.fromBytes((Bytes) next.key).getPrimaryKey()).withValue(new SubscriptionResponseWrapper(((SubscriptionWrapper) ((ValueAndTimestamp) next.value).value()).getHash(), record.value().newValue, ((SubscriptionWrapper) ((ValueAndTimestamp) next.value).value()).getPrimaryPartition())));
                    }
                } catch (Throwable th) {
                    if (range != null) {
                        try {
                            range.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (range != null) {
                range.close();
            }
        }

        private boolean prefixEquals(byte[] bArr, byte[] bArr2) {
            int min = Math.min(bArr.length, bArr2.length);
            return ByteBuffer.wrap(bArr, 0, min).equals(ByteBuffer.wrap(bArr2, 0, min));
        }
    }

    public ForeignTableJoinProcessorSupplier(StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, CombinedKeySchema<KO, K> combinedKeySchema) {
        this.storeBuilder = storeBuilder;
        this.keySchema = combinedKeySchema;
    }

    @Override // org.apache.kafka.streams.processor.api.ProcessorSupplier, java.util.function.Supplier
    public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
        return new KTableKTableJoinProcessor();
    }

    public void setUseVersionedSemantics(boolean z) {
        this.useVersionedSemantics = z;
    }

    public boolean isUseVersionedSemantics() {
        return this.useVersionedSemantics;
    }
}
