/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.sink.hdfs;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.flume.Context;
import org.apache.flume.FlumeException;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.flume.sink.hdfs.HDFSWriter;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public abstract class AbstractHDFSWriter
implements HDFSWriter {
    private static final Logger logger = LoggerFactory.getLogger(AbstractHDFSWriter.class);
    private FSDataOutputStream outputStream;
    private FileSystem fs;
    private Path destPath;
    private Method refGetNumCurrentReplicas = null;
    private Method refGetDefaultReplication = null;
    private Method refHflushOrSync = null;
    private Integer configuredMinReplicas = null;
    private Integer numberOfCloseRetries = null;
    private long timeBetweenCloseRetries = Long.MAX_VALUE;
    static final Object[] NO_ARGS = new Object[0];

    public void configure(Context context) {
        this.configuredMinReplicas = context.getInteger("hdfs.minBlockReplicas");
        if (this.configuredMinReplicas != null) {
            Preconditions.checkArgument((this.configuredMinReplicas >= 0 ? 1 : 0) != 0, (Object)"hdfs.minBlockReplicas must be greater than or equal to 0");
        }
        this.numberOfCloseRetries = context.getInteger("hdfs.closeTries", Integer.valueOf(1)) - 1;
        if (this.numberOfCloseRetries > 1) {
            try {
                this.timeBetweenCloseRetries = context.getLong("hdfs.callTimeout", Long.valueOf(30000L));
            }
            catch (NumberFormatException e) {
                logger.warn("hdfs.callTimeout can not be parsed to a long: " + context.getLong("hdfs.callTimeout"));
            }
            this.timeBetweenCloseRetries = Math.max(this.timeBetweenCloseRetries / (long)this.numberOfCloseRetries.intValue(), 1000L);
        }
    }

    @Override
    public boolean isUnderReplicated() {
        try {
            int numBlocks = this.getNumCurrentReplicas();
            if (numBlocks == -1) {
                return false;
            }
            int desiredBlocks = this.configuredMinReplicas != null ? this.configuredMinReplicas.intValue() : this.getFsDesiredReplication();
            return numBlocks < desiredBlocks;
        }
        catch (IllegalAccessException e) {
            logger.error("Unexpected error while checking replication factor", (Throwable)e);
        }
        catch (InvocationTargetException e) {
            logger.error("Unexpected error while checking replication factor", (Throwable)e);
        }
        catch (IllegalArgumentException e) {
            logger.error("Unexpected error while checking replication factor", (Throwable)e);
        }
        return false;
    }

    protected void registerCurrentStream(FSDataOutputStream outputStream, FileSystem fs, Path destPath) {
        Preconditions.checkNotNull((Object)outputStream, (Object)"outputStream must not be null");
        Preconditions.checkNotNull((Object)fs, (Object)"fs must not be null");
        Preconditions.checkNotNull((Object)destPath, (Object)"destPath must not be null");
        this.outputStream = outputStream;
        this.fs = fs;
        this.destPath = destPath;
        this.refGetNumCurrentReplicas = this.reflectGetNumCurrentReplicas(outputStream);
        this.refGetDefaultReplication = this.reflectGetDefaultReplication(fs);
        this.refHflushOrSync = this.reflectHflushOrSync(outputStream);
    }

    protected void unregisterCurrentStream() {
        this.outputStream = null;
        this.fs = null;
        this.destPath = null;
        this.refGetNumCurrentReplicas = null;
        this.refGetDefaultReplication = null;
    }

    public int getFsDesiredReplication() {
        int replication = 0;
        if (this.fs != null && this.destPath != null) {
            if (this.refGetDefaultReplication != null) {
                try {
                    replication = ((Short)this.refGetDefaultReplication.invoke((Object)this.fs, this.destPath)).shortValue();
                }
                catch (IllegalAccessException e) {
                    logger.warn("Unexpected error calling getDefaultReplication(Path)", (Throwable)e);
                }
                catch (InvocationTargetException e) {
                    logger.warn("Unexpected error calling getDefaultReplication(Path)", (Throwable)e);
                }
            } else {
                replication = this.fs.getDefaultReplication();
            }
        }
        return replication;
    }

    public int getNumCurrentReplicas() throws IllegalArgumentException, IllegalAccessException, InvocationTargetException {
        Object repl;
        OutputStream dfsOutputStream;
        if (this.refGetNumCurrentReplicas != null && this.outputStream != null && (dfsOutputStream = this.outputStream.getWrappedStream()) != null && (repl = this.refGetNumCurrentReplicas.invoke((Object)dfsOutputStream, NO_ARGS)) instanceof Integer) {
            return (Integer)repl;
        }
        return -1;
    }

    private Method reflectGetNumCurrentReplicas(FSDataOutputStream os) {
        Method m = null;
        if (os != null) {
            Class<?> wrappedStreamClass = os.getWrappedStream().getClass();
            try {
                m = wrappedStreamClass.getDeclaredMethod("getNumCurrentReplicas", new Class[0]);
                m.setAccessible(true);
            }
            catch (NoSuchMethodException e) {
                logger.debug("FileSystem's output stream doesn't support getNumCurrentReplicas; --HDFS-826 not available; fsOut=" + wrappedStreamClass.getName() + "; err=" + e);
            }
            catch (SecurityException e) {
                logger.debug("Doesn't have access to getNumCurrentReplicas on FileSystems's output stream --HDFS-826 not available; fsOut=" + wrappedStreamClass.getName(), (Throwable)e);
                m = null;
            }
        }
        if (m != null) {
            logger.debug("Using getNumCurrentReplicas--HDFS-826");
        }
        return m;
    }

    private Method reflectGetDefaultReplication(FileSystem fileSystem) {
        Method m = null;
        if (fileSystem != null) {
            Class<?> fsClass = fileSystem.getClass();
            try {
                m = fsClass.getMethod("getDefaultReplication", Path.class);
            }
            catch (NoSuchMethodException e) {
                logger.debug("FileSystem implementation doesn't support getDefaultReplication(Path); -- HADOOP-8014 not available; className = " + fsClass.getName() + "; err = " + e);
            }
            catch (SecurityException e) {
                logger.debug("No access to getDefaultReplication(Path) on FileSystem implementation -- HADOOP-8014 not available; className = " + fsClass.getName() + "; err = " + e);
            }
        }
        if (m != null) {
            logger.debug("Using FileSystem.getDefaultReplication(Path) from HADOOP-8014");
        }
        return m;
    }

    private Method reflectHflushOrSync(FSDataOutputStream os) {
        Method m = null;
        if (os != null) {
            Class<?> fsDataOutputStreamClass = os.getClass();
            try {
                m = fsDataOutputStreamClass.getMethod("hflush", new Class[0]);
            }
            catch (NoSuchMethodException ex) {
                logger.debug("HFlush not found. Will use sync() instead");
                try {
                    m = fsDataOutputStreamClass.getMethod("sync", new Class[0]);
                }
                catch (Exception ex1) {
                    String msg = "Neither hflush not sync were found. That seems to be a problem!";
                    logger.error(msg);
                    throw new FlumeException(msg, (Throwable)ex1);
                }
            }
        }
        return m;
    }

    protected void hflushOrSync(FSDataOutputStream os) throws IOException {
        try {
            this.refHflushOrSync.invoke((Object)os, new Object[0]);
        }
        catch (InvocationTargetException e) {
            String msg = "Error while trying to hflushOrSync!";
            logger.error(msg);
            Throwable cause = e.getCause();
            if (cause != null && cause instanceof IOException) {
                throw (IOException)cause;
            }
            throw new FlumeException(msg, (Throwable)e);
        }
        catch (Exception e) {
            String msg = "Error while trying to hflushOrSync!";
            logger.error(msg);
            throw new FlumeException(msg, (Throwable)e);
        }
    }
}

