package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;

import com.google.common.base.Supplier;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.AbstractFileSystem;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.TestNonAggregatingLogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mortbay.util.MultiException;

/* loaded from: input_file:test-classes/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.class */
public class TestLogAggregationService extends BaseContainerManagerTest {
    private static RecordFactory recordFactory;
    DrainDispatcher dispatcher;
    EventHandler<ApplicationEvent> appEventHandler;
    private Map<ApplicationAccessType, String> acls = createAppAcls();
    private File remoteRootLogDir = new File("target", getClass().getName() + "-remoteLogDir");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:test-classes/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService$LogFileStatusInLastCycle.class */
    public static class LogFileStatusInLastCycle {
        private String logFilePathInLastCycle;
        private List<String> logFileTypesInLastCycle;

        public LogFileStatusInLastCycle(String str, List<String> list) {
            this.logFilePathInLastCycle = str;
            this.logFileTypesInLastCycle = list;
        }

        public String getLogFilePathInLastCycle() {
            return this.logFilePathInLastCycle;
        }

        public List<String> getLogFileTypesInLastCycle() {
            return this.logFileTypesInLastCycle;
        }
    }

    public TestLogAggregationService() throws UnsupportedFileSystemException {
        this.remoteRootLogDir.mkdir();
    }

    @Override // org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest
    public void setup() throws IOException {
        super.setup();
        ((NodeManager.NMContext) this.context).setNodeId(NodeId.newInstance("0.0.0.0", 5555));
        this.dispatcher = createDispatcher();
        this.appEventHandler = (EventHandler) Mockito.mock(EventHandler.class);
        this.dispatcher.register(ApplicationEventType.class, this.appEventHandler);
        UserGroupInformation.setConfiguration(this.conf);
    }

    @Override // org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest
    public void tearDown() throws IOException, InterruptedException {
        super.tearDown();
        createContainerExecutor().deleteAsUser(this.user, new Path(this.remoteRootLogDir.getAbsolutePath()), new Path[0]);
        this.dispatcher.await();
        this.dispatcher.stop();
        this.dispatcher.close();
    }

