package org.apache.flume.sink.hdfs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flume.Channel;
import org.apache.flume.Clock;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.SystemClock;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.formatter.output.BucketPath;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/sink/hdfs/HDFSEventSink.class */
public class HDFSEventSink extends AbstractSink implements Configurable {
    private static final long defaultRollInterval = 30;
    private static final long defaultRollSize = 1024;
    private static final long defaultRollCount = 10;
    private static final String defaultFileName = "FlumeData";
    private static final String defaultSuffix = "";
    private static final String defaultInUsePrefix = "";
    private static final String defaultInUseSuffix = ".tmp";
    private static final long defaultBatchSize = 100;
    private static final String defaultFileType = "SequenceFile";
    private static final int defaultMaxOpenFiles = 5000;
    private static final long defaultCallTimeout = 10000;
    private static final int defaultThreadPoolSize = 10;
    private static final int defaultRollTimerPoolSize = 1;
    private final HDFSWriterFactory writerFactory;
    private WriterLinkedHashMap sfWriters;
    private long rollInterval;
    private long rollSize;
    private long rollCount;
    private long batchSize;
    private int threadsPoolSize;
    private int rollTimerPoolSize;
    private CompressionCodec codeC;
    private SequenceFile.CompressionType compType;
    private String fileType;
    private String filePath;
    private String fileName;
    private String suffix;
    private String inUsePrefix;
    private String inUseSuffix;
    private TimeZone timeZone;
    private int maxOpenFiles;
    private ExecutorService callTimeoutPool;
    private ScheduledExecutorService timedRollerPool;
    private String kerbConfPrincipal;
    private String kerbKeytab;
    private String proxyUserName;
    private UserGroupInformation proxyTicket;
    private boolean needRounding;
    private int roundUnit;
    private int roundValue;
    private boolean useLocalTime;
    private long callTimeout;
    private Context context;
    private SinkCounter sinkCounter;
    private volatile int idleTimeout;
    private Clock clock;
    private static final Logger LOG = LoggerFactory.getLogger(HDFSEventSink.class);
    private static String DIRECTORY_DELIMITER = System.getProperty("file.separator");
    private static final AtomicReference<KerberosUser> staticLogin = new AtomicReference<>();

    /* loaded from: input_file:org/apache/flume/sink/hdfs/HDFSEventSink$WriterCallback.class */
    public interface WriterCallback {
        void run(String str);
    }

    /* loaded from: input_file:org/apache/flume/sink/hdfs/HDFSEventSink$WriterLinkedHashMap.class */
    private static class WriterLinkedHashMap extends LinkedHashMap<String, BucketWriter> {
        private final int maxOpenFiles;

        public WriterLinkedHashMap(int i) {
            super(16, 0.75f, true);
            this.maxOpenFiles = i;
        }

        @Override // java.util.LinkedHashMap
        protected boolean removeEldestEntry(Map.Entry<String, BucketWriter> entry) {
            if (size() <= this.maxOpenFiles) {
                return false;
            }
            try {
                entry.getValue().close();
                return true;
            } catch (IOException e) {
                HDFSEventSink.LOG.warn(entry.getKey().toString(), e);
                return true;
            } catch (InterruptedException e2) {
                HDFSEventSink.LOG.warn(entry.getKey().toString(), e2);
                Thread.currentThread().interrupt();
                return true;
            }
        }
    }

    public HDFSEventSink() {
        this(new HDFSWriterFactory());
    }

    public HDFSEventSink(HDFSWriterFactory hDFSWriterFactory) {
        this.needRounding = false;
        this.roundUnit = 13;
        this.roundValue = defaultRollTimerPoolSize;
        this.useLocalTime = false;
        this.writerFactory = hDFSWriterFactory;
    }

