package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.class */
class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends AbstractProcessor<K1, V1> {
    private static final Logger LOG = LoggerFactory.getLogger(KStreamKTableJoin.class);
    private final KTableValueGetter<K2, V2> valueGetter;
    private final KeyValueMapper<? super K1, ? super V1, ? extends K2> keyMapper;
    private final ValueJoiner<? super V1, ? super V2, ? extends R> joiner;
    private final boolean leftJoin;
    private StreamsMetricsImpl metrics;
    private Sensor droppedRecordsSensor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KStreamKTableJoinProcessor(KTableValueGetter<K2, V2> kTableValueGetter, KeyValueMapper<? super K1, ? super V1, ? extends K2> keyValueMapper, ValueJoiner<? super V1, ? super V2, ? extends R> valueJoiner, boolean z) {
        this.valueGetter = kTableValueGetter;
        this.keyMapper = keyValueMapper;
        this.joiner = valueJoiner;
        this.leftJoin = z;
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void init(ProcessorContext processorContext) {
        super.init(processorContext);
        this.metrics = (StreamsMetricsImpl) processorContext.metrics();
        this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(Thread.currentThread().getName(), processorContext.taskId().toString(), this.metrics);
        this.valueGetter.init(processorContext);
    }

    @Override // org.apache.kafka.streams.processor.Processor
    public void process(K1 k1, V1 v1) {
        if (k1 == null || v1 == null) {
            LOG.warn("Skipping record due to null key or value. key=[{}] value=[{}] topic=[{}] partition=[{}] offset=[{}]", new Object[]{k1, v1, context().topic(), Integer.valueOf(context().partition()), Long.valueOf(context().offset())});
            this.droppedRecordsSensor.record();
            return;
        }
        K2 apply = this.keyMapper.apply(k1, v1);
        Object valueOrNull = apply == null ? null : ValueAndTimestamp.getValueOrNull(this.valueGetter.get(apply));
        if (this.leftJoin || valueOrNull != null) {
            context().forward(k1, this.joiner.apply(v1, valueOrNull));
        }
    }

    @Override // org.apache.kafka.streams.processor.AbstractProcessor, org.apache.kafka.streams.processor.Processor
    public void close() {
        this.valueGetter.close();
    }
}
