package org.apache.tez.dag.history.recovery;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
import org.apache.tez.dag.history.events.DAGFinishedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.DAGSubmittedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/tez/dag/history/recovery/TestRecoveryService.class */
public class TestRecoveryService {
    private static final String TEST_ROOT_DIR = "target/" + TestRecoveryService.class.getName() + "-tmpDir";
    private static final long startTime = System.currentTimeMillis();
    private static final ApplicationId appId = ApplicationId.newInstance(startTime, 1);
    private static final ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
    private static final TezDAGID dagId = TezDAGID.getInstance(appId, 1);
    private static final TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
    private static final TezTaskID tezTaskId = TezTaskID.getInstance(vertexId, 1);
    private Configuration conf;
    private AppContext appContext;
    private MockRecoveryService recoveryService;
    private Path dagRecoveryPath;
    private Path summaryPath;
    private FileSystem fs;
    private FSDataOutputStream dagFos;
    private FSDataOutputStream summaryFos;

    /* loaded from: input_file:org/apache/tez/dag/history/recovery/TestRecoveryService$MockFileSystem.class */
    public static class MockFileSystem extends FileSystem {
        static FileSystem delegate;
        static URI uri = URI.create("mockfs:///");

        public URI getUri() {
            return uri;
        }

        public FSDataInputStream open(Path path, int i) throws IOException {
            return delegate.open(path, i);
        }

        public FSDataOutputStream create(Path path, boolean z, int i) throws IOException {
            return delegate.create(path, z, i);
        }

        public FSDataOutputStream create(Path path, FsPermission fsPermission, boolean z, int i, short s, long j, Progressable progressable) throws IOException {
            return delegate.create(path, fsPermission, z, i, s, j, progressable);
        }

        public FSDataOutputStream append(Path path, int i, Progressable progressable) throws IOException {
            return delegate.append(path, i, progressable);
        }

        public boolean rename(Path path, Path path2) throws IOException {
            return delegate.rename(path, path2);
        }

        public boolean delete(Path path, boolean z) throws IOException {
            return delegate.delete(path, z);
        }

        public FileStatus[] listStatus(Path path) throws FileNotFoundException, IOException {
            return delegate.listStatus(path);
        }

        public void setWorkingDirectory(Path path) {
            delegate.setWorkingDirectory(path);
        }

        public Path getWorkingDirectory() {
            return delegate.getWorkingDirectory();
        }

        public boolean mkdirs(Path path, FsPermission fsPermission) throws IOException {
            return delegate.mkdirs(path, fsPermission);
        }