    public void configure(Context context) {
        this.context = context;
        this.filePath = (String) Preconditions.checkNotNull(context.getString("hdfs.path"), "hdfs.path is required");
        this.fileName = context.getString("hdfs.filePrefix", defaultFileName);
        this.suffix = context.getString("hdfs.fileSuffix", "");
        this.inUsePrefix = context.getString("hdfs.inUsePrefix", "");
        this.inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);
        String string = context.getString("hdfs.timeZone");
        this.timeZone = string == null ? null : TimeZone.getTimeZone(string);
        this.rollInterval = context.getLong("hdfs.rollInterval", Long.valueOf(defaultRollInterval)).longValue();
        this.rollSize = context.getLong("hdfs.rollSize", Long.valueOf(defaultRollSize)).longValue();
        this.rollCount = context.getLong("hdfs.rollCount", Long.valueOf(defaultRollCount)).longValue();
        this.batchSize = context.getLong("hdfs.batchSize", Long.valueOf(defaultBatchSize)).longValue();
        this.idleTimeout = context.getInteger("hdfs.idleTimeout", 0).intValue();
        String string2 = context.getString("hdfs.codeC");
        this.fileType = context.getString("hdfs.fileType", defaultFileType);
        this.maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", Integer.valueOf(defaultMaxOpenFiles)).intValue();
        this.callTimeout = context.getLong("hdfs.callTimeout", Long.valueOf(defaultCallTimeout)).longValue();
        this.threadsPoolSize = context.getInteger("hdfs.threadsPoolSize", Integer.valueOf(defaultThreadPoolSize)).intValue();
        this.rollTimerPoolSize = context.getInteger("hdfs.rollTimerPoolSize", Integer.valueOf(defaultRollTimerPoolSize)).intValue();
        this.kerbConfPrincipal = context.getString("hdfs.kerberosPrincipal", "");
        this.kerbKeytab = context.getString("hdfs.kerberosKeytab", "");
        this.proxyUserName = context.getString("hdfs.proxyUser", "");
        Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0");
        if (string2 == null) {
            this.codeC = null;
            this.compType = SequenceFile.CompressionType.NONE;
        } else {
            this.codeC = getCodec(string2);
            this.compType = SequenceFile.CompressionType.BLOCK;
        }
        if (this.fileType.equalsIgnoreCase("DataStream") && string2 != null) {
            throw new IllegalArgumentException("fileType: " + this.fileType + " which does NOT support compressed output. Please don't set codeC or change the fileType if compressed output is desired.");
        }
        if (this.fileType.equalsIgnoreCase("CompressedStream")) {
            Preconditions.checkNotNull(this.codeC, "It's essential to set compress codec when fileType is: " + this.fileType);
        }
        if (!authenticate()) {
            LOG.error("Failed to authenticate!");
        }
        this.needRounding = context.getBoolean("hdfs.round", false).booleanValue();
        if (this.needRounding) {
            String string3 = context.getString("hdfs.roundUnit", "second");
            if (string3.equalsIgnoreCase("hour")) {
                this.roundUnit = 11;
            } else if (string3.equalsIgnoreCase("minute")) {
                this.roundUnit = 12;
            } else if (string3.equalsIgnoreCase("second")) {
                this.roundUnit = 13;
            } else {
                LOG.warn("Rounding unit is not valid, please set one ofminute, hour, or second. Rounding will be disabled");
                this.needRounding = false;
            }
            this.roundValue = context.getInteger("hdfs.roundValue", Integer.valueOf(defaultRollTimerPoolSize)).intValue();
            if (this.roundUnit == 13 || this.roundUnit == 12) {
                Preconditions.checkArgument(this.roundValue > 0 && this.roundValue <= 60, "Round valuemust be > 0 and <= 60");
            } else if (this.roundUnit == 11) {
                Preconditions.checkArgument(this.roundValue > 0 && this.roundValue <= 24, "Round valuemust be > 0 and <= 24");
            }
        }
        this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false).booleanValue();
        if (this.useLocalTime) {
            this.clock = new SystemClock();
        }
        if (this.sinkCounter == null) {
            this.sinkCounter = new SinkCounter(getName());
        }
    }

    private static boolean codecMatches(Class<? extends CompressionCodec> cls, String str) {
        String simpleName = cls.getSimpleName();
        if (cls.getName().equals(str) || simpleName.equalsIgnoreCase(str)) {
            return true;
        }
        return simpleName.endsWith("Codec") && simpleName.substring(0, simpleName.length() - "Codec".length()).equalsIgnoreCase(str);
    }

    @VisibleForTesting
    static CompressionCodec getCodec(String str) {
        Configuration configuration = new Configuration();
        List<Class> codecClasses = CompressionCodecFactory.getCodecClasses(configuration);
        CompressionCodec compressionCodec = null;
        ArrayList arrayList = new ArrayList();
        arrayList.add("None");
        for (Class cls : codecClasses) {
            arrayList.add(cls.getSimpleName());
            if (codecMatches(cls, str)) {
                try {
                    compressionCodec = (CompressionCodec) cls.newInstance();
                } catch (IllegalAccessException e) {
                    LOG.error("Unable to access " + cls + " class");
                } catch (InstantiationException e2) {
                    LOG.error("Unable to instantiate " + cls + " class");
                }
            }
        }
        if (compressionCodec == null) {
            if (!str.equalsIgnoreCase("None")) {
                throw new IllegalArgumentException("Unsupported compression codec " + str + ".  Please choose from: " + arrayList);
            }
        } else if (compressionCodec instanceof org.apache.hadoop.conf.Configurable) {
            ((org.apache.hadoop.conf.Configurable) compressionCodec).setConf(configuration);
        }
        return compressionCodec;
    }

    public Sink.Status process() throws EventDeliveryException {
        Event take;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        ArrayList newArrayList = Lists.newArrayList();
        transaction.begin();
        try {
            int i = 0;
            while (i < this.batchSize && (take = channel.take()) != null) {
                try {
                    String escapeString = BucketPath.escapeString(this.filePath, take.getHeaders(), this.timeZone, this.needRounding, this.roundUnit, this.roundValue, this.useLocalTime);
                    String escapeString2 = BucketPath.escapeString(this.fileName, take.getHeaders(), this.timeZone, this.needRounding, this.roundUnit, this.roundValue, this.useLocalTime);
                    String str = escapeString + DIRECTORY_DELIMITER + escapeString2;
                    BucketWriter bucketWriter = this.sfWriters.get(str);
                    if (bucketWriter == null) {
                        HDFSWriter writer = this.writerFactory.getWriter(this.fileType);
                        WriterCallback writerCallback = null;
                        if (this.idleTimeout != 0) {
                            writerCallback = new WriterCallback() { // from class: org.apache.flume.sink.hdfs.HDFSEventSink.1
                                @Override // org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback
                                public void run(String str2) {
                                    HDFSEventSink.this.sfWriters.remove(str2);
                                }
                            };
                        }
                        bucketWriter = new BucketWriter(this.rollInterval, this.rollSize, this.rollCount, this.batchSize, this.context, escapeString, escapeString2, this.inUsePrefix, this.inUseSuffix, this.suffix, this.codeC, this.compType, writer, this.timedRollerPool, this.proxyTicket, this.sinkCounter, this.idleTimeout, writerCallback, str, this.callTimeout, this.callTimeoutPool);
                        this.sfWriters.put(str, bucketWriter);
                    }
                    if (!newArrayList.contains(bucketWriter)) {
                        newArrayList.add(bucketWriter);
                    }
                    try {
                        bucketWriter.append(take);
                    } catch (IOException e) {
                        this.sfWriters.remove(str);
                        newArrayList.remove(bucketWriter);
                        LOG.warn("Unable to append to " + str + ". Path might have been deleted or file ownership/permissions changed. Removing file handles from sfWriters and writers so append can be retried in another transaction event with new file handle.", e);
                        try {
                            bucketWriter.close();
                        } catch (IOException e2) {
                            LOG.warn("Caught IOException while closing bucketWriter. ", e2);
                        }
                    }
                    i += defaultRollTimerPoolSize;
                } catch (IOException e3) {
                    transaction.rollback();
                    LOG.warn("HDFS IO error", e3);
                    Sink.Status status = Sink.Status.BACKOFF;
                    transaction.close();
                    return status;
                } catch (Throwable th) {
                    transaction.rollback();
                    LOG.error("process failed", th);
                    if (th instanceof Error) {
                        throw ((Error) th);
                    }
                    throw new EventDeliveryException(th);
                }
            }
            if (i == 0) {
                this.sinkCounter.incrementBatchEmptyCount();
            } else if (i == this.batchSize) {
                this.sinkCounter.incrementBatchCompleteCount();
            } else {
                this.sinkCounter.incrementBatchUnderflowCount();
            }
            Iterator it = newArrayList.iterator();
            while (it.hasNext()) {
                ((BucketWriter) it.next()).flush();
            }
            transaction.commit();
            if (i < defaultRollTimerPoolSize) {
                Sink.Status status2 = Sink.Status.BACKOFF;
                transaction.close();
                return status2;
            }
            this.sinkCounter.addToEventDrainSuccessCount(i);
            Sink.Status status3 = Sink.Status.READY;
            transaction.close();
            return status3;
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }

    public void stop() {
        for (Map.Entry<String, BucketWriter> entry : this.sfWriters.entrySet()) {
            LOG.info("Closing {}", entry.getKey());
            try {
                entry.getValue().close();
            } catch (Exception e) {
                LOG.warn("Exception while closing " + entry.getKey() + ". Exception follows.", e);
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        ExecutorService[] executorServiceArr = {this.callTimeoutPool, this.timedRollerPool};
        int length = executorServiceArr.length;
        for (int i = 0; i < length; i += defaultRollTimerPoolSize) {
            ExecutorService executorService = executorServiceArr[i];
            executorService.shutdown();
            while (!executorService.isTerminated()) {
                try {
                    executorService.awaitTermination(Math.max(defaultCallTimeout, this.callTimeout), TimeUnit.MILLISECONDS);
                } catch (InterruptedException e2) {
                    LOG.warn("shutdown interrupted on " + executorService, e2);
                }
            }
        }
        this.callTimeoutPool = null;
        this.timedRollerPool = null;
        this.sfWriters.clear();
        this.sfWriters = null;
        this.sinkCounter.stop();
        super.stop();
    }

    public void start() {
        this.callTimeoutPool = Executors.newFixedThreadPool(this.threadsPoolSize, new ThreadFactoryBuilder().setNameFormat("hdfs-" + getName() + "-call-runner-%d").build());
        this.timedRollerPool = Executors.newScheduledThreadPool(this.rollTimerPoolSize, new ThreadFactoryBuilder().setNameFormat("hdfs-" + getName() + "-roll-timer-%d").build());
        this.sfWriters = new WriterLinkedHashMap(this.maxOpenFiles);
        this.sinkCounter.start();
        super.start();
    }

    private boolean authenticate() {
        boolean isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
        LOG.info("Hadoop Security enabled: " + isSecurityEnabled);
        if (isSecurityEnabled) {
            if (this.kerbConfPrincipal.isEmpty()) {
                LOG.error("Hadoop running in secure mode, but Flume config doesn't specify a principal to use for Kerberos auth.");
                return false;
            }
            if (this.kerbKeytab.isEmpty()) {
                LOG.error("Hadoop running in secure mode, but Flume config doesn't specify a keytab to use for Kerberos auth.");
                return false;
            }
            File file = new File(this.kerbKeytab);
            if (!file.isFile() || !file.canRead()) {
                throw new IllegalArgumentException("The keyTab file: " + this.kerbKeytab + " is nonexistent or can't read. Please specify a readable keytab file for Kerberos auth.");
            }
            try {
                String serverPrincipal = SecurityUtil.getServerPrincipal(this.kerbConfPrincipal, "");
                Preconditions.checkNotNull(serverPrincipal, "Principal must not be null");
                KerberosUser kerberosUser = staticLogin.get();
                KerberosUser kerberosUser2 = new KerberosUser(serverPrincipal, this.kerbKeytab);
                Preconditions.checkState(kerberosUser == null || kerberosUser.equals(kerberosUser2), "Cannot use multiple kerberos principals in the same agent.  Must restart agent to use new principal or keytab. Previous = %s, New = %s", new Object[]{kerberosUser, kerberosUser2});
                UserGroupInformation userGroupInformation = null;
                if (kerberosUser != null && kerberosUser.equals(kerberosUser2)) {
                    try {
                        userGroupInformation = UserGroupInformation.getLoginUser();
                    } catch (IOException e) {
                        LOG.warn("User unexpectedly had no active login. Continuing with authentication", e);
                    }
                }
                if (userGroupInformation == null || !userGroupInformation.getUserName().equals(serverPrincipal)) {
                    try {
                        kerberosLogin(this, serverPrincipal, this.kerbKeytab);
                    } catch (IOException e2) {
                        LOG.error("Authentication or file read error while attempting to login as kerberos principal (" + serverPrincipal + ") using keytab (" + this.kerbKeytab + "). Exception follows.", e2);
                        return false;
                    }
                } else {
                    LOG.debug("{}: Using existing principal login: {}", this, userGroupInformation);
                }
                staticLogin.set(kerberosUser2);
            } catch (IOException e3) {
                LOG.error("Host lookup error resolving kerberos principal (" + this.kerbConfPrincipal + "). Exception follows.", e3);
                return false;
            }
        }
        this.proxyTicket = null;
        if (!this.proxyUserName.isEmpty()) {
            try {
                this.proxyTicket = UserGroupInformation.createProxyUser(this.proxyUserName, UserGroupInformation.getLoginUser());
            } catch (IOException e4) {
                LOG.error("Unable to login as proxy user. Exception follows.", e4);
                return false;
            }
        }
        UserGroupInformation userGroupInformation2 = null;
        if (this.proxyTicket != null) {
            userGroupInformation2 = this.proxyTicket;
        } else if (isSecurityEnabled) {
            try {
                userGroupInformation2 = UserGroupInformation.getLoginUser();
            } catch (IOException e5) {
                LOG.error("Unexpected error: Unable to get authenticated user after apparent successful login! Exception follows.", e5);
                return false;
            }
        }
        if (userGroupInformation2 == null) {
            return true;
        }
        UserGroupInformation.AuthenticationMethod authenticationMethod = userGroupInformation2.getAuthenticationMethod();
        LOG.info("Auth method: {}", authenticationMethod);
        LOG.info(" User name: {}", userGroupInformation2.getUserName());
        LOG.info(" Using keytab: {}", Boolean.valueOf(userGroupInformation2.isFromKeytab()));
        if (authenticationMethod == UserGroupInformation.AuthenticationMethod.PROXY) {
            try {
                UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
                LOG.info(" Superuser auth: {}", loginUser.getAuthenticationMethod());
                LOG.info(" Superuser name: {}", loginUser.getUserName());
                LOG.info(" Superuser using keytab: {}", Boolean.valueOf(loginUser.isFromKeytab()));
            } catch (IOException e6) {
                LOG.error("Unexpected error: unknown superuser impersonating proxy.", e6);
                return false;
            }
        }
        LOG.info("Logged in as user {}", userGroupInformation2.getUserName());
        return true;
    }

    private static synchronized UserGroupInformation kerberosLogin(HDFSEventSink hDFSEventSink, String str, String str2) throws IOException {
        UserGroupInformation userGroupInformation = null;
        try {
            userGroupInformation = UserGroupInformation.getLoginUser();
        } catch (IOException e) {
            LOG.debug("Unable to get login user before Kerberos auth attempt.", e);
        }
        if (userGroupInformation == null || !userGroupInformation.getUserName().equals(str)) {
            LOG.info("{}: Attempting kerberos login as principal ({}) from keytab file ({})", new Object[]{hDFSEventSink, str, str2});
            UserGroupInformation.loginUserFromKeytab(str, str2);
            userGroupInformation = UserGroupInformation.getLoginUser();
        } else {
            LOG.debug("{}: Using existing principal ({}): {}", new Object[]{hDFSEventSink, str, userGroupInformation});
        }
        return userGroupInformation;
    }

    public String toString() {
        return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() + " }";
    }

    @VisibleForTesting
    void setBucketClock(Clock clock) {
        BucketPath.setClock(clock);
    }
}
