/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.src;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

class FileSourceHeavyThroughputTest {
    private TestingFileSystem testFs;

    FileSourceHeavyThroughputTest() {
    }

    @AfterEach
    void unregisterTestFs() throws Exception {
        if (this.testFs != null) {
            this.testFs.unregister();
        }
    }

    @Test
    void testHeavyThroughput() throws Exception {
        InputStatus status;
        Path path = new Path("testfs:///testpath");
        long fileSize = 0x500000000L;
        FileSourceSplit split = new FileSourceSplit("testsplitId", path, 0L, 0x500000000L, 0L, 0x500000000L);
        this.testFs = TestingFileSystem.createForFileStatus(path.toUri().getScheme(), TestingFileSystem.TestFileStatus.forFileWithStream(path, 0x500000000L, new GeneratingInputStream(0x500000000L)));
        this.testFs.register();
        FileSource source = FileSource.forRecordStreamFormat((StreamFormat)new ArrayReaderFormat(), (Path[])new Path[]{path}).build();
        SourceReader reader = source.createReader((SourceReaderContext)new NoOpReaderContext());
        reader.addSplits(Collections.singletonList(split));
        reader.notifyNoMoreSplits();
        NoOpReaderOutput out = new NoOpReaderOutput();
        while ((status = reader.pollNext(out)) != InputStatus.END_OF_INPUT) {
            if (status != InputStatus.NOTHING_AVAILABLE) continue;
            reader.isAvailable().get();
        }
    }

    private static final class GeneratingInputStream
    extends FSDataInputStream {
        private final long length;
        private long pos;

        GeneratingInputStream(long length) {
            this.length = length;
        }

        public void seek(long desired) throws IOException {
            Preconditions.checkArgument((desired >= 0L && desired <= this.length ? 1 : 0) != 0);
            this.pos = desired;
        }

        public long getPos() throws IOException {
            return this.pos;
        }

        public int read() throws IOException {
            if (this.pos < this.length) {
                ++this.pos;
                return 0;
            }
            return -1;
        }

        public int read(byte[] b, int off, int len) throws IOException {
            if (this.pos < this.length) {
                int remaining = (int)Math.min((long)len, this.length - this.pos);
                this.pos += (long)remaining;
                return remaining;
            }
            return -1;
        }
    }

    private static final class ArrayReaderFormat
    extends SimpleStreamFormat<byte[]> {
        private static final long serialVersionUID = 1L;

        private ArrayReaderFormat() {
        }

        public StreamFormat.Reader<byte[]> createReader(Configuration config, FSDataInputStream stream) throws IOException {
            return new ArrayReader(stream);
        }

        public TypeInformation<byte[]> getProducedType() {
            return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
        }
    }

    private static final class NoOpReaderContext
    implements SourceReaderContext {
        private NoOpReaderContext() {
        }

        public SourceReaderMetricGroup metricGroup() {
            return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
        }

        public Configuration getConfiguration() {
            return new Configuration();
        }

        public String getLocalHostName() {
            return "localhost";
        }

        public int getIndexOfSubtask() {
            return 0;
        }

        public void sendSplitRequest() {
        }

        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
        }

        public UserCodeClassLoader getUserCodeClassLoader() {
            return SimpleUserCodeClassLoader.create((ClassLoader)this.getClass().getClassLoader());
        }

        public int currentParallelism() {
            return 1;
        }
    }

    private static final class NoOpReaderOutput<E>
    implements ReaderOutput<E> {
        private NoOpReaderOutput() {
        }

        public void collect(E record) {
        }

        public void collect(E record, long timestamp) {
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markIdle() {
        }

        public void markActive() {
        }

        public SourceOutput<E> createOutputForSplit(String splitId) {
            return this;
        }

        public void releaseOutputForSplit(String splitId) {
        }
    }

    private static final class ArrayReader
    implements StreamFormat.Reader<byte[]> {
        private static final int ARRAY_SIZE = 0x100000;
        private final FSDataInputStream in;

        ArrayReader(FSDataInputStream in) {
            this.in = in;
        }

        @Nullable
        public byte[] read() throws IOException {
            byte[] array = new byte[0x100000];
            int read = this.in.read(array);
            if (read == array.length) {
                return array;
            }
            if (read == -1) {
                return null;
            }
            return Arrays.copyOf(array, read);
        }

        public void close() throws IOException {
            this.in.close();
        }
    }
}

