/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.taskcomm;

import com.google.protobuf.ByteString;
import com.google.protobuf.ServiceException;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.TezTestServiceCommunicator;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.test.service.rpc.TezTestServiceProtocolProtos;
import org.apache.tez.util.ProtoConverters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TezTestServiceTaskCommunicatorImpl
extends TezTaskCommunicatorImpl {
    private static final Logger LOG = LoggerFactory.getLogger(TezTestServiceTaskCommunicatorImpl.class);
    private final TezTestServiceCommunicator communicator = new TezTestServiceCommunicator(3);
    private final TezTestServiceProtocolProtos.SubmitWorkRequestProto BASE_SUBMIT_WORK_REQUEST;
    private final ConcurrentMap<String, ByteBuffer> credentialMap;

    public TezTestServiceTaskCommunicatorImpl(TaskCommunicatorContext taskCommunicatorContext) {
        super(taskCommunicatorContext);
        TezTestServiceProtocolProtos.SubmitWorkRequestProto.Builder baseBuilder = TezTestServiceProtocolProtos.SubmitWorkRequestProto.newBuilder();
        baseBuilder.setUser(System.getProperty("user.name"));
        baseBuilder.setApplicationIdString(taskCommunicatorContext.getApplicationAttemptId().getApplicationId().toString());
        baseBuilder.setAppAttemptNumber(taskCommunicatorContext.getApplicationAttemptId().getAttemptId());
        baseBuilder.setTokenIdentifier(this.getTokenIdentifier());
        this.BASE_SUBMIT_WORK_REQUEST = baseBuilder.build();
        this.credentialMap = new ConcurrentHashMap<String, ByteBuffer>();
    }

    public void initialize() throws Exception {
        super.initialize();
        this.communicator.init(this.getConf());
    }

    public void start() {
        super.start();
        this.communicator.start();
    }

    public void shutdown() {
        super.shutdown();
        this.communicator.stop();
    }

    public void registerRunningContainer(ContainerId containerId, String hostname, int port) {
        super.registerRunningContainer(containerId, hostname, port);
    }

    public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason, String diagnostics) {
        super.registerContainerEnd(containerId, endReason, diagnostics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void registerRunningTaskAttempt(final ContainerId containerId, final TaskSpec taskSpec, Map<String, LocalResource> additionalResources, Credentials credentials, boolean credentialsChanged, int priority) {
        int port;
        String host;
        super.registerRunningTaskAttempt(containerId, taskSpec, additionalResources, credentials, credentialsChanged, priority);
        TezTestServiceProtocolProtos.SubmitWorkRequestProto requestProto = null;
        try {
            requestProto = this.constructSubmitWorkRequest(containerId, taskSpec);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to construct request", e);
        }
        TezTaskCommunicatorImpl.ContainerInfo containerInfo = this.getContainerInfo(containerId);
        if (containerInfo != null) {
            TezTaskCommunicatorImpl.ContainerInfo containerInfo2 = containerInfo;
            synchronized (containerInfo2) {
                host = containerInfo.host;
                port = containerInfo.port;
            }
        } else {
            throw new RuntimeException("ContainerInfo not found for container: " + containerId + ", while trying to launch task: " + taskSpec.getTaskAttemptID());
        }
        this.getContext().taskSubmitted(taskSpec.getTaskAttemptID(), containerId);
        this.getContext().taskStartedRemotely(taskSpec.getTaskAttemptID());
        this.communicator.submitWork(requestProto, host, port, new TezTestServiceCommunicator.ExecuteRequestCallback<TezTestServiceProtocolProtos.SubmitWorkResponseProto>(){

            @Override
            public void setResponse(TezTestServiceProtocolProtos.SubmitWorkResponseProto response) {
                LOG.info("Successfully launched task: " + taskSpec.getTaskAttemptID());
            }

            @Override
            public void indicateError(Throwable t) {
                LOG.info("Failed to run task: " + taskSpec.getTaskAttemptID() + " on containerId: " + containerId, t);
                if (t instanceof ServiceException) {
                    ServiceException se = (ServiceException)t;
                    t = se.getCause();
                }
                if (t instanceof RemoteException) {
                    RemoteException re = (RemoteException)t;
                    String message = re.toString();
                    if (message.contains(RejectedExecutionException.class.getName())) {
                        TezTestServiceTaskCommunicatorImpl.this.getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.EXECUTOR_BUSY, "Service Busy");
                    } else {
                        TezTestServiceTaskCommunicatorImpl.this.getContext().taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.toString());
                    }
                } else if (t instanceof IOException) {
                    TezTestServiceTaskCommunicatorImpl.this.getContext().taskKilled(taskSpec.getTaskAttemptID(), TaskAttemptEndReason.COMMUNICATION_ERROR, "Communication Error");
                } else {
                    TezTestServiceTaskCommunicatorImpl.this.getContext().taskFailed(taskSpec.getTaskAttemptID(), TaskFailureType.NON_FATAL, TaskAttemptEndReason.OTHER, t.getMessage());
                }
            }
        });
    }

    public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason, String diagnostics) {
        super.unregisterRunningTaskAttempt(taskAttemptID, endReason, diagnostics);
    }

    private TezTestServiceProtocolProtos.SubmitWorkRequestProto constructSubmitWorkRequest(ContainerId containerId, TaskSpec taskSpec) throws IOException {
        TezTestServiceProtocolProtos.SubmitWorkRequestProto.Builder builder = TezTestServiceProtocolProtos.SubmitWorkRequestProto.newBuilder((TezTestServiceProtocolProtos.SubmitWorkRequestProto)this.BASE_SUBMIT_WORK_REQUEST);
        builder.setContainerIdString(containerId.toString());
        builder.setAmHost(this.getAddress().getHostName());
        builder.setAmPort(this.getAddress().getPort());
        Credentials taskCredentials = new Credentials();
        taskCredentials.addAll(this.getContext().getAMCredentials());
        ByteBuffer credentialsBinary = (ByteBuffer)this.credentialMap.get(taskSpec.getDAGName());
        if (credentialsBinary == null) {
            credentialsBinary = this.serializeCredentials(this.getContext().getAMCredentials());
            this.credentialMap.putIfAbsent(taskSpec.getDAGName(), credentialsBinary.duplicate());
        } else {
            credentialsBinary = credentialsBinary.duplicate();
        }
        builder.setCredentialsBinary(ByteString.copyFrom((ByteBuffer)credentialsBinary));
        builder.setTaskSpec(ProtoConverters.convertTaskSpecToProto(taskSpec));
        return builder.build();
    }

    private ByteBuffer serializeCredentials(Credentials credentials) throws IOException {
        Credentials containerCredentials = new Credentials();
        containerCredentials.addAll(credentials);
        DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
        containerCredentials.writeTokenStorageToStream((DataOutputStream)containerTokens_dob);
        ByteBuffer containerCredentialsBuffer = ByteBuffer.wrap(containerTokens_dob.getData(), 0, containerTokens_dob.getLength());
        return containerCredentialsBuffer;
    }
}

