/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.source;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.source.split.ValuesSourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ValuesSourceReader
implements SourceReader<RowData, ValuesSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(ValuesSourceReader.class);
    private final SourceReaderContext context;
    private CompletableFuture<Void> availability;
    private final List<byte[]> serializedElements;
    private final TypeSerializer<RowData> serializer;
    private List<RowData> elements;
    private final Queue<ValuesSourceSplit> remainingSplits;
    private boolean noMoreSplits;

    public ValuesSourceReader(List<byte[]> serializedElements, TypeSerializer<RowData> serializer, SourceReaderContext context) {
        this.serializedElements = serializedElements;
        this.serializer = serializer;
        this.context = context;
        this.availability = new CompletableFuture();
        this.remainingSplits = new ArrayDeque<ValuesSourceSplit>();
    }

    public void start() {
        this.elements = new ArrayList<RowData>();
        for (byte[] bytes : this.serializedElements) {
            try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);){
                DataInputViewStreamWrapper input = new DataInputViewStreamWrapper((InputStream)bais);
                RowData element = (RowData)this.serializer.deserialize((DataInputView)input);
                this.elements.add(element);
            }
            catch (Exception e) {
                throw new TableException("Failed to deserialize an element from the source. If you are using user-defined serialization (Value and Writable types), check the serialization functions.\nSerializer is " + this.serializer, (Throwable)e);
            }
        }
        if (this.remainingSplits.isEmpty()) {
            this.context.sendSplitRequest();
        }
    }

    public InputStatus pollNext(ReaderOutput<RowData> output) throws Exception {
        ValuesSourceSplit currentSplit = this.remainingSplits.poll();
        if (currentSplit != null) {
            if (currentSplit.isInfinite()) {
                this.remainingSplits.add(currentSplit);
                this.resetAvailability();
                return InputStatus.NOTHING_AVAILABLE;
            }
            output.collect((Object)this.elements.get(currentSplit.getIndex()));
            this.context.sendSplitRequest();
            return InputStatus.MORE_AVAILABLE;
        }
        if (this.noMoreSplits) {
            return InputStatus.END_OF_INPUT;
        }
        this.resetAvailability();
        return InputStatus.NOTHING_AVAILABLE;
    }

    private void resetAvailability() {
        if (this.availability.isDone()) {
            this.availability = new CompletableFuture();
        }
    }

    public List<ValuesSourceSplit> snapshotState(long checkpointId) {
        return Collections.emptyList();
    }

    public CompletableFuture<Void> isAvailable() {
        return this.availability;
    }

    public void addSplits(List<ValuesSourceSplit> splits) {
        this.remainingSplits.addAll(splits);
        this.availability.complete(null);
    }

    public void notifyNoMoreSplits() {
        this.noMoreSplits = true;
        this.availability.complete(null);
    }

    public void close() throws Exception {
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.info("checkpoint {} finished.", (Object)checkpointId);
    }
}

