/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.FixedOrderMap;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRegister;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateManager;
import org.apache.kafka.streams.processor.internals.StateManagerUtil;
import org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.RecordConverter;
import org.slf4j.Logger;

public class ProcessorStateManager
implements StateManager {
    private static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
    private Logger log;
    private String logPrefix;
    private final TaskId taskId;
    private final boolean eosEnabled;
    private final ChangelogRegister changelogReader;
    private final Map<String, String> storeToChangelogTopic;
    private final Collection<TopicPartition> sourcePartitions;
    private final FixedOrderMap<String, StateStoreMetadata> stores = new FixedOrderMap();
    private final FixedOrderMap<String, StateStore> globalStores = new FixedOrderMap();
    private final File baseDir;
    private final OffsetCheckpoint checkpointFile;
    private Task.TaskType taskType;

    public static String storeChangelogTopic(String applicationId, String storeName, String internalStream) {
        String topicName = applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
        return internalStream.isEmpty() ? topicName : internalStream + ":" + topicName;
    }

    public ProcessorStateManager(TaskId taskId, Task.TaskType taskType, boolean eosEnabled, LogContext logContext, StateDirectory stateDirectory, ChangelogRegister changelogReader, Map<String, String> storeToChangelogTopic, Collection<TopicPartition> sourcePartitions) throws ProcessorStateException {
        this.log = logContext.logger(ProcessorStateManager.class);
        this.logPrefix = logContext.logPrefix();
        this.taskId = taskId;
        this.taskType = taskType;
        this.eosEnabled = eosEnabled;
        this.changelogReader = changelogReader;
        this.sourcePartitions = sourcePartitions;
        this.storeToChangelogTopic = storeToChangelogTopic;
        this.baseDir = stateDirectory.directoryForTask(taskId);
        this.checkpointFile = new OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
        this.log.debug("Created state store manager for task {}", (Object)taskId);
    }

    void registerStateStores(List<StateStore> allStores, InternalProcessorContext processorContext) {
        processorContext.uninitialize();
        for (StateStore store : allStores) {
            if (this.stores.containsKey((Object)store.name())) {
                this.maybeRegisterStoreWithChangelogReader(store.name());
            } else {
                store.init(processorContext, store);
            }
            this.log.trace("Registered state store {}", (Object)store.name());
        }
    }

    void registerGlobalStateStores(List<StateStore> stateStores) {
        this.log.debug("Register global stores {}", stateStores);
        for (StateStore stateStore : stateStores) {
            this.globalStores.put((Object)stateStore.name(), (Object)stateStore);
        }
    }

    @Override
    public StateStore getGlobalStore(String name) {
        return (StateStore)this.globalStores.get((Object)name);
    }

    void initializeStoreOffsetsFromCheckpoint(boolean storeDirIsEmpty) {
        try {
            Map<TopicPartition, Long> loadedCheckpoints = this.checkpointFile.read();
            this.log.trace("Loaded offsets from the checkpoint file: {}", loadedCheckpoints);
            for (StateStoreMetadata store : this.stores.values()) {
                if (store.corrupted) {
                    this.log.error("Tried to initialize store offsets for corrupted store {}", (Object)store);
                    throw new IllegalStateException("Should not initialize offsets for a corrupted task");
                }
                if (store.changelogPartition == null) {
                    this.log.info("State store {} is not logged and hence would not be restored", (Object)store.stateStore.name());
                    continue;
                }
                if (!store.stateStore.persistent()) {
                    this.log.info("Initializing to the starting offset for changelog {} of in-memory state store {}", (Object)store.changelogPartition, (Object)store.stateStore.name());
                    continue;
                }
                if (store.offset() == null) {
                    if (loadedCheckpoints.containsKey(store.changelogPartition)) {
                        Long offset = this.changelogOffsetFromCheckpointedOffset(loadedCheckpoints.remove(store.changelogPartition));
                        store.setOffset(offset);
                        this.log.debug("State store {} initialized from checkpoint with offset {} at changelog {}", new Object[]{store.stateStore.name(), store.offset, store.changelogPartition});
                        continue;
                    }
                    if (this.eosEnabled && !storeDirIsEmpty) {
                        this.log.warn("State store {} did not find checkpoint offsets while stores are not empty, since under EOS it has the risk of getting uncommitted data in stores we have to treat it as a task corruption error and wipe out the local state of task {} before re-bootstrapping", (Object)store.stateStore.name(), (Object)this.taskId);
                        throw new TaskCorruptedException(Collections.singletonMap(this.taskId, this.changelogPartitions()));
                    }
                    this.log.info("State store {} did not find checkpoint offset, hence would default to the starting offset at changelog {}", (Object)store.stateStore.name(), (Object)store.changelogPartition);
                    continue;
                }
                loadedCheckpoints.remove(store.changelogPartition);
                this.log.debug("Skipping re-initialization of offset from checkpoint for recycled store {}", (Object)store.stateStore.name());
            }
            if (!loadedCheckpoints.isEmpty()) {
                this.log.warn("Some loaded checkpoint offsets cannot find their corresponding state stores: {}", loadedCheckpoints);
            }
            this.checkpointFile.delete();
        }
        catch (TaskCorruptedException e) {
            throw e;
        }
        catch (IOException | RuntimeException e) {
            throw new ProcessorStateException(String.format("%sError loading and deleting checkpoint file when creating the state manager", this.logPrefix), e);
        }
    }

    private void maybeRegisterStoreWithChangelogReader(String storeName) {
        if (this.isLoggingEnabled(storeName)) {
            this.changelogReader.register(this.getStorePartition(storeName), this);
        }
    }

    private List<TopicPartition> getAllChangelogTopicPartitions() {
        ArrayList<TopicPartition> allChangelogPartitions = new ArrayList<TopicPartition>();
        for (StateStoreMetadata storeMetadata : this.stores.values()) {
            if (storeMetadata.changelogPartition == null) continue;
            allChangelogPartitions.add(storeMetadata.changelogPartition);
        }
        return allChangelogPartitions;
    }

    @Override
    public File baseDir() {
        return this.baseDir;
    }

    @Override
    public void registerStore(StateStore store, StateRestoreCallback stateRestoreCallback) {
        String storeName = store.name();
        if (".checkpoint".equals(storeName)) {
            throw new IllegalArgumentException(String.format("%sIllegal store name: %s, which collides with the pre-defined checkpoint file name", this.logPrefix, storeName));
        }
        if (this.stores.containsKey((Object)storeName)) {
            throw new IllegalArgumentException(String.format("%sStore %s has already been registered.", this.logPrefix, storeName));
        }
        if (stateRestoreCallback instanceof StateRestoreListener) {
            this.log.warn("The registered state restore callback is also implementing the state restore listener interface, which is not expected and would be ignored");
        }
        StateStoreMetadata storeMetadata = this.isLoggingEnabled(storeName) ? new StateStoreMetadata(store, this.getStorePartition(storeName), stateRestoreCallback, StateManagerUtil.converterForStore(store)) : new StateStoreMetadata(store);
        this.stores.put((Object)storeName, (Object)storeMetadata);
        this.maybeRegisterStoreWithChangelogReader(storeName);
        this.log.debug("Registered state store {} to its state manager", (Object)storeName);
    }

    @Override
    public StateStore getStore(String name) {
        if (this.stores.containsKey((Object)name)) {
            return ((StateStoreMetadata)this.stores.get((Object)name)).stateStore;
        }
        return null;
    }

    Collection<TopicPartition> changelogPartitions() {
        return this.changelogOffsets().keySet();
    }

    void markChangelogAsCorrupted(Collection<TopicPartition> partitions) {
        for (StateStoreMetadata storeMetadata : this.stores.values()) {
            if (!partitions.contains(storeMetadata.changelogPartition)) continue;
            storeMetadata.corrupted = true;
            partitions.remove(storeMetadata.changelogPartition);
        }
        if (!partitions.isEmpty()) {
            throw new IllegalStateException("Some partitions " + partitions + " are not contained in the store list of task " + this.taskId + " marking as corrupted, this is not expected");
        }
    }

    @Override
    public Map<TopicPartition, Long> changelogOffsets() {
        HashMap<TopicPartition, Long> changelogOffsets = new HashMap<TopicPartition, Long>();
        for (StateStoreMetadata storeMetadata : this.stores.values()) {
            if (storeMetadata.changelogPartition == null) continue;
            changelogOffsets.put(storeMetadata.changelogPartition, storeMetadata.offset == null ? 0L : storeMetadata.offset + 1L);
        }
        return changelogOffsets;
    }

    TaskId taskId() {
        return this.taskId;
    }

    boolean changelogAsSource(TopicPartition partition) {
        return this.sourcePartitions.contains(partition);
    }

    @Override
    public Task.TaskType taskType() {
        return this.taskType;
    }

    StateStoreMetadata storeMetadata(TopicPartition partition) {
        for (StateStoreMetadata storeMetadata : this.stores.values()) {
            if (!partition.equals((Object)storeMetadata.changelogPartition)) continue;
            return storeMetadata;
        }
        return null;
    }

    void restore(StateStoreMetadata storeMetadata, List<ConsumerRecord<byte[], byte[]>> restoreRecords) {
        if (!this.stores.containsValue((Object)storeMetadata)) {
            throw new IllegalStateException("Restoring " + storeMetadata + " which is not registered in this state manager, this should not happen.");
        }
        if (!restoreRecords.isEmpty()) {
            Long batchEndOffset = restoreRecords.get(restoreRecords.size() - 1).offset();
            RecordBatchingStateRestoreCallback restoreCallback = StateRestoreCallbackAdapter.adapt(storeMetadata.restoreCallback);
            List<ConsumerRecord<byte[], byte[]>> convertedRecords = restoreRecords.stream().map(storeMetadata.recordConverter::convert).collect(Collectors.toList());
            try {
                restoreCallback.restoreBatch(convertedRecords);
            }
            catch (RuntimeException e) {
                throw new ProcessorStateException(String.format("%sException caught while trying to restore state from %s", this.logPrefix, storeMetadata.changelogPartition), e);
            }
            storeMetadata.setOffset(batchEndOffset);
        }
    }

    @Override
    public void flush() {
        Object firstException = null;
        if (!this.stores.isEmpty()) {
            this.log.debug("Flushing all stores registered in the state manager: {}", this.stores);
            for (StateStoreMetadata metadata : this.stores.values()) {
                StateStore store = metadata.stateStore;
                this.log.trace("Flushing store {}", (Object)store.name());
                try {
                    store.flush();
                }
                catch (RuntimeException exception) {
                    if (firstException == null) {
                        firstException = exception instanceof StreamsException ? exception : new ProcessorStateException(String.format("%sFailed to flush state store %s", this.logPrefix, store.name()), exception);
                    }
                    this.log.error("Failed to flush state store {}: ", (Object)store.name(), (Object)exception);
                }
            }
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    @Override
    public void close() throws ProcessorStateException {
        this.log.debug("Closing its state manager and all the registered state stores: {}", this.stores);
        this.changelogReader.unregister(this.getAllChangelogTopicPartitions());
        Object firstException = null;
        if (!this.stores.isEmpty()) {
            for (Map.Entry entry : this.stores.entrySet()) {
                StateStore store = ((StateStoreMetadata)entry.getValue()).stateStore;
                this.log.trace("Closing store {}", (Object)store.name());
                try {
                    store.close();
                }
                catch (RuntimeException exception) {
                    if (firstException == null) {
                        firstException = exception instanceof StreamsException ? exception : new ProcessorStateException(String.format("%sFailed to close state store %s", this.logPrefix, store.name()), exception);
                    }
                    this.log.error("Failed to close state store {}: ", (Object)store.name(), (Object)exception);
                }
            }
            this.stores.clear();
        }
        if (firstException != null) {
            throw firstException;
        }
    }

    void recycle() {
        this.log.debug("Recycling state for {} task {}.", (Object)this.taskType, (Object)this.taskId);
        List<TopicPartition> allChangelogs = this.getAllChangelogTopicPartitions();
        this.changelogReader.unregister(allChangelogs);
    }

    void transitionTaskType(Task.TaskType newType, LogContext logContext) {
        if (this.taskType.equals((Object)newType)) {
            throw new IllegalStateException("Tried to recycle state for task type conversion but new type was the same.");
        }
        Task.TaskType oldType = this.taskType;
        this.taskType = newType;
        this.log = logContext.logger(ProcessorStateManager.class);
        this.logPrefix = logContext.logPrefix();
        this.log.debug("Transitioning state manager for {} task {} to {}", new Object[]{oldType, this.taskId, newType});
    }

    @Override
    public void checkpoint(Map<TopicPartition, Long> writtenOffsets) {
        for (Map.Entry<TopicPartition, Long> entry : writtenOffsets.entrySet()) {
            StateStoreMetadata store = this.findStore(entry.getKey());
            if (store == null) continue;
            store.setOffset((Long)entry.getValue());
            this.log.debug("State store {} updated to written offset {} at changelog {}", new Object[]{store.stateStore.name(), store.offset, store.changelogPartition});
        }
        HashMap<TopicPartition, Long> checkpointingOffsets = new HashMap<TopicPartition, Long>();
        for (StateStoreMetadata storeMetadata : this.stores.values()) {
            if (storeMetadata.changelogPartition == null || !storeMetadata.stateStore.persistent() || storeMetadata.corrupted) continue;
            long checkpointableOffset = this.checkpointableOffsetFromChangelogOffset(storeMetadata.offset);
            checkpointingOffsets.put(storeMetadata.changelogPartition, checkpointableOffset);
        }
        this.log.debug("Writing checkpoint: {}", checkpointingOffsets);
        try {
            this.checkpointFile.write(checkpointingOffsets);
        }
        catch (IOException e) {
            this.log.warn("Failed to write offset checkpoint file to [{}]", (Object)this.checkpointFile, (Object)e);
        }
    }

    private TopicPartition getStorePartition(String storeName) {
        return new TopicPartition(this.storeToChangelogTopic.get(storeName), this.taskId.partition);
    }

    private boolean isLoggingEnabled(String storeName) {
        return this.storeToChangelogTopic.containsKey(storeName);
    }

    private StateStoreMetadata findStore(TopicPartition changelogPartition) {
        List found = this.stores.values().stream().filter(metadata -> changelogPartition.equals((Object)((StateStoreMetadata)metadata).changelogPartition)).collect(Collectors.toList());
        if (found.size() > 1) {
            throw new IllegalStateException("Multiple state stores are found for changelog partition " + changelogPartition + ", this should never happen: " + found);
        }
        return found.isEmpty() ? null : (StateStoreMetadata)found.get(0);
    }

    private long checkpointableOffsetFromChangelogOffset(Long offset) {
        return offset != null ? offset : -4L;
    }

    private Long changelogOffsetFromCheckpointedOffset(long offset) {
        return offset != -4L ? Long.valueOf(offset) : null;
    }

    public static class StateStoreMetadata {
        private final StateStore stateStore;
        private final TopicPartition changelogPartition;
        private final StateRestoreCallback restoreCallback;
        private final RecordConverter recordConverter;
        private Long offset;
        private boolean corrupted;

        private StateStoreMetadata(StateStore stateStore) {
            this.stateStore = stateStore;
            this.restoreCallback = null;
            this.recordConverter = null;
            this.changelogPartition = null;
            this.corrupted = false;
            this.offset = null;
        }

        private StateStoreMetadata(StateStore stateStore, TopicPartition changelogPartition, StateRestoreCallback restoreCallback, RecordConverter recordConverter) {
            if (restoreCallback == null) {
                throw new IllegalStateException("Log enabled store should always provide a restore callback upon registration");
            }
            this.stateStore = stateStore;
            this.changelogPartition = changelogPartition;
            this.restoreCallback = restoreCallback;
            this.recordConverter = recordConverter;
            this.offset = null;
        }

        private void setOffset(Long offset) {
            this.offset = offset;
        }

        Long offset() {
            return this.offset;
        }

        TopicPartition changelogPartition() {
            return this.changelogPartition;
        }

        StateStore store() {
            return this.stateStore;
        }

        public String toString() {
            return "StateStoreMetadata (" + this.stateStore.name() + " : " + this.changelogPartition + " @ " + this.offset;
        }
    }
}

