package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import java.io.IOException;
import java.util.LinkedList;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FileChunk;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.class */
public class TestMergeManager {
    private static final Log LOG = LogFactory.getLog(TestMergeManager.class);
    private static Configuration defaultConf = new TezConfiguration();
    private static FileSystem localFs;
    private static Path workDir;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager$SrcFileInfo.class */
    public class SrcFileInfo {
        private Path path;
        private TezIndexRecord[] indexedRecords;

        private SrcFileInfo() {
        }
    }

    @Before
    @After
    public void cleanup() throws IOException {
        localFs.delete(workDir, true);
    }

    @Test(timeout = 10000)
    public void testLocalDiskMergeMultipleTasks() throws IOException {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setBoolean("tez.runtime.compress", false);
        tezConfiguration.set("tez.runtime.key.class", IntWritable.class.getName());
        tezConfiguration.set("tez.runtime.value.class", IntWritable.class.getName());
        Path path = new Path(workDir, "local");
        Path path2 = new Path(workDir, "srcData");
        localFs.mkdirs(path);
        localFs.mkdirs(path2);
        tezConfiguration.setStrings("tez.runtime.framework.local.dirs", new String[]{path.toString()});
        LocalFileSystem local = FileSystem.getLocal(tezConfiguration);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext createMockInputContext = createMockInputContext(UUID.randomUUID().toString());
        InputContext createMockInputContext2 = createMockInputContext(UUID.randomUUID().toString());
        ExceptionReporter exceptionReporter = (ExceptionReporter) Mockito.mock(ExceptionReporter.class);
        ExceptionReporter exceptionReporter2 = (ExceptionReporter) Mockito.mock(ExceptionReporter.class);
        MergeManager mergeManager = (MergeManager) Mockito.spy(new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter, 2000000L, (CompressionCodec) null, false, -1));
        MergeManager mergeManager2 = (MergeManager) Mockito.spy(new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext2, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter2, 2000000L, (CompressionCodec) null, false, -1));
        SrcFileInfo createFile = createFile(tezConfiguration, local, new Path(path2, "attemptsrc1.out"), 2, 3, 0);
        SrcFileInfo createFile2 = createFile(tezConfiguration, local, new Path(path2, "attemptsrc2.out"), 2, 3, 6);
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, createFile.path.getName());
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(1, 0, createFile2.path.getName());
        InputAttemptIdentifier inputAttemptIdentifier3 = new InputAttemptIdentifier(0, 0, createFile.path.getName());
        InputAttemptIdentifier inputAttemptIdentifier4 = new InputAttemptIdentifier(1, 0, createFile2.path.getName());
        MapOutput mapOutputForDirectDiskFetch = getMapOutputForDirectDiskFetch(inputAttemptIdentifier, createFile.path, createFile.indexedRecords[0], mergeManager);
        MapOutput mapOutputForDirectDiskFetch2 = getMapOutputForDirectDiskFetch(inputAttemptIdentifier2, createFile2.path, createFile2.indexedRecords[0], mergeManager);
        MapOutput mapOutputForDirectDiskFetch3 = getMapOutputForDirectDiskFetch(inputAttemptIdentifier3, createFile.path, createFile.indexedRecords[1], mergeManager2);
        MapOutput mapOutputForDirectDiskFetch4 = getMapOutputForDirectDiskFetch(inputAttemptIdentifier4, createFile2.path, createFile2.indexedRecords[1], mergeManager2);
        mapOutputForDirectDiskFetch.commit();
        mapOutputForDirectDiskFetch2.commit();
        ((MergeManager) Mockito.verify(mergeManager)).closeOnDiskFile(mapOutputForDirectDiskFetch.getOutputPath());
        ((MergeManager) Mockito.verify(mergeManager)).closeOnDiskFile(mapOutputForDirectDiskFetch2.getOutputPath());
        LinkedList linkedList = new LinkedList();
        linkedList.addAll(mergeManager.onDiskMapOutputs);
        mergeManager.onDiskMapOutputs.clear();
        mergeManager.onDiskMerger.merge(linkedList);
        Assert.assertEquals(1L, mergeManager.onDiskMapOutputs.size());
        mapOutputForDirectDiskFetch3.commit();
        mapOutputForDirectDiskFetch4.commit();
        ((MergeManager) Mockito.verify(mergeManager2)).closeOnDiskFile(mapOutputForDirectDiskFetch3.getOutputPath());
        ((MergeManager) Mockito.verify(mergeManager2)).closeOnDiskFile(mapOutputForDirectDiskFetch4.getOutputPath());
        LinkedList linkedList2 = new LinkedList();
        linkedList2.addAll(mergeManager2.onDiskMapOutputs);
        mergeManager2.onDiskMapOutputs.clear();
        mergeManager2.onDiskMerger.merge(linkedList2);
        Assert.assertEquals(1L, mergeManager2.onDiskMapOutputs.size());
        Assert.assertNotEquals(((FileChunk) mergeManager.onDiskMapOutputs.iterator().next()).getPath(), ((FileChunk) mergeManager2.onDiskMapOutputs.iterator().next()).getPath());
        Assert.assertTrue(((FileChunk) mergeManager.onDiskMapOutputs.iterator().next()).getPath().toString().contains(createMockInputContext.getUniqueIdentifier()));
        Assert.assertTrue(((FileChunk) mergeManager2.onDiskMapOutputs.iterator().next()).getPath().toString().contains(createMockInputContext2.getUniqueIdentifier()));
    }

    private InputContext createMockInputContext(String str) {
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ((InputContext) Mockito.doReturn(new TezCounters()).when(inputContext)).getCounters();
        ((InputContext) Mockito.doReturn(209715200L).when(inputContext)).getTotalMemoryAvailableToTask();
        ((InputContext) Mockito.doReturn("srcVertexName").when(inputContext)).getSourceVertexName();
        ((InputContext) Mockito.doReturn(str).when(inputContext)).getUniqueIdentifier();
        return inputContext;
    }

    private SrcFileInfo createFile(Configuration configuration, FileSystem fileSystem, Path path, int i, int i2, int i3) throws IOException {
        FSDataOutputStream create = fileSystem.create(path);
        int i4 = i3;
        SrcFileInfo srcFileInfo = new SrcFileInfo();
        srcFileInfo.indexedRecords = new TezIndexRecord[i];
        srcFileInfo.path = path;
        for (int i5 = 0; i5 < i; i5++) {
            long pos = create.getPos();
            IFile.Writer writer = new IFile.Writer(configuration, create, IntWritable.class, IntWritable.class, (CompressionCodec) null, (TezCounter) null, (TezCounter) null);
            for (int i6 = 0; i6 < i2; i6++) {
                writer.append(new IntWritable(i4), new IntWritable(i4));
                i4++;
            }
            writer.close();
            srcFileInfo.indexedRecords[i5] = new TezIndexRecord(pos, writer.getRawLength(), writer.getCompressedLength());
        }
        create.close();
        return srcFileInfo;
    }

    private static MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier inputAttemptIdentifier, Path path, TezIndexRecord tezIndexRecord, MergeManager mergeManager) throws IOException {
        return MapOutput.createLocalDiskMapOutput(inputAttemptIdentifier, mergeManager, path, tezIndexRecord.getStartOffset(), tezIndexRecord.getPartLength(), true);
    }

    static {
        localFs = null;
        workDir = null;
        try {
            defaultConf.set("fs.defaultFS", "file:///");
            localFs = FileSystem.getLocal(defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), TestMergeManager.class.getSimpleName());
            workDir = localFs.makeQualified(workDir);
            localFs.mkdirs(workDir);
            LOG.info("Using workDir: " + workDir);
        } catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}