        public FileStatus getFileStatus(Path path) throws IOException {
            return delegate.getFileStatus(path);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/dag/history/recovery/TestRecoveryService$MockRecoveryService.class */
    public static class MockRecoveryService extends RecoveryService {
        public AtomicInteger processedRecoveryEventCounter;

        public MockRecoveryService(AppContext appContext) {
            super(appContext);
            this.processedRecoveryEventCounter = new AtomicInteger(0);
        }

        protected void handleRecoveryEvent(DAGHistoryEvent dAGHistoryEvent) throws IOException {
            super.handleRecoveryEvent(dAGHistoryEvent);
            this.processedRecoveryEventCounter.addAndGet(1);
        }
    }

    private void setup(boolean z, String[][] strArr) throws Exception {
        this.conf = new Configuration();
        this.conf.set("fs.defaultFS", "file:///");
        if (strArr != null) {
            for (String[] strArr2 : strArr) {
                this.conf.set(strArr2[0], strArr2[1]);
            }
        }
        this.appContext = (AppContext) Mockito.mock(AppContext.class);
        Mockito.when(this.appContext.getClock()).thenReturn(new SystemClock());
        Mockito.when(this.appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
        Mockito.when(this.appContext.getApplicationID()).thenReturn(appId);
        if (z) {
            this.fs = (FileSystem) Mockito.mock(FileSystem.class);
            Mockito.when(this.appContext.getCurrentRecoveryDir()).thenReturn(new Path("mockfs:///"));
            this.conf.set("fs.mockfs.impl", MockFileSystem.class.getName());
            MockFileSystem.delegate = this.fs;
            this.dagFos = (FSDataOutputStream) Mockito.spy(new FSDataOutputStream(new OutputStream() { // from class: org.apache.tez.dag.history.recovery.TestRecoveryService.1
                @Override // java.io.OutputStream
                public void write(int i) throws IOException {
                }
            }, (FileSystem.Statistics) null));
            this.summaryFos = (FSDataOutputStream) Mockito.spy(new FSDataOutputStream(new OutputStream() { // from class: org.apache.tez.dag.history.recovery.TestRecoveryService.2
                @Override // java.io.OutputStream
                public void write(int i) throws IOException {
                }
            }, (FileSystem.Statistics) null));
        } else {
            Mockito.when(this.appContext.getCurrentRecoveryDir()).thenReturn(new Path(TEST_ROOT_DIR));
            this.fs = FileSystem.getLocal(this.conf);
            this.fs.delete(new Path(TEST_ROOT_DIR), true);
        }
        this.recoveryService = new MockRecoveryService(this.appContext);
        this.conf.setBoolean("tez.test.recovery.drain_event", true);
        this.recoveryService.init(this.conf);
        this.summaryPath = TezCommonUtils.getSummaryRecoveryPath(this.recoveryService.recoveryPath);
        this.dagRecoveryPath = TezCommonUtils.getDAGRecoveryPath(this.recoveryService.recoveryPath, dagId.toString());
        if (z) {
            Mockito.when(this.fs.create((Path) Mockito.eq(this.dagRecoveryPath), Mockito.eq(false), Mockito.anyInt())).thenReturn(this.dagFos);
            Mockito.when(this.fs.create((Path) Mockito.eq(this.summaryPath), Mockito.eq(false), Mockito.anyInt())).thenReturn(this.summaryFos);
        }
    }

    @Test(timeout = 5000)
    public void testDrainEvents() throws Exception {
        setup(false, null);
        this.recoveryService.start();
        int nextInt = new Random().nextInt(100) + 100;
        for (int i = 0; i < nextInt; i++) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
        }
        this.recoveryService.stop();
        Assert.assertEquals(nextInt, this.recoveryService.processedRecoveryEventCounter.get());
    }

    @Test(timeout = 5000)
    public void testMultipleDAGFinishedEvent() throws Exception {
        setup(false, null);
        this.recoveryService.start();
        int nextInt = new Random().nextInt(100) + 100;
        for (int i = 0; i < nextInt; i++) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
        }
        this.recoveryService.await();
        Assert.assertTrue(this.recoveryService.outputStreamMap.containsKey(dagId));
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGFinishedEvent(dagId, 1L, 2L, DAGState.FAILED, "diag", (TezCounters) null, "user", "dag1", (Map) null, appAttemptId, (DAGProtos.DAGPlan) null)));
        Assert.assertFalse(this.recoveryService.outputStreamMap.containsKey(dagId));
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", (TezCounters) null, "user", "dag1", (Map) null, appAttemptId, (DAGProtos.DAGPlan) null)));
        Assert.assertEquals(this.recoveryService.outputStreamMap.size(), 0L);
        Assert.assertFalse(this.recoveryService.outputStreamMap.containsKey(dagId));
        this.recoveryService.stop();
    }

    @Test(timeout = 5000)
    public void testSummaryPathExisted() throws Exception {
        setup(false, null);
        this.recoveryService.start();
        touchFile(this.summaryPath);
        Assert.assertFalse(this.recoveryService.hasRecoveryFailed());
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", (TezCounters) null, "user", "dag1", (Map) null, appAttemptId, (DAGProtos.DAGPlan) null)));
        Assert.assertTrue(this.recoveryService.hasRecoveryFailed());
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGFinishedEvent(dagId, 1L, 2L, DAGState.ERROR, "diag", (TezCounters) null, "user", "dag1", (Map) null, appAttemptId, (DAGProtos.DAGPlan) null)));
        this.recoveryService.stop();
    }

    @Test(timeout = 5000)
    public void testRecoveryPathExisted() throws Exception {
        setup(false, null);
        this.recoveryService.start();
        touchFile(this.dagRecoveryPath);
        Assert.assertFalse(this.recoveryService.hasRecoveryFailed());
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
        this.recoveryService.await();
        Assert.assertTrue(this.recoveryService.hasRecoveryFailed());
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new TaskStartedEvent(tezTaskId, "v1", 0L, 0L)));
        this.recoveryService.stop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [java.lang.String[], java.lang.String[][]] */
    @Test(timeout = 5000)
    public void testRecoveryFlushOnMaxEvents() throws Exception {
        setup(true, new String[]{new String[]{"tez.dag.recovery.max.unflushed.events", "10"}, new String[]{"tez.dag.recovery.flush.interval.secs", "-1"}});
        this.recoveryService.start();
        for (int i = 0; i < 9; i++) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        }
        waitForDrain(-1);
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(0))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        waitForDrain(-1);
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(1))).hflush();
        this.recoveryService.stop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [java.lang.String[], java.lang.String[][]] */
    @Test(timeout = 10000)
    public void testRecoveryFlushOnTimeoutEvents() throws Exception {
        setup(true, new String[]{new String[]{"tez.dag.recovery.max.unflushed.events", "-1"}, new String[]{"tez.dag.recovery.flush.interval.secs", "5"}});
        this.recoveryService.start();
        for (int i = 0; i < 100; i++) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        }
        Thread.sleep(5000L);
        Assert.assertTrue(this.recoveryService.eventQueue.isEmpty());
        ((FileSystem) Mockito.verify(this.fs, Mockito.times(1))).create((Path) Mockito.eq(this.dagRecoveryPath), Mockito.eq(false), Mockito.anyInt());
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(0))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        waitForDrain(1000);
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(1))).hflush();
        this.recoveryService.stop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [java.lang.String[], java.lang.String[][]] */
    @Test(timeout = 10000)
    public void testRecoveryFlush() throws Exception {
        setup(true, new String[]{new String[]{"tez.dag.recovery.max.unflushed.events", "10"}, new String[]{"tez.dag.recovery.flush.interval.secs", "5"}});
        this.recoveryService.start();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        Thread.sleep(5000L);
        Assert.assertTrue(this.recoveryService.eventQueue.isEmpty());
        ((FileSystem) Mockito.verify(this.fs, Mockito.times(1))).create((Path) Mockito.eq(this.dagRecoveryPath), Mockito.eq(false), Mockito.anyInt());
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(0))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        waitForDrain(1000);
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(1))).hflush();
        for (int i = 0; i < 9; i++) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        }
        waitForDrain(-1);
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(1))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        waitForDrain(-1);
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(2))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        this.recoveryService.stop();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [java.lang.String[], java.lang.String[][]] */
    @Test(timeout = 50000)
    public void testRecoveryFlushOnStop() throws Exception {
        setup(true, new String[]{new String[]{"tez.dag.recovery.max.unflushed.events", "-1"}, new String[]{"tez.dag.recovery.flush.interval.secs", "-1"}});
        this.recoveryService.start();
        for (int i = 0; i < 100; i++) {
            this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        }
        waitForDrain(-1);
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(0))).hflush();
        Thread.sleep(30000L);
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGStartedEvent(dagId, startTime, "nobody", "test-dag")));
        waitForDrain(-1);
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(0))).hflush();
        this.recoveryService.stop();
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(1))).hflush();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r2v1, types: [java.lang.String[], java.lang.String[][]] */
    @Test(timeout = 5000)
    public void testRecoveryFlushOnSummaryEvent() throws Exception {
        setup(true, new String[]{new String[]{"tez.dag.recovery.max.unflushed.events", "-1"}, new String[]{"tez.dag.recovery.flush.interval.secs", "-1"}});
        this.recoveryService.start();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGSubmittedEvent(dagId, startTime, DAGProtos.DAGPlan.newBuilder().setName("test_dag").build(), appAttemptId, (Map) null, "nobody", this.conf, (String) null, "default")));
        waitForDrain(-1);
        ((FSDataOutputStream) Mockito.verify(this.summaryFos, Mockito.times(1))).hflush();
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(1))).hflush();
        this.recoveryService.handle(new DAGHistoryEvent(dagId, new DAGCommitStartedEvent(dagId, startTime)));
        waitForDrain(-1);
        ((FSDataOutputStream) Mockito.verify(this.summaryFos, Mockito.times(2))).hflush();
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(1))).hflush();
        this.recoveryService.stop();
        ((FSDataOutputStream) Mockito.verify(this.dagFos, Mockito.times(2))).hflush();
    }

    private void waitForDrain(int i) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + i;
        while (!this.recoveryService.eventQueue.isEmpty()) {
            Thread.sleep(10L);
            if (i != -1 && System.currentTimeMillis() > currentTimeMillis) {
                return;
            }
        }
    }

    private void touchFile(Path path) throws IOException {
        this.fs.create(path).close();
    }
}