    private void verifyLocalFileDeletion(LogAggregationService logAggregationService) throws Exception {
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        File file = new File(localLogDir, ConverterUtils.toString(newApplicationId));
        file.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        ApplicationAttemptId newApplicationAttemptId = BuilderUtils.newApplicationAttemptId(newApplicationId, 1);
        ContainerId newContainerId = BuilderUtils.newContainerId(newApplicationAttemptId, 1L);
        writeContainerLogs(file, newContainerId, new String[]{"stdout", "stderr", "syslog"});
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId, 0));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        logAggregationService.stop();
        Assert.assertEquals(0L, logAggregationService.getNumAggregators());
        ((LogAggregationService) Mockito.verify(logAggregationService)).closeFileSystems((UserGroupInformation) Matchers.any(UserGroupInformation.class));
        ((DeletionService) Mockito.verify(this.delSrvc)).delete((String) Matchers.eq(this.user), (Path) Matchers.eq((Path) null), (Path) Matchers.eq(new Path(file.getAbsolutePath())));
        this.delSrvc.stop();
        File file2 = new File(file, ConverterUtils.toString(newContainerId));
        for (String str : new String[]{"stdout", "stderr", "syslog"}) {
            File file3 = new File(file2, str);
            Assert.assertFalse("check " + file3, file3.exists());
        }
        Assert.assertFalse(file.exists());
        Path remoteNodeLogFileForApp = logAggregationService.getRemoteNodeLogFileForApp(newApplicationId, this.user);
        Assert.assertTrue("Log file [" + remoteNodeLogFileForApp + "] not found", new File(remoteNodeLogFileForApp.toUri().getPath()).exists());
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationAttemptId.getApplicationId(), ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationAttemptId.getApplicationId(), ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}, true, "getType", "getApplicationID");
    }

    @Test
    public void testLocalFileDeletionAfterUpload() throws Exception {
        this.delSrvc = new DeletionService(createContainerExecutor());
        this.delSrvc = (DeletionService) Mockito.spy(this.delSrvc);
        this.delSrvc.init(this.conf);
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        verifyLocalFileDeletion((LogAggregationService) Mockito.spy(new LogAggregationService(this.dispatcher, this.context, this.delSrvc, this.dirsHandler)));
    }

    @Test
    public void testLocalFileDeletionOnDiskFull() throws Exception {
        this.delSrvc = new DeletionService(createContainerExecutor());
        this.delSrvc = (DeletionService) Mockito.spy(this.delSrvc);
        this.delSrvc.init(this.conf);
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        List<String> logDirs = this.dirsHandler.getLogDirs();
        LocalDirsHandlerService localDirsHandlerService = (LocalDirsHandlerService) Mockito.spy(this.dirsHandler);
        Mockito.when(localDirsHandlerService.getLogDirs()).thenReturn(new ArrayList());
        Mockito.when(localDirsHandlerService.getLogDirsForRead()).thenReturn(logDirs);
        verifyLocalFileDeletion((LogAggregationService) Mockito.spy(new LogAggregationService(this.dispatcher, this.context, this.delSrvc, localDirsHandlerService)));
    }

    @Test
    public void testNoContainerOnNode() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        LogAggregationService logAggregationService = new LogAggregationService(this.dispatcher, this.context, this.delSrvc, this.dirsHandler);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        new File(localLogDir, ConverterUtils.toString(newApplicationId)).mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        logAggregationService.stop();
        Assert.assertEquals(0L, logAggregationService.getNumAggregators());
        Assert.assertFalse(new File(logAggregationService.getRemoteNodeLogFileForApp(newApplicationId, this.user).toUri().getPath()).exists());
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}, true, "getType", "getApplicationID");
        logAggregationService.close();
    }

    @Test
    public void testMultipleAppsLogAggregation() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        String[] strArr = {"stdout", "stderr", "syslog"};
        LogAggregationService logAggregationService = new LogAggregationService(this.dispatcher, this.context, this.delSrvc, this.dirsHandler);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        File file = new File(localLogDir, ConverterUtils.toString(newApplicationId));
        file.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        ApplicationAttemptId newApplicationAttemptId = BuilderUtils.newApplicationAttemptId(newApplicationId, 1);
        ContainerId newContainerId = BuilderUtils.newContainerId(newApplicationAttemptId, 1L);
        writeContainerLogs(file, newContainerId, strArr);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId, 0));
        ApplicationId newApplicationId2 = BuilderUtils.newApplicationId(1234L, 2);
        ApplicationAttemptId newApplicationAttemptId2 = BuilderUtils.newApplicationAttemptId(newApplicationId2, 1);
        File file2 = new File(localLogDir, ConverterUtils.toString(newApplicationId2));
        file2.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId2, this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls));
        ContainerId newContainerId2 = BuilderUtils.newContainerId(newApplicationAttemptId2, 1L);
        writeContainerLogs(file2, newContainerId2, strArr);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId2, 0));
        ContainerId newContainerId3 = BuilderUtils.newContainerId(newApplicationAttemptId, 2L);
        writeContainerLogs(file, newContainerId3, strArr);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId3, 0));
        ApplicationId newApplicationId3 = BuilderUtils.newApplicationId(1234L, 3);
        ApplicationAttemptId newApplicationAttemptId3 = BuilderUtils.newApplicationAttemptId(newApplicationId3, 1);
        File file3 = new File(localLogDir, ConverterUtils.toString(newApplicationId3));
        file3.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId3, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationId2, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationId3, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)}, false, "getType", "getApplicationID");
        Mockito.reset(new EventHandler[]{this.appEventHandler});
        ContainerId newContainerId4 = BuilderUtils.newContainerId(newApplicationAttemptId3, 1L);
        writeContainerLogs(file3, newContainerId4, strArr);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId4, 0));
        ContainerId newContainerId5 = BuilderUtils.newContainerId(newApplicationAttemptId3, 2L);
        writeContainerLogs(file3, newContainerId5, strArr);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId5, 1));
        ContainerId newContainerId6 = BuilderUtils.newContainerId(newApplicationAttemptId2, 2L);
        writeContainerLogs(file2, newContainerId6, strArr);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId6, 0));
        ContainerId newContainerId7 = BuilderUtils.newContainerId(newApplicationAttemptId3, 3L);
        writeContainerLogs(file3, newContainerId7, strArr);
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId7, 0));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId2));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId3));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        logAggregationService.stop();
        Assert.assertEquals(0L, logAggregationService.getNumAggregators());
        verifyContainerLogs(logAggregationService, newApplicationId, new ContainerId[]{newContainerId, newContainerId3}, strArr, 3, false);
        verifyContainerLogs(logAggregationService, newApplicationId2, new ContainerId[]{newContainerId2}, strArr, 3, false);
        verifyContainerLogs(logAggregationService, newApplicationId3, new ContainerId[]{newContainerId4, newContainerId5}, strArr, 3, false);
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), new ApplicationEvent(newApplicationId2, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), new ApplicationEvent(newApplicationId3, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}, false, "getType", "getApplicationID");
    }

    @Test
    public void testVerifyAndCreateRemoteDirsFailure() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        LogAggregationService logAggregationService = (LogAggregationService) Mockito.spy(new LogAggregationService(this.dispatcher, this.context, this.delSrvc, this.dirsHandler));
        logAggregationService.init(this.conf);
        ((LogAggregationService) Mockito.doThrow(new YarnRuntimeException("KABOOM!")).when(logAggregationService)).verifyAndCreateRemoteLogDir((Configuration) Matchers.any(Configuration.class));
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) (Math.random() * 1000.0d));
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)}, false, "getType", "getApplicationID", "getDiagnostic");
        Mockito.reset(new LogAggregationService[]{logAggregationService});
        ApplicationId newApplicationId2 = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) (Math.random() * 1000.0d));
        new File(localLogDir, ConverterUtils.toString(newApplicationId2)).mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId2, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED), new ApplicationEvent(newApplicationId2, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)}, false, "getType", "getApplicationID", "getDiagnostic");
        logAggregationService.stop();
    }

    @Test
    public void testVerifyAndCreateRemoteDirNonExistence() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        File file = new File(String.valueOf("tmp" + System.currentTimeMillis()));
        this.conf.set("yarn.nodemanager.remote-app-log-dir", file.getAbsolutePath());
        LogAggregationService logAggregationService = (LogAggregationService) Mockito.spy(new LogAggregationService(this.dispatcher, this.context, this.delSrvc, this.dirsHandler));
        logAggregationService.init(this.conf);
        Assert.assertTrue("The new file already exists!", !file.exists());
        logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
        Assert.assertTrue("The new aggregate file is not successfully created", file.exists());
        file.delete();
    }

    @Test
    public void testAppLogDirCreation() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir-suffix", "logs");
        InlineDispatcher inlineDispatcher = new InlineDispatcher();
        inlineDispatcher.init(this.conf);
        inlineDispatcher.start();
        FileSystem fileSystem = FileSystem.get(this.conf);
        final FileSystem fileSystem2 = (FileSystem) Mockito.spy(FileSystem.get(this.conf));
        LogAggregationService logAggregationService = new LogAggregationService(inlineDispatcher, this.context, this.delSrvc, this.dirsHandler) { // from class: org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.TestLogAggregationService.1
            @Override // org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService
            protected FileSystem getFileSystem(Configuration configuration) {
                return fileSystem2;
            }
        };
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1L, 1);
        Path makeQualified = fileSystem.makeQualified(new Path(this.remoteRootLogDir.getAbsolutePath(), this.user));
        Path path = new Path(makeQualified, "logs");
        Path path2 = new Path(path, newApplicationId.toString());
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        ((FileSystem) Mockito.verify(fileSystem2)).mkdirs((Path) Matchers.eq(makeQualified), (FsPermission) Matchers.isA(FsPermission.class));
        ((FileSystem) Mockito.verify(fileSystem2)).mkdirs((Path) Matchers.eq(path), (FsPermission) Matchers.isA(FsPermission.class));
        ((FileSystem) Mockito.verify(fileSystem2)).mkdirs((Path) Matchers.eq(path2), (FsPermission) Matchers.isA(FsPermission.class));
        ApplicationId newApplicationId2 = BuilderUtils.newApplicationId(1L, 2);
        Path path3 = new Path(path, newApplicationId2.toString());
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId2, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        ((FileSystem) Mockito.verify(fileSystem2)).mkdirs((Path) Matchers.eq(path3), (FsPermission) Matchers.isA(FsPermission.class));
        ApplicationId newApplicationId3 = BuilderUtils.newApplicationId(1L, 3);
        Path path4 = new Path(path, newApplicationId3.toString());
        new File(path4.toUri().getPath()).mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId3, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        ((FileSystem) Mockito.verify(fileSystem2, Mockito.never())).mkdirs((Path) Matchers.eq(path4), (FsPermission) Matchers.isA(FsPermission.class));
        logAggregationService.stop();
        logAggregationService.close();
        inlineDispatcher.stop();
    }

    @Test
    public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        LogAggregationService logAggregationService = (LogAggregationService) Mockito.spy(new LogAggregationService(this.dispatcher, this.context, this.delSrvc, this.dirsHandler));
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) (Math.random() * 1000.0d));
        ((LogAggregationService) Mockito.doThrow(new YarnRuntimeException("KABOOM!")).when(logAggregationService)).initAppAggregator((ApplicationId) Matchers.eq(newApplicationId), (String) Matchers.eq(this.user), (Credentials) Matchers.any(Credentials.class), (ContainerLogsRetentionPolicy) Matchers.any(ContainerLogsRetentionPolicy.class), Matchers.anyMap(), (LogAggregationContext) Matchers.any(LogAggregationContext.class));
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)}, false, "getType", "getApplicationID", "getDiagnostic");
        ((LogAggregationService) Mockito.verify(logAggregationService, Mockito.never())).closeFileSystems((UserGroupInformation) Matchers.any(UserGroupInformation.class));
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(BuilderUtils.newContainerId(4, 1, 1L, 1L), 0));
        this.dispatcher.await();
        logAggregationService.handle(new LogHandlerAppFinishedEvent(BuilderUtils.newApplicationId(1L, 5)));
        this.dispatcher.await();
    }

    @Test
    public void testLogAggregationCreateDirsFailsWithoutKillingNM() throws Exception {
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        LogAggregationService logAggregationService = (LogAggregationService) Mockito.spy(new LogAggregationService(this.dispatcher, this.context, this.delSrvc, this.dirsHandler));
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(System.currentTimeMillis(), (int) (Math.random() * 1000.0d));
        ((LogAggregationService) Mockito.doThrow(new RuntimeException("KABOOM!")).when(logAggregationService)).createAppDir((String) Matchers.any(String.class), (ApplicationId) Matchers.any(ApplicationId.class), (UserGroupInformation) Matchers.any(UserGroupInformation.class));
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED)}, false, "getType", "getApplicationID", "getDiagnostic");
        ((LogAggregationService) Mockito.verify(logAggregationService)).closeFileSystems((UserGroupInformation) Matchers.any(UserGroupInformation.class));
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(BuilderUtils.newContainerId(4, 1, 1L, 1L), 0));
        this.dispatcher.await();
        logAggregationService.handle(new LogHandlerAppFinishedEvent(BuilderUtils.newApplicationId(1L, 5)));
        this.dispatcher.await();
        logAggregationService.stop();
        Assert.assertEquals(0L, logAggregationService.getNumAggregators());
    }

    private void writeContainerLogs(File file, ContainerId containerId, String[] strArr) throws IOException {
        String converterUtils = ConverterUtils.toString(containerId);
        File file2 = new File(file, converterUtils);
        file2.mkdir();
        for (String str : strArr) {
            FileWriter fileWriter = new FileWriter(new File(file2, str));
            fileWriter.write(converterUtils + " Hello " + str + "!");
            fileWriter.close();
        }
    }

    private LogFileStatusInLastCycle verifyContainerLogs(LogAggregationService logAggregationService, ApplicationId applicationId, ContainerId[] containerIdArr, String[] strArr, int i, boolean z) throws IOException {
        Path remoteAppLogDir = logAggregationService.getRemoteAppLogDir(applicationId, this.user);
        RemoteIterator remoteIterator = null;
        try {
            remoteIterator = FileContext.getFileContext(FileContext.getFileContext(this.conf).makeQualified(remoteAppLogDir).toUri(), this.conf).listStatus(remoteAppLogDir);
        } catch (FileNotFoundException e) {
            Assert.fail("Should have log files");
        }
        Assert.assertTrue(remoteIterator.hasNext());
        FileStatus fileStatus = null;
        if (z) {
            long j = 0;
            while (remoteIterator.hasNext()) {
                FileStatus fileStatus2 = (FileStatus) remoteIterator.next();
                if (!fileStatus2.getPath().getName().contains(".tmp")) {
                    long parseLong = Long.parseLong(fileStatus2.getPath().getName().split("_")[2]);
                    if (parseLong > j) {
                        fileStatus = fileStatus2;
                        j = parseLong;
                    }
                }
            }
            String[] split = fileStatus.getPath().getName().split("_");
            Assert.assertTrue(split.length == 3);
            Assert.assertEquals(split[0] + ":" + split[1], logAggregationService.getNodeId().toString());
        } else {
            fileStatus = (FileStatus) remoteIterator.next();
            Assert.assertTrue(fileStatus.getPath().getName().equals(LogAggregationUtils.getNodeString(logAggregationService.getNodeId())));
        }
        AggregatedLogFormat.LogReader logReader = new AggregatedLogFormat.LogReader(this.conf, fileStatus.getPath());
        Assert.assertEquals(this.user, logReader.getApplicationOwner());
        verifyAcls(logReader.getApplicationAcls());
        ArrayList arrayList = new ArrayList();
        try {
            HashMap hashMap = new HashMap();
            AggregatedLogFormat.LogKey logKey = new AggregatedLogFormat.LogKey();
            DataInputStream next = logReader.next(logKey);
            while (next != null) {
                LOG.info("Found container " + logKey.toString());
                HashMap hashMap2 = new HashMap();
                hashMap.put(logKey.toString(), hashMap2);
                while (true) {
                    try {
                        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                        AggregatedLogFormat.LogReader.readAContainerLogsForALogType(next, new PrintStream(byteArrayOutputStream));
                        String[] split2 = byteArrayOutputStream.toString().split(System.getProperty("line.separator"));
                        Assert.assertEquals("LogType:", split2[0].substring(0, 8));
                        String substring = split2[0].substring(8);
                        arrayList.add(substring);
                        Assert.assertEquals("LogLength:", split2[1].substring(0, 10));
                        long parseLong2 = Long.parseLong(split2[1].substring(10));
                        Assert.assertEquals("Log Contents:", split2[2].substring(0, 13));
                        hashMap2.put(substring, StringUtils.join(Arrays.copyOfRange(split2, 3, split2.length), "\n"));
                        LOG.info("LogType:" + substring);
                        LOG.info("LogLength:" + parseLong2);
                        LOG.info("Log Contents:\n" + ((String) hashMap2.get(substring)));
                    } catch (EOFException e2) {
                        logKey = new AggregatedLogFormat.LogKey();
                        next = logReader.next(logKey);
                    }
                }
            }
            Assert.assertEquals(containerIdArr.length, hashMap.size());
            for (ContainerId containerId : containerIdArr) {
                String converterUtils = ConverterUtils.toString(containerId);
                Map map = (Map) hashMap.remove(converterUtils);
                Assert.assertEquals(i, map.size());
                for (String str : strArr) {
                    String str2 = converterUtils + " Hello " + str + "!End of LogType:" + str;
                    LOG.info("Expected log-content : " + new String(str2));
                    String str3 = (String) map.remove(str);
                    Assert.assertNotNull(containerId + " " + str + " not present in aggregated log-file!", str3);
                    Assert.assertEquals(str2, str3);
                }
                Assert.assertEquals(0L, map.size());
            }
            Assert.assertEquals(0L, hashMap.size());
            LogFileStatusInLastCycle logFileStatusInLastCycle = new LogFileStatusInLastCycle(fileStatus.getPath().getName(), arrayList);
            logReader.close();
            return logFileStatusInLastCycle;
        } catch (Throwable th) {
            logReader.close();
            throw th;
        }
    }

    @Test
    public void testLogAggregationForRealContainerLaunch() throws IOException, InterruptedException, YarnException {
        this.containerManager.start();
        File file = new File(tmpDir, "scriptFile.sh");
        PrintWriter printWriter = new PrintWriter(file);
        printWriter.write("\necho Hello World! Stdout! > " + new File(localLogDir, "stdout"));
        printWriter.write("\necho Hello World! Stderr! > " + new File(localLogDir, "stderr"));
        printWriter.write("\necho Hello World! Syslog! > " + new File(localLogDir, "syslog"));
        printWriter.close();
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) recordFactory.newRecordInstance(ContainerLaunchContext.class);
        ApplicationId newInstance = ApplicationId.newInstance(0L, 0);
        ContainerId newContainerId = BuilderUtils.newContainerId(BuilderUtils.newApplicationAttemptId(newInstance, 1), 0L);
        URL yarnUrlFromPath = ConverterUtils.getYarnUrlFromPath(localFS.makeQualified(new Path(file.getAbsolutePath())));
        LocalResource localResource = (LocalResource) recordFactory.newRecordInstance(LocalResource.class);
        localResource.setResource(yarnUrlFromPath);
        localResource.setSize(-1L);
        localResource.setVisibility(LocalResourceVisibility.APPLICATION);
        localResource.setType(LocalResourceType.FILE);
        localResource.setTimestamp(file.lastModified());
        HashMap hashMap = new HashMap();
        hashMap.put("dest_file", localResource);
        containerLaunchContext.setLocalResources(hashMap);
        ArrayList arrayList = new ArrayList();
        arrayList.add("/bin/bash");
        arrayList.add(file.getAbsolutePath());
        containerLaunchContext.setCommands(arrayList);
        StartContainerRequest newInstance2 = StartContainerRequest.newInstance(containerLaunchContext, TestContainerManager.createContainerToken(newContainerId, 1234L, this.context.getNodeId(), this.user, this.context.getContainerTokenSecretManager()));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(newInstance2);
        this.containerManager.startContainers(StartContainersRequest.newInstance(arrayList2));
        BaseContainerManagerTest.waitForContainerState(this.containerManager, newContainerId, ContainerState.COMPLETE);
        this.containerManager.handle(new CMgrCompletedAppsEvent(Arrays.asList(newInstance), CMgrCompletedAppsEvent.Reason.ON_SHUTDOWN));
        this.containerManager.stop();
    }

    private void verifyAcls(Map<ApplicationAccessType, String> map) {
        Assert.assertEquals(this.acls.size(), map.size());
        for (ApplicationAccessType applicationAccessType : this.acls.keySet()) {
            Assert.assertEquals(this.acls.get(applicationAccessType), map.get(applicationAccessType));
        }
    }

    private DrainDispatcher createDispatcher() {
        DrainDispatcher drainDispatcher = new DrainDispatcher();
        drainDispatcher.init(this.conf);
        drainDispatcher.start();
        return drainDispatcher;
    }

    private Map<ApplicationAccessType, String> createAppAcls() {
        HashMap hashMap = new HashMap();
        hashMap.put(ApplicationAccessType.MODIFY_APP, "user group");
        hashMap.put(ApplicationAccessType.VIEW_APP, "*");
        return hashMap;
    }

    @Test(timeout = 20000)
    public void testStopAfterError() throws Exception {
        DeletionService deletionService = (DeletionService) Mockito.mock(DeletionService.class);
        LocalDirsHandlerService localDirsHandlerService = (LocalDirsHandlerService) Mockito.mock(LocalDirsHandlerService.class);
        Mockito.when(localDirsHandlerService.getLogDirs()).thenThrow(new Throwable[]{new RuntimeException()});
        LogAggregationService logAggregationService = new LogAggregationService(this.dispatcher, this.context, deletionService, localDirsHandlerService);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        logAggregationService.handle(new LogHandlerAppStartedEvent(BuilderUtils.newApplicationId(1234L, 1), this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        logAggregationService.stop();
        Assert.assertEquals(0L, logAggregationService.getNumAggregators());
        logAggregationService.close();
    }

    @Test
    public void testLogAggregatorCleanup() throws Exception {
        LogAggregationService logAggregationService = new LogAggregationService(this.dispatcher, this.context, (DeletionService) Mockito.mock(DeletionService.class), (LocalDirsHandlerService) Mockito.mock(LocalDirsHandlerService.class));
        logAggregationService.init(this.conf);
        logAggregationService.start();
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        this.dispatcher.await();
        for (int i = 20000; i > 0 && logAggregationService.getNumAggregators() > 0; i -= 100) {
            Thread.sleep(100L);
        }
        Assert.assertEquals("Log aggregator failed to cleanup!", 0L, logAggregationService.getNumAggregators());
        logAggregationService.stop();
        logAggregationService.close();
    }

    private static <T extends Event<?>> void checkEvents(EventHandler<T> eventHandler, T[] tArr, boolean z, String... strArr) throws Exception {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(tArr.getClass().getComponentType());
        ((EventHandler) Mockito.verify(eventHandler, Mockito.atLeast(0))).handle((Event) forClass.capture());
        List allValues = forClass.getAllValues();
        MultiException multiException = new MultiException();
        try {
            Assert.assertEquals("expected events", tArr.length, allValues.size());
        } catch (Throwable th) {
            multiException.add(th);
        }
        if (z) {
            int max = Math.max(tArr.length, allValues.size());
            int i = 0;
            while (i < max) {
                try {
                    Assert.assertEquals("event#" + i, i < tArr.length ? eventToString(tArr[i], strArr) : null, i < allValues.size() ? eventToString((Event) allValues.get(i), strArr) : null);
                } catch (Throwable th2) {
                    multiException.add(th2);
                }
                i++;
            }
        } else {
            HashSet hashSet = new HashSet();
            for (T t : tArr) {
                hashSet.add(eventToString(t, strArr));
            }
            Iterator it = allValues.iterator();
            while (it.hasNext()) {
                try {
                    String eventToString = eventToString((Event) it.next(), strArr);
                    Assert.assertTrue("unexpected event: " + eventToString, hashSet.remove(eventToString));
                } catch (Throwable th3) {
                    multiException.add(th3);
                }
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                try {
                    Assert.fail("missing event: " + ((String) it2.next()));
                } catch (Throwable th4) {
                    multiException.add(th4);
                }
            }
        }
        multiException.ifExceptionThrow();
    }

    private static String eventToString(Event<?> event, String[] strArr) throws Exception {
        StringBuilder sb = new StringBuilder("[ ");
        for (String str : strArr) {
            try {
                Method method = event.getClass().getMethod(str, new Class[0]);
                sb.append(method.getName()).append("=").append(method.invoke(event, new Object[0]).toString()).append(" ");
            } catch (Exception e) {
            }
        }
        sb.append("]");
        return sb.toString();
    }

    @Test
    public void testFailedDirsLocalFileDeletionAfterUpload() throws Exception {
        DeletionService deletionService = (DeletionService) Mockito.mock(DeletionService.class);
        File[] localLogDirFiles = TestNonAggregatingLogHandler.getLocalLogDirFiles(getClass().getName(), 7);
        ArrayList arrayList = new ArrayList(localLogDirFiles.length);
        for (File file : localLogDirFiles) {
            arrayList.add(file.getAbsolutePath());
        }
        this.conf.set("yarn.nodemanager.log-dirs", StringUtils.join(arrayList, ","));
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        this.conf.setLong("yarn.nodemanager.disk-health-checker.interval-ms", 500L);
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        ApplicationAttemptId newApplicationAttemptId = BuilderUtils.newApplicationAttemptId(newApplicationId, 1);
        this.dirsHandler = new LocalDirsHandlerService();
        LocalDirsHandlerService localDirsHandlerService = (LocalDirsHandlerService) Mockito.mock(LocalDirsHandlerService.class);
        LogAggregationService logAggregationService = (LogAggregationService) Mockito.spy(new LogAggregationService(this.dispatcher, this.context, deletionService, localDirsHandlerService));
        AbstractFileSystem abstractFileSystem = (AbstractFileSystem) Mockito.spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
        FileContext fileContext = FileContext.getFileContext(abstractFileSystem, this.conf);
        ((LogAggregationService) Mockito.doReturn(fileContext).when(logAggregationService)).getLocalFileContext((Configuration) Matchers.isA(Configuration.class));
        logAggregationService.init(this.conf);
        logAggregationService.start();
        TestNonAggregatingLogHandler.runMockedFailedDirs(logAggregationService, newApplicationId, this.user, deletionService, localDirsHandlerService, this.conf, abstractFileSystem, fileContext, localLogDirFiles);
        logAggregationService.stop();
        Assert.assertEquals(0L, logAggregationService.getNumAggregators());
        ((LogAggregationService) Mockito.verify(logAggregationService)).closeFileSystems((UserGroupInformation) Matchers.any(UserGroupInformation.class));
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationAttemptId.getApplicationId(), ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationAttemptId.getApplicationId(), ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}, true, "getType", "getApplicationID");
    }

    @Test(timeout = 50000)
    public void testLogAggregationServiceWithPatterns() throws Exception {
        LogAggregationContext logAggregationContext = (LogAggregationContext) Records.newRecord(LogAggregationContext.class);
        logAggregationContext.setIncludePattern("stdout|syslog");
        LogAggregationContext logAggregationContext2 = (LogAggregationContext) Records.newRecord(LogAggregationContext.class);
        logAggregationContext2.setExcludePattern("stdout|syslog");
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        ApplicationId newApplicationId2 = BuilderUtils.newApplicationId(1234L, 2);
        ApplicationId newApplicationId3 = BuilderUtils.newApplicationId(1234L, 3);
        ApplicationId newApplicationId4 = BuilderUtils.newApplicationId(1234L, 4);
        Application application = (Application) Mockito.mock(Application.class);
        Mockito.when(application.getContainers()).thenReturn(new HashMap());
        this.context.getApplications().put(newApplicationId, application);
        this.context.getApplications().put(newApplicationId2, application);
        this.context.getApplications().put(newApplicationId3, application);
        this.context.getApplications().put(newApplicationId4, application);
        LogAggregationService logAggregationService = new LogAggregationService(this.dispatcher, this.context, this.delSrvc, this.dirsHandler);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        File file = new File(localLogDir, ConverterUtils.toString(newApplicationId));
        file.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, logAggregationContext));
        ContainerId newContainerId = BuilderUtils.newContainerId(BuilderUtils.newApplicationAttemptId(newApplicationId, 1), 1L);
        writeContainerLogs(file, newContainerId, new String[]{"stdout", "stderr", "syslog"});
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId, 0));
        ApplicationAttemptId newApplicationAttemptId = BuilderUtils.newApplicationAttemptId(newApplicationId2, 1);
        File file2 = new File(localLogDir, ConverterUtils.toString(newApplicationId2));
        file2.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId2, this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls, logAggregationContext2));
        ContainerId newContainerId2 = BuilderUtils.newContainerId(newApplicationAttemptId, 1L);
        writeContainerLogs(file2, newContainerId2, new String[]{"stdout", "stderr", "syslog"});
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId2, 0));
        LogAggregationContext logAggregationContext3 = (LogAggregationContext) Records.newRecord(LogAggregationContext.class);
        logAggregationContext3.setIncludePattern(".*.log");
        logAggregationContext3.setExcludePattern("sys.log|std.log");
        ApplicationAttemptId newApplicationAttemptId2 = BuilderUtils.newApplicationAttemptId(newApplicationId3, 1);
        File file3 = new File(localLogDir, ConverterUtils.toString(newApplicationId3));
        file3.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId3, this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls, logAggregationContext3));
        ContainerId newContainerId3 = BuilderUtils.newContainerId(newApplicationAttemptId2, 1L);
        writeContainerLogs(file3, newContainerId3, new String[]{"stdout", "sys.log", "std.log", "out.log", "err.log", "log"});
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId3, 0));
        LogAggregationContext logAggregationContext4 = (LogAggregationContext) Records.newRecord(LogAggregationContext.class);
        logAggregationContext4.setIncludePattern("sys.log|std.log");
        logAggregationContext4.setExcludePattern("std.log");
        ApplicationAttemptId newApplicationAttemptId3 = BuilderUtils.newApplicationAttemptId(newApplicationId4, 1);
        File file4 = new File(localLogDir, ConverterUtils.toString(newApplicationId4));
        file4.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId4, this.user, null, ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY, this.acls, logAggregationContext4));
        ContainerId newContainerId4 = BuilderUtils.newContainerId(newApplicationAttemptId3, 1L);
        writeContainerLogs(file4, newContainerId4, new String[]{"stdout", "sys.log", "std.log", "out.log", "err.log", "log"});
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId4, 0));
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationId2, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationId3, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED), new ApplicationEvent(newApplicationId4, ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)}, false, "getType", "getApplicationID");
        Mockito.reset(new EventHandler[]{this.appEventHandler});
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId2));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId3));
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId4));
        logAggregationService.stop();
        Assert.assertEquals(0L, logAggregationService.getNumAggregators());
        verifyContainerLogs(logAggregationService, newApplicationId, new ContainerId[]{newContainerId}, new String[]{"stdout", "syslog"}, 2, false);
        verifyContainerLogs(logAggregationService, newApplicationId2, new ContainerId[]{newContainerId2}, new String[]{"stderr"}, 1, false);
        verifyContainerLogs(logAggregationService, newApplicationId3, new ContainerId[]{newContainerId3}, new String[]{"out.log", "err.log"}, 2, false);
        verifyContainerLogs(logAggregationService, newApplicationId4, new ContainerId[]{newContainerId4}, new String[]{"sys.log"}, 1, false);
        this.dispatcher.await();
        checkEvents(this.appEventHandler, new ApplicationEvent[]{new ApplicationEvent(newApplicationId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), new ApplicationEvent(newApplicationId2, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), new ApplicationEvent(newApplicationId3, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED), new ApplicationEvent(newApplicationId4, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)}, false, "getType", "getApplicationID");
    }

    @Test(timeout = 50000)
    public void testLogAggregationServiceWithInterval() throws Exception {
        testLogAggregationService(false);
    }

    @Test(timeout = 50000)
    public void testLogAggregationServiceWithRetention() throws Exception {
        testLogAggregationService(true);
    }

    private void testLogAggregationService(boolean z) throws Exception {
        LogAggregationContext logAggregationContext = (LogAggregationContext) Records.newRecord(LogAggregationContext.class);
        logAggregationContext.setRolledLogsIncludePattern(".*");
        logAggregationContext.setRolledLogsExcludePattern("std_final");
        this.conf.set("yarn.nodemanager.log-dirs", localLogDir.getAbsolutePath());
        this.conf.set("yarn.nodemanager.remote-app-log-dir", this.remoteRootLogDir.getAbsolutePath());
        this.conf.setLong("yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds", 3600L);
        if (z) {
            this.conf.setInt("yarn.nodemanager.log-aggregation.num-log-files-per-app", 1);
        }
        this.conf.setLong("yarn.nodemanager.delete.debug-delay-sec", 3600L);
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(123456L, 1);
        ContainerId newContainerId = BuilderUtils.newContainerId(BuilderUtils.newApplicationAttemptId(newApplicationId, 1), 1L);
        Context context = (Context) Mockito.spy(this.context);
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Application application = (Application) Mockito.mock(Application.class);
        HashMap hashMap = new HashMap();
        hashMap.put(newContainerId, Mockito.mock(Container.class));
        concurrentHashMap.put(newApplicationId, application);
        Mockito.when(application.getContainers()).thenReturn(hashMap);
        Mockito.when(context.getApplications()).thenReturn(concurrentHashMap);
        LogAggregationService logAggregationService = new LogAggregationService(this.dispatcher, context, this.delSrvc, this.dirsHandler);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        File file = new File(localLogDir, ConverterUtils.toString(newApplicationId));
        file.mkdir();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, logAggregationContext));
        String[] strArr = {"stdout", "stderr", "syslog"};
        writeContainerLogs(file, newContainerId, new String[]{"stdout", "stderr", "syslog", "std_final"});
        AppLogAggregatorImpl appLogAggregatorImpl = (AppLogAggregatorImpl) logAggregationService.getAppLogAggregators().get(newApplicationId);
        appLogAggregatorImpl.doLogAggregationOutOfBand();
        if (z) {
            Assert.assertTrue(waitAndCheckLogNum(logAggregationService, newApplicationId, 50, 1, true, null));
        } else {
            Assert.assertTrue(waitAndCheckLogNum(logAggregationService, newApplicationId, 50, 1, false, null));
        }
        LogFileStatusInLastCycle verifyContainerLogs = verifyContainerLogs(logAggregationService, newApplicationId, new ContainerId[]{newContainerId}, strArr, 3, true);
        for (String str : strArr) {
            Assert.assertTrue(verifyContainerLogs.getLogFileTypesInLastCycle().contains(str));
        }
        Assert.assertFalse(verifyContainerLogs.getLogFileTypesInLastCycle().contains("std_final"));
        Thread.sleep(2000L);
        appLogAggregatorImpl.doLogAggregationOutOfBand();
        Assert.assertEquals(numOfLogsAvailable(logAggregationService, newApplicationId, true, null), 1L);
        Thread.sleep(2000L);
        String[] strArr2 = {"stdout_1", "stderr_1", "syslog_1"};
        writeContainerLogs(file, newContainerId, strArr2);
        appLogAggregatorImpl.doLogAggregationOutOfBand();
        if (z) {
            Assert.assertTrue(waitAndCheckLogNum(logAggregationService, newApplicationId, 50, 1, true, verifyContainerLogs.getLogFilePathInLastCycle()));
        } else {
            Assert.assertTrue(waitAndCheckLogNum(logAggregationService, newApplicationId, 50, 2, false, null));
        }
        LogFileStatusInLastCycle verifyContainerLogs2 = verifyContainerLogs(logAggregationService, newApplicationId, new ContainerId[]{newContainerId}, strArr2, 3, true);
        for (String str2 : strArr2) {
            Assert.assertTrue(verifyContainerLogs2.getLogFileTypesInLastCycle().contains(str2));
        }
        Assert.assertFalse(verifyContainerLogs2.getLogFileTypesInLastCycle().contains("std_final"));
        Thread.sleep(2000L);
        writeContainerLogs(file, newContainerId, new String[]{"stdout_2", "stderr_2", "syslog_2"});
        logAggregationService.handle(new LogHandlerContainerFinishedEvent(newContainerId, 0));
        this.dispatcher.await();
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        if (z) {
            Assert.assertTrue(waitAndCheckLogNum(logAggregationService, newApplicationId, 50, 1, true, verifyContainerLogs2.getLogFilePathInLastCycle()));
        } else {
            Assert.assertTrue(waitAndCheckLogNum(logAggregationService, newApplicationId, 50, 3, false, null));
        }
        verifyContainerLogs(logAggregationService, newApplicationId, new ContainerId[]{newContainerId}, new String[]{"stdout_2", "stderr_2", "syslog_2", "std_final"}, 4, true);
        logAggregationService.stop();
        Assert.assertEquals(0L, logAggregationService.getNumAggregators());
    }

    @Test(timeout = 20000)
    public void testAddNewTokenSentFromRMForLogAggregation() throws Exception {
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        yarnConfiguration.set("hadoop.security.authentication", "kerberos");
        UserGroupInformation.setConfiguration(yarnConfiguration);
        ApplicationId newApplicationId = BuilderUtils.newApplicationId(1234L, 1);
        Application application = (Application) Mockito.mock(Application.class);
        Mockito.when(application.getContainers()).thenReturn(new HashMap());
        this.context.getApplications().put(newApplicationId, application);
        LogAggregationService logAggregationService = new LogAggregationService(this.dispatcher, this.context, this.delSrvc, this.dirsHandler);
        logAggregationService.init(this.conf);
        logAggregationService.start();
        logAggregationService.handle(new LogHandlerAppStartedEvent(newApplicationId, this.user, null, ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls, (LogAggregationContext) Records.newRecord(LogAggregationContext.class)));
        Text text = new Text("user1");
        RMDelegationTokenIdentifier rMDelegationTokenIdentifier = new RMDelegationTokenIdentifier(text, new Text("renewer1"), text);
        final Token token = new Token(rMDelegationTokenIdentifier.getBytes(), "password1".getBytes(), rMDelegationTokenIdentifier.getKind(), new Text("service1"));
        Credentials credentials = new Credentials();
        credentials.addToken(text, token);
        this.context.getSystemCredentialsForApps().put(newApplicationId, credentials);
        logAggregationService.handle(new LogHandlerAppFinishedEvent(newApplicationId));
        final UserGroupInformation ugi = ((AppLogAggregatorImpl) logAggregationService.getAppLogAggregators().get(newApplicationId)).getUgi();
        GenericTestUtils.waitFor(new Supplier<Boolean>() { // from class: org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.TestLogAggregationService.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Boolean m372get() {
                boolean z = false;
                Iterator it = ugi.getCredentials().getAllTokens().iterator();
                while (it.hasNext()) {
                    if (((Token) it.next()).equals(token)) {
                        z = true;
                    }
                }
                return Boolean.valueOf(z);
            }
        }, 1000, 20000);
        logAggregationService.stop();
    }

    private int numOfLogsAvailable(LogAggregationService logAggregationService, ApplicationId applicationId, boolean z, String str) throws IOException {
        Path remoteAppLogDir = logAggregationService.getRemoteAppLogDir(applicationId, this.user);
        try {
            RemoteIterator listStatus = FileContext.getFileContext(FileContext.getFileContext(this.conf).makeQualified(remoteAppLogDir).toUri(), this.conf).listStatus(remoteAppLogDir);
            int i = 0;
            while (listStatus.hasNext()) {
                String name = ((FileStatus) listStatus.next()).getPath().getName();
                if (name.contains(".tmp")) {
                    return -1;
                }
                if (str != null && name.contains(str) && z) {
                    return -1;
                }
                if (name.contains(LogAggregationUtils.getNodeString(logAggregationService.getNodeId()))) {
                    i++;
                }
            }
            return i;
        } catch (FileNotFoundException e) {
            return -1;
        }
    }

    private boolean waitAndCheckLogNum(LogAggregationService logAggregationService, ApplicationId applicationId, int i, int i2, boolean z, String str) throws IOException, InterruptedException {
        for (int i3 = 0; numOfLogsAvailable(logAggregationService, applicationId, z, str) != i2 && i3 <= i; i3++) {
            Thread.sleep(500L);
        }
        return numOfLogsAvailable(logAggregationService, applicationId, z, str) == i2;
    }

    static {
        LOG = LogFactory.getLog(TestLogAggregationService.class);
        recordFactory = RecordFactoryProvider.getRecordFactory((Configuration) null);
    }
}
