package org.kitesdk.data.spi.filesystem;

import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.spi.AbstractDatasetReader;
import org.kitesdk.data.spi.DataModelUtil;
import org.kitesdk.data.spi.EntityAccessor;
import org.kitesdk.data.spi.JsonUtil;
import org.kitesdk.data.spi.ReaderWriterState;
import org.kitesdk.shaded.com.google.common.base.Function;
import org.kitesdk.shaded.com.google.common.base.Preconditions;
import org.kitesdk.shaded.com.google.common.collect.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/kite-data-core-1.1.0.jar:org/kitesdk/data/spi/filesystem/JSONFileReader.class */
public class JSONFileReader<E> extends AbstractDatasetReader<E> {
    private static final Logger LOG = LoggerFactory.getLogger(JSONFileReader.class);
    private final FileSystem fs;
    private final Path path;
    private final GenericData model;
    private final Schema schema;
    private long size;
    private InputStream incoming;
    private ReaderWriterState state;
    private Iterator<E> iterator;

    /* loaded from: input_file:lib/kite-data-core-1.1.0.jar:org/kitesdk/data/spi/filesystem/JSONFileReader$JSONRecordReader.class */
    public class JSONRecordReader extends RecordReader<E, Void> {
        private E current;

        public JSONRecordReader() {
        }

        public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        }

        public boolean nextKeyValue() throws IOException, InterruptedException {
            if (!JSONFileReader.this.hasNext()) {
                return false;
            }
            this.current = (E) JSONFileReader.this.next();
            return true;
        }

        public E getCurrentKey() throws IOException, InterruptedException {
            return this.current;
        }

        /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
        public Void m652getCurrentValue() throws IOException, InterruptedException {
            return null;
        }

        public float getProgress() throws IOException, InterruptedException {
            if (JSONFileReader.this.size == 0) {
                return 0.0f;
            }
            return ((float) JSONFileReader.this.incoming.getPos()) / ((float) JSONFileReader.this.size);
        }

        public void close() throws IOException {
            JSONFileReader.this.close();
        }
    }

    public JSONFileReader(FileSystem fileSystem, Path path, EntityAccessor<E> entityAccessor) {
        this.size = 0L;
        this.incoming = null;
        this.state = ReaderWriterState.NEW;
        this.fs = fileSystem;
        this.path = path;
        this.schema = entityAccessor.getReadSchema();
        this.model = DataModelUtil.getDataModelForType(entityAccessor.getType());
        this.state = ReaderWriterState.NEW;
    }

    public JSONFileReader(InputStream inputStream, Schema schema, Class<E> cls) {
        this.size = 0L;
        this.incoming = null;
        this.state = ReaderWriterState.NEW;
        this.fs = null;
        this.path = null;
        this.incoming = inputStream;
        this.schema = schema;
        this.model = DataModelUtil.getDataModelForType(cls);
        this.state = ReaderWriterState.NEW;
    }

    @Override // org.kitesdk.data.spi.InitializeAccessor
    public void initialize() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.NEW), "A reader may not be opened more than once - current state:%s", this.state);
        Preconditions.checkArgument(Schema.Type.RECORD.equals(this.schema.getType()), "Schemas for JSON files should be record");
        if (this.incoming == null) {
            Preconditions.checkNotNull(this.fs, "FileSystem cannot be null");
            Preconditions.checkNotNull(this.path, "Path cannot be null");
            try {
                this.incoming = this.fs.open(this.path);
                this.size = this.fs.getFileStatus(this.path).getLen();
            } catch (IOException e) {
                throw new DatasetIOException("Cannot open path: " + this.path, e);
            }
        }
        this.iterator = Iterators.transform(JsonUtil.parser(this.incoming), new Function<JsonNode, E>() { // from class: org.kitesdk.data.spi.filesystem.JSONFileReader.1
            @Override // org.kitesdk.shaded.com.google.common.base.Function
            public E apply(@Nullable JsonNode jsonNode) {
                return (E) JsonUtil.convertToAvro(JSONFileReader.this.model, jsonNode, JSONFileReader.this.schema);
            }
        });
        this.state = ReaderWriterState.OPEN;
    }

    @Override // org.kitesdk.data.DatasetReader, java.util.Iterator
    public boolean hasNext() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to read from a file in state:%s", this.state);
        return this.iterator.hasNext();
    }

    @Override // org.kitesdk.data.DatasetReader, java.util.Iterator
    public E next() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to read from a file in state:%s", this.state);
        return this.iterator.next();
    }

    @Override // org.kitesdk.data.DatasetReader, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.state.equals(ReaderWriterState.OPEN)) {
            LOG.debug("Closing reader on path:{}", this.path);
            this.iterator = null;
            try {
                this.incoming.close();
                this.state = ReaderWriterState.CLOSED;
            } catch (IOException e) {
                throw new DatasetIOException("Unable to close reader path:" + this.path, e);
            }
        }
    }

    @Override // org.kitesdk.data.DatasetReader
    public boolean isOpen() {
        return this.state == ReaderWriterState.OPEN;
    }

    public RecordReader<E, Void> asRecordReader() {
        Preconditions.checkArgument(this.incoming instanceof FSDataInputStream, "Cannot use {} in a record reader", this.incoming.getClass());
        return new JSONRecordReader();
    }
}
