package org.kitesdk.data.spi.filesystem;

import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.util.Set;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetIOException;
import org.kitesdk.data.DatasetOperationException;
import org.kitesdk.data.DatasetRecordException;
import org.kitesdk.data.Format;
import org.kitesdk.data.Formats;
import org.kitesdk.data.Syncable;
import org.kitesdk.data.UnknownFormatException;
import org.kitesdk.data.ValidationException;
import org.kitesdk.data.spi.AbstractDatasetWriter;
import org.kitesdk.data.spi.DescriptorUtil;
import org.kitesdk.data.spi.ReaderWriterState;
import org.kitesdk.shaded.com.google.common.annotations.VisibleForTesting;
import org.kitesdk.shaded.com.google.common.base.Preconditions;
import org.kitesdk.shaded.com.google.common.base.Throwables;
import org.kitesdk.shaded.com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:lib/kite-data-core-1.1.0.jar:org/kitesdk/data/spi/filesystem/FileSystemWriter.class */
public class FileSystemWriter<E> extends AbstractDatasetWriter<E> {
    private static final Logger LOG = LoggerFactory.getLogger(FileSystemWriter.class);
    private static final Set<Format> SUPPORTED_FORMATS = ImmutableSet.builder().add((ImmutableSet.Builder) Formats.AVRO).add((ImmutableSet.Builder<E>) Formats.PARQUET).build();
    private final Path directory;
    private final DatasetDescriptor descriptor;
    private Path tempPath;
    private Path finalPath;
    private int count;
    protected final FileSystem fs;
    protected FileAppender<E> appender;
    protected boolean flushed;

    @VisibleForTesting
    ReaderWriterState state;

    @VisibleForTesting
    final Configuration conf;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/kite-data-core-1.1.0.jar:org/kitesdk/data/spi/filesystem/FileSystemWriter$FileAppender.class */
    public interface FileAppender<E> extends Flushable, Closeable {
        void open() throws IOException;

        void append(E e) throws IOException;

        void sync() throws IOException;

        void cleanup() throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/kite-data-core-1.1.0.jar:org/kitesdk/data/spi/filesystem/FileSystemWriter$IncrementalWriter.class */
    public static class IncrementalWriter<E> extends FileSystemWriter<E> implements org.kitesdk.data.Flushable, Syncable {
        private IncrementalWriter(FileSystem fileSystem, Path path, DatasetDescriptor datasetDescriptor) {
            super(fileSystem, path, datasetDescriptor);
        }

        @Override // org.kitesdk.data.Flushable, java.io.Flushable
        public void flush() {
            Preconditions.checkState(isOpen(), "Attempt to flush a writer in state:%s", this.state);
            try {
                this.appender.flush();
                this.flushed = true;
            } catch (IOException e) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetIOException("Failed to flush appender " + this.appender, e);
            } catch (RuntimeException e2) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetOperationException(e2, "Failed to flush appender %s", this.appender);
            }
        }

