/*
 * Decompiled with CFR 0.152.
 */
package oadd.org.apache.drill.exec.rpc.user;

import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import oadd.com.google.protobuf.InvalidProtocolBufferException;
import oadd.io.netty.buffer.ByteBuf;
import oadd.io.netty.buffer.ByteBufInputStream;
import oadd.io.netty.util.concurrent.Future;
import oadd.org.apache.drill.common.config.DrillProperties;
import oadd.org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import oadd.org.apache.drill.exec.proto.GeneralRPCProtos;
import oadd.org.apache.drill.exec.proto.UserBitShared;
import oadd.org.apache.drill.exec.proto.UserProtos;
import oadd.org.apache.drill.exec.rpc.Acks;
import oadd.org.apache.drill.exec.rpc.RequestHandler;
import oadd.org.apache.drill.exec.rpc.Response;
import oadd.org.apache.drill.exec.rpc.ResponseSender;
import oadd.org.apache.drill.exec.rpc.RpcException;
import oadd.org.apache.drill.exec.rpc.RpcOutcomeListener;
import oadd.org.apache.drill.exec.rpc.UserClientConnection;
import oadd.org.apache.drill.exec.rpc.user.UserConnectionConfig;
import oadd.org.apache.drill.exec.rpc.user.UserServer;
import oadd.org.apache.drill.exec.rpc.user.UserSession;
import oadd.org.apache.drill.exec.server.options.OptionValue;
import oadd.org.apache.drill.exec.work.user.UserWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiUserServerRequestHandler
implements RequestHandler<UserServer.BitToUserConnection> {
    private static final Logger logger = LoggerFactory.getLogger(MultiUserServerRequestHandler.class);
    private final UserWorker worker;
    private final UserConnectionConfig config;
    private final Map<UserProtos.SessionHandle, UserClientConnection> sessions = new HashMap<UserProtos.SessionHandle, UserClientConnection>();

    public MultiUserServerRequestHandler(UserWorker worker, UserConnectionConfig config) {
        this.worker = worker;
        this.config = config;
    }

    @Override
    public void handle(UserServer.BitToUserConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException {
        switch (rpcType) {
            case 1025: {
                UserProtos.NewSessionRequest request;
                try {
                    request = UserProtos.NewSessionRequest.PARSER.parseFrom(new ByteBufInputStream(pBody));
                }
                catch (InvalidProtocolBufferException e) {
                    throw new RpcException("Failure while decoding NewSessionRequest body.", e);
                }
                DrillProperties properties = DrillProperties.createEmpty();
                DrillProperties requestedProperties = DrillProperties.createFromProperties(request.getProperties(), false);
                UserSession connectionSession = connection.getSession();
                properties.merge(connectionSession.getProperties());
                properties.merge(requestedProperties);
                String sessionUser = properties.getProperty("user", connectionSession.getCredentials().getUserName());
                UserSession userSession = UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName(sessionUser).build()).withOptionManager(this.worker.getSystemOptions()).withUserProperties(properties).setSupportComplexTypes(connectionSession.isSupportComplexTypes()).build();
                for (Map.Entry<Object, Object> entry : requestedProperties.entrySet()) {
                    OptionValue optionValue;
                    if (DrillProperties.ACCEPTED_BY_SERVER.contains(entry.getKey().toString()) || (optionValue = this.worker.getSystemOptions().getOption(entry.getKey().toString())) == null) continue;
                    userSession.setSessionOption(optionValue.kind, entry.getKey().toString(), entry.getValue().toString());
                }
                if (this.config.getImpersonationManager() != null && userSession.getTargetUserName() != null) {
                    this.config.getImpersonationManager().replaceUserOnSession(userSession.getTargetUserName(), userSession);
                }
                UserProtos.SessionHandle sessionHandle = UserProtos.SessionHandle.newBuilder().setSessionId(userSession.getSessionId()).build();
                this.sessions.put(sessionHandle, MultiUserServerRequestHandler.newSession(connection, userSession));
                logger.debug("New session created: {}", (Object)sessionHandle.getSessionId());
                sender.send(new Response(UserProtos.RpcType.SESSION_HANDLE, sessionHandle, new ByteBuf[0]));
                break;
            }
            case 1027: {
                UserProtos.RunQueryWithSessionHandle query;
                logger.debug("Received query to run. Returning query handle.");
                try {
                    query = UserProtos.RunQueryWithSessionHandle.PARSER.parseFrom(new ByteBufInputStream(pBody));
                }
                catch (InvalidProtocolBufferException e) {
                    throw new RpcException("Failure while decoding RunQueryWithSessionHandle body.", e);
                }
                UserClientConnection currentConnection = this.sessions.get(query.getSessionHandle());
                if (currentConnection == null) {
                    throw new RpcException("Unexpected message. Received a query on non-existent session.");
                }
                logger.debug("Received query on session: {}", (Object)query.getSessionHandle());
                UserBitShared.QueryId queryId = this.worker.submitWork(currentConnection, query.getRunQuery());
                sender.send(new Response(UserProtos.RpcType.QUERY_HANDLE, queryId, new ByteBuf[0]));
                break;
            }
            case 1028: {
                UserProtos.SessionHandle handle;
                try {
                    handle = UserProtos.SessionHandle.PARSER.parseFrom(new ByteBufInputStream(pBody));
                }
                catch (InvalidProtocolBufferException e) {
                    throw new RpcException("Failure while decoding SessionHandle body.", e);
                }
                UserClientConnection removedConnection = this.sessions.remove(handle);
                if (removedConnection != null) {
                    removedConnection.close();
                    sender.send(new Response(UserProtos.RpcType.ACK, Acks.OK, new ByteBuf[0]));
                    break;
                }
                sender.send(new Response(UserProtos.RpcType.ACK, Acks.FAIL, new ByteBuf[0]));
                break;
            }
            case 1029: {
                UserProtos.CancelQueryWithSessionHandle request;
                try {
                    request = UserProtos.CancelQueryWithSessionHandle.PARSER.parseFrom(new ByteBufInputStream(pBody));
                }
                catch (InvalidProtocolBufferException e) {
                    throw new RpcException("Failure while decoding CancelQueryWithSessionHandle body.", e);
                }
                if (this.sessions.get(request.getSessionHandle()) == null) {
                    throw new RpcException("Unexpected message. Received a cancellation on query in non-existent session.");
                }
                sender.send(new Response(UserProtos.RpcType.ACK, this.worker.cancelQuery(request.getQueryId()), new ByteBuf[0]));
                break;
            }
            default: {
                throw new UnsupportedOperationException(String.format("MultiUserServerRequestHandler received rpc of unknown type. Type was %d.", rpcType));
            }
        }
    }

    private static UserClientConnection newSession(final UserServer.BitToUserConnection underlyingConnection, final UserSession userSession) {
        return new UserClientConnection(){

            @Override
            public UserSession getSession() {
                return userSession;
            }

            @Override
            public void sendResult(RpcOutcomeListener<GeneralRPCProtos.Ack> listener, UserBitShared.QueryResult result) {
                underlyingConnection.sendResult(listener, result);
            }

            @Override
            public void sendData(RpcOutcomeListener<GeneralRPCProtos.Ack> listener, QueryWritableBatch result) {
                underlyingConnection.sendData(listener, result);
            }

            @Override
            public Future<Void> getClosureFuture() {
                return underlyingConnection.getClosureFuture().addListener(future -> userSession.close());
            }

            @Override
            public SocketAddress getRemoteAddress() {
                return underlyingConnection.getRemoteAddress();
            }

            @Override
            public void close() {
                userSession.close();
            }
        };
    }
}

