package org.apache.storm.hdfs.trident;

import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.FailedException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
import org.apache.storm.hdfs.trident.format.FileNameFormat;
import org.apache.storm.hdfs.trident.format.RecordFormat;
import org.apache.storm.hdfs.trident.format.SequenceFormat;
import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.operation.TridentCollector;
import storm.trident.state.State;
import storm.trident.tuple.TridentTuple;

/* loaded from: input_file:org/apache/storm/hdfs/trident/HdfsState.class */
public class HdfsState implements State {
    public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class);
    private Options options;

    /* loaded from: input_file:org/apache/storm/hdfs/trident/HdfsState$HdfsFileOptions.class */
    public static class HdfsFileOptions extends Options {
        private transient FSDataOutputStream out;
        protected RecordFormat format;
        private long offset = 0;

        public HdfsFileOptions withFsUrl(String str) {
            this.fsUrl = str;
            return this;
        }

        public HdfsFileOptions withConfigKey(String str) {
            this.configKey = str;
            return this;
        }

        public HdfsFileOptions withFileNameFormat(FileNameFormat fileNameFormat) {
            this.fileNameFormat = fileNameFormat;
            return this;
        }

        public HdfsFileOptions withRecordFormat(RecordFormat recordFormat) {
            this.format = recordFormat;
            return this;
        }

        public HdfsFileOptions withRotationPolicy(FileRotationPolicy fileRotationPolicy) {
            this.rotationPolicy = fileRotationPolicy;
            return this;
        }

        public HdfsFileOptions addRotationAction(RotationAction rotationAction) {
            this.rotationActions.add(rotationAction);
            return this;
        }

        @Override // org.apache.storm.hdfs.trident.HdfsState.Options
        void doPrepare(Map map, int i, int i2) throws IOException {
            HdfsState.LOG.info("Preparing HDFS Bolt...");
            this.fs = FileSystem.get(URI.create(this.fsUrl), this.hdfsConfig);
        }

        @Override // org.apache.storm.hdfs.trident.HdfsState.Options
        void closeOutputFile() throws IOException {
            this.out.close();
        }

        @Override // org.apache.storm.hdfs.trident.HdfsState.Options
        Path createOutputFile() throws IOException {
            Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
            this.out = this.fs.create(path);
            return path;
        }

        @Override // org.apache.storm.hdfs.trident.HdfsState.Options
        public void execute(List<TridentTuple> list) throws IOException {
            boolean z = false;
            synchronized (this.writeLock) {
                for (TridentTuple tridentTuple : list) {
                    this.out.write(this.format.format(tridentTuple));
                    this.offset += r0.length;
                    if (this.rotationPolicy.mark(tridentTuple, this.offset)) {
                        rotateOutputFile();
                        this.offset = 0L;
                        this.rotationPolicy.reset();
                        z = true;
                    }
                }
                if (!z) {
                    if (this.out instanceof HdfsDataOutputStream) {
                        ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                    } else {
                        this.out.hsync();
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/storm/hdfs/trident/HdfsState$Options.class */
    public static abstract class Options implements Serializable {
        protected String fsUrl;
        protected String configKey;
        protected transient FileSystem fs;
        private Path currentFile;
        protected FileRotationPolicy rotationPolicy;
        protected FileNameFormat fileNameFormat;
        protected transient Configuration hdfsConfig;
        protected transient Object writeLock;
        protected transient Timer rotationTimer;
        protected int rotation = 0;
        protected ArrayList<RotationAction> rotationActions = new ArrayList<>();

        abstract void closeOutputFile() throws IOException;

        abstract Path createOutputFile() throws IOException;

        abstract void execute(List<TridentTuple> list) throws IOException;

        abstract void doPrepare(Map map, int i, int i2) throws IOException;

        protected void rotateOutputFile() throws IOException {
            HdfsState.LOG.info("Rotating output file...");
            long currentTimeMillis = System.currentTimeMillis();
            synchronized (this.writeLock) {
                closeOutputFile();
                this.rotation++;
                Path createOutputFile = createOutputFile();
                HdfsState.LOG.info("Performing {} file rotation actions.", Integer.valueOf(this.rotationActions.size()));
                Iterator<RotationAction> it = this.rotationActions.iterator();
                while (it.hasNext()) {
                    it.next().execute(this.fs, this.currentFile);
                }
                this.currentFile = createOutputFile;
            }
            HdfsState.LOG.info("File rotation took {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }

        void prepare(Map map, int i, int i2) {
            this.writeLock = new Object();
            if (this.rotationPolicy == null) {
                throw new IllegalStateException("RotationPolicy must be specified.");
            }
            if (this.fsUrl == null) {
                throw new IllegalStateException("File system URL must be specified.");
            }
            this.fileNameFormat.prepare(map, i, i2);
            this.hdfsConfig = new Configuration();
            Map map2 = (Map) map.get(this.configKey);
            if (map2 != null) {
                for (String str : map2.keySet()) {
                    this.hdfsConfig.set(str, String.valueOf(map2.get(str)));
                }
            }
            try {
                HdfsSecurityUtil.login(map, this.hdfsConfig);
                doPrepare(map, i, i2);
                this.currentFile = createOutputFile();
                if (this.rotationPolicy instanceof TimedRotationPolicy) {
                    long interval = ((TimedRotationPolicy) this.rotationPolicy).getInterval();
                    this.rotationTimer = new Timer(true);
                    this.rotationTimer.scheduleAtFixedRate(new TimerTask() { // from class: org.apache.storm.hdfs.trident.HdfsState.Options.1
                        @Override // java.util.TimerTask, java.lang.Runnable
                        public void run() {
                            try {
                                Options.this.rotateOutputFile();
                            } catch (IOException e) {
                                HdfsState.LOG.warn("IOException during scheduled file rotation.", (Throwable) e);
                            }
                        }
                    }, interval, interval);
                }
            } catch (Exception e) {
                throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
            }
        }
    }

    /* loaded from: input_file:org/apache/storm/hdfs/trident/HdfsState$SequenceFileOptions.class */
    public static class SequenceFileOptions extends Options {
        private SequenceFormat format;
        private transient SequenceFile.Writer writer;
        private transient CompressionCodecFactory codecFactory;
        private SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
        private String compressionCodec = "default";

        public SequenceFileOptions withCompressionCodec(String str) {
            this.compressionCodec = str;
            return this;
        }

        public SequenceFileOptions withFsUrl(String str) {
            this.fsUrl = str;
            return this;
        }

        public SequenceFileOptions withConfigKey(String str) {
            this.configKey = str;
            return this;
        }

        public SequenceFileOptions withFileNameFormat(FileNameFormat fileNameFormat) {
            this.fileNameFormat = fileNameFormat;
            return this;
        }

        public SequenceFileOptions withSequenceFormat(SequenceFormat sequenceFormat) {
            this.format = sequenceFormat;
            return this;
        }

        public SequenceFileOptions withRotationPolicy(FileRotationPolicy fileRotationPolicy) {
            this.rotationPolicy = fileRotationPolicy;
            return this;
        }

        public SequenceFileOptions withCompressionType(SequenceFile.CompressionType compressionType) {
            this.compressionType = compressionType;
            return this;
        }

        public SequenceFileOptions addRotationAction(RotationAction rotationAction) {
            this.rotationActions.add(rotationAction);
            return this;
        }

        @Override // org.apache.storm.hdfs.trident.HdfsState.Options
        void doPrepare(Map map, int i, int i2) throws IOException {
            HdfsState.LOG.info("Preparing Sequence File State...");
            if (this.format == null) {
                throw new IllegalStateException("SequenceFormat must be specified.");
            }
            this.fs = FileSystem.get(URI.create(this.fsUrl), this.hdfsConfig);
            this.codecFactory = new CompressionCodecFactory(this.hdfsConfig);
        }

        @Override // org.apache.storm.hdfs.trident.HdfsState.Options
        Path createOutputFile() throws IOException {
            Path path = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
            this.writer = SequenceFile.createWriter(this.hdfsConfig, SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(this.format.keyClass()), SequenceFile.Writer.valueClass(this.format.valueClass()), SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec)));
            return path;
        }

        @Override // org.apache.storm.hdfs.trident.HdfsState.Options
        void closeOutputFile() throws IOException {
            this.writer.close();
        }

        @Override // org.apache.storm.hdfs.trident.HdfsState.Options
        public void execute(List<TridentTuple> list) throws IOException {
            long length;
            for (TridentTuple tridentTuple : list) {
                synchronized (this.writeLock) {
                    this.writer.append(this.format.key(tridentTuple), this.format.value(tridentTuple));
                    length = this.writer.getLength();
                }
                if (this.rotationPolicy.mark(tridentTuple, length)) {
                    rotateOutputFile();
                    this.rotationPolicy.reset();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HdfsState(Options options) {
        this.options = options;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void prepare(Map map, IMetricsContext iMetricsContext, int i, int i2) {
        this.options.prepare(map, i, i2);
    }

    public void beginCommit(Long l) {
    }

    public void commit(Long l) {
    }

    public void updateState(List<TridentTuple> list, TridentCollector tridentCollector) {
        try {
            this.options.execute(list);
        } catch (IOException e) {
            LOG.warn("Failing batch due to IOException.", (Throwable) e);
            throw new FailedException(e);
        }
    }
}
