/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.history.recovery;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestRecoveryService {
    private static final String TEST_ROOT_DIR = "target/" + TestRecoveryService.class.getName() + "-tmpDir";
    private static final long startTime = System.currentTimeMillis();
    private static final ApplicationId appId = ApplicationId.newInstance((long)startTime, (int)1);
    private static final ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)appId, (int)1);
    private static final TezDAGID dagId = TezDAGID.getInstance((ApplicationId)appId, (int)1);
    private static final TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)dagId, (int)1);
    private static final TezTaskID tezTaskId = TezTaskID.getInstance((TezVertexID)vertexId, (int)1);
    private Configuration conf;
    private AppContext appContext;
    private MockRecoveryService recoveryService;
    private Path dagRecoveryPath;
    private Path summaryPath;
    private FileSystem fs;
    private FSDataOutputStream dagFos;
    private FSDataOutputStream summaryFos;

    private void setup(boolean useMockFs, String[][] configs) throws Exception {
        this.conf = new Configuration();
        this.conf.set("fs.defaultFS", "file:///");
        if (configs != null) {
            for (String[] config : configs) {
                this.conf.set(config[0], config[1]);
            }
        }
        this.appContext = (AppContext)Mockito.mock(AppContext.class);
        Mockito.when((Object)this.appContext.getClock()).thenReturn((Object)new SystemClock());
        Mockito.when((Object)this.appContext.getHadoopShim()).thenReturn((Object)new DefaultHadoopShim());
        Mockito.when((Object)this.appContext.getApplicationID()).thenReturn((Object)appId);
        if (useMockFs) {
            this.fs = (FileSystem)Mockito.mock(FileSystem.class);
            Mockito.when((Object)this.appContext.getCurrentRecoveryDir()).thenReturn((Object)new Path("mockfs:///"));
            this.conf.set("fs.mockfs.impl", MockFileSystem.class.getName());
            MockFileSystem.delegate = this.fs;
            this.dagFos = (FSDataOutputStream)Mockito.spy((Object)new FSDataOutputStream(new OutputStream(){

                @Override
                public void write(int b) throws IOException {
                }
            }, null));
            this.summaryFos = (FSDataOutputStream)Mockito.spy((Object)new FSDataOutputStream(new OutputStream(){

                @Override
                public void write(int b) throws IOException {
                }
            }, null));
        } else {
            Mockito.when((Object)this.appContext.getCurrentRecoveryDir()).thenReturn((Object)new Path(TEST_ROOT_DIR));
            this.fs = FileSystem.getLocal((Configuration)this.conf);
            this.fs.delete(new Path(TEST_ROOT_DIR), true);
        }
        this.recoveryService = new MockRecoveryService(this.appContext);
        this.conf.setBoolean("tez.test.recovery.drain_event", true);
        this.recoveryService.init(this.conf);
        this.summaryPath = TezCommonUtils.getSummaryRecoveryPath((Path)this.recoveryService.recoveryPath);
        this.dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath((Path)this.recoveryService.recoveryPath, (String)dagId.toString());
        if (useMockFs) {
            Mockito.when((Object)this.fs.create((Path)Mockito.eq((Object)this.dagRecoveryPath), Mockito.eq((boolean)false), Mockito.anyInt())).thenReturn((Object)this.dagFos);
            Mockito.when((Object)this.fs.create((Path)Mockito.eq((Object)this.summaryPath), Mockito.eq((boolean)false), Mockito.anyInt())).thenReturn((Object)this.summaryFos);
        }
    }

    @Test(timeout=5000L)
    public void testDrainEvents() throws Exception {
        this.setup(false, null);
        this.recoveryService.start();
        int randEventCount = new Random().nextInt(100) + 100;
        for (int i = 0; i < randEventCount; ++i) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
        }
        this.recoveryService.stop();
        Assert.assertEquals((long)randEventCount, (long)this.recoveryService.processedRecoveryEventCounter.get());
    }

    @Test(timeout=5000L)
    public void testMultipleDAGFinishedEvent() throws Exception {
        this.setup(false, null);
        this.recoveryService.start();
        int randEventCount = new Random().nextInt(100) + 100;
        for (int i = 0; i < randEventCount; ++i) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
        }
        this.recoveryService.await();
        Assert.assertTrue((boolean)this.recoveryService.outputStreamMap.containsKey(dagId));
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGFinishedEvent(dagId, 1L, 2L, DAGState.FAILED, "diag", null, "user", "dag1", null, appAttemptId, null)));
        Assert.assertFalse((boolean)this.recoveryService.outputStreamMap.containsKey(dagId));
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null, appAttemptId, null)));
        Assert.assertEquals((long)this.recoveryService.outputStreamMap.size(), (long)0L);
        Assert.assertFalse((boolean)this.recoveryService.outputStreamMap.containsKey(dagId));
        this.recoveryService.stop();
    }

    @Test(timeout=5000L)
    public void testSummaryPathExisted() throws Exception {
        this.setup(false, null);
        this.recoveryService.start();
        this.touchFile(this.summaryPath);
        Assert.assertFalse((boolean)this.recoveryService.hasRecoveryFailed());
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null, appAttemptId, null)));
        Assert.assertTrue((boolean)this.recoveryService.hasRecoveryFailed());
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", null, "user", "dag1", null, appAttemptId, null)));
        this.recoveryService.stop();
    }

    @Test(timeout=5000L)
    public void testRecoveryPathExisted() throws Exception {
        this.setup(false, null);
        this.recoveryService.start();
        this.touchFile(this.dagRecoveryPath);
        Assert.assertFalse((boolean)this.recoveryService.hasRecoveryFailed());
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
        this.recoveryService.await();
        Assert.assertTrue((boolean)this.recoveryService.hasRecoveryFailed());
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
        this.recoveryService.stop();
    }

    @Test(timeout=5000L)
    public void testRecoveryFlushOnMaxEvents() throws Exception {
        this.setup(true, new String[][]{{"tez.dag.recovery.max.unflushed.events", "10"}, {"tez.dag.recovery.flush.interval.secs", "-1"}});
        this.recoveryService.start();
        for (int i = 0; i < 9; ++i) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        }
        this.waitForDrain(-1);
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)0))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        this.waitForDrain(-1);
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)1))).hflush();
        this.recoveryService.stop();
    }

    @Test(timeout=10000L)
    public void testRecoveryFlushOnTimeoutEvents() throws Exception {
        this.setup(true, new String[][]{{"tez.dag.recovery.max.unflushed.events", "-1"}, {"tez.dag.recovery.flush.interval.secs", "5"}});
        this.recoveryService.start();
        for (int i = 0; i < 100; ++i) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        }
        Thread.sleep(5000L);
        Assert.assertTrue((boolean)this.recoveryService.eventQueue.isEmpty());
        ((FileSystem)Mockito.verify((Object)this.fs, (VerificationMode)Mockito.times((int)1))).create((Path)Mockito.eq((Object)this.dagRecoveryPath), Mockito.eq((boolean)false), Mockito.anyInt());
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)0))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        this.waitForDrain(1000);
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)1))).hflush();
        this.recoveryService.stop();
    }

    @Test(timeout=10000L)
    public void testRecoveryFlush() throws Exception {
        this.setup(true, new String[][]{{"tez.dag.recovery.max.unflushed.events", "10"}, {"tez.dag.recovery.flush.interval.secs", "5"}});
        this.recoveryService.start();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        Thread.sleep(5000L);
        Assert.assertTrue((boolean)this.recoveryService.eventQueue.isEmpty());
        ((FileSystem)Mockito.verify((Object)this.fs, (VerificationMode)Mockito.times((int)1))).create((Path)Mockito.eq((Object)this.dagRecoveryPath), Mockito.eq((boolean)false), Mockito.anyInt());
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)0))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        this.waitForDrain(1000);
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)1))).hflush();
        for (int i = 0; i < 9; ++i) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        }
        this.waitForDrain(-1);
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)1))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        this.waitForDrain(-1);
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)2))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        this.recoveryService.stop();
    }

    @Test(timeout=50000L)
    public void testRecoveryFlushOnStop() throws Exception {
        this.setup(true, new String[][]{{"tez.dag.recovery.max.unflushed.events", "-1"}, {"tez.dag.recovery.flush.interval.secs", "-1"}});
        this.recoveryService.start();
        for (int i = 0; i < 100; ++i) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        }
        this.waitForDrain(-1);
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)0))).hflush();
        Thread.sleep(30000L);
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        this.waitForDrain(-1);
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)0))).hflush();
        this.recoveryService.stop();
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)1))).hflush();
    }

    @Test(timeout=5000L)
    public void testRecoveryFlushOnSummaryEvent() throws Exception {
        this.setup(true, new String[][]{{"tez.dag.recovery.max.unflushed.events", "-1"}, {"tez.dag.recovery.flush.interval.secs", "-1"}});
        this.recoveryService.start();
        DAGProtos.DAGPlan dagPlan = DAGProtos.DAGPlan.newBuilder().setName("test_dag").build();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGSubmittedEvent(dagId, startTime, dagPlan, appAttemptId, null, "nobody", this.conf, null, "default")));
        this.waitForDrain(-1);
        ((FSDataOutputStream)Mockito.verify((Object)this.summaryFos, (VerificationMode)Mockito.times((int)1))).hflush();
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)1))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, (HistoryEvent)new DAGCommitStartedEvent(dagId, startTime)));
        this.waitForDrain(-1);
        ((FSDataOutputStream)Mockito.verify((Object)this.summaryFos, (VerificationMode)Mockito.times((int)2))).hflush();
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)1))).hflush();
        this.recoveryService.stop();
        ((FSDataOutputStream)Mockito.verify((Object)this.dagFos, (VerificationMode)Mockito.times((int)2))).hflush();
    }

    private void waitForDrain(int limit) throws Exception {
        long maxTime = System.currentTimeMillis() + (long)limit;
        while (!this.recoveryService.eventQueue.isEmpty()) {
            Thread.sleep(10L);
            if (limit == -1 || System.currentTimeMillis() <= maxTime) continue;
            break;
        }
    }

    private void touchFile(Path path) throws IOException {
        this.fs.create(path).close();
    }

    public static class MockFileSystem
    extends FileSystem {
        static FileSystem delegate;
        static URI uri;

        public URI getUri() {
            return uri;
        }

        public FSDataInputStream open(Path f, int bufferSize) throws IOException {
            return delegate.open(f, bufferSize);
        }

        public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize) throws IOException {
            return delegate.create(f, overwrite, bufferSize);
        }

        public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
            return delegate.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
        }

        public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
            return delegate.append(f, bufferSize, progress);
        }

        public boolean rename(Path src, Path dst) throws IOException {
            return delegate.rename(src, dst);
        }

        public boolean delete(Path f, boolean recursive) throws IOException {
            return delegate.delete(f, recursive);
        }

        public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
            return delegate.listStatus(f);
        }

        public void setWorkingDirectory(Path new_dir) {
            delegate.setWorkingDirectory(new_dir);
        }

        public Path getWorkingDirectory() {
            return delegate.getWorkingDirectory();
        }

        public boolean mkdirs(Path f, FsPermission permission) throws IOException {
            return delegate.mkdirs(f, permission);
        }

        public FileStatus getFileStatus(Path f) throws IOException {
            return delegate.getFileStatus(f);
        }

        static {
            uri = URI.create("mockfs:///");
        }
    }

    private static class MockRecoveryService
    extends RecoveryService {
        public AtomicInteger processedRecoveryEventCounter = new AtomicInteger(0);

        public MockRecoveryService(AppContext appContext) {
            super(appContext);
        }

        protected void handleRecoveryEvent(DAGHistoryEvent event) throws IOException {
            super.handleRecoveryEvent(event);
            this.processedRecoveryEventCounter.addAndGet(1);
        }
    }
}

