/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezReflectionException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TezDAGID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecoveryServiceWithEventHandlingHook
extends RecoveryService {
    public static final String AM_RECOVERY_SERVICE_HOOK_CLASS = "tez.test.am.recovery_service.hook";
    private static final Logger LOG = LoggerFactory.getLogger(RecoveryServiceWithEventHandlingHook.class);
    private RecoveryServiceHook hook;
    private boolean shutdownInvoked = false;

    public RecoveryServiceWithEventHandlingHook(AppContext appContext) {
        super(appContext);
    }

    public void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        String clazz = conf.get(AM_RECOVERY_SERVICE_HOOK_CLASS);
        Preconditions.checkArgument((clazz != null ? 1 : 0) != 0, (Object)"RecoveryServiceHook class is not specified");
        this.hook = (RecoveryServiceHook)ReflectionUtils.createClazzInstance((String)clazz, (Class[])new Class[]{RecoveryServiceWithEventHandlingHook.class, AppContext.class}, (Object[])new Object[]{this, this.appContext});
    }

    protected void handleRecoveryEvent(DAGHistoryEvent event) throws IOException {
        this.hook.preHandleRecoveryEvent(event);
        if (this.shutdownInvoked) {
            return;
        }
        super.handleRecoveryEvent(event);
        this.hook.postHandleRecoveryEvent(event);
    }

    protected void handleSummaryEvent(TezDAGID dagID, HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
        this.hook.preHandleSummaryEvent(eventType, summaryEvent);
        if (this.shutdownInvoked) {
            return;
        }
        super.handleSummaryEvent(dagID, eventType, summaryEvent);
        this.hook.postHandleSummaryEvent(eventType, summaryEvent);
    }

    private void shutdown() {
        Thread shutdownThread = new Thread("AMShutdown Thread"){

            @Override
            public void run() {
                LOG.info("Try to kill AM");
                System.exit(1);
            }
        };
        super.setStopped(true);
        this.shutdownInvoked = true;
        shutdownThread.start();
    }

    public static class MultipleRoundShutdownCondition {
        private List<SimpleShutdownCondition> shutdownConditionList;

        public MultipleRoundShutdownCondition() {
        }

        public MultipleRoundShutdownCondition(List<SimpleShutdownCondition> shutdownConditionList) {
            this.shutdownConditionList = shutdownConditionList;
        }

        public String serialize() throws IOException {
            StringBuilder builder = new StringBuilder();
            for (int i = 0; i < this.shutdownConditionList.size(); ++i) {
                builder.append(this.shutdownConditionList.get(i).serialize());
                if (i == this.shutdownConditionList.size() - 1) continue;
                builder.append(";");
            }
            return builder.toString();
        }

        public MultipleRoundShutdownCondition deserialize(String str) throws IOException {
            String[] splits = str.split(";");
            this.shutdownConditionList = new ArrayList<SimpleShutdownCondition>();
            for (String split : splits) {
                SimpleShutdownCondition condition = new SimpleShutdownCondition();
                this.shutdownConditionList.add(condition.deserialize(split));
            }
            return this;
        }

        public SimpleShutdownCondition getSimpleShutdownCondition(int index) {
            return this.shutdownConditionList.get(index);
        }

        public int size() {
            return this.shutdownConditionList.size();
        }
    }

    public static class MultipleRoundRecoveryEventHook
    extends RecoveryServiceHook {
        public static final String MULTIPLE_ROUND_SHUTDOWN_CONDITION = "tez.test.recovery.multiple_round_shutdown_condition";
        private MultipleRoundShutdownCondition shutdownCondition = new MultipleRoundShutdownCondition();
        private int attemptId;

        public MultipleRoundRecoveryEventHook(RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) {
            super(recoveryService, appContext);
            try {
                Preconditions.checkArgument((recoveryService.getConfig().get(MULTIPLE_ROUND_SHUTDOWN_CONDITION) != null ? 1 : 0) != 0, (Object)"tez.test.recovery.multiple_round_shutdown_condition is not set in TezConfiguration");
                this.shutdownCondition.deserialize(recoveryService.getConfig().get(MULTIPLE_ROUND_SHUTDOWN_CONDITION));
            }
            catch (IOException e) {
                throw new TezUncheckedException("Can not initialize MultipleRoundShutdownCondition", (Throwable)e);
            }
            this.attemptId = appContext.getApplicationAttemptId().getAttemptId();
        }

        @Override
        public void preHandleRecoveryEvent(DAGHistoryEvent event) throws IOException {
            if (this.attemptId <= this.shutdownCondition.size()) {
                SimpleShutdownCondition condition = this.shutdownCondition.getSimpleShutdownCondition(this.attemptId - 1);
                if (condition.timing.equals((Object)SimpleShutdownCondition.TIMING.PRE) && condition.match(event.getHistoryEvent())) {
                    this.recoveryService.shutdown();
                }
            }
        }

        @Override
        public void postHandleRecoveryEvent(DAGHistoryEvent event) throws IOException {
            for (int i = 0; i < this.shutdownCondition.size(); ++i) {
                SimpleShutdownCondition condition = this.shutdownCondition.getSimpleShutdownCondition(i);
                LOG.info("condition:" + condition.getEvent().getEventType() + ":" + condition.getHistoryEvent());
            }
            if (this.attemptId <= this.shutdownCondition.size()) {
                SimpleShutdownCondition condition = this.shutdownCondition.getSimpleShutdownCondition(this.attemptId - 1);
                LOG.info("event:" + event.getHistoryEvent().getEventType());
                if (condition.timing.equals((Object)SimpleShutdownCondition.TIMING.POST) && condition.match(event.getHistoryEvent())) {
                    this.recoveryService.shutdown();
                }
            }
        }

        @Override
        public void preHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
        }

        @Override
        public void postHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
        }
    }

    public static class SimpleShutdownCondition {
        private TIMING timing;
        private HistoryEvent event;

        public SimpleShutdownCondition(TIMING timing, HistoryEvent event) {
            this.timing = timing;
            this.event = event;
        }

        public SimpleShutdownCondition() {
        }

        public HistoryEvent getHistoryEvent() {
            return this.event;
        }

        private String encodeHistoryEvent(HistoryEvent event) throws IOException {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            CodedOutputStream codedOutputStream = CodedOutputStream.newInstance((OutputStream)out);
            event.toProtoStream(codedOutputStream);
            codedOutputStream.flush();
            return event.getClass().getName() + "," + Base64.encodeBase64String((byte[])out.toByteArray());
        }

        private HistoryEvent decodeHistoryEvent(String eventClass, String base64) throws IOException {
            CodedInputStream in = CodedInputStream.newInstance((byte[])Base64.decodeBase64((String)base64));
            try {
                HistoryEvent event = (HistoryEvent)ReflectionUtils.createClazzInstance((String)eventClass);
                event.fromProtoStream(in);
                return event;
            }
            catch (TezReflectionException e) {
                throw new IOException(e);
            }
        }

        public String serialize() throws IOException {
            StringBuilder builder = new StringBuilder();
            builder.append(this.timing.name() + ",");
            builder.append(this.encodeHistoryEvent(this.event));
            return builder.toString();
        }

        public SimpleShutdownCondition deserialize(String str) throws IOException {
            String[] tokens = str.split(",");
            this.timing = TIMING.valueOf(tokens[0]);
            this.event = this.decodeHistoryEvent(tokens[1], tokens[2]);
            return this;
        }

        public HistoryEvent getEvent() {
            return this.event;
        }

        public TIMING getTiming() {
            return this.timing;
        }

        public boolean match(HistoryEvent incomingEvent) {
            switch (this.event.getEventType()) {
                case DAG_SUBMITTED: {
                    if (incomingEvent.getEventType() != HistoryEventType.DAG_SUBMITTED) break;
                    return true;
                }
                case DAG_INITIALIZED: {
                    if (incomingEvent.getEventType() != HistoryEventType.DAG_INITIALIZED) break;
                    return true;
                }
                case DAG_STARTED: {
                    if (incomingEvent.getEventType() != HistoryEventType.DAG_STARTED) break;
                    return true;
                }
                case DAG_FINISHED: {
                    if (incomingEvent.getEventType() != HistoryEventType.DAG_FINISHED) break;
                    return true;
                }
                case VERTEX_INITIALIZED: {
                    if (incomingEvent.getEventType() != HistoryEventType.VERTEX_INITIALIZED) break;
                    VertexInitializedEvent otherEvent = (VertexInitializedEvent)incomingEvent;
                    VertexInitializedEvent conditionEvent = (VertexInitializedEvent)this.event;
                    return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId();
                }
                case VERTEX_STARTED: {
                    if (incomingEvent.getEventType() != HistoryEventType.VERTEX_STARTED) break;
                    VertexStartedEvent otherEvent = (VertexStartedEvent)incomingEvent;
                    VertexStartedEvent conditionEvent = (VertexStartedEvent)this.event;
                    return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId();
                }
                case VERTEX_FINISHED: {
                    if (incomingEvent.getEventType() != HistoryEventType.VERTEX_FINISHED) break;
                    VertexFinishedEvent otherEvent = (VertexFinishedEvent)incomingEvent;
                    VertexFinishedEvent conditionEvent = (VertexFinishedEvent)this.event;
                    return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId();
                }
                case VERTEX_CONFIGURE_DONE: {
                    if (incomingEvent.getEventType() != HistoryEventType.VERTEX_CONFIGURE_DONE) break;
                    VertexConfigurationDoneEvent otherEvent = (VertexConfigurationDoneEvent)incomingEvent;
                    VertexConfigurationDoneEvent conditionEvent = (VertexConfigurationDoneEvent)this.event;
                    return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId();
                }
                case TASK_STARTED: {
                    if (incomingEvent.getEventType() != HistoryEventType.TASK_STARTED) break;
                    TaskStartedEvent otherEvent = (TaskStartedEvent)incomingEvent;
                    TaskStartedEvent conditionEvent = (TaskStartedEvent)this.event;
                    return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId() && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId();
                }
                case TASK_FINISHED: {
                    if (incomingEvent.getEventType() != HistoryEventType.TASK_FINISHED) break;
                    TaskFinishedEvent otherEvent = (TaskFinishedEvent)incomingEvent;
                    TaskFinishedEvent conditionEvent = (TaskFinishedEvent)this.event;
                    return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId() && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId();
                }
                case TASK_ATTEMPT_STARTED: {
                    if (incomingEvent.getEventType() != HistoryEventType.TASK_ATTEMPT_STARTED) break;
                    TaskAttemptStartedEvent otherEvent = (TaskAttemptStartedEvent)incomingEvent;
                    TaskAttemptStartedEvent conditionEvent = (TaskAttemptStartedEvent)this.event;
                    return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId() && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId() && otherEvent.getTaskAttemptID().getId() == conditionEvent.getTaskAttemptID().getId();
                }
                case TASK_ATTEMPT_FINISHED: {
                    if (incomingEvent.getEventType() != HistoryEventType.TASK_ATTEMPT_FINISHED) break;
                    TaskAttemptFinishedEvent otherEvent = (TaskAttemptFinishedEvent)incomingEvent;
                    TaskAttemptFinishedEvent conditionEvent = (TaskAttemptFinishedEvent)this.event;
                    return otherEvent.getVertexID().getId() == conditionEvent.getVertexID().getId() && otherEvent.getTaskID().getId() == conditionEvent.getTaskID().getId() && otherEvent.getTaskAttemptID().getId() == conditionEvent.getTaskAttemptID().getId();
                }
                default: {
                    LOG.info("do nothing with event:" + this.event.getEventType());
                }
            }
            return false;
        }

        public HistoryEventType getEventType() {
            return this.event.getEventType();
        }

        public static enum TIMING {
            PRE,
            POST;

        }
    }

    public static class SimpleRecoveryEventHook
    extends RecoveryServiceHook {
        public static final String SIMPLE_SHUTDOWN_CONDITION = "tez.test.recovery.simple_shutdown_condition";
        private SimpleShutdownCondition shutdownCondition = new SimpleShutdownCondition();

        public SimpleRecoveryEventHook(RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) {
            super(recoveryService, appContext);
            try {
                Preconditions.checkArgument((recoveryService.getConfig().get(SIMPLE_SHUTDOWN_CONDITION) != null ? 1 : 0) != 0, (Object)"tez.test.recovery.simple_shutdown_condition is not set in TezConfiguration");
                this.shutdownCondition.deserialize(recoveryService.getConfig().get(SIMPLE_SHUTDOWN_CONDITION));
            }
            catch (IOException e) {
                throw new TezUncheckedException("Can not initialize SimpleShutdownCondition", (Throwable)e);
            }
        }

        @Override
        public void preHandleRecoveryEvent(DAGHistoryEvent event) throws IOException {
            if (this.shutdownCondition.timing.equals((Object)SimpleShutdownCondition.TIMING.PRE) && this.appContext.getApplicationAttemptId().getAttemptId() == 1 && this.shutdownCondition.match(event.getHistoryEvent())) {
                this.recoveryService.shutdown();
            }
        }

        @Override
        public void postHandleRecoveryEvent(DAGHistoryEvent event) throws IOException {
            if (this.shutdownCondition.timing.equals((Object)SimpleShutdownCondition.TIMING.POST) && this.appContext.getApplicationAttemptId().getAttemptId() == 1 && this.shutdownCondition.match(event.getHistoryEvent())) {
                this.recoveryService.shutdown();
            }
        }

        @Override
        public void preHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
        }

        @Override
        public void postHandleSummaryEvent(HistoryEventType eventType, SummaryEvent summaryEvent) throws IOException {
        }
    }

    public static abstract class RecoveryServiceHook {
        protected RecoveryServiceWithEventHandlingHook recoveryService;
        protected AppContext appContext;

        public RecoveryServiceHook(RecoveryServiceWithEventHandlingHook recoveryService, AppContext appContext) {
            this.recoveryService = recoveryService;
            this.appContext = appContext;
        }

        public abstract void preHandleRecoveryEvent(DAGHistoryEvent var1) throws IOException;

        public abstract void postHandleRecoveryEvent(DAGHistoryEvent var1) throws IOException;

        public abstract void preHandleSummaryEvent(HistoryEventType var1, SummaryEvent var2) throws IOException;

        public abstract void postHandleSummaryEvent(HistoryEventType var1, SummaryEvent var2) throws IOException;
    }
}

