package org.apache.flume.source;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.mapr.web.security.SslConfig;
import com.mapr.web.security.WebSecurityManager;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.security.KeyStore;
import java.security.PrivilegedAction;
import java.security.Security;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLServerSocket;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.FlumeAuthenticator;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.SourceCounter;
import org.apache.flume.thrift.Status;
import org.apache.flume.thrift.ThriftFlumeEvent;
import org.apache.flume.thrift.ThriftSourceProtocol;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.AbstractNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TSSLTransportFactory;
import org.apache.thrift.transport.TSaslServerTransport;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/ThriftSource.class */
public class ThriftSource extends AbstractSource implements Configurable, EventDrivenSource {
    public static final Logger logger = LoggerFactory.getLogger(ThriftSource.class);
    public static final String CONFIG_THREADS = "threads";
    public static final String CONFIG_BIND = "bind";
    public static final String CONFIG_PORT = "port";
    public static final String CONFIG_PROTOCOL = "protocol";
    public static final String BINARY_PROTOCOL = "binary";
    public static final String COMPACT_PROTOCOL = "compact";
    private static final String SSL_KEY = "ssl";
    private static final String KEYSTORE_KEY = "keystore";
    private static final String KEYSTORE_PASSWORD_KEY = "keystore-password";
    private static final String KEYSTORE_TYPE_KEY = "keystore-type";
    private static final String EXCLUDE_PROTOCOLS = "exclude-protocols";
    private static final String KERBEROS_KEY = "kerberos";
    private static final String AGENT_PRINCIPAL = "agent-principal";
    private static final String AGENT_KEYTAB = "agent-keytab";
    private static final String MAPR_SECURITY_ENABLED = "mapr_sec_enabled";
    private Integer port;
    private String bindAddress;
    private SourceCounter sourceCounter;
    private TServer server;
    private ExecutorService servingExecutor;
    private String protocol;
    private String keystore;
    private String keystorePassword;
    private String keystoreType;
    private String principal;
    private FlumeAuthenticator flumeAuth;
    private int maxThreads = 0;
    private final List<String> excludeProtocols = new LinkedList();
    private boolean enableSsl = false;
    private boolean enableKerberos = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/source/ThriftSource$ThriftSourceHandler.class */
    public class ThriftSourceHandler implements ThriftSourceProtocol.Iface {
        private ThriftSourceHandler() {
        }

        public Status append(ThriftFlumeEvent thriftFlumeEvent) throws TException {
            Event withBody = EventBuilder.withBody(thriftFlumeEvent.getBody(), thriftFlumeEvent.getHeaders());
            ThriftSource.this.sourceCounter.incrementAppendReceivedCount();
            ThriftSource.this.sourceCounter.incrementEventReceivedCount();
            try {
                ThriftSource.this.getChannelProcessor().processEvent(withBody);
                ThriftSource.this.sourceCounter.incrementAppendAcceptedCount();
                ThriftSource.this.sourceCounter.incrementEventAcceptedCount();
                return Status.OK;
            } catch (ChannelException e) {
                ThriftSource.logger.warn("Thrift source " + ThriftSource.this.getName() + " could not append events to the channel.", e);
                return Status.FAILED;
            }
        }

