/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.state.StateSerdes;

class StoreChangeLogger<K, V> {
    protected final StateSerdes<K, V> serialization;
    private final String topic;
    private final int partition;
    private final ProcessorContext context;
    private final RecordCollector collector;

    StoreChangeLogger(String storeName, ProcessorContext context, StateSerdes<K, V> serialization) {
        this(storeName, context, context.taskId().partition, serialization);
    }

    private StoreChangeLogger(String storeName, ProcessorContext context, int partition, StateSerdes<K, V> serialization) {
        this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName, context.applicationInternalStream());
        this.context = context;
        this.partition = partition;
        this.serialization = serialization;
        this.collector = ((RecordCollector.Supplier)((Object)context)).recordCollector();
    }

    void logChange(K key, V value) {
        if (this.collector != null) {
            Serializer<K> keySerializer = this.serialization.keySerializer();
            Serializer<V> valueSerializer = this.serialization.valueSerializer();
            this.collector.send(this.topic, key, value, this.partition, this.context.timestamp(), keySerializer, valueSerializer);
        }
    }
}

