package org.apache.hadoop.mapreduce.v2.app.commit;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobCommitFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

/* JADX WARN: Classes with same name are omitted:
  input_file:hadoop-mapreduce-client-app-2.7.0-mapr-1607-tests.jar:org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.class
 */
/* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler.class */
public class TestCommitterEventHandler {
    static String stagingDir = "target/test-staging/";

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-app-2.7.0-mapr-1607-tests.jar:org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler$TestingJobEventHandler.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler$TestingJobEventHandler.class */
    private static class TestingJobEventHandler implements EventHandler<JobEvent> {
        int numCommitCompletedEvents;

        private TestingJobEventHandler() {
            this.numCommitCompletedEvents = 0;
        }

        public void handle(JobEvent jobEvent) {
            if (jobEvent.getType() == JobEventType.JOB_COMMIT_COMPLETED) {
                this.numCommitCompletedEvents++;
            }
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-app-2.7.0-mapr-1607-tests.jar:org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler$TestingRMHeartbeatHandler.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler$TestingRMHeartbeatHandler.class */
    private static class TestingRMHeartbeatHandler implements RMHeartbeatHandler {
        private long lastHeartbeatTime;
        private ConcurrentLinkedQueue<Runnable> callbacks;

        private TestingRMHeartbeatHandler() {
            this.lastHeartbeatTime = 0L;
            this.callbacks = new ConcurrentLinkedQueue<>();
        }

        @Override // org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler
        public long getLastHeartbeatTime() {
            return this.lastHeartbeatTime;
        }

        @Override // org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler
        public void runOnNextHeartbeat(Runnable runnable) {
            this.callbacks.add(runnable);
        }

        public void setLastHeartbeatTime(long j) {
            this.lastHeartbeatTime = j;
            while (true) {
                Runnable poll = this.callbacks.poll();
                if (poll == null) {
                    return;
                } else {
                    poll.run();
                }
            }
        }

        public int getNumCallbacks() {
            return this.callbacks.size();
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:hadoop-mapreduce-client-app-2.7.0-mapr-1607-tests.jar:org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler$WaitForItHandler.class
     */
    /* loaded from: input_file:test-classes/org/apache/hadoop/mapreduce/v2/app/commit/TestCommitterEventHandler$WaitForItHandler.class */
    public static class WaitForItHandler implements EventHandler {
        private Event event = null;

        public synchronized void handle(Event event) {
            this.event = event;
            notifyAll();
        }

        public synchronized Event getAndClearEvent() throws InterruptedException {
            if (this.event == null) {
                long monotonicNow = Time.monotonicNow();
                while (this.event == null && Time.monotonicNow() - monotonicNow < 5000) {
                    wait(5000L);
                }
            }
            Event event = this.event;
            this.event = null;
            return event;
        }
    }

    @BeforeClass
    public static void setup() {
        stagingDir = new File(stagingDir).getAbsolutePath();
    }

    @Before
    public void cleanup() throws IOException {
        File file = new File(stagingDir);
        if (file.exists()) {
            FileUtils.deleteDirectory(file);
        }
        file.mkdirs();
    }

    @Test
    public void testCommitWindow() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("yarn.app.mapreduce.am.staging-dir", stagingDir);
        AsyncDispatcher asyncDispatcher = new AsyncDispatcher();
        asyncDispatcher.init(configuration);
        asyncDispatcher.start();
        TestingJobEventHandler testingJobEventHandler = new TestingJobEventHandler();
        asyncDispatcher.register(JobEventType.class, testingJobEventHandler);
        SystemClock systemClock = new SystemClock();
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        ApplicationAttemptId applicationAttemptId = ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
        Mockito.when(appContext.getApplicationID()).thenReturn(applicationAttemptId.getApplicationId());
        Mockito.when(appContext.getApplicationAttemptId()).thenReturn(applicationAttemptId);
        Mockito.when(appContext.getEventHandler()).thenReturn(asyncDispatcher.getEventHandler());
        Mockito.when(appContext.getClock()).thenReturn(systemClock);
        OutputCommitter outputCommitter = (OutputCommitter) Mockito.mock(OutputCommitter.class);
        TestingRMHeartbeatHandler testingRMHeartbeatHandler = new TestingRMHeartbeatHandler();
        CommitterEventHandler committerEventHandler = new CommitterEventHandler(appContext, outputCommitter, testingRMHeartbeatHandler);
        committerEventHandler.init(configuration);
        committerEventHandler.start();
        committerEventHandler.handle(new CommitterJobCommitEvent(null, null));
        long j = 5000;
        while (true) {
            long j2 = j;
            if (testingRMHeartbeatHandler.getNumCallbacks() == 1 || j2 <= 0) {
                break;
            }
            Thread.sleep(10L);
            j = j2 - 10;
        }
        Assert.assertEquals("committer did not register a heartbeat callback", 1L, testingRMHeartbeatHandler.getNumCallbacks());
        ((OutputCommitter) Mockito.verify(outputCommitter, Mockito.never())).commitJob((JobContext) Mockito.any(JobContext.class));
        Assert.assertEquals("committer should not have committed", 0L, testingJobEventHandler.numCommitCompletedEvents);
        testingRMHeartbeatHandler.setLastHeartbeatTime(systemClock.getTime());
        long j3 = 5000;
        while (true) {
            long j4 = j3;
            if (testingJobEventHandler.numCommitCompletedEvents == 1 || j4 <= 0) {
                break;
            }
            Thread.sleep(10L);
            j3 = j4 - 10;
        }
        Assert.assertEquals("committer did not complete commit after RM hearbeat", 1L, testingJobEventHandler.numCommitCompletedEvents);
        ((OutputCommitter) Mockito.verify(outputCommitter, Mockito.times(1))).commitJob((JobContext) Mockito.any(JobContext.class));
        cleanup();
        committerEventHandler.handle(new CommitterJobCommitEvent(null, null));
        long j5 = 5000;
        while (true) {
            long j6 = j5;
            if (testingJobEventHandler.numCommitCompletedEvents == 2 || j6 <= 0) {
                break;
            }
            Thread.sleep(10L);
            j5 = j6 - 10;
        }
        Assert.assertEquals("committer did not commit", 2L, testingJobEventHandler.numCommitCompletedEvents);
        ((OutputCommitter) Mockito.verify(outputCommitter, Mockito.times(2))).commitJob((JobContext) Mockito.any(JobContext.class));
        committerEventHandler.stop();
        asyncDispatcher.stop();
    }

    @Test
    public void testBasic() throws Exception {
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        OutputCommitter outputCommitter = (OutputCommitter) Mockito.mock(OutputCommitter.class);
        Clock clock = (Clock) Mockito.mock(Clock.class);
        CommitterEventHandler committerEventHandler = new CommitterEventHandler(appContext, outputCommitter, new TestingRMHeartbeatHandler());
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        yarnConfiguration.set("yarn.app.mapreduce.am.staging-dir", stagingDir);
        JobContext jobContext = (JobContext) Mockito.mock(JobContext.class);
        ApplicationAttemptId applicationAttemptId = ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
        JobId yarn = TypeConverter.toYarn(TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
        WaitForItHandler waitForItHandler = new WaitForItHandler();
        Mockito.when(appContext.getApplicationID()).thenReturn(applicationAttemptId.getApplicationId());
        Mockito.when(appContext.getApplicationAttemptId()).thenReturn(applicationAttemptId);
        Mockito.when(appContext.getEventHandler()).thenReturn(waitForItHandler);
        Mockito.when(appContext.getClock()).thenReturn(clock);
        committerEventHandler.init(yarnConfiguration);
        committerEventHandler.start();
        try {
            committerEventHandler.handle(new CommitterJobCommitEvent(yarn, jobContext));
            String shortUserName = UserGroupInformation.getCurrentUser().getShortUserName();
            Path startJobCommitFile = MRApps.getStartJobCommitFile(yarnConfiguration, shortUserName, yarn);
            Path endJobCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(yarnConfiguration, shortUserName, yarn);
            Path endJobCommitFailureFile = MRApps.getEndJobCommitFailureFile(yarnConfiguration, shortUserName, yarn);
            Event andClearEvent = waitForItHandler.getAndClearEvent();
            Assert.assertNotNull(andClearEvent);
            Assert.assertTrue(andClearEvent instanceof JobCommitCompletedEvent);
            FileSystem fileSystem = FileSystem.get(yarnConfiguration);
            Assert.assertTrue(startJobCommitFile.toString(), fileSystem.exists(startJobCommitFile));
            Assert.assertTrue(endJobCommitSuccessFile.toString(), fileSystem.exists(endJobCommitSuccessFile));
            Assert.assertFalse(endJobCommitFailureFile.toString(), fileSystem.exists(endJobCommitFailureFile));
            ((OutputCommitter) Mockito.verify(outputCommitter)).commitJob((JobContext) Mockito.any(JobContext.class));
            committerEventHandler.stop();
        } catch (Throwable th) {
            committerEventHandler.stop();
            throw th;
        }
    }

    @Test
    public void testFailure() throws Exception {
        AppContext appContext = (AppContext) Mockito.mock(AppContext.class);
        OutputCommitter outputCommitter = (OutputCommitter) Mockito.mock(OutputCommitter.class);
        Clock clock = (Clock) Mockito.mock(Clock.class);
        CommitterEventHandler committerEventHandler = new CommitterEventHandler(appContext, outputCommitter, new TestingRMHeartbeatHandler());
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        yarnConfiguration.set("yarn.app.mapreduce.am.staging-dir", stagingDir);
        JobContext jobContext = (JobContext) Mockito.mock(JobContext.class);
        ApplicationAttemptId applicationAttemptId = ConverterUtils.toApplicationAttemptId("appattempt_1234567890000_0001_0");
        JobId yarn = TypeConverter.toYarn(TypeConverter.fromYarn(applicationAttemptId.getApplicationId()));
        WaitForItHandler waitForItHandler = new WaitForItHandler();
        Mockito.when(appContext.getApplicationID()).thenReturn(applicationAttemptId.getApplicationId());
        Mockito.when(appContext.getApplicationAttemptId()).thenReturn(applicationAttemptId);
        Mockito.when(appContext.getEventHandler()).thenReturn(waitForItHandler);
        Mockito.when(appContext.getClock()).thenReturn(clock);
        ((OutputCommitter) Mockito.doThrow(new YarnRuntimeException("Intentional Failure")).when(outputCommitter)).commitJob((JobContext) Mockito.any(JobContext.class));
        committerEventHandler.init(yarnConfiguration);
        committerEventHandler.start();
        try {
            committerEventHandler.handle(new CommitterJobCommitEvent(yarn, jobContext));
            String shortUserName = UserGroupInformation.getCurrentUser().getShortUserName();
            Path startJobCommitFile = MRApps.getStartJobCommitFile(yarnConfiguration, shortUserName, yarn);
            Path endJobCommitSuccessFile = MRApps.getEndJobCommitSuccessFile(yarnConfiguration, shortUserName, yarn);
            Path endJobCommitFailureFile = MRApps.getEndJobCommitFailureFile(yarnConfiguration, shortUserName, yarn);
            Event andClearEvent = waitForItHandler.getAndClearEvent();
            Assert.assertNotNull(andClearEvent);
            Assert.assertTrue(andClearEvent instanceof JobCommitFailedEvent);
            FileSystem fileSystem = FileSystem.get(yarnConfiguration);
            Assert.assertTrue(fileSystem.exists(startJobCommitFile));
            Assert.assertFalse(fileSystem.exists(endJobCommitSuccessFile));
            Assert.assertTrue(fileSystem.exists(endJobCommitFailureFile));
            ((OutputCommitter) Mockito.verify(outputCommitter)).commitJob((JobContext) Mockito.any(JobContext.class));
            committerEventHandler.stop();
        } catch (Throwable th) {
            committerEventHandler.stop();
            throw th;
        }
    }
}