        public Status appendBatch(List<ThriftFlumeEvent> list) throws TException {
            ThriftSource.this.sourceCounter.incrementAppendBatchReceivedCount();
            ThriftSource.this.sourceCounter.addToEventReceivedCount(list.size());
            ArrayList newArrayList = Lists.newArrayList();
            for (ThriftFlumeEvent thriftFlumeEvent : list) {
                newArrayList.add(EventBuilder.withBody(thriftFlumeEvent.getBody(), thriftFlumeEvent.getHeaders()));
            }
            try {
                ThriftSource.this.getChannelProcessor().processEventBatch(newArrayList);
                ThriftSource.this.sourceCounter.incrementAppendBatchAcceptedCount();
                ThriftSource.this.sourceCounter.addToEventAcceptedCount(list.size());
                return Status.OK;
            } catch (ChannelException e) {
                ThriftSource.logger.warn("Thrift source %s could not append events to the channel.", ThriftSource.this.getName());
                return Status.FAILED;
            }
        }
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        logger.info("Configuring thrift source.");
        this.port = context.getInteger("port");
        Preconditions.checkNotNull(this.port, "Port must be specified for Thrift Source.");
        this.bindAddress = context.getString("bind");
        Preconditions.checkNotNull(this.bindAddress, "Bind address must be specified for Thrift Source.");
        try {
            this.maxThreads = context.getInteger(CONFIG_THREADS, 0).intValue();
            this.maxThreads = this.maxThreads <= 0 ? Integer.MAX_VALUE : this.maxThreads;
        } catch (NumberFormatException e) {
            logger.warn("Thrift source's \"threads\" property must specify an integer value: " + context.getString(CONFIG_THREADS));
        }
        if (this.sourceCounter == null) {
            this.sourceCounter = new SourceCounter(getName());
        }
        this.protocol = context.getString(CONFIG_PROTOCOL);
        if (this.protocol == null) {
            this.protocol = COMPACT_PROTOCOL;
        }
        Preconditions.checkArgument(this.protocol.equalsIgnoreCase(BINARY_PROTOCOL) || this.protocol.equalsIgnoreCase(COMPACT_PROTOCOL), "binary or compact are the only valid Thrift protocol types to choose from.");
        boolean parseBoolean = Boolean.parseBoolean(System.getProperty(MAPR_SECURITY_ENABLED, "false"));
        if (context.getBoolean(SSL_KEY) == null && parseBoolean) {
            this.enableSsl = true;
            SslConfig sslConfig = WebSecurityManager.getSslConfig(SslConfig.SslConfigScope.SCOPE_CLIENT_ONLY);
            this.keystore = sslConfig.getClientKeystoreLocation();
            this.keystorePassword = new String(sslConfig.getClientKeystorePassword());
            this.keystoreType = sslConfig.getClientKeystoreType().toUpperCase();
            excludeProtocolsAndLoadKeyStore(context);
        } else {
            this.enableSsl = context.getBoolean(SSL_KEY, false).booleanValue();
            if (this.enableSsl) {
                this.keystore = context.getString("keystore");
                this.keystorePassword = context.getString(KEYSTORE_PASSWORD_KEY);
                this.keystoreType = context.getString(KEYSTORE_TYPE_KEY, "JKS");
                excludeProtocolsAndLoadKeyStore(context);
            }
        }
        this.principal = context.getString(AGENT_PRINCIPAL);
        String string = context.getString(AGENT_KEYTAB);
        this.enableKerberos = context.getBoolean(KERBEROS_KEY, false).booleanValue();
        this.flumeAuth = FlumeAuthenticationUtil.getAuthenticator(this.principal, string);
        if (this.enableKerberos) {
            if (!this.flumeAuth.isAuthenticated()) {
                throw new FlumeException("Authentication failed in Kerberos mode for principal " + this.principal + " keytab " + string);
            }
            this.flumeAuth.startCredentialRefresher();
        }
    }

