/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.hdfs.trident;

import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.FailedException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.storm.hdfs.common.rotation.RotationAction;
import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
import org.apache.storm.hdfs.trident.format.FileNameFormat;
import org.apache.storm.hdfs.trident.format.RecordFormat;
import org.apache.storm.hdfs.trident.format.SequenceFormat;
import org.apache.storm.hdfs.trident.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.trident.rotation.TimedRotationPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.operation.TridentCollector;
import storm.trident.state.State;
import storm.trident.tuple.TridentTuple;

public class HdfsState
implements State {
    public static final Logger LOG = LoggerFactory.getLogger(HdfsState.class);
    private Options options;

    HdfsState(Options options) {
        this.options = options;
    }

    void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
        this.options.prepare(conf, partitionIndex, numPartitions);
    }

    public void beginCommit(Long txId) {
    }

    public void commit(Long txId) {
    }

    public void updateState(List<TridentTuple> tuples, TridentCollector tridentCollector) {
        try {
            this.options.execute(tuples);
        }
        catch (IOException e) {
            LOG.warn("Failing batch due to IOException.", e);
            throw new FailedException((Throwable)e);
        }
    }

    public static class SequenceFileOptions
    extends Options {
        private SequenceFormat format;
        private SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
        private transient SequenceFile.Writer writer;
        private String compressionCodec = "default";
        private transient CompressionCodecFactory codecFactory;

        public SequenceFileOptions withCompressionCodec(String codec) {
            this.compressionCodec = codec;
            return this;
        }

        public SequenceFileOptions withFsUrl(String fsUrl) {
            this.fsUrl = fsUrl;
            return this;
        }

        public SequenceFileOptions withConfigKey(String configKey) {
            this.configKey = configKey;
            return this;
        }

        public SequenceFileOptions withFileNameFormat(FileNameFormat fileNameFormat) {
            this.fileNameFormat = fileNameFormat;
            return this;
        }

        public SequenceFileOptions withSequenceFormat(SequenceFormat format2) {
            this.format = format2;
            return this;
        }

        public SequenceFileOptions withRotationPolicy(FileRotationPolicy rotationPolicy) {
            this.rotationPolicy = rotationPolicy;
            return this;
        }

        public SequenceFileOptions withCompressionType(SequenceFile.CompressionType compressionType) {
            this.compressionType = compressionType;
            return this;
        }

        public SequenceFileOptions addRotationAction(RotationAction action) {
            this.rotationActions.add(action);
            return this;
        }

        @Override
        void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
            LOG.info("Preparing Sequence File State...");
            if (this.format == null) {
                throw new IllegalStateException("SequenceFormat must be specified.");
            }
            this.fs = FileSystem.get(URI.create(this.fsUrl), this.hdfsConfig);
            this.codecFactory = new CompressionCodecFactory(this.hdfsConfig);
        }

        @Override
        Path createOutputFile() throws IOException {
            Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
            this.writer = SequenceFile.createWriter(this.hdfsConfig, SequenceFile.Writer.file(p), SequenceFile.Writer.keyClass(this.format.keyClass()), SequenceFile.Writer.valueClass(this.format.valueClass()), SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec)));
            return p;
        }

        @Override
        void closeOutputFile() throws IOException {
            this.writer.close();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void execute(List<TridentTuple> tuples) throws IOException {
            for (TridentTuple tuple : tuples) {
                long offset;
                Object object = this.writeLock;
                synchronized (object) {
                    this.writer.append(this.format.key(tuple), this.format.value(tuple));
                    offset = this.writer.getLength();
                }
                if (!this.rotationPolicy.mark(tuple, offset)) continue;
                this.rotateOutputFile();
                this.rotationPolicy.reset();
            }
        }
    }

    public static class HdfsFileOptions
    extends Options {
        private transient FSDataOutputStream out;
        protected RecordFormat format;
        private long offset = 0L;

        public HdfsFileOptions withFsUrl(String fsUrl) {
            this.fsUrl = fsUrl;
            return this;
        }

        public HdfsFileOptions withConfigKey(String configKey) {
            this.configKey = configKey;
            return this;
        }

        public HdfsFileOptions withFileNameFormat(FileNameFormat fileNameFormat) {
            this.fileNameFormat = fileNameFormat;
            return this;
        }

        public HdfsFileOptions withRecordFormat(RecordFormat format2) {
            this.format = format2;
            return this;
        }

        public HdfsFileOptions withRotationPolicy(FileRotationPolicy rotationPolicy) {
            this.rotationPolicy = rotationPolicy;
            return this;
        }

        public HdfsFileOptions addRotationAction(RotationAction action) {
            this.rotationActions.add(action);
            return this;
        }

        @Override
        void doPrepare(Map conf, int partitionIndex, int numPartitions) throws IOException {
            LOG.info("Preparing HDFS Bolt...");
            this.fs = FileSystem.get(URI.create(this.fsUrl), this.hdfsConfig);
        }

        @Override
        void closeOutputFile() throws IOException {
            this.out.close();
        }

        @Override
        Path createOutputFile() throws IOException {
            Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
            this.out = this.fs.create(path);
            return path;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void execute(List<TridentTuple> tuples) throws IOException {
            boolean rotated = false;
            Object object = this.writeLock;
            synchronized (object) {
                for (TridentTuple tuple : tuples) {
                    byte[] bytes = this.format.format(tuple);
                    this.out.write(bytes);
                    this.offset += (long)bytes.length;
                    if (!this.rotationPolicy.mark(tuple, this.offset)) continue;
                    this.rotateOutputFile();
                    this.offset = 0L;
                    this.rotationPolicy.reset();
                    rotated = true;
                }
                if (!rotated) {
                    if (this.out instanceof HdfsDataOutputStream) {
                        ((HdfsDataOutputStream)this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
                    } else {
                        this.out.hsync();
                    }
                }
            }
        }
    }

    public static abstract class Options
    implements Serializable {
        protected String fsUrl;
        protected String configKey;
        protected transient FileSystem fs;
        private Path currentFile;
        protected FileRotationPolicy rotationPolicy;
        protected FileNameFormat fileNameFormat;
        protected int rotation = 0;
        protected transient Configuration hdfsConfig;
        protected ArrayList<RotationAction> rotationActions = new ArrayList();
        protected transient Object writeLock;
        protected transient Timer rotationTimer;

        abstract void closeOutputFile() throws IOException;

        abstract Path createOutputFile() throws IOException;

        abstract void execute(List<TridentTuple> var1) throws IOException;

        abstract void doPrepare(Map var1, int var2, int var3) throws IOException;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void rotateOutputFile() throws IOException {
            LOG.info("Rotating output file...");
            long start = System.currentTimeMillis();
            Object object = this.writeLock;
            synchronized (object) {
                this.closeOutputFile();
                ++this.rotation;
                Path newFile = this.createOutputFile();
                LOG.info("Performing {} file rotation actions.", (Object)this.rotationActions.size());
                for (RotationAction action : this.rotationActions) {
                    action.execute(this.fs, this.currentFile);
                }
                this.currentFile = newFile;
            }
            long time = System.currentTimeMillis() - start;
            LOG.info("File rotation took {} ms.", (Object)time);
        }

        void prepare(Map conf, int partitionIndex, int numPartitions) {
            this.writeLock = new Object();
            if (this.rotationPolicy == null) {
                throw new IllegalStateException("RotationPolicy must be specified.");
            }
            if (this.fsUrl == null) {
                throw new IllegalStateException("File system URL must be specified.");
            }
            this.fileNameFormat.prepare(conf, partitionIndex, numPartitions);
            this.hdfsConfig = new Configuration();
            Map map2 = (Map)conf.get(this.configKey);
            if (map2 != null) {
                for (String key : map2.keySet()) {
                    this.hdfsConfig.set(key, String.valueOf(map2.get(key)));
                }
            }
            try {
                HdfsSecurityUtil.login(conf, this.hdfsConfig);
                this.doPrepare(conf, partitionIndex, numPartitions);
                this.currentFile = this.createOutputFile();
            }
            catch (Exception e) {
                throw new RuntimeException("Error preparing HdfsState: " + e.getMessage(), e);
            }
            if (this.rotationPolicy instanceof TimedRotationPolicy) {
                long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
                this.rotationTimer = new Timer(true);
                TimerTask task = new TimerTask(){

                    @Override
                    public void run() {
                        try {
                            Options.this.rotateOutputFile();
                        }
                        catch (IOException e) {
                            LOG.warn("IOException during scheduled file rotation.", e);
                        }
                    }
                };
                this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
            }
        }
    }
}

