/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.formats.avro;

import java.io.IOException;
import java.util.Iterator;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.avro.shaded.org.apache.avro.Schema;
import org.apache.flink.avro.shaded.org.apache.avro.file.DataFileReader;
import org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connector.file.src.util.IteratorResultIterator;
import org.apache.flink.connector.file.src.util.Pool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.utils.FSDataInputStreamWrapper;

@Internal
public abstract class AbstractAvroBulkFormat<A, T, SplitT extends FileSourceSplit>
implements BulkFormat<T, SplitT> {
    private static final long serialVersionUID = 1L;
    protected final Schema readerSchema;

    protected AbstractAvroBulkFormat(Schema readerSchema) {
        this.readerSchema = readerSchema;
    }

    public AvroReader createReader(Configuration config, SplitT split) throws IOException {
        return this.createReader(split, this.createReusedAvroRecord(), this.createConverter());
    }

    public AvroReader restoreReader(Configuration config, SplitT split) throws IOException {
        return this.createReader(split, this.createReusedAvroRecord(), this.createConverter());
    }

    public boolean isSplittable() {
        return true;
    }

    private AvroReader createReader(SplitT split, A reuse, Function<A, T> converter) throws IOException {
        long end = split.offset() + split.length();
        if (split.getReaderPosition().isPresent()) {
            CheckpointedPosition position = (CheckpointedPosition)split.getReaderPosition().get();
            return new AvroReader(split.path(), split.offset(), end, position.getOffset(), position.getRecordsAfterOffset(), reuse, converter);
        }
        return new AvroReader(split.path(), split.offset(), end, -1L, 0L, reuse, converter);
    }

    protected abstract A createReusedAvroRecord();

    protected abstract Function<A, T> createConverter();

    private class AvroReader
    implements BulkFormat.Reader<T> {
        private final DataFileReader<A> reader;
        private final Function<A, T> converter;
        private final long end;
        private final Pool<A> pool;
        private long currentBlockStart;
        private long currentRecordsToSkip;

        private AvroReader(Path path, long offset, long end, long blockStart, long recordsToSkip, A reuse, Function<A, T> converter) throws IOException {
            this.reader = this.createReaderFromPath(path);
            if (blockStart >= 0L) {
                this.reader.seek(blockStart);
            } else {
                this.reader.sync(offset);
            }
            int i = 0;
            while ((long)i < recordsToSkip) {
                this.reader.next(reuse);
                ++i;
            }
            this.converter = converter;
            this.end = end;
            this.pool = new Pool(1);
            this.pool.add(reuse);
            this.currentBlockStart = this.reader.previousSync();
            this.currentRecordsToSkip = recordsToSkip;
        }

        private DataFileReader<A> createReaderFromPath(Path path) throws IOException {
            FileSystem fileSystem = path.getFileSystem();
            GenericDatumReader datumReader = new GenericDatumReader(null, AbstractAvroBulkFormat.this.readerSchema);
            FSDataInputStreamWrapper in = new FSDataInputStreamWrapper(fileSystem.open(path), fileSystem.getFileStatus(path).getLen());
            return (DataFileReader)DataFileReader.openReader(in, datumReader);
        }

        @Nullable
        public BulkFormat.RecordIterator<T> readBatch() throws IOException {
            Object reuse;
            try {
                reuse = this.pool.pollEntry();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while waiting for the previous batch to be consumed", e);
            }
            if (!this.readNextBlock()) {
                this.pool.recycler().recycle(reuse);
                return null;
            }
            this.currentBlockStart = this.reader.previousSync();
            AvroBlockIterator iterator = new AvroBlockIterator(this.reader.getBlockCount() - this.currentRecordsToSkip, this.reader, reuse, this.converter);
            long recordsToSkip = this.currentRecordsToSkip;
            this.currentRecordsToSkip = 0L;
            return new IteratorResultIterator((Iterator)iterator, this.currentBlockStart, recordsToSkip, () -> this.pool.recycler().recycle(reuse));
        }

        private boolean readNextBlock() throws IOException {
            return this.reader.hasNext() && !this.reader.pastSync(this.end);
        }

        public void close() throws IOException {
            this.reader.close();
        }
    }

    private class AvroBlockIterator
    implements Iterator<T> {
        private long numRecordsRemaining;
        private final DataFileReader<A> reader;
        private final A reuse;
        private final Function<A, T> converter;

        private AvroBlockIterator(long numRecordsRemaining, DataFileReader<A> reader, A reuse, Function<A, T> converter) {
            this.numRecordsRemaining = numRecordsRemaining;
            this.reader = reader;
            this.reuse = reuse;
            this.converter = converter;
        }

        @Override
        public boolean hasNext() {
            return this.numRecordsRemaining > 0L;
        }

        @Override
        public T next() {
            try {
                --this.numRecordsRemaining;
                return this.converter.apply(this.reader.next(this.reuse));
            }
            catch (IOException e) {
                throw new RuntimeException("Encountered exception when reading from avro format file", e);
            }
        }
    }
}