    public void excludeProtocolsAndLoadKeyStore(Context context) {
        String string = context.getString(EXCLUDE_PROTOCOLS);
        if (string == null) {
            this.excludeProtocols.add("SSLv3");
        } else {
            this.excludeProtocols.addAll(Arrays.asList(string.split(" ")));
            if (!this.excludeProtocols.contains("SSLv3")) {
                this.excludeProtocols.add("SSLv3");
            }
        }
        Preconditions.checkNotNull(this.keystore, "keystore must be specified when SSL is enabled");
        Preconditions.checkNotNull(this.keystorePassword, "keystore-password must be specified when SSL is enabled");
        try {
            KeyStore.getInstance(this.keystoreType).load(new FileInputStream(this.keystore), this.keystorePassword.toCharArray());
        } catch (Exception e) {
            throw new FlumeException("Thrift source configured with invalid keystore: " + this.keystore, e);
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Starting thrift source");
        this.server = getTThreadedSelectorServer();
        if (this.server == null) {
            this.server = getTThreadPoolServer();
        }
        this.servingExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("Flume Thrift Source I/O Boss").build());
        this.servingExecutor.submit(new Runnable() { // from class: org.apache.flume.source.ThriftSource.1
            @Override // java.lang.Runnable
            public void run() {
                ThriftSource.this.flumeAuth.execute(new PrivilegedAction<Object>() { // from class: org.apache.flume.source.ThriftSource.1.1
                    @Override // java.security.PrivilegedAction
                    public Object run() {
                        ThriftSource.this.server.serve();
                        return null;
                    }
                });
            }
        });
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.server.isServing()) {
            try {
                if (System.currentTimeMillis() - currentTimeMillis >= ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE) {
                    throw new FlumeException("Thrift server failed to start!");
                }
                TimeUnit.MILLISECONDS.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new FlumeException("Interrupted while waiting for Thrift server to start.", e);
            }
        }
        this.sourceCounter.start();
        logger.info("Started Thrift source.");
        super.start();
    }

    private String getkeyManagerAlgorithm() {
        String property = Security.getProperty("ssl.KeyManagerFactory.algorithm");
        return property != null ? property : KeyManagerFactory.getDefaultAlgorithm();
    }

    private TServerTransport getSSLServerTransport() {
        try {
            TSSLTransportFactory.TSSLTransportParameters tSSLTransportParameters = new TSSLTransportFactory.TSSLTransportParameters();
            tSSLTransportParameters.setKeyStore(this.keystore, this.keystorePassword, getkeyManagerAlgorithm(), this.keystoreType);
            TServerSocket serverSocket = TSSLTransportFactory.getServerSocket(this.port.intValue(), 120000, InetAddress.getByName(this.bindAddress), tSSLTransportParameters);
            ServerSocket serverSocket2 = serverSocket.getServerSocket();
            if (serverSocket2 instanceof SSLServerSocket) {
                SSLServerSocket sSLServerSocket = (SSLServerSocket) serverSocket2;
                ArrayList arrayList = new ArrayList();
                for (String str : sSLServerSocket.getEnabledProtocols()) {
                    if (!this.excludeProtocols.contains(str)) {
                        arrayList.add(str);
                    }
                }
                sSLServerSocket.setEnabledProtocols((String[]) arrayList.toArray(new String[0]));
            }
            return serverSocket;
        } catch (Throwable th) {
            throw new FlumeException("Cannot start Thrift source.", th);
        }
    }

    private TServerTransport getTServerTransport() {
        try {
            return new TServerSocket(new InetSocketAddress(this.bindAddress, this.port.intValue()));
        } catch (Throwable th) {
            throw new FlumeException("Cannot start Thrift source.", th);
        }
    }

    private TProtocolFactory getProtocolFactory() {
        if (this.protocol.equals(BINARY_PROTOCOL)) {
            logger.info("Using TBinaryProtocol");
            return new TBinaryProtocol.Factory();
        }
        logger.info("Using TCompactProtocol");
        return new TCompactProtocol.Factory();
    }

    private TServer getTThreadedSelectorServer() {
        if (this.enableSsl || this.enableKerberos) {
            return null;
        }
        try {
            Class<?> cls = Class.forName("org.apache.thrift.server.TThreadedSelectorServer");
            Class<?> cls2 = Class.forName("org.apache.thrift.server.TThreadedSelectorServer$Args");
            TNonblockingServerSocket tNonblockingServerSocket = new TNonblockingServerSocket(new InetSocketAddress(this.bindAddress, this.port.intValue()));
            ThreadFactory build = new ThreadFactoryBuilder().setNameFormat("Flume Thrift IPC Thread %d").build();
            ExecutorService newCachedThreadPool = this.maxThreads == 0 ? Executors.newCachedThreadPool(build) : Executors.newFixedThreadPool(this.maxThreads, build);
            AbstractNonblockingServer.AbstractNonblockingServerArgs abstractNonblockingServerArgs = (AbstractNonblockingServer.AbstractNonblockingServerArgs) cls2.getConstructor(TNonblockingServerTransport.class).newInstance(tNonblockingServerSocket);
            cls2.getDeclaredMethod("executorService", ExecutorService.class).invoke(abstractNonblockingServerArgs, newCachedThreadPool);
            populateServerParams(abstractNonblockingServerArgs);
            this.server = (TServer) cls.getConstructor(cls2).newInstance(abstractNonblockingServerArgs);
            return this.server;
        } catch (ClassNotFoundException e) {
            return null;
        } catch (Throwable th) {
            throw new FlumeException("Cannot start Thrift Source.", th);
        }
    }

    private TServer getTThreadPoolServer() {
        TThreadPoolServer.Args args = new TThreadPoolServer.Args(this.enableSsl ? getSSLServerTransport() : getTServerTransport());
        args.maxWorkerThreads(this.maxThreads);
        populateServerParams(args);
        return new TThreadPoolServer(args);
    }

    private void populateServerParams(TServer.AbstractServerArgs abstractServerArgs) {
        abstractServerArgs.protocolFactory(getProtocolFactory());
        if (this.enableKerberos) {
            abstractServerArgs.transportFactory(getSASLTransportFactory());
        } else {
            abstractServerArgs.transportFactory(new TFastFramedTransport.Factory());
        }
        abstractServerArgs.processor(new ThriftSourceProtocol.Processor(new ThriftSourceHandler()));
    }

    private TTransportFactory getSASLTransportFactory() {
        try {
            String[] splitKerberosName = FlumeAuthenticationUtil.splitKerberosName(this.principal);
            HashMap hashMap = new HashMap();
            hashMap.put("javax.security.sasl.qop", "auth");
            TSaslServerTransport.Factory factory = new TSaslServerTransport.Factory();
            factory.addServerDefinition("GSSAPI", splitKerberosName[0], splitKerberosName[1], hashMap, FlumeAuthenticationUtil.getSaslGssCallbackHandler());
            return factory;
        } catch (IOException e) {
            throw new FlumeException("Error while trying to resolve Principal name - " + this.principal, e);
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        if (this.server != null && this.server.isServing()) {
            this.server.stop();
        }
        if (this.servingExecutor != null) {
            this.servingExecutor.shutdown();
            try {
                if (!this.servingExecutor.awaitTermination(5L, TimeUnit.SECONDS)) {
                    this.servingExecutor.shutdownNow();
                }
            } catch (InterruptedException e) {
                throw new FlumeException("Interrupted while waiting for server to be shutdown.");
            }
        }
        this.sourceCounter.stop();
        super.stop();
    }
}
