/*
 * Decompiled with CFR 0.152.
 */
package oadd.org.apache.drill.exec.rpc.user;

import java.io.IOException;
import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.net.ssl.SSLEngine;
import javax.security.sasl.SaslException;
import oadd.com.google.protobuf.Internal;
import oadd.com.google.protobuf.MessageLite;
import oadd.io.netty.buffer.ByteBuf;
import oadd.io.netty.channel.Channel;
import oadd.io.netty.channel.ChannelHandler;
import oadd.io.netty.channel.ChannelHandlerContext;
import oadd.io.netty.channel.ChannelPipeline;
import oadd.io.netty.channel.EventLoopGroup;
import oadd.io.netty.channel.socket.SocketChannel;
import oadd.io.netty.handler.ssl.SslHandler;
import oadd.io.netty.util.concurrent.Future;
import oadd.io.netty.util.concurrent.GenericFutureListener;
import oadd.org.apache.drill.common.exceptions.DrillException;
import oadd.org.apache.drill.exec.exception.DrillbitStartupException;
import oadd.org.apache.drill.exec.memory.BufferAllocator;
import oadd.org.apache.drill.exec.physical.impl.materialize.QueryDataPackage;
import oadd.org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import oadd.org.apache.drill.exec.proto.GeneralRPCProtos;
import oadd.org.apache.drill.exec.proto.UserBitShared;
import oadd.org.apache.drill.exec.proto.UserProtos;
import oadd.org.apache.drill.exec.rpc.AbstractServerConnection;
import oadd.org.apache.drill.exec.rpc.BasicServer;
import oadd.org.apache.drill.exec.rpc.OutOfMemoryHandler;
import oadd.org.apache.drill.exec.rpc.OutboundRpcMessage;
import oadd.org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import oadd.org.apache.drill.exec.rpc.RpcException;
import oadd.org.apache.drill.exec.rpc.RpcOutcomeListener;
import oadd.org.apache.drill.exec.rpc.UserClientConnection;
import oadd.org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
import oadd.org.apache.drill.exec.rpc.security.plain.PlainFactory;
import oadd.org.apache.drill.exec.rpc.user.MultiUserServerRequestHandler;
import oadd.org.apache.drill.exec.rpc.user.UserConnectionConfig;
import oadd.org.apache.drill.exec.rpc.user.UserProtobufLengthDecoder;
import oadd.org.apache.drill.exec.rpc.user.UserRpcConfig;
import oadd.org.apache.drill.exec.rpc.user.UserRpcMetrics;
import oadd.org.apache.drill.exec.rpc.user.UserRpcUtils;
import oadd.org.apache.drill.exec.rpc.user.UserServerRequestHandler;
import oadd.org.apache.drill.exec.rpc.user.UserSession;
import oadd.org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
import oadd.org.apache.drill.exec.server.BootStrapContext;
import oadd.org.apache.drill.exec.ssl.SSLConfig;
import oadd.org.apache.drill.exec.ssl.SSLConfigBuilder;
import oadd.org.apache.drill.exec.work.user.UserWorker;
import oadd.org.joda.time.DateTime;
import org.apache.hadoop.security.HadoopKerberosName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

