package org.apache.tez.test;

import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.RecoveryParser;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.OrderedWordCount;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.test.RecoveryServiceWithEventHandlingHook;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/test/TestRecovery.class */
public class TestRecovery {
    private static final Logger LOG = LoggerFactory.getLogger(TestRecovery.class);
    private static Configuration conf = new Configuration(false);
    private static MiniTezCluster miniTezCluster = null;
    private static String TEST_ROOT_DIR = "target/" + TestRecovery.class.getName() + "-tmpDir";
    private static MiniDFSCluster dfsCluster = null;
    private static FileSystem remoteFs = null;

    @BeforeClass
    public static void beforeClass() throws Exception {
        LOG.info("Starting mini clusters");
        try {
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            conf.set("fs.AbstractFileSystem.hdfs.impl", "org.apache.hadoop.fs.Hdfs");
            conf.set("fs.AbstractFileSystem.file.impl", "org.apache.hadoop.fs.local.LocalFs");
            conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true).racks((String[]) null).build();
            remoteFs = dfsCluster.getFileSystem();
            if (miniTezCluster == null) {
                miniTezCluster = new MiniTezCluster(TestRecovery.class.getName(), 1, 1, 1);
                Configuration configuration = new Configuration(conf);
                configuration.setInt("yarn.resourcemanager.am.max-attempts", 4);
                configuration.set("fs.defaultFS", remoteFs.getUri().toString());
                configuration.setLong("tez.am.sleep.time.before.exit.millis", 500L);
                miniTezCluster.init(configuration);
                miniTezCluster.start();
            }
        } catch (IOException e) {
            throw new RuntimeException("problem starting mini dfs cluster", e);
        }
    }

    @AfterClass
    public static void afterClass() throws InterruptedException {
        if (miniTezCluster != null) {
            try {
                LOG.info("Stopping MiniTezCluster");
                miniTezCluster.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        if (dfsCluster != null) {
            try {
                LOG.info("Stopping DFSCluster");
                dfsCluster.shutdown();
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }

    @Test(timeout = 1800000)
    public void testRecovery_OrderedWordCount() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(System.currentTimeMillis(), 1);
        TezDAGID tezDAGID = TezDAGID.getInstance(newInstance, 1);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 0);
        TezVertexID tezVertexID2 = TezVertexID.getInstance(tezDAGID, 1);
        TezVertexID tezVertexID3 = TezVertexID.getInstance(tezDAGID, 2);
        ContainerId newInstance2 = ContainerId.newInstance(ApplicationAttemptId.newInstance(newInstance, 1), 1);
        NodeId newInstance3 = NodeId.newInstance("localhost", 10);
        ArrayList newArrayList = Lists.newArrayList(new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition[]{new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new DAGInitializedEvent(tezDAGID, 0L, "username", "dagName", (Map) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new DAGStartedEvent(tezDAGID, 0L, "username", "dagName")), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new DAGFinishedEvent(tezDAGID, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(), "username", "dagName", new HashMap(), ApplicationAttemptId.newInstance(newInstance, 1), (DAGProtos.DAGPlan) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexInitializedEvent(tezVertexID, "Tokenizer", 0L, 0L, 0, "", (Map) null, Lists.newArrayList(new TezEvent[]{new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), (EventMetaData) null)}), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexInitializedEvent(tezVertexID2, "Summation", 0L, 0L, 0, "", (Map) null, (List) null, (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexInitializedEvent(tezVertexID3, "Sorter", 0L, 0L, 0, "", (Map) null, (List) null, (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexConfigurationDoneEvent(tezVertexID, 0L, 2, (VertexLocationHint) null, (Map) null, (Map) null, true)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexConfigurationDoneEvent(tezVertexID2, 0L, 2, (VertexLocationHint) null, (Map) null, (Map) null, true)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexConfigurationDoneEvent(tezVertexID3, 0L, 2, (VertexLocationHint) null, (Map) null, (Map) null, true)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexStartedEvent(tezVertexID, 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexStartedEvent(tezVertexID2, 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexStartedEvent(tezVertexID3, 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexFinishedEvent(tezVertexID, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), new HashMap(), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexFinishedEvent(tezVertexID2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), new HashMap(), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexFinishedEvent(tezVertexID3, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), new HashMap(), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskStartedEvent(TezTaskID.getInstance(tezVertexID, 0), "vertexName", 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskStartedEvent(TezTaskID.getInstance(tezVertexID2, 0), "vertexName", 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskStartedEvent(TezTaskID.getInstance(tezVertexID3, 0), "vertexName", 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskFinishedEvent(TezTaskID.getInstance(tezVertexID, 0), "vertexName", 0L, 0L, (TezTaskAttemptID) null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskFinishedEvent(TezTaskID.getInstance(tezVertexID2, 0), "vertexName", 0L, 0L, (TezTaskAttemptID) null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskFinishedEvent(TezTaskID.getInstance(tezVertexID3, 0), "vertexName", 0L, 0L, (TezTaskAttemptID) null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0), "vertexName", 0L, newInstance2, newInstance3, "", "", "")), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID2, 0), 0), "vertexName", 0L, newInstance2, newInstance3, "", "", "")), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID3, 0), 0), "vertexName", 0L, newInstance2, newInstance3, "", "", ""))});
        Random random = new Random();
        for (int i = 0; i < newArrayList.size(); i++) {
            if (random.nextDouble() < 0.5d) {
                testOrderedWordCount((RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition) newArrayList.get(i), true, ((RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition) newArrayList.get(i)).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED);
            }
        }
    }

    private void testOrderedWordCount(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition simpleShutdownCondition, boolean z, boolean z2) throws Exception {
        LOG.info("shutdownCondition:" + simpleShutdownCondition.getEventType() + ", event=" + simpleShutdownCondition.getEvent());
        Path path = new Path("/tmp/owc-input/");
        Path path2 = new Path("/tmp/owc-staging-dir");
        remoteFs.mkdirs(path);
        remoteFs.mkdirs(path2);
        TestTezJobs.generateOrderedWordCountInput(path, remoteFs);
        Path path3 = new Path("/tmp/owc-output/");
        TezConfiguration tezConfiguration = new TezConfiguration(miniTezCluster.getConfig());
        tezConfiguration.setInt("tez.am.max.app.attempts", 4);
        tezConfiguration.set("tez.test.recovery-service-class", RecoveryServiceWithEventHandlingHook.class.getName());
        tezConfiguration.set(RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS, RecoveryServiceWithEventHandlingHook.SimpleRecoveryEventHook.class.getName());
        tezConfiguration.set(RecoveryServiceWithEventHandlingHook.SimpleRecoveryEventHook.SIMPLE_SHUTDOWN_CONDITION, simpleShutdownCondition.serialize());
        tezConfiguration.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", z);
        tezConfiguration.setBoolean("tez.test.recovery.drain_event", false);
        tezConfiguration.set("tez.staging-dir", path2.toString());
        tezConfiguration.setBoolean("tez.am.staging.scratch-data.auto-delete", false);
        tezConfiguration.set("tez.am.log.level", "INFO;org.apache.tez=DEBUG");
        OrderedWordCount orderedWordCount = new OrderedWordCount();
        if (z2) {
            Assert.assertTrue("OrderedWordCount failed", orderedWordCount.run(tezConfiguration, new String[]{"-generateSplitInClient", "/tmp/owc-input/", "/tmp/owc-output/", "5"}, (TezClient) null) == 0);
        } else {
            Assert.assertTrue("OrderedWordCount failed", orderedWordCount.run(tezConfiguration, new String[]{"/tmp/owc-input/", "/tmp/owc-output/", "5"}, (TezClient) null) == 0);
        }
        TestTezJobs.verifyOutput(path3, remoteFs);
        List readRecoveryEvents = RecoveryParser.readRecoveryEvents(tezConfiguration, orderedWordCount.getAppId(), 1);
        HistoryEvent historyEvent = (HistoryEvent) readRecoveryEvents.get(readRecoveryEvents.size() - 1);
        Assert.assertEquals(simpleShutdownCondition.getEvent().getEventType(), historyEvent.getEventType());
        Assert.assertTrue(simpleShutdownCondition.match(historyEvent));
    }

    private void testOrderedWordCountMultipleRoundRecoverying(RecoveryServiceWithEventHandlingHook.MultipleRoundShutdownCondition multipleRoundShutdownCondition, boolean z, boolean z2) throws Exception {
        for (int i = 0; i < multipleRoundShutdownCondition.size(); i++) {
            RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition simpleShutdownCondition = multipleRoundShutdownCondition.getSimpleShutdownCondition(i);
            LOG.info("ShutdownCondition:" + simpleShutdownCondition.getEventType() + ", event=" + simpleShutdownCondition.getEvent());
        }
        Path path = new Path("/tmp/owc-input/");
        Path path2 = new Path("/tmp/owc-staging-dir");
        remoteFs.mkdirs(path);
        remoteFs.mkdirs(path2);
        TestTezJobs.generateOrderedWordCountInput(path, remoteFs);
        Path path3 = new Path("/tmp/owc-output/");
        TezConfiguration tezConfiguration = new TezConfiguration(miniTezCluster.getConfig());
        tezConfiguration.setInt("tez.am.max.app.attempts", 4);
        tezConfiguration.set("tez.test.recovery-service-class", RecoveryServiceWithEventHandlingHook.class.getName());
        tezConfiguration.set(RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS, RecoveryServiceWithEventHandlingHook.MultipleRoundRecoveryEventHook.class.getName());
        tezConfiguration.set(RecoveryServiceWithEventHandlingHook.MultipleRoundRecoveryEventHook.MULTIPLE_ROUND_SHUTDOWN_CONDITION, multipleRoundShutdownCondition.serialize());
        tezConfiguration.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", z);
        tezConfiguration.setBoolean("tez.test.recovery.drain_event", false);
        tezConfiguration.set("tez.staging-dir", path2.toString());
        tezConfiguration.setBoolean("tez.am.staging.scratch-data.auto-delete", false);
        OrderedWordCount orderedWordCount = new OrderedWordCount();
        if (z2) {
            Assert.assertTrue("OrderedWordCount failed", orderedWordCount.run(tezConfiguration, new String[]{"-generateSplitInClient", "/tmp/owc-input/", "/tmp/owc-output/", "5"}, (TezClient) null) == 0);
        } else {
            Assert.assertTrue("OrderedWordCount failed", orderedWordCount.run(tezConfiguration, new String[]{"/tmp/owc-input/", "/tmp/owc-output/", "5"}, (TezClient) null) == 0);
        }
        TestTezJobs.verifyOutput(path3, remoteFs);
    }

    @Test(timeout = 1800000)
    public void testRecovery_HashJoin() throws Exception {
        ApplicationId newInstance = ApplicationId.newInstance(System.currentTimeMillis(), 1);
        TezDAGID tezDAGID = TezDAGID.getInstance(newInstance, 1);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 0);
        TezVertexID tezVertexID2 = TezVertexID.getInstance(tezDAGID, 1);
        TezVertexID tezVertexID3 = TezVertexID.getInstance(tezDAGID, 2);
        ContainerId newInstance2 = ContainerId.newInstance(ApplicationAttemptId.newInstance(newInstance, 1), 1);
        NodeId newInstance3 = NodeId.newInstance("localhost", 10);
        ArrayList newArrayList = Lists.newArrayList(new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition[]{new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new DAGInitializedEvent(tezDAGID, 0L, "username", "dagName", (Map) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new DAGStartedEvent(tezDAGID, 0L, "username", "dagName")), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new DAGFinishedEvent(tezDAGID, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(), "username", "dagName", new HashMap(), ApplicationAttemptId.newInstance(newInstance, 1), (DAGProtos.DAGPlan) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexInitializedEvent(tezVertexID, "hashSide", 0L, 0L, 0, "", (Map) null, Lists.newArrayList(new TezEvent[]{new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), (EventMetaData) null)}), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexInitializedEvent(tezVertexID2, "streamingSide", 0L, 0L, 0, "", (Map) null, (List) null, (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexInitializedEvent(tezVertexID3, "joiner", 0L, 0L, 0, "", (Map) null, (List) null, (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexStartedEvent(tezVertexID, 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexStartedEvent(tezVertexID2, 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexStartedEvent(tezVertexID3, 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexConfigurationDoneEvent(tezVertexID, 0L, 2, (VertexLocationHint) null, (Map) null, (Map) null, true)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexConfigurationDoneEvent(tezVertexID2, 0L, 2, (VertexLocationHint) null, (Map) null, (Map) null, true)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexConfigurationDoneEvent(tezVertexID3, 0L, 2, (VertexLocationHint) null, (Map) null, (Map) null, true)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexFinishedEvent(tezVertexID, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), new HashMap(), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexFinishedEvent(tezVertexID2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), new HashMap(), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexFinishedEvent(tezVertexID3, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), new HashMap(), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskStartedEvent(TezTaskID.getInstance(tezVertexID, 0), "vertexName", 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskStartedEvent(TezTaskID.getInstance(tezVertexID2, 0), "vertexName", 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskStartedEvent(TezTaskID.getInstance(tezVertexID3, 0), "vertexName", 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskFinishedEvent(TezTaskID.getInstance(tezVertexID, 0), "vertexName", 0L, 0L, (TezTaskAttemptID) null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskFinishedEvent(TezTaskID.getInstance(tezVertexID2, 0), "vertexName", 0L, 0L, (TezTaskAttemptID) null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskFinishedEvent(TezTaskID.getInstance(tezVertexID3, 0), "vertexName", 0L, 0L, (TezTaskAttemptID) null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0), "vertexName", 0L, newInstance2, newInstance3, "", "", "")), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID2, 0), 0), "vertexName", 0L, newInstance2, newInstance3, "", "", "")), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID3, 0), 0), "vertexName", 0L, newInstance2, newInstance3, "", "", ""))});
        Random random = new Random();
        for (int i = 0; i < newArrayList.size(); i++) {
            if (random.nextDouble() < 0.5d) {
                testHashJoinExample((RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition) newArrayList.get(i), true, ((RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition) newArrayList.get(i)).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED);
            }
        }
    }

    private void testHashJoinExample(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition simpleShutdownCondition, boolean z, boolean z2) throws Exception {
        HashJoinExample hashJoinExample = new HashJoinExample();
        TezConfiguration tezConfiguration = new TezConfiguration(miniTezCluster.getConfig());
        tezConfiguration.setInt("tez.am.max.app.attempts", 4);
        tezConfiguration.set("tez.test.recovery-service-class", RecoveryServiceWithEventHandlingHook.class.getName());
        tezConfiguration.set(RecoveryServiceWithEventHandlingHook.AM_RECOVERY_SERVICE_HOOK_CLASS, RecoveryServiceWithEventHandlingHook.SimpleRecoveryEventHook.class.getName());
        tezConfiguration.set(RecoveryServiceWithEventHandlingHook.SimpleRecoveryEventHook.SIMPLE_SHUTDOWN_CONDITION, simpleShutdownCondition.serialize());
        tezConfiguration.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", z);
        tezConfiguration.setBoolean("tez.test.recovery.drain_event", false);
        tezConfiguration.setBoolean("tez.am.staging.scratch-data.auto-delete", false);
        tezConfiguration.setInt("ipc.client.connect.max.retries", 0);
        tezConfiguration.setInt("ipc.client.connect.max.retries.on.timeouts", 0);
        tezConfiguration.setInt("ipc.client.connect.timeout", 1000);
        tezConfiguration.set("tez.am.log.level", "INFO;org.apache.tez=DEBUG");
        hashJoinExample.setConf(tezConfiguration);
        Path path = new Path("/tmp/tez-staging-dir");
        Path path2 = new Path("/tmp/hashJoin/inPath1");
        Path path3 = new Path("/tmp/hashJoin/inPath2");
        Path path4 = new Path("/tmp/hashJoin/outPath");
        remoteFs.delete(path4, true);
        remoteFs.mkdirs(path2);
        remoteFs.mkdirs(path3);
        remoteFs.mkdirs(path);
        HashSet hashSet = new HashSet();
        FSDataOutputStream create = remoteFs.create(new Path(path2, "file"));
        FSDataOutputStream create2 = remoteFs.create(new Path(path3, "file"));
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(create));
        BufferedWriter bufferedWriter2 = new BufferedWriter(new OutputStreamWriter(create2));
        for (int i = 0; i < 20; i++) {
            String str = "term" + i;
            bufferedWriter.write(str);
            bufferedWriter.newLine();
            if (i % 2 == 0) {
                bufferedWriter2.write(str);
                bufferedWriter2.newLine();
                hashSet.add(str);
            }
        }
        bufferedWriter.close();
        bufferedWriter2.close();
        create.close();
        create2.close();
        Assert.assertEquals(0L, hashJoinExample.run(z2 ? new String[]{"-Dtez.staging-dir=" + path.toString(), "-generateSplitInClient", path2.toString(), path3.toString(), "1", path4.toString()} : new String[]{"-Dtez.staging-dir=" + path.toString(), path2.toString(), path3.toString(), "1", path4.toString()}));
        FileStatus[] listStatus = remoteFs.listStatus(path4, new PathFilter() { // from class: org.apache.tez.test.TestRecovery.1
            public boolean accept(Path path5) {
                String name = path5.getName();
                return (name.startsWith("_") || name.startsWith(".")) ? false : true;
            }
        });
        Assert.assertEquals(1L, listStatus.length);
        FSDataInputStream open = remoteFs.open(listStatus[0].getPath());
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open));
        while (true) {
            String readLine = bufferedReader.readLine();
            if (readLine == null) {
                bufferedReader.close();
                open.close();
                Assert.assertEquals(0L, hashSet.size());
                List readRecoveryEvents = RecoveryParser.readRecoveryEvents(tezConfiguration, hashJoinExample.getAppId(), 1);
                HistoryEvent historyEvent = (HistoryEvent) readRecoveryEvents.get(readRecoveryEvents.size() - 1);
                Assert.assertEquals(simpleShutdownCondition.getEvent().getEventType(), historyEvent.getEventType());
                Assert.assertTrue(simpleShutdownCondition.match(historyEvent));
                return;
            }
            Assert.assertTrue(hashSet.remove(readLine));
        }
    }

    @Test(timeout = 1800000)
    public void testTwoRoundsRecoverying() throws Exception {
        int nextInt;
        ApplicationId newInstance = ApplicationId.newInstance(System.currentTimeMillis(), 1);
        TezDAGID tezDAGID = TezDAGID.getInstance(newInstance, 1);
        TezVertexID tezVertexID = TezVertexID.getInstance(tezDAGID, 0);
        ArrayList newArrayList = Lists.newArrayList(new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition[]{new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new DAGInitializedEvent(tezDAGID, 0L, "username", "dagName", (Map) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new DAGStartedEvent(tezDAGID, 0L, "username", "dagName")), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexInitializedEvent(tezVertexID, "Tokenizer", 0L, 0L, 0, "", (Map) null, Lists.newArrayList(new TezEvent[]{new TezEvent(InputDataInformationEvent.createWithObjectPayload(0, new Object()), (EventMetaData) null)}), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexStartedEvent(tezVertexID, 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexConfigurationDoneEvent(tezVertexID, 0L, 2, (VertexLocationHint) null, (Map) null, (Map) null, true)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskStartedEvent(TezTaskID.getInstance(tezVertexID, 0), "vertexName", 0L, 0L)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskAttemptStartedEvent(TezTaskAttemptID.getInstance(TezTaskID.getInstance(tezVertexID, 0), 0), "vertexName", 0L, ContainerId.newInstance(ApplicationAttemptId.newInstance(newInstance, 1), 1), NodeId.newInstance("localhost", 10), "", "", "")), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new TaskFinishedEvent(TezTaskID.getInstance(tezVertexID, 0), "vertexName", 0L, 0L, (TezTaskAttemptID) null, TaskState.SUCCEEDED, "", new TezCounters(), 0)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexFinishedEvent(tezVertexID, "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), new HashMap(), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexFinishedEvent(TezVertexID.getInstance(tezDAGID, 1), "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), new HashMap(), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new VertexFinishedEvent(TezVertexID.getInstance(tezDAGID, 2), "vertexName", 1, 0L, 0L, 0L, 0L, 0L, VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), new HashMap(), (ServicePluginInfo) null)), new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition.TIMING.POST, new DAGFinishedEvent(tezDAGID, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(), "username", "dagName", new HashMap(), ApplicationAttemptId.newInstance(newInstance, 1), (DAGProtos.DAGPlan) null))});
        Random random = new Random();
        for (int i = 0; i < newArrayList.size() - 1; i++) {
            if (random.nextDouble() < 0.5d && (nextInt = i + 1 + random.nextInt((newArrayList.size() - i) - 1)) == newArrayList.size() - 1) {
                testOrderedWordCountMultipleRoundRecoverying(new RecoveryServiceWithEventHandlingHook.MultipleRoundShutdownCondition(Lists.newArrayList(new RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition[]{(RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition) newArrayList.get(i), (RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition) newArrayList.get(nextInt)})), true, ((RecoveryServiceWithEventHandlingHook.SimpleShutdownCondition) newArrayList.get(i)).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED);
            }
        }
    }
}