        @Override // org.kitesdk.data.Syncable
        public void sync() {
            Preconditions.checkState(isOpen(), "Attempt to sync a writer in state:%s", this.state);
            try {
                this.appender.sync();
                this.flushed = true;
            } catch (IOException e) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetIOException("Failed to sync appender " + this.appender, e);
            } catch (RuntimeException e2) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetOperationException(e2, "Failed to sync appender %s", this.appender);
            }
        }
    }

    /* loaded from: input_file:lib/kite-data-core-1.1.0.jar:org/kitesdk/data/spi/filesystem/FileSystemWriter$KiteRecordWriter.class */
    private class KiteRecordWriter extends RecordWriter<E, Void> {
        private KiteRecordWriter() {
        }

        public void write(E e, Void r5) throws IOException, InterruptedException {
            FileSystemWriter.this.write(e);
        }

        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            FileSystemWriter.this.close();
        }

        public /* bridge */ /* synthetic */ void write(Object obj, Object obj2) throws IOException, InterruptedException {
            write((KiteRecordWriter) obj, (Void) obj2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isSupportedFormat(DatasetDescriptor datasetDescriptor) {
        Format format = datasetDescriptor.getFormat();
        return SUPPORTED_FORMATS.contains(format) || (Formats.CSV.equals(format) && DescriptorUtil.isEnabled("kite.allow.csv", datasetDescriptor));
    }

    private FileSystemWriter(FileSystem fileSystem, Path path, DatasetDescriptor datasetDescriptor) {
        this.count = 0;
        this.flushed = false;
        Preconditions.checkNotNull(fileSystem, "File system is not defined");
        Preconditions.checkNotNull(path, "Destination directory is not defined");
        Preconditions.checkNotNull(datasetDescriptor, "Descriptor is not defined");
        this.fs = fileSystem;
        this.directory = path;
        this.descriptor = datasetDescriptor;
        this.conf = new Configuration(fileSystem.getConf());
        this.state = ReaderWriterState.NEW;
        for (String str : datasetDescriptor.listProperties()) {
            this.conf.set(str, datasetDescriptor.getProperty(str));
        }
    }

    @Override // org.kitesdk.data.spi.InitializeAccessor
    public final void initialize() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.NEW), "Unable to open a writer from state:%s", this.state);
        ValidationException.check(isSupportedFormat(this.descriptor), "Not a supported format: %s", this.descriptor.getFormat());
        try {
            this.fs.mkdirs(this.directory);
            try {
                this.finalPath = new Path(this.directory, uniqueFilename(this.descriptor.getFormat()));
                this.tempPath = tempFilename(this.finalPath);
                try {
                    this.appender = newAppender(this.tempPath);
                    this.appender.open();
                    this.count = 0;
                    LOG.info("Opened output appender {} for {}", this.appender, this.finalPath);
                    this.state = ReaderWriterState.OPEN;
                } catch (IOException e) {
                    this.state = ReaderWriterState.ERROR;
                    throw new DatasetIOException("Failed to open appender " + this.appender, e);
                } catch (RuntimeException e2) {
                    this.state = ReaderWriterState.ERROR;
                    throw new DatasetOperationException(e2, "Failed to open appender %s", this.appender);
                }
            } catch (RuntimeException e3) {
                this.state = ReaderWriterState.ERROR;
                throw new DatasetOperationException(e3, "Failed to initialize file paths under %s", this.directory);
            }
        } catch (IOException e4) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetIOException("Failed to create path " + this.directory, e4);
        } catch (RuntimeException e5) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetOperationException(e5, "Failed to create path %s", this.directory);
        }
    }

    @Override // org.kitesdk.data.DatasetWriter
    public final void write(E e) {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to write to a writer in state:%s", this.state);
        try {
            this.appender.append(e);
            this.count++;
        } catch (IOException e2) {
            this.state = ReaderWriterState.ERROR;
            throw new DatasetIOException("Failed to append " + e + " to " + this.appender, e2);
        } catch (RuntimeException e3) {
            Throwables.propagateIfInstanceOf(e3, DatasetRecordException.class);
            this.state = ReaderWriterState.ERROR;
            throw new DatasetOperationException(e3, "Failed to append %s to %s", e, this.appender);
        }
    }

    @Override // org.kitesdk.data.DatasetWriter, java.io.Closeable, java.lang.AutoCloseable
    public final void close() {
        try {
            if (ReaderWriterState.NEW.equals(this.state) || ReaderWriterState.CLOSED.equals(this.state)) {
                return;
            }
            if (!ReaderWriterState.ERROR.equals(this.state)) {
                try {
                    try {
                        this.appender.close();
                    } catch (IOException e) {
                        throw new DatasetIOException("Failed to close appender " + this.appender, e);
                    }
                } catch (RuntimeException e2) {
                    throw new DatasetOperationException(e2, "Failed to close appender %s", this.appender);
                }
            }
            if (this.count > 0 && (this.flushed || ReaderWriterState.OPEN.equals(this.state))) {
                try {
                    if (!this.fs.rename(this.tempPath, this.finalPath)) {
                        throw new DatasetOperationException("Failed to move %s to %s", this.tempPath, this.finalPath);
                    }
                    LOG.debug("Committed {} for appender {} ({} entities)", new Object[]{this.finalPath, this.appender, Integer.valueOf(this.count)});
                    this.appender.cleanup();
                    this.state = ReaderWriterState.CLOSED;
                } catch (IOException e3) {
                    throw new DatasetIOException("Failed to commit " + this.finalPath, e3);
                } catch (RuntimeException e4) {
                    throw new DatasetOperationException(e4, "Failed to commit %s", this.finalPath);
                }
            }
            try {
                try {
                    if (!this.fs.delete(this.tempPath, true)) {
                        throw new DatasetOperationException("Failed to delete %s", this.tempPath);
                    }
                    LOG.debug("Discarded {} ({} entities)", this.tempPath, Integer.valueOf(this.count));
                    try {
                        this.appender.cleanup();
                        this.state = ReaderWriterState.CLOSED;
                    } catch (IOException e5) {
                        throw new DatasetIOException("Failed to clean up " + this.appender, e5);
                    }
                } catch (IOException e6) {
                    throw new DatasetIOException("Failed to remove temporary file " + this.tempPath, e6);
                }
            } catch (RuntimeException e7) {
                throw new DatasetOperationException(e7, "Failed to remove temporary file %s", this.tempPath);
            }
        } finally {
            this.state = ReaderWriterState.CLOSED;
        }
    }

    @Override // org.kitesdk.data.DatasetWriter
    public final boolean isOpen() {
        return this.state.equals(ReaderWriterState.OPEN);
    }

    private static String uniqueFilename(Format format) {
        return UUID.randomUUID() + "." + format.getExtension();
    }

    private static Path tempFilename(Path path) {
        return new Path(path.getParent(), "." + path.getName() + ".tmp");
    }

    @VisibleForTesting
    <E> FileAppender<E> newAppender(Path path) {
        Format format = this.descriptor.getFormat();
        if (Formats.PARQUET.equals(format)) {
            return DescriptorUtil.isDisabled(FileSystemProperties.NON_DURABLE_PARQUET_PROP, this.descriptor) ? new DurableParquetAppender(this.fs, path, this.descriptor.getSchema(), this.conf, this.descriptor.getCompressionType()) : new ParquetAppender(this.fs, path, this.descriptor.getSchema(), this.conf, this.descriptor.getCompressionType());
        }
        if (Formats.AVRO.equals(format)) {
            return new AvroAppender(this.fs, path, this.descriptor.getSchema(), this.descriptor.getCompressionType());
        }
        if (Formats.CSV.equals(format) && DescriptorUtil.isEnabled("kite.allow.csv", this.descriptor)) {
            return new CSVAppender(this.fs, path, this.descriptor);
        }
        this.state = ReaderWriterState.ERROR;
        throw new UnknownFormatException("Unknown format " + this.descriptor);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <E> FileSystemWriter<E> newWriter(FileSystem fileSystem, Path path, DatasetDescriptor datasetDescriptor) {
        Format format = datasetDescriptor.getFormat();
        return Formats.PARQUET.equals(format) ? DescriptorUtil.isDisabled(FileSystemProperties.NON_DURABLE_PARQUET_PROP, datasetDescriptor) ? new IncrementalWriter(fileSystem, path, datasetDescriptor) : new FileSystemWriter<>(fileSystem, path, datasetDescriptor) : (Formats.AVRO.equals(format) || Formats.CSV.equals(format)) ? new IncrementalWriter(fileSystem, path, datasetDescriptor) : new FileSystemWriter<>(fileSystem, path, datasetDescriptor);
    }

    public RecordWriter<E, Void> asRecordWriter() {
        return new KiteRecordWriter();
    }
}
