/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.source;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.source.split.ValuesSourcePartitionSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicFilteringValuesSourceReader
implements SourceReader<RowData, ValuesSourcePartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicFilteringValuesSourceReader.class);
    private final SourceReaderContext context;
    private CompletableFuture<Void> availability;
    private final TypeSerializer<RowData> serializer;
    private final Map<Map<String, String>, byte[]> serializedElements;
    private final Map<Map<String, String>, Integer> counts;
    private final Queue<ValuesSourcePartitionSplit> remainingSplits;
    private transient ValuesSourcePartitionSplit currentSplit;
    private transient Iterator<RowData> iterator;
    private transient boolean noMoreSplits;
    private transient boolean reachedInfiniteEnd;

    public DynamicFilteringValuesSourceReader(Map<Map<String, String>, byte[]> serializedElements, Map<Map<String, String>, Integer> counts, TypeSerializer<RowData> serializer, SourceReaderContext context) {
        this.serializedElements = (Map)Preconditions.checkNotNull(serializedElements);
        this.counts = (Map)Preconditions.checkNotNull(counts);
        this.serializer = serializer;
        this.context = (SourceReaderContext)Preconditions.checkNotNull((Object)context);
        this.availability = new CompletableFuture();
        this.remainingSplits = new ArrayDeque<ValuesSourcePartitionSplit>();
    }

    public void start() {
        if (this.remainingSplits.isEmpty()) {
            this.context.sendSplitRequest();
        }
    }

    public InputStatus pollNext(ReaderOutput<RowData> output) {
        if (this.reachedInfiniteEnd) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        if (this.iterator != null) {
            if (this.iterator.hasNext()) {
                output.collect((Object)this.iterator.next());
                return InputStatus.MORE_AVAILABLE;
            }
            this.finishSplit();
        }
        return this.tryMoveToNextSplit();
    }

    private void finishSplit() {
        this.iterator = null;
        this.currentSplit = null;
        if (this.remainingSplits.isEmpty() && !this.noMoreSplits) {
            this.context.sendSplitRequest();
        }
    }

    private InputStatus tryMoveToNextSplit() {
        this.currentSplit = this.remainingSplits.poll();
        if (this.currentSplit != null) {
            if (this.currentSplit.isInfinite()) {
                this.reachedInfiniteEnd = true;
                this.resetAvailability();
                return InputStatus.NOTHING_AVAILABLE;
            }
            Map<String, String> partition = this.currentSplit.getPartition();
            List<RowData> list = this.deserialize(this.serializedElements.get(partition), this.counts.get(partition));
            this.iterator = list.iterator();
            return InputStatus.MORE_AVAILABLE;
        }
        if (this.noMoreSplits) {
            return InputStatus.END_OF_INPUT;
        }
        this.resetAvailability();
        return InputStatus.NOTHING_AVAILABLE;
    }

    private void resetAvailability() {
        if (this.availability.isDone()) {
            this.availability = new CompletableFuture();
        }
    }

    private List<RowData> deserialize(byte[] data, int count) {
        ArrayList<RowData> list = new ArrayList<RowData>();
        try (ByteArrayInputStream bais = new ByteArrayInputStream(data);){
            DataInputViewStreamWrapper input = new DataInputViewStreamWrapper((InputStream)bais);
            for (int i = 0; i < count; ++i) {
                RowData element = (RowData)this.serializer.deserialize((DataInputView)input);
                list.add(element);
            }
        }
        catch (Exception e) {
            throw new TableException("Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.\nSerializer is " + this.serializer, (Throwable)e);
        }
        return list;
    }

    public List<ValuesSourcePartitionSplit> snapshotState(long checkpointId) {
        return Collections.emptyList();
    }

    public CompletableFuture<Void> isAvailable() {
        return this.availability;
    }

    public void addSplits(List<ValuesSourcePartitionSplit> splits) {
        this.remainingSplits.addAll(splits);
        this.availability.complete(null);
    }

    public void notifyNoMoreSplits() {
        this.noMoreSplits = true;
        this.availability.complete(null);
    }

    public void close() throws Exception {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.info("checkpoint {} finished.", (Object)checkpointId);
    }
}