public class UserServer
extends BasicServer<UserProtos.RpcType, BitToUserConnection> {
    private static final Logger logger = LoggerFactory.getLogger(UserServer.class);
    private static final String SERVER_NAME = "Apache Drill Server";
    private final UserConnectionConfig config;
    private final SSLConfig sslConfig;
    private Channel sslChannel;
    private final UserWorker userWorker;
    private static final ConcurrentHashMap<BitToUserConnection, BitToUserConnectionConfig> userConnectionMap = new ConcurrentHashMap();

    private String serializeUserToBitHandshakeWithoutPassword(UserProtos.UserToBitHandshake inbound) {
        StringBuilder sb = new StringBuilder();
        sb.append("rpc_version: ");
        sb.append(inbound.getRpcVersion());
        sb.append("\ncredentials:\n\t");
        sb.append(inbound.getCredentials());
        sb.append("properties:");
        List<UserProtos.Property> props = inbound.getProperties().getPropertiesList();
        for (UserProtos.Property p : props) {
            if (p.getKey().equalsIgnoreCase("password")) continue;
            sb.append("\n\tproperty:\n\t\t");
            sb.append("key: \"");
            sb.append(p.getKey());
            sb.append("\"\n\t\tvalue: \"");
            sb.append(p.getValue());
            sb.append("\"");
        }
        sb.append("\nsupport_complex_types: ");
        sb.append(inbound.getSupportComplexTypes());
        sb.append("\nsupport_timeout: ");
        sb.append(inbound.getSupportTimeout());
        sb.append("sasl_support: ");
        sb.append(inbound.getSaslSupport());
        sb.append("\nclient_infos:\n\t");
        sb.append(inbound.getClientInfos().toString().replace("\n", "\n\t"));
        return sb.toString();
    }

    public UserServer(BootStrapContext context, BufferAllocator allocator, EventLoopGroup eventLoopGroup, UserWorker worker) throws DrillbitStartupException {
        super(UserRpcConfig.getMapping(context.getConfig(), context.getExecutor()), allocator.getAsByteBufAllocator(), eventLoopGroup);
        this.config = new UserConnectionConfig(allocator, context, new UserServerRequestHandler(worker));
        this.sslChannel = null;
        try {
            this.sslConfig = new SSLConfigBuilder().config(context.getConfig()).mode(SSLConfig.Mode.SERVER).initializeSSLContext(true).validateKeyStore(true).build();
            logger.info("Rpc server configured to use TLS protocol '{}'", (Object)this.sslConfig.getProtocol());
        }
        catch (DrillException e) {
            throw new DrillbitStartupException(e.getMessage(), e.getCause());
        }
        this.userWorker = worker;
        ((UserRpcMetrics)UserRpcMetrics.getInstance()).initialize(this.config.isEncryptionEnabled(), allocator);
    }

    @Override
    protected void setupSSL(ChannelPipeline pipe) {
        SSLEngine sslEngine = this.sslConfig.createSSLEngine(this.config.getAllocator(), null, 0);
        pipe.addFirst("ssl-handler", (ChannelHandler)new SslHandler(sslEngine));
        logger.debug("SSL communication between client and server is enabled.");
        logger.debug(this.sslConfig.toString());
    }

    @Override
    protected boolean isSslEnabled() {
        return this.sslConfig.isUserSslEnabled();
    }

    @Override
    public void setSslChannel(Channel c) {
        this.sslChannel = c;
    }

    @Override
    protected void closeSSL() {
        if (this.isSslEnabled() && this.sslChannel != null) {
            this.sslChannel.close();
        }
    }

    @Override
    protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
        switch (rpcType) {
            case 1: {
                return GeneralRPCProtos.Ack.getDefaultInstance();
            }
        }
        throw new UnsupportedOperationException();
    }

    public static Set<Map.Entry<BitToUserConnection, BitToUserConnectionConfig>> getUserConnections() {
        return userConnectionMap.entrySet();
    }

    @Override
    protected BitToUserConnection initRemoteConnection(SocketChannel channel) {
        super.initRemoteConnection(channel);
        return this.registerAndGetConnection(channel);
    }

    private BitToUserConnection registerAndGetConnection(SocketChannel channel) {
        BitToUserConnection bit2userConn = new BitToUserConnection(channel);
        if (bit2userConn != null) {
            userConnectionMap.put(bit2userConn, new BitToUserConnectionConfig());
        }
        return bit2userConn;
    }

    @Override
    protected BasicServer.ServerHandshakeHandler<UserProtos.UserToBitHandshake> getHandshakeHandler(final BitToUserConnection connection) {
        return new BasicServer.ServerHandshakeHandler<UserProtos.UserToBitHandshake>((Internal.EnumLite)UserProtos.RpcType.HANDSHAKE, UserProtos.UserToBitHandshake.PARSER){

            @Override
            protected void consumeHandshake(ChannelHandlerContext ctx, UserProtos.UserToBitHandshake inbound) throws Exception {
                UserProtos.BitToUserHandshake handshakeResp = this.getHandshakeResponse(inbound);
                OutboundRpcMessage msg = new OutboundRpcMessage(GeneralRPCProtos.RpcMode.RESPONSE, this.handshakeType, this.coordinationId, (MessageLite)handshakeResp, new ByteBuf[0]);
                ctx.writeAndFlush(msg);
                if (handshakeResp.getStatus() != UserProtos.HandshakeStatus.SUCCESS && handshakeResp.getStatus() != UserProtos.HandshakeStatus.AUTH_REQUIRED) {
                    throw new RpcException("Handshake request failed: " + handshakeResp.getErrorMessage());
                }
            }

            public UserProtos.BitToUserHandshake getHandshakeResponse(UserProtos.UserToBitHandshake inbound) throws Exception {
                if (logger.isTraceEnabled()) {
                    logger.trace("Handling handshake from user to bit. {}", (Object)UserServer.this.serializeUserToBitHandshakeWithoutPassword(inbound));
                }
                if (!inbound.hasSupportTimeout() || !inbound.getSupportTimeout()) {
                    connection.disableReadTimeout();
                    logger.warn("Timeout Disabled as client {} doesn't support it.", (Object)connection.getName());
                }
                UserProtos.BitToUserHandshake.Builder respBuilder = UserProtos.BitToUserHandshake.newBuilder().setRpcVersion(5).setServerInfos(UserRpcUtils.getRpcEndpointInfos(UserServer.SERVER_NAME)).addAllSupportedMethods(UserRpcConfig.SUPPORTED_SERVER_METHODS);
                try {
                    int saslSupportOrdinal;
                    if (inbound.getRpcVersion() != 5) {
                        String errMsg = String.format("Invalid rpc version. Expected %d, actual %d.", 5, inbound.getRpcVersion());
                        return UserServer.handleFailure(respBuilder, UserProtos.HandshakeStatus.RPC_VERSION_MISMATCH, errMsg, null);
                    }
                    connection.setHandshake(inbound);
                    if (!UserServer.this.config.isAuthEnabled()) {
                        connection.finalizeSession(inbound.getCredentials().getUserName());
                        respBuilder.setStatus(UserProtos.HandshakeStatus.SUCCESS);
                        return respBuilder.build();
                    }
                    boolean clientSupportsSasl = inbound.hasSaslSupport();
                    int n = saslSupportOrdinal = clientSupportsSasl ? inbound.getSaslSupport().ordinal() : UserProtos.SaslSupport.UNKNOWN_SASL_SUPPORT.ordinal();
                    if ((!clientSupportsSasl || saslSupportOrdinal == UserProtos.SaslSupport.SASL_AUTH.ordinal()) && UserServer.this.config.isEncryptionEnabled()) {
                        throw new UserAuthenticationException("The server doesn't allow client without encryption support. Please upgrade your client or talk to your system administrator.");
                    }
                    if (!clientSupportsSasl) {
                        String userName = inbound.getCredentials().getUserName();
                        if (logger.isTraceEnabled()) {
                            logger.trace("User {} on connection {} is likely using an older client.", (Object)userName, (Object)connection.getRemoteAddress());
                        }
                        try {
                            PlainFactory plainFactory;
                            String password = "";
                            UserProtos.UserProperties props = inbound.getProperties();
                            for (int i = 0; i < props.getPropertiesCount(); ++i) {
                                UserProtos.Property prop = props.getProperties(i);
                                if (!"password".equalsIgnoreCase(prop.getKey())) continue;
                                password = prop.getValue();
                                break;
                            }
                            try {
                                plainFactory = (PlainFactory)UserServer.this.config.getAuthProvider().getAuthenticatorFactory("PLAIN");
                            }
                            catch (SaslException e) {
                                throw new UserAuthenticationException("The server no longer supports username/password based authentication. Please talk to your system administrator.");
                            }
                            plainFactory.getAuthenticator().authenticate(userName, password);
                            connection.changeHandlerTo(UserServer.this.config.getMessageHandler());
                            connection.finalizeSession(userName);
                            respBuilder.setStatus(UserProtos.HandshakeStatus.SUCCESS);
                            logger.info("Authenticated {} from {} successfully using PLAIN", (Object)userName, (Object)connection.getRemoteAddress());
                            return respBuilder.build();
                        }
                        catch (UserAuthenticationException ex) {
                            return UserServer.handleFailure(respBuilder, UserProtos.HandshakeStatus.AUTH_FAILED, ex.getMessage(), ex);
                        }
                    }
                    respBuilder.addAllAuthenticationMechanisms(UserServer.this.config.getAuthProvider().getAllFactoryNames());
                    respBuilder.setEncrypted(connection.isEncryptionEnabled());
                    respBuilder.setMaxWrappedSize(connection.getMaxWrappedSize());
                    respBuilder.setStatus(UserProtos.HandshakeStatus.AUTH_REQUIRED);
                    return respBuilder.build();
                }
                catch (Exception e) {
                    return UserServer.handleFailure(respBuilder, UserProtos.HandshakeStatus.UNKNOWN_FAILURE, e.getMessage(), e);
                }
            }
        };
    }

    private static UserProtos.BitToUserHandshake handleFailure(UserProtos.BitToUserHandshake.Builder respBuilder, UserProtos.HandshakeStatus status, String errMsg, Exception exception) {
        String errorId = UUID.randomUUID().toString();
        if (exception != null) {
            logger.error("Error {} in Handling handshake request: {}, {}", errorId, status, errMsg, exception);
        } else {
            logger.error("Error {} in Handling handshake request: {}, {}", errorId, status, errMsg);
        }
        return respBuilder.setStatus(status).setErrorId(errorId).setErrorMessage(errMsg).build();
    }

    @Override
    protected ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
        return new UserProtobufLengthDecoder(allocator, outOfMemoryHandler);
    }

    public class BitToUserConnectionConfig {
        private final DateTime established = new DateTime();
        private final boolean isAuthEnabled;
        private final boolean isEncryptionEnabled;
        private final boolean isSSLEnabled;

        public BitToUserConnectionConfig() {
            this.isAuthEnabled = UserServer.this.config.isAuthEnabled();
            this.isEncryptionEnabled = UserServer.this.config.isEncryptionEnabled();
            this.isSSLEnabled = UserServer.this.config.isSSLEnabled();
        }

        public boolean isAuthEnabled() {
            return this.isAuthEnabled;
        }

        public boolean isEncryptionEnabled() {
            return this.isEncryptionEnabled;
        }

        public boolean isSSLEnabled() {
            return this.isSSLEnabled;
        }

        public DateTime getEstablished() {
            return this.established;
        }
    }

    public class BitToUserConnection
    extends AbstractServerConnection<BitToUserConnection>
    implements UserClientConnection {
        private UserSession session;
        private UserProtos.UserToBitHandshake inbound;
        private String authenticatedUser;

        BitToUserConnection(SocketChannel channel) {
            super(channel, UserServer.this.config, !UserServer.this.config.isAuthEnabled() ? UserServer.this.config.getMessageHandler() : new ServerAuthenticationHandler<BitToUserConnection, UserProtos.RpcType>(UserServer.this.config.getMessageHandler(), 24, UserProtos.RpcType.SASL_MESSAGE));
            this.incConnectionCounter();
        }

        void disableReadTimeout() {
            this.getChannel().pipeline().remove("timeout-handler");
        }

        void setHandshake(UserProtos.UserToBitHandshake inbound) {
            this.inbound = inbound;
        }

        @Override
        public void finalizeSaslSession() throws IOException {
            String authorizationID = this.getSaslServer().getAuthorizationID();
            String userName = new HadoopKerberosName(authorizationID).getShortName();
            this.finalizeSession(userName);
            logger.info("User {} logged in from {}", (Object)this.authenticatedUser, (Object)this.getRemoteAddress());
        }

        void finalizeSession(String userName) {
            MDC.put("drill.userName", userName);
            this.session = UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName(userName).build()).withOptionManager(UserServer.this.userWorker.getSystemOptions()).withUserProperties(this.inbound.getProperties()).setSupportComplexTypes(this.inbound.getSupportComplexTypes()).build();
            this.authenticatedUser = userName;
            String targetName = this.session.getTargetUserName();
            if (UserServer.this.config.getImpersonationManager() != null && targetName != null) {
                UserServer.this.config.getImpersonationManager().replaceUserOnSession(targetName, this.session);
            }
            if (this.inbound.hasEnableMultiplex() && this.inbound.getEnableMultiplex()) {
                this.changeHandlerTo(new MultiUserServerRequestHandler(UserServer.this.userWorker, UserServer.this.config));
                this.session.unsetTargetUserName();
            }
        }

        @Override
        public UserSession getSession() {
            return this.session;
        }

        @Override
        protected Logger getLogger() {
            return logger;
        }

        @Override
        public void sendResult(RpcOutcomeListener<GeneralRPCProtos.Ack> listener, UserBitShared.QueryResult result) {
            logger.trace("Sending result to client with {}", (Object)result);
            UserServer.this.send(listener, this, UserProtos.RpcType.QUERY_RESULT, result, GeneralRPCProtos.Ack.class, true, new ByteBuf[0]);
        }

        @Override
        public void sendData(RpcOutcomeListener<GeneralRPCProtos.Ack> listener, QueryDataPackage data) {
            QueryWritableBatch result = data.toWritableBatch();
            logger.trace("Sending data to client with {}", (Object)result);
            UserServer.this.send(listener, this, UserProtos.RpcType.QUERY_DATA, result.getHeader(), GeneralRPCProtos.Ack.class, false, result.getBuffers());
        }

        @Override
        public Future<Void> getClosureFuture() {
            return this.getChannel().closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> this.cleanup()));
        }

        @Override
        public SocketAddress getRemoteAddress() {
            return this.getChannel().remoteAddress();
        }

        @Override
        public void channelClosed(RpcException ex) {
            if (UserServer.this.config.isAuthEnabled()) {
                logger.info("User {} logged out from {}", (Object)this.authenticatedUser, (Object)this.getRemoteAddress());
            }
            super.channelClosed(ex);
        }

        private void cleanup() {
            MDC.clear();
            if (this.session != null) {
                this.session.close();
            }
        }

        @Override
        public void close() {
            this.cleanup();
            super.close();
        }

        @Override
        public void incConnectionCounter() {
            UserRpcMetrics.getInstance().addConnectionCount();
        }

        @Override
        public void decConnectionCounter() {
            UserRpcMetrics.getInstance().decConnectionCount();
            userConnectionMap.remove(this);
        }
    }
}

