/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.ext;

import hive.com.google.common.collect.Lists;
import hive.com.google.protobuf.InvalidProtocolBufferException;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.AsyncPbRpcProxy;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
import org.apache.hadoop.hive.llap.security.LlapTokenIdentifier;
import org.apache.hadoop.hive.llap.tez.Converters;
import org.apache.hadoop.hive.llap.tez.LlapProtocolClientProxy;
import org.apache.hadoop.hive.llap.tezplugins.helpers.LlapTaskUmbilicalServer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LlapTaskUmbilicalExternalClient
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(LlapTaskUmbilicalExternalClient.class);
    private static ScheduledThreadPoolExecutor retryExecutor = new ScheduledThreadPoolExecutor(1);
    private final Random rand = new Random();
    private final LlapProtocolClientProxy communicator;
    private volatile LlapTaskUmbilicalServer llapTaskUmbilicalServer;
    private final Configuration conf;
    protected final String tokenIdentifier;
    protected final Token<JobTokenIdentifier> sessionToken;
    private LlapTaskUmbilicalExternalResponder responder = null;
    private final long connectionTimeout;
    private long baseDelay;
    private int attemptNum = 0;
    private volatile boolean closed = false;
    private volatile boolean timeoutsDisabled = false;
    private RequestInfo requestInfo;
    List<TezEvent> tezEvents;

    public LlapTaskUmbilicalExternalClient(Configuration conf, String tokenIdentifier, Token<JobTokenIdentifier> sessionToken, LlapTaskUmbilicalExternalResponder responder, Token<LlapTokenIdentifier> llapToken) {
        this.conf = conf;
        this.tokenIdentifier = tokenIdentifier;
        this.sessionToken = sessionToken;
        this.responder = responder;
        this.connectionTimeout = 3L * HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        this.baseDelay = HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MS, TimeUnit.MILLISECONDS);
        this.communicator = new LlapProtocolClientProxy(1, conf, llapToken);
        this.communicator.init(conf);
    }

    private void terminateRequest() {
        if (this.closed || this.requestInfo == null) {
            LOG.warn("No current request to terminate");
            return;
        }
        LlapDaemonProtocolProtos.TerminateFragmentRequestProto.Builder builder = LlapDaemonProtocolProtos.TerminateFragmentRequestProto.newBuilder();
        builder.setQueryIdentifier(this.requestInfo.queryIdentifierProto);
        builder.setFragmentIdentifierString(this.requestInfo.taskAttemptId);
        final String taskAttemptId = this.requestInfo.taskAttemptId;
        this.communicator.sendTerminateFragment(builder.build(), this.requestInfo.hostname, this.requestInfo.port, new AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.TerminateFragmentResponseProto>(){

            @Override
            public void setResponse(LlapDaemonProtocolProtos.TerminateFragmentResponseProto response) {
                LOG.debug("Received terminate response for " + taskAttemptId);
            }

            @Override
            public void indicateError(Throwable t) {
                String msg = "Failed to terminate " + taskAttemptId;
                LOG.error(msg, t);
            }
        });
    }

    public InetSocketAddress getAddress() {
        return SharedUmbilicalServer.getInstance((Configuration)this.conf).llapTaskUmbilicalServer.getAddress();
    }

    public void submitWork(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, String llapHost, int llapPort) {
        LlapDaemonProtocolProtos.VertexOrBinary vob = request.getWorkSpec();
        assert (vob.hasVertexBinary() != vob.hasVertex());
        LlapDaemonProtocolProtos.SignableVertexSpec vertex = null;
        try {
            vertex = vob.hasVertex() ? vob.getVertex() : LlapDaemonProtocolProtos.SignableVertexSpec.parseFrom(vob.getVertexBinary());
        }
        catch (InvalidProtocolBufferException e) {
            throw new RuntimeException(e);
        }
        LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifierProto = vertex.getQueryIdentifier();
        TezTaskAttemptID attemptId = Converters.createTaskAttemptId(queryIdentifierProto, vertex.getVertexIndex(), request.getFragmentNumber(), request.getAttemptNumber());
        String fragmentId = attemptId.toString();
        this.requestInfo = new RequestInfo(request, queryIdentifierProto, fragmentId, llapHost, llapPort);
        this.tezEvents = Lists.newArrayList();
        this.registerClient();
        LlapTaskUmbilicalExternalClient client = this;
        this.communicator.start();
        this.submitWork();
    }

    private void submitWork() {
        if (!this.closed) {
            this.communicator.sendSubmitWork(this.requestInfo.request, this.requestInfo.hostname, this.requestInfo.port, new SubmitWorkCallback(this));
        }
    }

    private void retrySubmission() {
        ++this.attemptNum;
        long retryDelay = this.determineRetryDelay();
        LOG.info("Queueing fragment for resubmission {}, attempt {}, delay {}", new Object[]{this.requestInfo.taskAttemptId, this.attemptNum, retryDelay});
        this.disableTimeouts();
        retryExecutor.schedule(new Runnable(){

            @Override
            public void run() {
                LlapTaskUmbilicalExternalClient.this.enableTimeouts();
                LlapTaskUmbilicalExternalClient.this.submitWork();
            }
        }, retryDelay, TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() {
        if (!this.closed) {
            this.terminateRequest();
            this.unregisterClient();
        }
    }

    private void registerClient() {
        SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(this.conf);
        LlapTaskUmbilicalExternalClient prevVal = umbilicalServer.umbilicalProtocol.registeredClients.putIfAbsent(this.requestInfo.taskAttemptId, this);
        if (prevVal != null) {
            LOG.warn("Unexpected - fragment " + this.requestInfo.taskAttemptId + " is already registered!");
        }
        umbilicalServer.llapTaskUmbilicalServer.addTokenForJob(this.tokenIdentifier, this.sessionToken);
    }

    private void unregisterClient() {
        if (!this.closed && this.requestInfo != null) {
            this.communicator.stop();
            SharedUmbilicalServer umbilicalServer = SharedUmbilicalServer.getInstance(this.conf);
            umbilicalServer.umbilicalProtocol.unregisterClient(this.requestInfo.taskAttemptId);
            umbilicalServer.llapTaskUmbilicalServer.removeTokenForJob(this.tokenIdentifier);
            this.closed = true;
        }
    }

    long getLastHeartbeat() {
        return this.requestInfo.lastHeartbeat.get();
    }

    void setLastHeartbeat(long lastHeartbeat) {
        this.requestInfo.lastHeartbeat.set(lastHeartbeat);
    }

    private boolean isTimedOut(long currentTime) {
        if (this.timeoutsDisabled) {
            return false;
        }
        return currentTime - this.getLastHeartbeat() >= this.connectionTimeout;
    }

    private void enableTimeouts() {
        this.setLastHeartbeat(System.currentTimeMillis());
        this.timeoutsDisabled = false;
    }

    private void disableTimeouts() {
        this.timeoutsDisabled = true;
    }

    private long determineRetryDelay() {
        int maxDelay = (int)Math.min((double)this.baseDelay * Math.pow(2.0, this.attemptNum), 60000.0);
        long retryDelay = this.rand.nextInt(maxDelay);
        return retryDelay;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("LlapTaskUmbilicalExternalClient");
        if (this.requestInfo != null) {
            sb.append("(");
            sb.append(this.requestInfo.taskAttemptId);
            sb.append(")");
        }
        return sb.toString();
    }

    private static class LlapTaskUmbilicalExternalImpl
    implements LlapTaskUmbilicalProtocol {
        final ConcurrentMap<String, LlapTaskUmbilicalExternalClient> registeredClients = new ConcurrentHashMap<String, LlapTaskUmbilicalExternalClient>();
        private final ScheduledThreadPoolExecutor timer;

        public LlapTaskUmbilicalExternalImpl(Configuration conf) {
            long taskInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            this.timer = new ScheduledThreadPoolExecutor(1);
            this.timer.scheduleAtFixedRate(new HeartbeatCheckTask(this), taskInterval, taskInterval, TimeUnit.MILLISECONDS);
        }

        @Override
        public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
            return true;
        }

        @Override
        public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, TezException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received heartbeat from container, request=" + request);
            }
            TezHeartbeatResponse response = new TezHeartbeatResponse();
            response.setLastRequestId(request.getRequestId());
            TezTaskAttemptID taskAttemptId = request.getCurrentTaskAttemptID();
            String taskAttemptIdString = taskAttemptId.toString();
            this.updateHeartbeatInfo(taskAttemptIdString);
            List<Object> tezEvents = null;
            LlapTaskUmbilicalExternalClient client = (LlapTaskUmbilicalExternalClient)this.registeredClients.get(taskAttemptIdString);
            if (client == null) {
                LOG.info("Unexpected heartbeat from " + taskAttemptIdString);
                response.setShouldDie();
                return response;
            }
            if (client.requestInfo.state == RequestState.PENDING) {
                client.requestInfo.state = RequestState.RUNNING;
                tezEvents = client.tezEvents;
            } else {
                tezEvents = Collections.emptyList();
            }
            boolean shouldUnregisterClient = false;
            response.setLastRequestId(request.getRequestId());
            response.setNextFromEventId(0);
            response.setNextPreRoutedEventId(0);
            response.setEvents(tezEvents);
            List inEvents = request.getEvents();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Heartbeat from " + taskAttemptIdString + " events: " + (inEvents != null ? inEvents.size() : -1));
            }
            block7: for (TezEvent tezEvent : ListUtils.emptyIfNull((List)inEvents)) {
                EventType eventType = tezEvent.getEventType();
                switch (eventType) {
                    case TASK_ATTEMPT_COMPLETED_EVENT: {
                        LOG.debug("Task completed event for " + taskAttemptIdString);
                        shouldUnregisterClient = true;
                        continue block7;
                    }
                    case TASK_ATTEMPT_FAILED_EVENT: {
                        LOG.debug("Task failed event for " + taskAttemptIdString);
                        shouldUnregisterClient = true;
                        continue block7;
                    }
                    case TASK_STATUS_UPDATE_EVENT: {
                        LOG.debug("Task update event for " + taskAttemptIdString);
                        continue block7;
                    }
                }
                LOG.warn("Unhandled event type " + eventType);
            }
            if (shouldUnregisterClient) {
                client.unregisterClient();
            }
            try {
                if (client.responder != null) {
                    client.responder.heartbeat(request);
                }
            }
            catch (Exception err) {
                LOG.error("Error during responder execution", (Throwable)err);
            }
            return response;
        }

        @Override
        public void nodeHeartbeat(Text hostname, Text uniqueId, int port, LlapTaskUmbilicalProtocol.TezAttemptArray aw, LlapTaskUmbilicalProtocol.BooleanArray guaranteed) throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Node heartbeat from " + hostname + ":" + port + ", " + uniqueId);
            }
            this.updateHeartbeatInfo(hostname.toString(), uniqueId.toString(), port, aw);
        }

        @Override
        public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
            String taskAttemptIdString = taskAttemptId.toString();
            LlapTaskUmbilicalExternalClient client = (LlapTaskUmbilicalExternalClient)this.registeredClients.get(taskAttemptIdString);
            if (client != null) {
                if (client.requestInfo.state == RequestState.PENDING) {
                    LOG.info("Received task kill for {} which is still in pending state. Retry submission.", (Object)taskAttemptIdString);
                    client.retrySubmission();
                } else {
                    try {
                        LOG.error("Task killed - " + taskAttemptIdString);
                        client.unregisterClient();
                        if (client.responder != null) {
                            client.responder.taskKilled(taskAttemptId);
                        }
                    }
                    catch (Exception err) {
                        LOG.error("Error during responder execution", (Throwable)err);
                    }
                }
            } else {
                LOG.info("Received task killed notification for task which is not currently being tracked: " + taskAttemptId);
            }
        }

        public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
            return 0L;
        }

        public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash) throws IOException {
            return ProtocolSignature.getProtocolSignature((VersionedProtocol)this, (String)protocol, (long)clientVersion, (int)clientMethodsHash);
        }

        private void unregisterClient(String taskAttemptId) {
            this.registeredClients.remove(taskAttemptId);
        }

        private void updateHeartbeatInfo(String taskAttemptId) {
            int updateCount = 0;
            LlapTaskUmbilicalExternalClient registeredClient = (LlapTaskUmbilicalExternalClient)this.registeredClients.get(taskAttemptId);
            if (registeredClient != null) {
                registeredClient.setLastHeartbeat(System.currentTimeMillis());
                ++updateCount;
            }
            if (updateCount == 0) {
                LOG.warn("No tasks found for heartbeat from taskAttemptId " + taskAttemptId);
            }
        }

        private void updateHeartbeatInfo(String hostname, String uniqueId, int port, LlapTaskUmbilicalProtocol.TezAttemptArray tasks) {
            int updateCount = 0;
            HashSet<TezTaskAttemptID> attempts = new HashSet<TezTaskAttemptID>();
            for (Writable w : tasks.get()) {
                attempts.add((TezTaskAttemptID)w);
            }
            Object error = "";
            for (Map.Entry entry : this.registeredClients.entrySet()) {
                LlapTaskUmbilicalExternalClient registeredClient = (LlapTaskUmbilicalExternalClient)entry.getValue();
                if (!LlapTaskUmbilicalExternalImpl.doesClientMatchHeartbeat(registeredClient, hostname, uniqueId, port)) continue;
                TezTaskAttemptID ta = TezTaskAttemptID.fromString((String)registeredClient.requestInfo.taskAttemptId);
                if (attempts.contains(ta)) {
                    registeredClient.setLastHeartbeat(System.currentTimeMillis());
                    ++updateCount;
                    continue;
                }
                error = (String)error + registeredClient.requestInfo.taskAttemptId + ", ";
            }
            if (!((String)error).isEmpty()) {
                LOG.info("The tasks we expected to be on the node are not there: " + (String)error);
            }
            if (updateCount == 0) {
                LOG.info("No tasks found for heartbeat from hostname " + hostname + ", port " + port);
            }
        }

        private static boolean doesClientMatchHeartbeat(LlapTaskUmbilicalExternalClient client, String hostname, String uniqueId, int port) {
            return hostname.equals(client.requestInfo.hostname) && port == client.requestInfo.port && uniqueId.equals(client.requestInfo.uniqueNodeId);
        }
    }

    public static interface LlapTaskUmbilicalExternalResponder {
        public void submissionFailed(String var1, Throwable var2);

        public void heartbeat(TezHeartbeatRequest var1);

        public void taskKilled(TezTaskAttemptID var1);

        public void heartbeatTimeout(String var1);
    }

    private static class HeartbeatCheckTask
    implements Runnable {
        LlapTaskUmbilicalExternalImpl umbilicalImpl;

        public HeartbeatCheckTask(LlapTaskUmbilicalExternalImpl umbilicalImpl) {
            this.umbilicalImpl = umbilicalImpl;
        }

        @Override
        public void run() {
            long currentTime = System.currentTimeMillis();
            ArrayList<LlapTaskUmbilicalExternalClient> timedOutTasks = new ArrayList<LlapTaskUmbilicalExternalClient>();
            for (Map.Entry entry : this.umbilicalImpl.registeredClients.entrySet()) {
                LlapTaskUmbilicalExternalClient client = (LlapTaskUmbilicalExternalClient)entry.getValue();
                if (!client.isTimedOut(currentTime)) continue;
                timedOutTasks.add(client);
            }
            for (LlapTaskUmbilicalExternalClient timedOutTask : timedOutTasks) {
                String taskAttemptId = timedOutTask.requestInfo.taskAttemptId;
                LOG.info("Running taskAttemptId " + taskAttemptId + " timed out");
                timedOutTask.unregisterClient();
                timedOutTask.responder.heartbeatTimeout(taskAttemptId);
            }
        }
    }

    static class SubmitWorkCallback
    implements AsyncPbRpcProxy.ExecuteRequestCallback<LlapDaemonProtocolProtos.SubmitWorkResponseProto> {
        private LlapTaskUmbilicalExternalClient client;

        public SubmitWorkCallback(LlapTaskUmbilicalExternalClient client) {
            this.client = client;
        }

        @Override
        public void setResponse(LlapDaemonProtocolProtos.SubmitWorkResponseProto response) {
            if (response.hasSubmissionState() && response.getSubmissionState().equals(LlapDaemonProtocolProtos.SubmissionStateProto.REJECTED)) {
                String fragmentId = this.client.requestInfo.taskAttemptId;
                String msg = "Fragment: " + fragmentId + " rejected. Server Busy.";
                LOG.info(msg);
                return;
            }
            if (response.hasUniqueNodeId()) {
                this.client.requestInfo.uniqueNodeId = response.getUniqueNodeId();
            }
        }

        @Override
        public void indicateError(Throwable t) {
            String fragmentId = this.client.requestInfo.taskAttemptId;
            String msg = "Failed to submit: " + fragmentId;
            LOG.error(msg, t);
            RuntimeException err = new RuntimeException(msg, t);
            this.client.unregisterClient();
            this.client.responder.submissionFailed(fragmentId, err);
        }
    }

    private static class RequestInfo {
        RequestState state;
        final LlapDaemonProtocolProtos.SubmitWorkRequestProto request;
        final LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifierProto;
        final String taskAttemptId;
        final String hostname;
        String uniqueNodeId;
        final int port;
        final AtomicLong lastHeartbeat = new AtomicLong();

        public RequestInfo(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, LlapDaemonProtocolProtos.QueryIdentifierProto queryIdentifierProto, String taskAttemptId, String hostname, int port) {
            this.state = RequestState.PENDING;
            this.request = request;
            this.queryIdentifierProto = queryIdentifierProto;
            this.taskAttemptId = taskAttemptId;
            this.hostname = hostname;
            this.port = port;
            this.lastHeartbeat.set(System.currentTimeMillis());
        }
    }

    private static enum RequestState {
        PENDING,
        RUNNING;

    }

    private static class SharedUmbilicalServer {
        LlapTaskUmbilicalExternalImpl umbilicalProtocol;
        LlapTaskUmbilicalServer llapTaskUmbilicalServer;
        private static volatile SharedUmbilicalServer instance;
        private static final Object lock;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        static SharedUmbilicalServer getInstance(Configuration conf) {
            SharedUmbilicalServer value = instance;
            if (value == null) {
                Object object = lock;
                synchronized (object) {
                    if (instance == null) {
                        instance = new SharedUmbilicalServer(conf);
                    }
                    value = instance;
                }
            }
            return value;
        }

        private SharedUmbilicalServer(Configuration conf) {
            try {
                this.umbilicalProtocol = new LlapTaskUmbilicalExternalImpl(conf);
                this.llapTaskUmbilicalServer = new LlapTaskUmbilicalServer(conf, this.umbilicalProtocol, 1);
            }
            catch (Exception err) {
                throw new ExceptionInInitializerError(err);
            }
        }

        static {
            lock = new Object();
        }
    }
}

