/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.metadata;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryChangelogStateHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.EmptySegmentFileStateHandle;
import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.BiFunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class MetadataV2V3SerializerBase {
    private static final Logger LOG = LoggerFactory.getLogger(MetadataV2V3SerializerBase.class);
    private static final int MASTER_STATE_MAGIC_NUMBER = -915728746;
    private static final byte NULL_HANDLE = 0;
    private static final byte BYTE_STREAM_STATE_HANDLE = 1;
    private static final byte FILE_STREAM_STATE_HANDLE = 2;
    private static final byte KEY_GROUPS_HANDLE = 3;
    private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
    private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
    private static final byte RELATIVE_STREAM_STATE_HANDLE = 6;
    private static final byte SAVEPOINT_KEY_GROUPS_HANDLE = 7;
    private static final byte CHANGELOG_HANDLE = 8;
    private static final byte CHANGELOG_BYTE_INCREMENT_HANDLE = 9;
    private static final byte CHANGELOG_FILE_INCREMENT_HANDLE = 10;
    private static final byte INCREMENTAL_KEY_GROUPS_HANDLE_V2 = 11;
    private static final byte KEY_GROUPS_HANDLE_V2 = 12;
    private static final byte CHANGELOG_FILE_INCREMENT_HANDLE_V2 = 13;
    private static final byte CHANGELOG_HANDLE_V2 = 14;
    private static final byte SEGMENT_FILE_HANDLE = 15;
    private static final byte EMPTY_SEGMENT_FILE_HANDLE = 16;
    private static final byte SEGMENT_PARTITIONABLE_OPERATOR_STATE_HANDLE = 17;

    protected void serializeMetadata(CheckpointMetadata checkpointMetadata, DataOutputStream dos) throws IOException {
        dos.writeLong(checkpointMetadata.getCheckpointId());
        Collection<MasterState> masterStates = checkpointMetadata.getMasterStates();
        dos.writeInt(masterStates.size());
        for (MasterState ms : masterStates) {
            this.serializeMasterState(ms, dos);
        }
        Collection<OperatorState> operatorStates = checkpointMetadata.getOperatorStates();
        dos.writeInt(operatorStates.size());
        for (OperatorState operatorState : operatorStates) {
            this.serializeOperatorState(operatorState, dos);
        }
    }

    protected CheckpointMetadata deserializeMetadata(DataInputStream dis, @Nullable String externalPointer) throws IOException {
        List<MasterState> masterStates;
        DeserializationContext context = externalPointer == null ? null : new DeserializationContext(externalPointer);
        long checkpointId = dis.readLong();
        if (checkpointId < 0L) {
            throw new IOException("invalid checkpoint ID: " + checkpointId);
        }
        int numMasterStates = dis.readInt();
        if (numMasterStates == 0) {
            masterStates = Collections.emptyList();
        } else if (numMasterStates > 0) {
            masterStates = new ArrayList(numMasterStates);
            for (int i = 0; i < numMasterStates; ++i) {
                masterStates.add(this.deserializeMasterState(dis));
            }
        } else {
            throw new IOException("invalid number of master states: " + numMasterStates);
        }
        int numTaskStates = dis.readInt();
        ArrayList<OperatorState> operatorStates = new ArrayList<OperatorState>(numTaskStates);
        for (int i = 0; i < numTaskStates; ++i) {
            operatorStates.add(this.deserializeOperatorState(dis, context));
        }
        return new CheckpointMetadata(checkpointId, operatorStates, masterStates);
    }

    protected void serializeMasterState(MasterState state, DataOutputStream dos) throws IOException {
        dos.writeInt(-915728746);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(baos);
        out.writeInt(state.version());
        out.writeUTF(state.name());
        byte[] bytes = state.bytes();
        out.writeInt(bytes.length);
        out.write(bytes, 0, bytes.length);
        out.close();
        byte[] data = baos.toByteArray();
        dos.writeInt(data.length);
        dos.write(data, 0, data.length);
    }

    protected MasterState deserializeMasterState(DataInputStream dis) throws IOException {
        int magicNumber = dis.readInt();
        if (magicNumber != -915728746) {
            throw new IOException("incorrect magic number in master styte byte sequence");
        }
        int numBytes = dis.readInt();
        if (numBytes <= 0) {
            throw new IOException("found zero or negative length for master state bytes");
        }
        byte[] data = new byte[numBytes];
        dis.readFully(data);
        DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
        int version = in.readInt();
        String name = in.readUTF();
        byte[] bytes = new byte[in.readInt()];
        in.readFully(bytes);
        if (in.read() != -1) {
            throw new IOException("found trailing bytes in master state");
        }
        return new MasterState(name, bytes, version);
    }

    protected abstract void serializeOperatorState(OperatorState var1, DataOutputStream var2) throws IOException;

    protected abstract OperatorState deserializeOperatorState(DataInputStream var1, @Nullable DeserializationContext var2) throws IOException;

    protected void serializeSubtaskState(OperatorSubtaskState subtaskState, DataOutputStream dos) throws IOException {
        MetadataV2V3SerializerBase.serializeSingleton(subtaskState.getManagedOperatorState(), dos, this::serializeOperatorStateHandle);
        MetadataV2V3SerializerBase.serializeSingleton(subtaskState.getRawOperatorState(), dos, this::serializeOperatorStateHandle);
        this.serializeKeyedStateCol(subtaskState.getManagedKeyedState(), dos);
        this.serializeKeyedStateCol(subtaskState.getRawKeyedState(), dos);
    }

    private void serializeKeyedStateCol(StateObjectCollection<KeyedStateHandle> managedKeyedState, DataOutputStream dos) throws IOException {
        MetadataV2V3SerializerBase.serializeKeyedStateHandle(MetadataV2V3SerializerBase.extractSingleton(managedKeyedState), dos);
    }

    protected OperatorSubtaskState deserializeSubtaskState(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
        KeyedStateHandle rawKeyedState;
        KeyedStateHandle managedKeyedState;
        boolean hasRawOperatorState;
        boolean hasManagedOperatorState;
        OperatorSubtaskState.Builder state = OperatorSubtaskState.builder();
        boolean bl = hasManagedOperatorState = dis.readInt() != 0;
        if (hasManagedOperatorState) {
            state.setManagedOperatorState(this.deserializeOperatorStateHandle(dis, context));
        }
        boolean bl2 = hasRawOperatorState = dis.readInt() != 0;
        if (hasRawOperatorState) {
            state.setRawOperatorState(this.deserializeOperatorStateHandle(dis, context));
        }
        if ((managedKeyedState = MetadataV2V3SerializerBase.deserializeKeyedStateHandle(dis, context)) != null) {
            state.setManagedKeyedState(managedKeyedState);
        }
        if ((rawKeyedState = MetadataV2V3SerializerBase.deserializeKeyedStateHandle(dis, context)) != null) {
            state.setRawKeyedState(rawKeyedState);
        }
        state.setInputChannelState(this.deserializeInputChannelStateHandle(dis, context));
        state.setResultSubpartitionState(this.deserializeResultSubpartitionStateHandle(dis, context));
        return state.build();
    }

    @VisibleForTesting
    static void serializeKeyedStateHandle(KeyedStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle == null) {
            dos.writeByte(0);
        } else if (stateHandle instanceof KeyGroupsStateHandle) {
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)stateHandle;
            if (stateHandle instanceof KeyGroupsSavepointStateHandle) {
                dos.writeByte(7);
            } else {
                dos.writeByte(12);
            }
            dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
            for (int n : keyGroupsStateHandle.getKeyGroupRange()) {
                dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(n));
            }
            MetadataV2V3SerializerBase.serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
            if (!(stateHandle instanceof KeyGroupsSavepointStateHandle)) {
                MetadataV2V3SerializerBase.writeStateHandleId(stateHandle, dos);
            }
        } else if (stateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = (IncrementalRemoteKeyedStateHandle)stateHandle;
            dos.writeByte(11);
            dos.writeLong(incrementalKeyedStateHandle.getCheckpointId());
            dos.writeUTF(String.valueOf(incrementalKeyedStateHandle.getBackendIdentifier()));
            dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
            dos.writeLong(incrementalKeyedStateHandle.getCheckpointedSize());
            MetadataV2V3SerializerBase.serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaDataStateHandle(), dos);
            MetadataV2V3SerializerBase.serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getSharedState(), dos);
            MetadataV2V3SerializerBase.serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getPrivateState(), dos);
            MetadataV2V3SerializerBase.writeStateHandleId(incrementalKeyedStateHandle, dos);
        } else if (stateHandle instanceof ChangelogStateBackendHandle) {
            ChangelogStateBackendHandle handle = (ChangelogStateBackendHandle)stateHandle;
            dos.writeByte(14);
            dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
            dos.writeLong(handle.getCheckpointedSize());
            dos.writeInt(handle.getMaterializedStateHandles().size());
            for (KeyedStateHandle keyedStateHandle : handle.getMaterializedStateHandles()) {
                MetadataV2V3SerializerBase.serializeKeyedStateHandle(keyedStateHandle, dos);
            }
            dos.writeInt(handle.getNonMaterializedStateHandles().size());
            for (KeyedStateHandle keyedStateHandle : handle.getNonMaterializedStateHandles()) {
                MetadataV2V3SerializerBase.serializeKeyedStateHandle(keyedStateHandle, dos);
            }
            dos.writeLong(handle.getMaterializationID());
            dos.writeLong(handle.getCheckpointId());
            MetadataV2V3SerializerBase.writeStateHandleId(handle, dos);
        } else if (stateHandle instanceof InMemoryChangelogStateHandle) {
            InMemoryChangelogStateHandle handle = (InMemoryChangelogStateHandle)stateHandle;
            dos.writeByte(9);
            dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
            dos.writeLong(handle.getFrom());
            dos.writeLong(handle.getTo());
            dos.writeInt(handle.getChanges().size());
            for (StateChange stateChange : handle.getChanges()) {
                dos.writeInt(stateChange.getKeyGroup());
                dos.writeInt(stateChange.getChange().length);
                dos.write(stateChange.getChange());
            }
            MetadataV2V3SerializerBase.writeStateHandleId(handle, dos);
        } else if (stateHandle instanceof ChangelogStateHandleStreamImpl) {
            ChangelogStateHandleStreamImpl handle = (ChangelogStateHandleStreamImpl)stateHandle;
            dos.writeByte(13);
            dos.writeInt(handle.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(handle.getKeyGroupRange().getNumberOfKeyGroups());
            dos.writeInt(handle.getHandlesAndOffsets().size());
            for (Tuple2<StreamStateHandle, Long> tuple2 : handle.getHandlesAndOffsets()) {
                dos.writeLong((Long)tuple2.f1);
                MetadataV2V3SerializerBase.serializeStreamStateHandle((StreamStateHandle)tuple2.f0, dos);
            }
            dos.writeLong(handle.getStateSize());
            dos.writeLong(handle.getCheckpointedSize());
            MetadataV2V3SerializerBase.writeStateHandleId(handle, dos);
            dos.writeUTF(handle.getStorageIdentifier());
        } else {
            throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
        }
    }

    private static void writeStateHandleId(KeyedStateHandle keyedStateHandle, DataOutputStream dos) throws IOException {
        dos.writeUTF(keyedStateHandle.getStateHandleId().getKeyString());
    }

    @Nullable
    @VisibleForTesting
    static KeyedStateHandle deserializeKeyedStateHandle(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
        byte type = dis.readByte();
        if (0 == type) {
            return null;
        }
        if (3 == type || 12 == type || 7 == type) {
            int startKeyGroup = dis.readInt();
            int numKeyGroups = dis.readInt();
            KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
            long[] offsets = new long[numKeyGroups];
            for (int i = 0; i < numKeyGroups; ++i) {
                offsets[i] = dis.readLong();
            }
            KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets);
            StreamStateHandle stateHandle = MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, context);
            if (7 == type) {
                return new KeyGroupsSavepointStateHandle(keyGroupRangeOffsets, stateHandle);
            }
            StateHandleID stateHandleID = 12 == type ? new StateHandleID(dis.readUTF()) : StateHandleID.randomStateHandleId();
            return KeyGroupsStateHandle.restore(keyGroupRangeOffsets, stateHandle, stateHandleID);
        }
        if (5 == type || 11 == type) {
            return MetadataV2V3SerializerBase.deserializeIncrementalStateHandle(dis, context, type);
        }
        if (8 == type || 14 == type) {
            int startKeyGroup = dis.readInt();
            int numKeyGroups = dis.readInt();
            KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
            long checkpointedSize = dis.readLong();
            int baseSize = dis.readInt();
            ArrayList<KeyedStateHandle> base = new ArrayList<KeyedStateHandle>(baseSize);
            for (int i = 0; i < baseSize; ++i) {
                KeyedStateHandle handle = MetadataV2V3SerializerBase.deserializeKeyedStateHandle(dis, context);
                if (handle != null) {
                    base.add(handle);
                    continue;
                }
                LOG.warn("Unexpected null keyed state handle of materialized part when deserializing changelog state-backend handle");
            }
            int deltaSize = dis.readInt();
            ArrayList<ChangelogStateHandle> delta = new ArrayList<ChangelogStateHandle>(deltaSize);
            for (int i = 0; i < deltaSize; ++i) {
                delta.add((ChangelogStateHandle)MetadataV2V3SerializerBase.deserializeKeyedStateHandle(dis, context));
            }
            long materializationID = dis.readLong();
            long checkpointId = 14 == type ? dis.readLong() : materializationID;
            StateHandleID stateHandleId = new StateHandleID(dis.readUTF());
            return ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl.restore(base, delta, keyGroupRange, checkpointId, materializationID, checkpointedSize, stateHandleId);
        }
        if (9 == type) {
            int start = dis.readInt();
            int numKeyGroups = dis.readInt();
            KeyGroupRange keyGroupRange = KeyGroupRange.of(start, start + numKeyGroups - 1);
            long from = dis.readLong();
            long to = dis.readLong();
            int size = dis.readInt();
            ArrayList<StateChange> changes = new ArrayList<StateChange>(size);
            for (int i = 0; i < size; ++i) {
                int keyGroup = dis.readInt();
                int bytesSize = dis.readInt();
                byte[] bytes = new byte[bytesSize];
                IOUtils.readFully((InputStream)dis, (byte[])bytes, (int)0, (int)bytesSize);
                StateChange stateChange = keyGroup == -1 ? StateChange.ofMetadataChange(bytes) : StateChange.ofDataChange(keyGroup, bytes);
                changes.add(stateChange);
            }
            StateHandleID stateHandleId = new StateHandleID(dis.readUTF());
            return InMemoryChangelogStateHandle.restore(changes, SequenceNumber.of(from), SequenceNumber.of(to), keyGroupRange, stateHandleId);
        }
        if (10 == type || 13 == type) {
            int start = dis.readInt();
            int numKeyGroups = dis.readInt();
            KeyGroupRange keyGroupRange = KeyGroupRange.of(start, start + numKeyGroups - 1);
            int numHandles = dis.readInt();
            ArrayList<Tuple2<StreamStateHandle, Long>> streamHandleAndOffset = new ArrayList<Tuple2<StreamStateHandle, Long>>(numHandles);
            for (int i = 0; i < numHandles; ++i) {
                long o = dis.readLong();
                StreamStateHandle h = MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, context);
                streamHandleAndOffset.add(Tuple2.of((Object)h, (Object)o));
            }
            long size = dis.readLong();
            long checkpointedSize = dis.readLong();
            StateHandleID stateHandleId = new StateHandleID(dis.readUTF());
            String storageIdentifier = 13 == type ? dis.readUTF() : "filesystem";
            return ChangelogStateHandleStreamImpl.restore(streamHandleAndOffset, keyGroupRange, size, checkpointedSize, storageIdentifier, stateHandleId);
        }
        throw new IllegalStateException("Reading invalid KeyedStateHandle, type: " + type);
    }

    private static IncrementalRemoteKeyedStateHandle deserializeIncrementalStateHandle(DataInputStream dis, @Nullable DeserializationContext context, int stateHandleType) throws IOException {
        UUID uuid;
        boolean isV2Format = 11 == stateHandleType;
        long checkpointId = dis.readLong();
        String backendId = dis.readUTF();
        int startKeyGroup = dis.readInt();
        int numKeyGroups = dis.readInt();
        long checkpointedSize = isV2Format ? dis.readLong() : -1L;
        KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
        StreamStateHandle metaDataStateHandle = MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, context);
        List<IncrementalKeyedStateHandle.HandleAndLocalPath> sharedStates = MetadataV2V3SerializerBase.deserializeHandleAndLocalPathList(dis, context);
        List<IncrementalKeyedStateHandle.HandleAndLocalPath> privateStates = MetadataV2V3SerializerBase.deserializeHandleAndLocalPathList(dis, context);
        try {
            uuid = UUID.fromString(backendId);
        }
        catch (Exception ex) {
            uuid = UUID.nameUUIDFromBytes(backendId.getBytes(StandardCharsets.UTF_8));
        }
        StateHandleID stateHandleId = isV2Format ? new StateHandleID(dis.readUTF()) : StateHandleID.randomStateHandleId();
        return IncrementalRemoteKeyedStateHandle.restore(uuid, keyGroupRange, checkpointId, sharedStates, privateStates, metaDataStateHandle, checkpointedSize, stateHandleId);
    }

    void serializeOperatorStateHandle(OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle != null) {
            dos.writeByte(stateHandle instanceof FileMergingOperatorStreamStateHandle ? 17 : 4);
            Map<String, OperatorStateHandle.StateMetaInfo> partitionOffsetsMap = stateHandle.getStateNameToPartitionOffsets();
            dos.writeInt(partitionOffsetsMap.size());
            for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : partitionOffsetsMap.entrySet()) {
                dos.writeUTF(entry.getKey());
                OperatorStateHandle.StateMetaInfo stateMetaInfo = entry.getValue();
                int mode = stateMetaInfo.getDistributionMode().ordinal();
                dos.writeByte(mode);
                long[] offsets = stateMetaInfo.getOffsets();
                dos.writeInt(offsets.length);
                for (long offset : offsets) {
                    dos.writeLong(offset);
                }
            }
            if (stateHandle instanceof FileMergingOperatorStreamStateHandle) {
                dos.writeUTF(((FileMergingOperatorStreamStateHandle)stateHandle).getTaskOwnedDirHandle().getDirectory().toString());
                dos.writeUTF(((FileMergingOperatorStreamStateHandle)stateHandle).getSharedDirHandle().getDirectory().toString());
                dos.writeBoolean(stateHandle instanceof EmptyFileMergingOperatorStreamStateHandle);
            }
            MetadataV2V3SerializerBase.serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
        } else {
            dos.writeByte(0);
        }
    }

    OperatorStateHandle deserializeOperatorStateHandle(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
        byte type = dis.readByte();
        if (0 == type) {
            return null;
        }
        if (4 == type || 17 == type) {
            int mapSize = dis.readInt();
            HashMap offsetsMap = CollectionUtil.newHashMapWithExpectedSize((int)mapSize);
            for (int i = 0; i < mapSize; ++i) {
                String key = dis.readUTF();
                byte modeOrdinal = dis.readByte();
                OperatorStateHandle.Mode mode = OperatorStateHandle.Mode.values()[modeOrdinal];
                long[] offsets = new long[dis.readInt()];
                for (int j = 0; j < offsets.length; ++j) {
                    offsets[j] = dis.readLong();
                }
                OperatorStateHandle.StateMetaInfo metaInfo = new OperatorStateHandle.StateMetaInfo(offsets, mode);
                offsetsMap.put(key, metaInfo);
            }
            if (17 == type) {
                String taskOwnedDirPathStr = dis.readUTF();
                String sharedDirPathStr = dis.readUTF();
                boolean isEmpty = dis.readBoolean();
                StreamStateHandle stateHandle = MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, context);
                Preconditions.checkArgument((boolean)(stateHandle instanceof SegmentFileStateHandle));
                return isEmpty ? new EmptyFileMergingOperatorStreamStateHandle(DirectoryStreamStateHandle.of(new Path(taskOwnedDirPathStr)), DirectoryStreamStateHandle.of(new Path(sharedDirPathStr)), offsetsMap, stateHandle) : new FileMergingOperatorStreamStateHandle(DirectoryStreamStateHandle.of(new Path(taskOwnedDirPathStr)), DirectoryStreamStateHandle.of(new Path(sharedDirPathStr)), offsetsMap, stateHandle);
            }
            StreamStateHandle stateHandle = MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, context);
            return new OperatorStreamStateHandle(offsetsMap, stateHandle);
        }
        throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
    }

    protected StateObjectCollection<ResultSubpartitionStateHandle> deserializeResultSubpartitionStateHandle(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
        return StateObjectCollection.empty();
    }

    protected StateObjectCollection<InputChannelStateHandle> deserializeInputChannelStateHandle(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
        return StateObjectCollection.empty();
    }

    protected void serializeResultSubpartitionStateHandle(ResultSubpartitionStateHandle resultSubpartitionStateHandle, DataOutputStream dos) throws IOException {
    }

    protected void serializeInputChannelStateHandle(InputChannelStateHandle inputChannelStateHandle, DataOutputStream dos) throws IOException {
    }

    static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
        if (stateHandle == null) {
            dos.writeByte(0);
        } else if (stateHandle instanceof RelativeFileStateHandle) {
            dos.writeByte(6);
            RelativeFileStateHandle relativeFileStateHandle = (RelativeFileStateHandle)stateHandle;
            dos.writeUTF(relativeFileStateHandle.getRelativePath());
            dos.writeLong(relativeFileStateHandle.getStateSize());
        } else if (stateHandle instanceof SegmentFileStateHandle) {
            if (stateHandle instanceof EmptySegmentFileStateHandle) {
                dos.writeByte(16);
            } else {
                dos.writeByte(15);
                SegmentFileStateHandle segmentFileStateHandle = (SegmentFileStateHandle)stateHandle;
                dos.writeLong(segmentFileStateHandle.getStartPos());
                dos.writeLong(segmentFileStateHandle.getStateSize());
                dos.writeInt(segmentFileStateHandle.getScope().ordinal());
                dos.writeUTF(segmentFileStateHandle.getFilePath().toString());
                dos.writeUTF(segmentFileStateHandle.getLogicalFileId().getKeyString());
            }
        } else if (stateHandle instanceof FileStateHandle) {
            dos.writeByte(2);
            FileStateHandle fileStateHandle = (FileStateHandle)stateHandle;
            dos.writeLong(stateHandle.getStateSize());
            dos.writeUTF(fileStateHandle.getFilePath().toString());
        } else if (stateHandle instanceof ByteStreamStateHandle) {
            dos.writeByte(1);
            ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle)stateHandle;
            dos.writeUTF(byteStreamStateHandle.getHandleName());
            byte[] internalData = byteStreamStateHandle.getData();
            dos.writeInt(internalData.length);
            dos.write(byteStreamStateHandle.getData());
        } else if (stateHandle instanceof KeyGroupsStateHandle) {
            KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle)stateHandle;
            dos.writeByte(3);
            dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getStartKeyGroup());
            dos.writeInt(keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
            for (int keyGroup : keyGroupsStateHandle.getKeyGroupRange()) {
                dos.writeLong(keyGroupsStateHandle.getOffsetForKeyGroup(keyGroup));
            }
            MetadataV2V3SerializerBase.serializeStreamStateHandle(keyGroupsStateHandle.getDelegateStateHandle(), dos);
        } else {
            throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
        }
    }

    @Nullable
    static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
        int type = dis.read();
        if (0 == type) {
            return null;
        }
        if (2 == type) {
            long size = dis.readLong();
            String pathString = dis.readUTF();
            return new FileStateHandle(new Path(pathString), size);
        }
        if (1 == type) {
            String handleName = dis.readUTF();
            int numBytes = dis.readInt();
            byte[] data = new byte[numBytes];
            dis.readFully(data);
            return new ByteStreamStateHandle(handleName, data);
        }
        if (6 == type) {
            if (context == null) {
                throw new IOException("Cannot deserialize a RelativeFileStateHandle without a context to make it relative to.");
            }
            String relativePath = dis.readUTF();
            long size = dis.readLong();
            Path statePath = new Path(context.getExclusiveDirPath(), relativePath);
            return new RelativeFileStateHandle(statePath, relativePath, size);
        }
        if (3 == type) {
            int startKeyGroup = dis.readInt();
            int numKeyGroups = dis.readInt();
            KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
            long[] offsets = new long[numKeyGroups];
            for (int i = 0; i < numKeyGroups; ++i) {
                offsets[i] = dis.readLong();
            }
            KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets);
            StreamStateHandle stateHandle = MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, context);
            return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
        }
        if (15 == type) {
            long startPos = dis.readLong();
            long stateSize = dis.readLong();
            CheckpointedStateScope scope = CheckpointedStateScope.values()[dis.readInt()];
            Path physicalFilePath = new Path(dis.readUTF());
            LogicalFile.LogicalFileId logicalFileId = new LogicalFile.LogicalFileId(dis.readUTF());
            return new SegmentFileStateHandle(physicalFilePath, startPos, stateSize, scope, logicalFileId);
        }
        if (16 == type) {
            return EmptySegmentFileStateHandle.INSTANCE;
        }
        throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
    }

    @Nullable
    static ByteStreamStateHandle deserializeAndCheckByteStreamStateHandle(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
        StreamStateHandle handle = MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, context);
        if (handle == null || handle instanceof ByteStreamStateHandle) {
            return (ByteStreamStateHandle)handle;
        }
        throw new IOException("Expected a ByteStreamStateHandle but found a " + handle.getClass().getName());
    }

    @Nullable
    private static <T> T extractSingleton(Collection<T> collection) {
        if (collection == null || collection.isEmpty()) {
            return null;
        }
        if (collection.size() == 1) {
            return collection.iterator().next();
        }
        throw new IllegalStateException("Expected singleton collection, but found size: " + collection.size());
    }

    private static <T extends StateObject> void serializeSingleton(StateObjectCollection<T> stateObjectCollection, DataOutputStream dos, BiConsumerWithException<T, DataOutputStream, IOException> cons) throws IOException {
        StateObject state = (StateObject)MetadataV2V3SerializerBase.extractSingleton(stateObjectCollection);
        if (state != null) {
            dos.writeInt(1);
            cons.accept((Object)state, (Object)dos);
        } else {
            dos.writeInt(0);
        }
    }

    static <T extends StateObject> StateObjectCollection<T> deserializeCollection(DataInputStream dis, DeserializationContext context, BiFunctionWithException<DataInputStream, DeserializationContext, T, IOException> s) throws IOException {
        int size = dis.readInt();
        ArrayList<StateObject> result = new ArrayList<StateObject>();
        for (int i = 0; i < size; ++i) {
            result.add((StateObject)s.apply((Object)dis, (Object)context));
        }
        return new StateObjectCollection(result);
    }

    private static void serializeHandleAndLocalPathList(List<IncrementalKeyedStateHandle.HandleAndLocalPath> list, DataOutputStream dos) throws IOException {
        dos.writeInt(list.size());
        for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : list) {
            dos.writeUTF(handleAndLocalPath.getLocalPath());
            MetadataV2V3SerializerBase.serializeStreamStateHandle(handleAndLocalPath.getHandle(), dos);
        }
    }

    private static List<IncrementalKeyedStateHandle.HandleAndLocalPath> deserializeHandleAndLocalPathList(DataInputStream dis, @Nullable DeserializationContext context) throws IOException {
        int size = dis.readInt();
        ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> result = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>(size);
        for (int i = 0; i < size; ++i) {
            String localPath = dis.readUTF();
            StreamStateHandle stateHandle = MetadataV2V3SerializerBase.deserializeStreamStateHandle(dis, context);
            result.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of(stateHandle, localPath));
        }
        return result;
    }

    protected static final class DeserializationContext {
        private final String externalPointer;
        private Path cachedExclusiveDirPath;

        DeserializationContext(String externalPointer) {
            this.externalPointer = externalPointer;
        }

        Path getExclusiveDirPath() throws IOException {
            if (this.cachedExclusiveDirPath == null) {
                this.cachedExclusiveDirPath = DeserializationContext.createExclusiveDirPath(this.externalPointer);
            }
            return this.cachedExclusiveDirPath;
        }

        private static Path createExclusiveDirPath(String externalPointer) throws IOException {
            try {
                return AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(externalPointer).getExclusiveCheckpointDir();
            }
            catch (IOException e) {
                throw new IOException("Could not parse external pointer as state base path", e);
            }
        }
    }
}

