/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.test;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.examples.HashJoinExample;
import org.apache.tez.examples.JoinDataGen;
import org.apache.tez.examples.JoinValidate;
import org.apache.tez.examples.OrderedWordCount;
import org.apache.tez.examples.SimpleSessionExample;
import org.apache.tez.examples.SortMergeJoinExample;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.apache.tez.runtime.library.processor.SleepProcessor;
import org.apache.tez.test.MiniTezCluster;
import org.apache.tez.test.SimpleTestDAG;
import org.apache.tez.test.dag.MultiAttemptDAG;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestTezJobs {
    private static final Log LOG = LogFactory.getLog(TestTezJobs.class);
    protected static MiniTezCluster mrrTezCluster;
    protected static MiniDFSCluster dfsCluster;
    private static Configuration conf;
    private static FileSystem remoteFs;
    private static String TEST_ROOT_DIR;
    private static final String VERTEX_WITH_INITIALIZER_NAME = "VertexWithInitializer";
    private static final String EVENT_GENERATING_VERTEX_NAME = "EventGeneratingVertex";
    private static final String INPUT1_NAME = "Input1";

    @BeforeClass
    public static void setup() throws IOException {
        try {
            conf.set("hdfs.minidfs.basedir", TEST_ROOT_DIR);
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null).build();
            remoteFs = dfsCluster.getFileSystem();
        }
        catch (IOException io) {
            throw new RuntimeException("problem starting mini dfs cluster", io);
        }
        if (mrrTezCluster == null) {
            mrrTezCluster = new MiniTezCluster(TestTezJobs.class.getName(), 1, 1, 1);
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", remoteFs.getUri().toString());
            mrrTezCluster.init(conf);
            mrrTezCluster.start();
        }
    }

    @AfterClass
    public static void tearDown() {
        if (mrrTezCluster != null) {
            mrrTezCluster.stop();
            mrrTezCluster = null;
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
            dfsCluster = null;
        }
    }

    @Test(timeout=60000L)
    public void testHashJoinExample() throws Exception {
        String line;
        HashJoinExample hashJoinExample = new HashJoinExample();
        hashJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
        Path stagingDirPath = new Path("/tmp/tez-staging-dir");
        Path inPath1 = new Path("/tmp/hashJoin/inPath1");
        Path inPath2 = new Path("/tmp/hashJoin/inPath2");
        Path outPath = new Path("/tmp/hashJoin/outPath");
        remoteFs.mkdirs(inPath1);
        remoteFs.mkdirs(inPath2);
        remoteFs.mkdirs(stagingDirPath);
        HashSet<String> expectedResult = new HashSet<String>();
        FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
        FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
        BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter((OutputStream)out1));
        BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter((OutputStream)out2));
        for (int i = 0; i < 20; ++i) {
            String term = "term" + i;
            writer1.write(term);
            writer1.newLine();
            if (i % 2 != 0) continue;
            writer2.write(term);
            writer2.newLine();
            expectedResult.add(term);
        }
        writer1.close();
        writer2.close();
        out1.close();
        out2.close();
        String[] args = new String[]{"-Dtez.staging-dir=" + stagingDirPath.toString(), inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
        Assert.assertEquals((long)0L, (long)hashJoinExample.run(args));
        FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter(){

            public boolean accept(Path p) {
                String name = p.getName();
                return !name.startsWith("_") && !name.startsWith(".");
            }
        });
        Assert.assertEquals((long)1L, (long)statuses.length);
        FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)inStream));
        while ((line = reader.readLine()) != null) {
            Assert.assertTrue((boolean)expectedResult.remove(line));
        }
        reader.close();
        inStream.close();
        Assert.assertEquals((long)0L, (long)expectedResult.size());
    }

    @Test(timeout=60000L)
    public void testSortMergeJoinExample() throws Exception {
        String line;
        SortMergeJoinExample sortMergeJoinExample = new SortMergeJoinExample();
        sortMergeJoinExample.setConf(new Configuration(mrrTezCluster.getConfig()));
        Path stagingDirPath = new Path("/tmp/tez-staging-dir");
        Path inPath1 = new Path("/tmp/sortMerge/inPath1");
        Path inPath2 = new Path("/tmp/sortMerge/inPath2");
        Path outPath = new Path("/tmp/sortMerge/outPath");
        remoteFs.mkdirs(inPath1);
        remoteFs.mkdirs(inPath2);
        remoteFs.mkdirs(stagingDirPath);
        HashSet<String> expectedResult = new HashSet<String>();
        FSDataOutputStream out1 = remoteFs.create(new Path(inPath1, "file"));
        FSDataOutputStream out2 = remoteFs.create(new Path(inPath2, "file"));
        BufferedWriter writer1 = new BufferedWriter(new OutputStreamWriter((OutputStream)out1));
        BufferedWriter writer2 = new BufferedWriter(new OutputStreamWriter((OutputStream)out2));
        for (int i = 0; i < 20; ++i) {
            String term = "term" + i;
            writer1.write(term);
            writer1.newLine();
            if (i % 2 != 0) continue;
            writer2.write(term);
            writer2.newLine();
            expectedResult.add(term);
        }
        writer1.close();
        writer2.close();
        out1.close();
        out2.close();
        String[] args = new String[]{"-Dtez.staging-dir=" + stagingDirPath.toString(), inPath1.toString(), inPath2.toString(), "1", outPath.toString()};
        Assert.assertEquals((long)0L, (long)sortMergeJoinExample.run(args));
        FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter(){

            public boolean accept(Path p) {
                String name = p.getName();
                return !name.startsWith("_") && !name.startsWith(".");
            }
        });
        Assert.assertEquals((long)1L, (long)statuses.length);
        FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
        BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)inStream));
        while ((line = reader.readLine()) != null) {
            Assert.assertTrue((boolean)expectedResult.remove(line));
        }
        reader.close();
        inStream.close();
        Assert.assertEquals((long)0L, (long)expectedResult.size());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testHashJoinExamplePipeline() throws Exception {
        Path testDir = new Path("/tmp/testHashJoinExample");
        Path stagingDirPath = new Path("/tmp/tez-staging-dir");
        remoteFs.mkdirs(stagingDirPath);
        remoteFs.mkdirs(testDir);
        Path dataPath1 = new Path(testDir, "inPath1");
        Path dataPath2 = new Path(testDir, "inPath2");
        Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
        Path outPath = new Path(testDir, "outPath");
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        TezClient tezSession = null;
        try {
            tezSession = TezClient.create((String)"HashJoinExampleSession", (TezConfiguration)tezConf, (boolean)true);
            tezSession.start();
            JoinDataGen dataGen = new JoinDataGen();
            String[] dataGenArgs = new String[]{dataPath1.toString(), "1048576", dataPath2.toString(), "524288", expectedOutputPath.toString(), "2"};
            Assert.assertEquals((long)0L, (long)dataGen.run((Configuration)tezConf, dataGenArgs, tezSession));
            HashJoinExample joinExample = new HashJoinExample();
            String[] args = new String[]{dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()};
            Assert.assertEquals((long)0L, (long)joinExample.run((Configuration)tezConf, args, tezSession));
            JoinValidate joinValidate = new JoinValidate();
            String[] validateArgs = new String[]{expectedOutputPath.toString(), outPath.toString(), "3"};
            Assert.assertEquals((long)0L, (long)joinValidate.run((Configuration)tezConf, validateArgs, tezSession));
        }
        finally {
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=120000L)
    public void testSortMergeJoinExamplePipeline() throws Exception {
        Path testDir = new Path("/tmp/testSortMergeExample");
        Path stagingDirPath = new Path("/tmp/tez-staging-dir");
        remoteFs.mkdirs(stagingDirPath);
        remoteFs.mkdirs(testDir);
        Path dataPath1 = new Path(testDir, "inPath1");
        Path dataPath2 = new Path(testDir, "inPath2");
        Path expectedOutputPath = new Path(testDir, "expectedOutputPath");
        Path outPath = new Path(testDir, "outPath");
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        TezClient tezSession = null;
        try {
            tezSession = TezClient.create((String)"SortMergeExampleSession", (TezConfiguration)tezConf, (boolean)true);
            tezSession.start();
            JoinDataGen dataGen = new JoinDataGen();
            String[] dataGenArgs = new String[]{dataPath1.toString(), "1048576", dataPath2.toString(), "524288", expectedOutputPath.toString(), "2"};
            Assert.assertEquals((long)0L, (long)dataGen.run((Configuration)tezConf, dataGenArgs, tezSession));
            SortMergeJoinExample joinExample = new SortMergeJoinExample();
            String[] args = new String[]{dataPath1.toString(), dataPath2.toString(), "2", outPath.toString()};
            Assert.assertEquals((long)0L, (long)joinExample.run((Configuration)tezConf, args, tezSession));
            JoinValidate joinValidate = new JoinValidate();
            String[] validateArgs = new String[]{expectedOutputPath.toString(), outPath.toString(), "3"};
            Assert.assertEquals((long)0L, (long)joinValidate.run((Configuration)tezConf, validateArgs, tezSession));
        }
        finally {
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void generateOrderedWordCountInput(Path inputDir) throws IOException {
        Path dataPath1 = new Path(inputDir, "inPath1");
        Path dataPath2 = new Path(inputDir, "inPath2");
        FSDataOutputStream f1 = null;
        FSDataOutputStream f2 = null;
        try {
            f1 = remoteFs.create(dataPath1);
            f2 = remoteFs.create(dataPath2);
            String prefix = "a";
            for (int i = 1; i <= 10; ++i) {
                String word = "a_" + i;
                for (int j = 10; j >= i; --j) {
                    LOG.info((Object)("Writing " + word + " to input files"));
                    f1.write(word.getBytes());
                    f1.writeChars("\t");
                    f2.write(word.getBytes());
                    f2.writeChars("\t");
                }
            }
            f1.hsync();
            f2.hsync();
        }
        finally {
            if (f1 != null) {
                f1.close();
            }
            if (f2 != null) {
                f2.close();
            }
        }
    }

    private void verifyOrderedWordCountOutput(Path resultFile) throws IOException {
        String line;
        FSDataInputStream inputStream = remoteFs.open(resultFile);
        String prefix = "a";
        int currentCounter = 10;
        byte[] buffer = new byte[4096];
        int bytesRead = inputStream.read(buffer, 0, 4096);
        BufferedReader reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer, 0, bytesRead)));
        while ((line = reader.readLine()) != null) {
            LOG.info((Object)("Line: " + line + ", counter=" + currentCounter));
            int pos = line.indexOf("\t");
            String word = line.substring(0, pos - 1);
            Assert.assertEquals((Object)("a_" + currentCounter), (Object)word);
            String val = line.substring(pos + 1, line.length());
            Assert.assertEquals((long)((long)(11 - currentCounter) * 2L), (long)Long.valueOf(val));
            --currentCounter;
        }
        Assert.assertEquals((long)0L, (long)currentCounter);
    }

    private void verifyOutput(Path outputDir) throws IOException {
        FileStatus[] fileStatuses = remoteFs.listStatus(outputDir);
        Path resultFile = null;
        boolean foundResult = false;
        boolean foundSuccessFile = false;
        for (FileStatus fileStatus : fileStatuses) {
            if (!fileStatus.isFile()) continue;
            if (fileStatus.getPath().getName().equals("_SUCCESS")) {
                foundSuccessFile = true;
                continue;
            }
            if (!fileStatus.getPath().getName().startsWith("part-")) continue;
            if (foundResult) {
                Assert.fail((String)("Found 2 part files instead of 1, paths=" + resultFile + "," + fileStatus.getPath()));
            }
            foundResult = true;
            resultFile = fileStatus.getPath();
            LOG.info((Object)("Found output at " + resultFile));
        }
        Assert.assertTrue((boolean)foundResult);
        Assert.assertTrue((resultFile != null ? 1 : 0) != 0);
        Assert.assertTrue((boolean)foundSuccessFile);
        this.verifyOrderedWordCountOutput(resultFile);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testOrderedWordCount() throws Exception {
        String inputDirStr = "/tmp/owc-input/";
        Path inputDir = new Path(inputDirStr);
        Path stagingDirPath = new Path("/tmp/owc-staging-dir");
        remoteFs.mkdirs(inputDir);
        remoteFs.mkdirs(stagingDirPath);
        this.generateOrderedWordCountInput(inputDir);
        String outputDirStr = "/tmp/owc-output/";
        Path outputDir = new Path(outputDirStr);
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        Object tezSession = null;
        try {
            OrderedWordCount job = new OrderedWordCount();
            Assert.assertTrue((String)"OrderedWordCount failed", (boolean)job.run(inputDirStr, outputDirStr, (Configuration)tezConf, 2));
            this.verifyOutput(outputDir);
        }
        finally {
            remoteFs.delete(stagingDirPath, true);
            if (tezSession != null) {
                tezSession.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testSimpleSessionExample() throws Exception {
        Path stagingDirPath = new Path("/tmp/owc-staging-dir");
        remoteFs.mkdirs(stagingDirPath);
        int numIterations = 2;
        String[] inputPaths = new String[numIterations];
        String[] outputPaths = new String[numIterations];
        Path[] outputDirs = new Path[numIterations];
        for (int i = 0; i < numIterations; ++i) {
            Path outputDir;
            String outputDirStr;
            String inputDirStr;
            inputPaths[i] = inputDirStr = "/tmp/owc-input-" + i + "/";
            Path inputDir = new Path(inputDirStr);
            remoteFs.mkdirs(inputDir);
            this.generateOrderedWordCountInput(inputDir);
            outputPaths[i] = outputDirStr = "/tmp/owc-output-" + i + "/";
            outputDirs[i] = outputDir = new Path(outputDirStr);
        }
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        tezConf.set("tez.staging-dir", stagingDirPath.toString());
        YarnClient yarnClient = YarnClient.createYarnClient();
        try {
            yarnClient.init(mrrTezCluster.getConfig());
            yarnClient.start();
            List apps = yarnClient.getApplications();
            int appsBeforeCount = apps != null ? apps.size() : 0;
            SimpleSessionExample job = new SimpleSessionExample();
            Assert.assertTrue((String)"SimpleSessionExample failed", (boolean)job.run(inputPaths, outputPaths, (Configuration)tezConf, 2));
            for (int i = 0; i < numIterations; ++i) {
                this.verifyOutput(outputDirs[i]);
            }
            apps = yarnClient.getApplications();
            int appsAfterCount = apps != null ? apps.size() : 0;
            Assert.assertEquals((long)(appsBeforeCount + 1), (long)appsAfterCount);
        }
        finally {
            remoteFs.delete(stagingDirPath, true);
            if (yarnClient != null) {
                yarnClient.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testVertexOrder() throws Exception {
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        TezClient tezClient = TezClient.create((String)"TestVertexOrder", (TezConfiguration)tezConf);
        tezClient.start();
        try {
            DAG dag = SimpleTestDAG.createDAGForVertexOrder("dag1", conf);
            DAGClient dagClient = tezClient.submitDAG(dag);
            DAGStatus dagStatus = dagClient.getDAGStatus(null);
            while (!dagStatus.isCompleted()) {
                LOG.info((Object)("Waiting for dag to complete. Sleeping for 500ms. DAG name: " + dag.getName() + " DAG context: " + dagClient.getExecutionContext() + " Current state: " + dagStatus.getState()));
                Thread.sleep(100L);
                dagStatus = dagClient.getDAGStatus(null);
            }
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagStatus.getState());
            Set resultVertices = dagStatus.getVertexProgress().keySet();
            Assert.assertEquals((long)6L, (long)resultVertices.size());
            int i = 0;
            for (String vertexName : resultVertices) {
                if (i <= 1) {
                    Assert.assertTrue((vertexName.equals("v1") || vertexName.equals("v2") ? 1 : 0) != 0);
                } else if (i == 2) {
                    Assert.assertTrue((boolean)vertexName.equals("v3"));
                } else if (i <= 4) {
                    Assert.assertTrue((vertexName.equals("v4") || vertexName.equals("v5") ? 1 : 0) != 0);
                } else {
                    Assert.assertTrue((boolean)vertexName.equals("v6"));
                }
                ++i;
            }
        }
        finally {
            if (tezClient != null) {
                tezClient.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=60000L)
    public void testInputInitializerEvents() throws TezException, InterruptedException, IOException {
        TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
        TezClient tezClient = TezClient.create((String)"TestInputInitializerEvents", (TezConfiguration)tezConf);
        tezClient.start();
        try {
            DAG dag = DAG.create((String)"TestInputInitializerEvents");
            Vertex vertex1 = Vertex.create((String)VERTEX_WITH_INITIALIZER_NAME, (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)SleepProcessor.class.getName()).setUserPayload(new SleepProcessor.SleepProcessorConfig(1).toUserPayload())), (int)1).addDataSource(INPUT1_NAME, DataSourceDescriptor.create((InputDescriptor)InputDescriptor.create((String)MultiAttemptDAG.NoOpInput.class.getName()), (InputInitializerDescriptor)InputInitializerDescriptor.create((String)InputInitializerForTest.class.getName()), null));
            Vertex vertex2 = Vertex.create((String)EVENT_GENERATING_VERTEX_NAME, (ProcessorDescriptor)ProcessorDescriptor.create((String)InputInitializerEventGeneratingProcessor.class.getName()), (int)5);
            dag.addVertex(vertex1).addVertex(vertex2);
            DAGClient dagClient = tezClient.submitDAG(dag);
            dagClient.waitForCompletion();
            Assert.assertEquals((Object)DAGStatus.State.SUCCEEDED, (Object)dagClient.getDAGStatus(null).getState());
        }
        finally {
            tezClient.stop();
        }
    }

    static {
        conf = new Configuration();
        TEST_ROOT_DIR = "target/" + TestTezJobs.class.getName() + "-tmpDir";
    }

    public static class InputInitializerForTest
    extends InputInitializer {
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition condition = this.lock.newCondition();
        private final BitSet eventsSeen = new BitSet();

        public InputInitializerForTest(InputInitializerContext initializerContext) {
            super(initializerContext);
            this.getContext().registerForVertexStateUpdates(TestTezJobs.EVENT_GENERATING_VERTEX_NAME, EnumSet.of(VertexState.SUCCEEDED));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<Event> initialize() throws Exception {
            this.lock.lock();
            try {
                this.condition.await();
            }
            finally {
                this.lock.unlock();
            }
            return null;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
            this.lock.lock();
            try {
                for (InputInitializerEvent event : events) {
                    Preconditions.checkArgument((boolean)event.getSourceVertexName().equals(TestTezJobs.EVENT_GENERATING_VERTEX_NAME));
                    int index = event.getUserPayload().getInt(0);
                    Preconditions.checkState((!this.eventsSeen.get(index) ? 1 : 0) != 0);
                    this.eventsSeen.set(index);
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
            block4: {
                this.lock.lock();
                try {
                    Preconditions.checkArgument((stateUpdate.getVertexState() == VertexState.SUCCEEDED ? 1 : 0) != 0);
                    if (this.eventsSeen.cardinality() == this.getContext().getVertexNumTasks(TestTezJobs.EVENT_GENERATING_VERTEX_NAME)) {
                        this.condition.signal();
                        break block4;
                    }
                    throw new IllegalStateException("Received VertexState SUCCEEDED before receiving all InputInitializerEvents");
                }
                finally {
                    this.lock.unlock();
                }
            }
        }
    }

    public static class InputInitializerEventGeneratingProcessor
    extends SimpleProcessor {
        public InputInitializerEventGeneratingProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            if (this.getContext().getTaskIndex() == 1 && this.getContext().getTaskAttemptNumber() == 0) {
                throw new IOException("Failing task 2, attempt 0");
            }
            InputInitializerEvent initializerEvent = InputInitializerEvent.create((String)TestTezJobs.VERTEX_WITH_INITIALIZER_NAME, (String)TestTezJobs.INPUT1_NAME, (ByteBuffer)ByteBuffer.allocate(4).putInt(0, this.getContext().getTaskIndex()));
            ArrayList events = Lists.newArrayList();
            events.add(initializerEvent);
            this.getContext().sendEvents((List)events);
        }
    }
}

