/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.OutputStreamWithPos;
import org.apache.flink.changelog.fs.StateChangeSet;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StateChangeFormat {
    private static final Logger LOG = LoggerFactory.getLogger(StateChangeFormat.class);

    Map<StateChangeSet, Tuple2<Long, Long>> write(OutputStreamWithPos os, Collection<StateChangeSet> changeSets) throws IOException {
        ArrayList<StateChangeSet> sorted = new ArrayList<StateChangeSet>(changeSets);
        sorted.sort(Comparator.comparing(StateChangeSet::getLogId).thenComparing(StateChangeSet::getSequenceNumber));
        DataOutputViewStreamWrapper dataOutput = new DataOutputViewStreamWrapper((OutputStream)os);
        HashMap<StateChangeSet, Tuple2<Long, Long>> pendingResults = new HashMap<StateChangeSet, Tuple2<Long, Long>>();
        for (StateChangeSet changeSet : sorted) {
            long pos = os.getPos();
            pendingResults.put(changeSet, (Tuple2<Long, Long>)Tuple2.of((Object)pos, (Object)pos));
            this.writeChangeSet(dataOutput, changeSet.getChanges());
        }
        return pendingResults;
    }

    private void writeChangeSet(DataOutputViewStreamWrapper output, List<StateChange> changes) throws IOException {
        Map<Integer, List<StateChange>> byKeyGroup = changes.stream().collect(Collectors.groupingBy(StateChange::getKeyGroup));
        output.writeInt(byKeyGroup.size());
        List<StateChange> meta = byKeyGroup.remove(-1);
        if (meta != null) {
            this.writeChangeSetOfKG(output, -1, meta);
        }
        for (Map.Entry<Integer, List<StateChange>> entry : byKeyGroup.entrySet()) {
            this.writeChangeSetOfKG(output, entry.getKey(), entry.getValue());
        }
    }

    private void writeChangeSetOfKG(DataOutputViewStreamWrapper output, int keyGroup, List<StateChange> stateChanges) throws IOException {
        output.writeInt(stateChanges.size());
        output.writeInt(keyGroup);
        for (StateChange stateChange : stateChanges) {
            output.writeInt(stateChange.getChange().length);
            output.write(stateChange.getChange());
        }
    }

    CloseableIterator<StateChange> read(final DataInputStream input) throws IOException {
        return new CloseableIterator<StateChange>(){
            int numUnreadGroups;
            int numLeftInGroup;
            int keyGroup;
            {
                this.numUnreadGroups = input.readInt();
                this.numLeftInGroup = this.numUnreadGroups-- == 0 ? 0 : input.readInt();
                this.keyGroup = this.numLeftInGroup == 0 ? 0 : input.readInt();
            }

            public boolean hasNext() {
                this.advance();
                return this.numLeftInGroup > 0;
            }

            private void advance() {
                if (this.numLeftInGroup == 0 && this.numUnreadGroups > 0) {
                    --this.numUnreadGroups;
                    try {
                        this.numLeftInGroup = input.readInt();
                        this.keyGroup = input.readInt();
                    }
                    catch (IOException e) {
                        ExceptionUtils.rethrow((Throwable)e);
                    }
                }
            }

            public StateChange next() {
                this.advance();
                if (this.numLeftInGroup == 0) {
                    throw new NoSuchElementException();
                }
                --this.numLeftInGroup;
                try {
                    return this.readChange();
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            private StateChange readChange() throws IOException {
                int size = input.readInt();
                byte[] bytes = new byte[size];
                IOUtils.readFully((InputStream)input, (byte[])bytes, (int)0, (int)size);
                return this.keyGroup == -1 ? StateChange.ofMetadataChange((byte[])bytes) : StateChange.ofDataChange((int)this.keyGroup, (byte[])bytes);
            }

            public void close() throws Exception {
                LOG.trace("close {}", (Object)input);
                input.close();
            }
        };
    }
}

