package org.apache.tez.runtime.library.common.writers;

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.utils.DATA_RANGE_IN_MB;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.class */
public class TestUnorderedPartitionedKVWriter {
    private static final String HOST_STRING = "localhost";
    private static final int SHUFFLE_PORT = 4000;
    private static FileSystem localFs;
    private boolean shouldCompress;
    private TezRuntimeConfiguration.ReportPartitionStats reportPartitionStats;
    private Configuration defaultConf = new Configuration();
    private static final Logger LOG = LoggerFactory.getLogger(TestUnorderedPartitionedKVWriter.class);
    private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
    private static final Path TEST_ROOT_DIR = new Path(testTmpDir, TestUnorderedPartitionedKVWriter.class.getSimpleName());

    /* loaded from: input_file:org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter$PartitionerForTest.class */
    public static class PartitionerForTest implements Partitioner {
        public int getPartition(Object obj, Object obj2, int i) {
            if (obj instanceof IntWritable) {
                return ((IntWritable) obj).get() % i;
            }
            throw new UnsupportedOperationException("Test partitioner expected to be called with IntWritable only");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter$UnorderedPartitionedKVWriterForTest.class */
    public static class UnorderedPartitionedKVWriterForTest extends UnorderedPartitionedKVWriter {
        public UnorderedPartitionedKVWriterForTest(OutputContext outputContext, Configuration configuration, int i, long j) throws IOException {
            super(outputContext, configuration, i, j);
        }

        String getHost() {
            return "localhost";
        }
    }

    public TestUnorderedPartitionedKVWriter(boolean z, TezRuntimeConfiguration.ReportPartitionStats reportPartitionStats) {
        this.shouldCompress = z;
        this.reportPartitionStats = reportPartitionStats;
    }

    @Parameterized.Parameters(name = "test[{0}, {1}]")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{false, TezRuntimeConfiguration.ReportPartitionStats.DISABLED}, new Object[]{false, TezRuntimeConfiguration.ReportPartitionStats.ENABLED}, new Object[]{false, TezRuntimeConfiguration.ReportPartitionStats.NONE}, new Object[]{false, TezRuntimeConfiguration.ReportPartitionStats.MEMORY_OPTIMIZED}, new Object[]{false, TezRuntimeConfiguration.ReportPartitionStats.PRECISE}, new Object[]{true, TezRuntimeConfiguration.ReportPartitionStats.DISABLED}, new Object[]{true, TezRuntimeConfiguration.ReportPartitionStats.ENABLED}, new Object[]{true, TezRuntimeConfiguration.ReportPartitionStats.NONE}, new Object[]{true, TezRuntimeConfiguration.ReportPartitionStats.MEMORY_OPTIMIZED}, new Object[]{true, TezRuntimeConfiguration.ReportPartitionStats.PRECISE});
    }

    @Before
    public void setup() throws IOException {
        LOG.info("Setup. Using test dir: " + TEST_ROOT_DIR);
        localFs = FileSystem.getLocal(new Configuration());
        localFs.delete(TEST_ROOT_DIR, true);
        localFs.mkdirs(TEST_ROOT_DIR);
    }

    @After
    public void cleanup() throws IOException {
        LOG.info("CleanUp");
        localFs.delete(TEST_ROOT_DIR, true);
    }

    @Test(timeout = 10000)
    public void testBufferSizing() throws IOException {
        OutputContext createMockOutputContext = createMockOutputContext(new TezCounters(), ApplicationId.newInstance(10000000L, 1), UUID.randomUUID().toString(), this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        Configuration createConfiguration = createConfiguration(createMockOutputContext, IntWritable.class, LongWritable.class, false, 2047);
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, 10, 2048L);
        Assert.assertEquals(2L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).numBuffers);
        Assert.assertEquals(1024L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).sizePerBuffer);
        Assert.assertEquals(1024L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).lastBufferSize);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).numInitializedBuffers);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).spillLimit);
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest2 = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, 10, 6141L);
        Assert.assertEquals(3L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest2).numBuffers);
        Assert.assertEquals(1980L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest2).sizePerBuffer);
        Assert.assertEquals(1980L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest2).lastBufferSize);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest2).numInitializedBuffers);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest2).spillLimit);
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest3 = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, 10, 5117L);
        Assert.assertEquals(2L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest3).numBuffers);
        Assert.assertEquals(1980L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest3).sizePerBuffer);
        Assert.assertEquals(1980L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest3).lastBufferSize);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest3).numInitializedBuffers);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest3).spillLimit);
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest4 = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, 10, 5118L);
        Assert.assertEquals(3L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest4).numBuffers);
        Assert.assertEquals(1980L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest4).sizePerBuffer);
        Assert.assertEquals(1024L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest4).lastBufferSize);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest4).numInitializedBuffers);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest4).spillLimit);
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest5 = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, 10, 8189L);
        Assert.assertEquals(4L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest5).numBuffers);
        Assert.assertEquals(1980L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest5).sizePerBuffer);
        Assert.assertEquals(1980L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest5).lastBufferSize);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest5).numInitializedBuffers);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest5).spillLimit);
        createConfiguration.setInt("tez.runtime.unordered-partitioned-kvwriter.buffer-merge-percent", 50);
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest6 = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, 10, 8189L);
        Assert.assertEquals(4L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest6).numBuffers);
        Assert.assertEquals(1980L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest6).sizePerBuffer);
        Assert.assertEquals(1980L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest6).lastBufferSize);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest6).numInitializedBuffers);
        Assert.assertEquals(2L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest6).spillLimit);
        createConfiguration.unset("tez.runtime.unordered.output.max-per-buffer.size-bytes");
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest7 = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, 10, 2048L);
        Assert.assertEquals(2L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest7).numBuffers);
        Assert.assertEquals(1024L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest7).sizePerBuffer);
        Assert.assertEquals(1024L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest7).lastBufferSize);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest7).numInitializedBuffers);
        Assert.assertEquals(1L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest7).spillLimit);
    }

    @Test(timeout = 10000)
    public void testNoSpill() throws IOException, InterruptedException {
        baseTest(10, 10, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout = 10000)
    public void testSingleSpill() throws IOException, InterruptedException {
        baseTest(50, 10, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout = 10000)
    public void testMultipleSpills() throws IOException, InterruptedException {
        baseTest(200, 10, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout = 10000)
    public void testMultipleSpillsWithSmallBuffer() throws IOException, InterruptedException {
        baseTest(200, 10, null, this.shouldCompress, 512, 0, 9600);
    }

    @Test(timeout = 10000)
    public void testMergeBuffersAndSpill() throws IOException, InterruptedException {
        baseTest(200, 10, null, this.shouldCompress, 2048, 10);
    }

    @Test(timeout = 10000)
    public void testNoRecords() throws IOException, InterruptedException {
        baseTest(0, 10, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout = 10000)
    public void testNoRecords_SinglePartition() throws IOException, InterruptedException {
        baseTest(0, 1, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout = 10000)
    public void testSkippedPartitions() throws IOException, InterruptedException {
        baseTest(200, 10, Sets.newHashSet(new Integer[]{2, 5}), this.shouldCompress, -1, 0);
    }

    @Test(timeout = 10000)
    public void testNoSpill_SinglePartition() throws IOException, InterruptedException {
        baseTest(10, 1, null, this.shouldCompress, -1, 0);
    }

    @Test(timeout = 10000)
    public void testRandomText() throws IOException, InterruptedException {
        textTest(100, 10, 2048L, 0, 0, 0, false, true);
    }

    @Test(timeout = 10000)
    public void testLargeKeys() throws IOException, InterruptedException {
        textTest(0, 10, 2048L, 10, 0, 0, false, true);
    }

    @Test(timeout = 10000)
    public void testLargevalues() throws IOException, InterruptedException {
        textTest(0, 10, 2048L, 0, 10, 0, false, true);
    }

    @Test(timeout = 10000)
    public void testLargeKvPairs() throws IOException, InterruptedException {
        textTest(0, 10, 2048L, 0, 0, 10, false, true);
    }

    @Test(timeout = 10000)
    public void testTextMixedRecords() throws IOException, InterruptedException {
        textTest(100, 10, 2048L, 10, 10, 10, false, true);
    }

    @Test(timeout = 10000000)
    public void testRandomTextWithoutFinalMerge() throws IOException, InterruptedException {
        textTest(100, 10, 2048L, 0, 0, 0, false, false);
    }

    @Test(timeout = 10000)
    public void testLargeKeysWithoutFinalMerge() throws IOException, InterruptedException {
        textTest(0, 10, 2048L, 10, 0, 0, false, false);
    }

    @Test(timeout = 10000)
    public void testLargevaluesWithoutFinalMerge() throws IOException, InterruptedException {
        textTest(0, 10, 2048L, 0, 10, 0, false, false);
    }

    @Test(timeout = 10000)
    public void testLargeKvPairsWithoutFinalMerge() throws IOException, InterruptedException {
        textTest(0, 10, 2048L, 0, 0, 10, false, false);
    }

    @Test(timeout = 10000)
    public void testTextMixedRecordsWithoutFinalMerge() throws IOException, InterruptedException {
        textTest(100, 10, 2048L, 10, 10, 10, false, false);
    }

    public void textTest(int i, int i2, long j, int i3, int i4, int i5, boolean z, boolean z2) throws IOException, InterruptedException {
        BitSet bitSet;
        HashPartitioner hashPartitioner = new HashPartitioner();
        ApplicationId newInstance = ApplicationId.newInstance(10000000L, 1);
        TezCounters tezCounters = new TezCounters();
        String uuid = UUID.randomUUID().toString();
        OutputContext createMockOutputContext = createMockOutputContext(tezCounters, newInstance, uuid, this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        Random random = new Random();
        Configuration createConfiguration = createConfiguration(createMockOutputContext, Text.class, Text.class, this.shouldCompress, -1, HashPartitioner.class);
        createConfiguration.setBoolean("tez.runtime.pipelined-shuffle.enabled", z);
        createConfiguration.setBoolean("tez.runtime.enable.final-merge.in.output", z2);
        Configurable configurable = null;
        if (this.shouldCompress) {
            configurable = new DefaultCodec();
            configurable.setConf(createConfiguration);
        }
        int i6 = 0;
        HashMap hashMap = new HashMap();
        for (int i7 = 0; i7 < i2; i7++) {
            hashMap.put(Integer.valueOf(i7), LinkedListMultimap.create());
        }
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, i2, j);
        int i8 = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).sizePerBuffer;
        BitSet bitSet2 = new BitSet(i2);
        Text text = new Text();
        Text text2 = new Text();
        for (int i9 = 0; i9 < i; i9++) {
            String createRandomString = createRandomString(Math.abs(random.nextInt(10)));
            String createRandomString2 = createRandomString(Math.abs(random.nextInt(20)));
            text.set(createRandomString);
            text2.set(createRandomString2);
            int partition = hashPartitioner.getPartition(text, text2, i2);
            bitSet2.set(partition);
            ((Multimap) hashMap.get(Integer.valueOf(partition))).put(createRandomString, createRandomString2);
            unorderedPartitionedKVWriterForTest.write(text, text2);
            i6++;
        }
        for (int i10 = 0; i10 < i3; i10++) {
            String createRandomString3 = createRandomString(i8 + Math.abs(random.nextInt(100)));
            String createRandomString4 = createRandomString(Math.abs(random.nextInt(20)));
            text.set(createRandomString3);
            text2.set(createRandomString4);
            int partition2 = hashPartitioner.getPartition(text, text2, i2);
            bitSet2.set(partition2);
            ((Multimap) hashMap.get(Integer.valueOf(partition2))).put(createRandomString3, createRandomString4);
            unorderedPartitionedKVWriterForTest.write(text, text2);
            i6++;
        }
        if (z) {
            ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.times(i3))).sendEvents(Matchers.anyListOf(Event.class));
        }
        for (int i11 = 0; i11 < i4; i11++) {
            String createRandomString5 = createRandomString(Math.abs(random.nextInt(10)));
            String createRandomString6 = createRandomString(i8 + Math.abs(random.nextInt(100)));
            text.set(createRandomString5);
            text2.set(createRandomString6);
            int partition3 = hashPartitioner.getPartition(text, text2, i2);
            bitSet2.set(partition3);
            ((Multimap) hashMap.get(Integer.valueOf(partition3))).put(createRandomString5, createRandomString6);
            unorderedPartitionedKVWriterForTest.write(text, text2);
            i6++;
        }
        if (z) {
            ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.times(i4 + i3))).sendEvents(Matchers.anyListOf(Event.class));
        }
        for (int i12 = 0; i12 < i5; i12++) {
            String createRandomString7 = createRandomString((i8 / 2) + Math.abs(random.nextInt(100)));
            String createRandomString8 = createRandomString((i8 / 2) + Math.abs(random.nextInt(100)));
            text.set(createRandomString7);
            text2.set(createRandomString8);
            int partition4 = hashPartitioner.getPartition(text, text2, i2);
            bitSet2.set(partition4);
            ((Multimap) hashMap.get(Integer.valueOf(partition4))).put(createRandomString7, createRandomString8);
            unorderedPartitionedKVWriterForTest.write(text, text2);
            i6++;
        }
        if (z) {
            ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.times(i4 + i3 + i5))).sendEvents(Matchers.anyListOf(Event.class));
        }
        List<Event> close = unorderedPartitionedKVWriterForTest.close();
        ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.never())).reportFailure((TaskFailureType) Matchers.any(TaskFailureType.class), (Throwable) Matchers.any(Throwable.class), (String) Matchers.any(String.class));
        if (!z) {
            VertexManagerEvent vertexManagerEvent = null;
            for (Event event : close) {
                if (event instanceof VertexManagerEvent) {
                    Assert.assertNull(vertexManagerEvent);
                    vertexManagerEvent = (VertexManagerEvent) event;
                }
            }
            Assert.assertEquals(i6, ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vertexManagerEvent.getUserPayload().asReadOnlyBuffer())).getNumRecord());
        }
        Assert.assertEquals(i3 + i4 + i5, tezCounters.findCounter(TaskCounter.OUTPUT_LARGE_RECORDS).getValue());
        if (z || !z2) {
            for (int i13 = 0; i13 < ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).numSpills.get(); i13++) {
                Assert.assertTrue(localFs.exists(((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).outputFileHandler.getSpillFileForWrite(i13, 0L)));
                Assert.assertTrue(localFs.exists(((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).outputFileHandler.getSpillIndexFileForWrite(i13, 0L)));
            }
            return;
        }
        Assert.assertEquals(2L, close.size());
        Assert.assertTrue(close.get(0) instanceof VertexManagerEvent);
        verifyPartitionStats((VertexManagerEvent) close.get(0), bitSet2);
        Assert.assertTrue(close.get(1) instanceof CompositeDataMovementEvent);
        CompositeDataMovementEvent compositeDataMovementEvent = (CompositeDataMovementEvent) close.get(1);
        Assert.assertEquals(0L, compositeDataMovementEvent.getSourceIndexStart());
        Assert.assertEquals(i2, compositeDataMovementEvent.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent.getUserPayload()));
        if (bitSet2.cardinality() != i2) {
            Assert.assertTrue(parseFrom.hasEmptyPartitions());
            bitSet = TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(parseFrom.getEmptyPartitions()));
            Assert.assertEquals(i2 - bitSet2.cardinality(), bitSet.cardinality());
        } else {
            Assert.assertFalse(parseFrom.hasEmptyPartitions());
            bitSet = new BitSet(i2);
        }
        Assert.assertEquals("localhost", parseFrom.getHost());
        Assert.assertEquals(4000L, parseFrom.getPort());
        Assert.assertEquals(uuid, parseFrom.getPathComponent());
        new TezTaskOutputFiles(createConfiguration, uuid, 1);
        Path path = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).finalOutPath;
        Path path2 = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).finalIndexPath;
        if (i6 > 0) {
            Assert.assertTrue(localFs.exists(path));
            Assert.assertTrue(localFs.exists(path2));
            TezSpillRecord tezSpillRecord = new TezSpillRecord(path2, createConfiguration);
            DataInputBuffer dataInputBuffer = new DataInputBuffer();
            DataInputBuffer dataInputBuffer2 = new DataInputBuffer();
            Text text3 = new Text();
            Text text4 = new Text();
            for (int i14 = 0; i14 < i2; i14++) {
                if (!bitSet.get(i14)) {
                    TezIndexRecord index = tezSpillRecord.getIndex(i14);
                    FSDataInputStream open = FileSystem.getLocal(createConfiguration).open(path);
                    open.seek(index.getStartOffset());
                    IFile.Reader reader = new IFile.Reader(open, index.getPartLength(), configurable, (TezCounter) null, (TezCounter) null, false, 0, -1);
                    while (reader.nextRawKey(dataInputBuffer)) {
                        reader.nextRawValue(dataInputBuffer2);
                        text3.readFields(dataInputBuffer);
                        text4.readFields(dataInputBuffer2);
                        Assert.assertTrue(((Multimap) hashMap.get(Integer.valueOf(hashPartitioner.getPartition(text3, text4, i2)))).remove(text3.toString(), text4.toString()));
                    }
                    open.close();
                }
            }
            for (int i15 = 0; i15 < i2; i15++) {
                Assert.assertEquals(0L, ((Multimap) hashMap.get(Integer.valueOf(i15))).size());
                hashMap.remove(Integer.valueOf(i15));
            }
            Assert.assertEquals(0L, hashMap.size());
        }
    }

    private int[] getPartitionStats(VertexManagerEvent vertexManagerEvent) throws IOException {
        RoaringBitmap roaringBitmap = new RoaringBitmap();
        ShuffleUserPayloads.VertexManagerEventPayloadProto parseFrom = ShuffleUserPayloads.VertexManagerEventPayloadProto.parseFrom(ByteString.copyFrom(vertexManagerEvent.getUserPayload()));
        if (!this.reportPartitionStats.isEnabled()) {
            Assert.assertFalse(parseFrom.hasPartitionStats());
            Assert.assertFalse(parseFrom.hasDetailedPartitionStats());
            return null;
        }
        if (this.reportPartitionStats.isPrecise()) {
            Assert.assertTrue(parseFrom.hasDetailedPartitionStats());
            List sizeInMbList = parseFrom.getDetailedPartitionStats().getSizeInMbList();
            int[] iArr = new int[sizeInMbList.size()];
            for (int i = 0; i < sizeInMbList.size(); i++) {
                int i2 = i;
                iArr[i2] = iArr[i2] + ((Integer) sizeInMbList.get(i)).intValue();
            }
            return iArr;
        }
        Assert.assertTrue(parseFrom.hasPartitionStats());
        roaringBitmap.deserialize(new DataInputStream(new ByteArrayInputStream(TezCommonUtils.decompressByteStringToByteArray(parseFrom.getPartitionStats()))));
        int[] iArr2 = new int[roaringBitmap.getCardinality()];
        Iterator it = roaringBitmap.iterator();
        DATA_RANGE_IN_MB[] values = DATA_RANGE_IN_MB.values();
        int length = values.length;
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            int i3 = intValue / length;
            int i4 = intValue % length;
            if (values[i4].getSizeInMB() > 0) {
                iArr2[i3] = iArr2[i3] + values[i4].getSizeInMB();
            }
        }
        return iArr2;
    }

    private void verifyPartitionStats(VertexManagerEvent vertexManagerEvent, BitSet bitSet) throws IOException {
        int[] partitionStats = getPartitionStats(vertexManagerEvent);
        if (partitionStats == null) {
            return;
        }
        for (int i = 0; i < partitionStats.length; i++) {
            Assert.assertTrue(bitSet.get(i) == (partitionStats[i] > 0));
        }
    }

    @Test(timeout = 10000)
    public void testNoSpill_WithPipelinedShuffle() throws IOException, InterruptedException {
        baseTestWithPipelinedTransfer(10, 10, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testSingleSpill_WithPipelinedShuffle() throws IOException, InterruptedException {
        baseTestWithPipelinedTransfer(50, 10, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testMultipleSpills_WithPipelinedShuffle() throws IOException, InterruptedException {
        baseTestWithPipelinedTransfer(200, 10, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testNoRecords_WithPipelinedShuffle() throws IOException, InterruptedException {
        baseTestWithPipelinedTransfer(0, 10, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testNoRecords_SinglePartition_WithPipelinedShuffle() throws IOException, InterruptedException {
        baseTestWithPipelinedTransfer(0, 1, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testSkippedPartitions_WithPipelinedShuffle() throws IOException, InterruptedException {
        baseTestWithPipelinedTransfer(200, 10, Sets.newHashSet(new Integer[]{2, 5}), this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testLargeKvPairs_WithPipelinedShuffle() throws IOException, InterruptedException {
        textTest(0, 10, 2048L, 10, 20, 50, true, false);
    }

    private void baseTestWithPipelinedTransfer(int i, int i2, Set<Integer> set, boolean z) throws IOException, InterruptedException {
        PartitionerForTest partitionerForTest = new PartitionerForTest();
        ApplicationId newInstance = ApplicationId.newInstance(10000000L, 1);
        TezCounters tezCounters = new TezCounters();
        String uuid = UUID.randomUUID().toString();
        OutputContext createMockOutputContext = createMockOutputContext(tezCounters, newInstance, uuid, this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        Configuration createConfiguration = createConfiguration(createMockOutputContext, IntWritable.class, LongWritable.class, z, -1);
        createConfiguration.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        createConfiguration.setBoolean("tez.runtime.pipelined-shuffle.enabled", true);
        if (z) {
            new DefaultCodec().setConf(createConfiguration);
        }
        int i3 = 0;
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, i2, 2048L);
        int i4 = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).sizePerBuffer;
        int i5 = 12 + 12;
        BitSet bitSet = new BitSet(i2);
        IntWritable intWritable = new IntWritable();
        LongWritable longWritable = new LongWritable();
        for (int i6 = 0; i6 < i; i6++) {
            intWritable.set(i6);
            longWritable.set(i6);
            int partition = partitionerForTest.getPartition(intWritable, longWritable, i2);
            if (set == null || !set.contains(Integer.valueOf(partition))) {
                bitSet.set(partition);
                unorderedPartitionedKVWriterForTest.write(intWritable, longWritable);
                i3++;
            }
        }
        int i7 = i4 / i5;
        int i8 = i3 / i7;
        ArgumentCaptor forClass = ArgumentCaptor.forClass(List.class);
        List close = unorderedPartitionedKVWriterForTest.close();
        if (i2 == 1) {
            Assert.assertEquals(false, Boolean.valueOf(((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).skipBuffers));
        }
        Assert.assertTrue(close.size() == 0);
        ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.atLeast(i8))).sendEvents((List) forClass.capture());
        int size = forClass.getAllValues().size();
        List list = (List) forClass.getAllValues().get(size - 1);
        VertexManagerEvent vertexManagerEvent = (VertexManagerEvent) list.get(0);
        for (int i9 = 0; i9 < size; i9++) {
            List list2 = (List) forClass.getAllValues().get(i9);
            if (i9 < size - 1) {
                Assert.assertTrue(list2.size() == 1);
                Assert.assertTrue(list2.get(0) instanceof CompositeDataMovementEvent);
            } else {
                Assert.assertTrue(list2.size() == 2);
                Assert.assertTrue(list2.get(0) instanceof VertexManagerEvent);
                Assert.assertTrue(list2.get(1) instanceof CompositeDataMovementEvent);
            }
        }
        verifyPartitionStats(vertexManagerEvent, bitSet);
        ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.never())).reportFailure((TaskFailureType) Matchers.any(TaskFailureType.class), (Throwable) Matchers.any(Throwable.class), (String) Matchers.any(String.class));
        Assert.assertNull(((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).currentBuffer);
        Assert.assertEquals(0L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).availableBuffers.size());
        TezCounter findCounter = tezCounters.findCounter(TaskCounter.OUTPUT_BYTES);
        TezCounter findCounter2 = tezCounters.findCounter(TaskCounter.OUTPUT_RECORDS);
        TezCounter findCounter3 = tezCounters.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
        TezCounter findCounter4 = tezCounters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
        TezCounter findCounter5 = tezCounters.findCounter(TaskCounter.SPILLED_RECORDS);
        TezCounter findCounter6 = tezCounters.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        TezCounter findCounter7 = tezCounters.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        TezCounter findCounter8 = tezCounters.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
        Assert.assertEquals(i3 * 12, findCounter.getValue());
        Assert.assertEquals(i3, findCounter2.getValue());
        Assert.assertEquals(i3 * i5, findCounter3.getValue());
        long value = findCounter4.getValue();
        if (i3 > 0) {
            Assert.assertTrue(value > 0);
            if (!z) {
                Assert.assertTrue(value > findCounter.getValue());
            }
        } else {
            Assert.assertEquals(0L, value);
        }
        Assert.assertTrue(((long) (i7 * i8)) >= findCounter5.getValue());
        long value2 = findCounter6.getValue();
        long value3 = findCounter7.getValue();
        Assert.assertEquals(value2, 0L);
        Assert.assertTrue(value2 == value3);
        Assert.assertEquals(findCounter8.getValue(), 0L);
        Assert.assertTrue(list.size() > 0);
        int size2 = list.size() - 1;
        Assert.assertTrue(list.get(size2) instanceof CompositeDataMovementEvent);
        CompositeDataMovementEvent compositeDataMovementEvent = (CompositeDataMovementEvent) list.get(size2);
        Assert.assertEquals(0L, compositeDataMovementEvent.getSourceIndexStart());
        Assert.assertEquals(i2, compositeDataMovementEvent.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent.getUserPayload()));
        Assert.assertTrue(parseFrom.getLastEvent());
        verifyEmptyPartitions(parseFrom, i3, i2, set);
        ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.atLeast(1))).notifyProgress();
        TezTaskOutputFiles tezTaskOutputFiles = new TezTaskOutputFiles(createConfiguration, uuid, 1);
        if (i3 > 0) {
            int i10 = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).numSpills.get();
            for (int i11 = 0; i11 < i10; i11++) {
                Assert.assertTrue(localFs.exists(tezTaskOutputFiles.getSpillFileForWrite(i11, 10L)));
                Assert.assertTrue(localFs.exists(tezTaskOutputFiles.getSpillIndexFileForWrite(i11, 10L)));
            }
        }
    }

    private void verifyEmptyPartitions(ShuffleUserPayloads.DataMovementEventPayloadProto dataMovementEventPayloadProto, int i, int i2, Set<Integer> set) throws IOException {
        if (!dataMovementEventPayloadProto.hasEmptyPartitions()) {
            Assert.assertEquals("localhost", dataMovementEventPayloadProto.getHost());
            Assert.assertEquals(4000L, dataMovementEventPayloadProto.getPort());
            Assert.assertTrue(dataMovementEventPayloadProto.hasPathComponent());
            return;
        }
        BitSet fromByteArray = TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(dataMovementEventPayloadProto.getEmptyPartitions()));
        if (i == 0) {
            Assert.assertEquals(i2, fromByteArray.cardinality());
        } else if (set != null) {
            Iterator<Integer> it = set.iterator();
            while (it.hasNext()) {
                Assert.assertTrue(fromByteArray.get(it.next().intValue()));
            }
            Assert.assertEquals(set.size(), fromByteArray.cardinality());
        }
        if (fromByteArray.cardinality() != i2) {
            Assert.assertEquals("localhost", dataMovementEventPayloadProto.getHost());
            Assert.assertEquals(4000L, dataMovementEventPayloadProto.getPort());
            Assert.assertTrue(dataMovementEventPayloadProto.hasPathComponent());
        } else {
            Assert.assertFalse(dataMovementEventPayloadProto.hasHost());
            Assert.assertFalse(dataMovementEventPayloadProto.hasPort());
            Assert.assertFalse(dataMovementEventPayloadProto.hasPathComponent());
        }
    }

    @Test(timeout = 10000)
    public void testNoSpill_WithFinalMergeDisabled() throws IOException, InterruptedException {
        baseTestWithFinalMergeDisabled(10, 10, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testSingleSpill_WithFinalMergeDisabled() throws IOException, InterruptedException {
        baseTestWithFinalMergeDisabled(50, 10, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testSinglePartition_WithFinalMergeDisabled() throws IOException, InterruptedException {
        baseTestWithFinalMergeDisabled(0, 1, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testMultipleSpills_WithFinalMergeDisabled() throws IOException, InterruptedException {
        baseTestWithFinalMergeDisabled(200, 10, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testNoRecords_WithFinalMergeDisabled() throws IOException, InterruptedException {
        baseTestWithFinalMergeDisabled(0, 10, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testNoRecords_SinglePartition_WithFinalMergeDisabled() throws IOException, InterruptedException {
        baseTestWithFinalMergeDisabled(0, 1, null, this.shouldCompress);
    }

    @Test(timeout = 10000)
    public void testSkippedPartitions_WithFinalMergeDisabled() throws IOException, InterruptedException {
        baseTestWithFinalMergeDisabled(200, 10, Sets.newHashSet(new Integer[]{2, 5}), this.shouldCompress);
    }

    private void baseTestWithFinalMergeDisabled(int i, int i2, Set<Integer> set, boolean z) throws IOException, InterruptedException {
        PartitionerForTest partitionerForTest = new PartitionerForTest();
        ApplicationId newInstance = ApplicationId.newInstance(10000000L, 1);
        TezCounters tezCounters = new TezCounters();
        String uuid = UUID.randomUUID().toString();
        OutputContext createMockOutputContext = createMockOutputContext(tezCounters, newInstance, uuid, this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        Configuration createConfiguration = createConfiguration(createMockOutputContext, IntWritable.class, LongWritable.class, z, -1);
        createConfiguration.setBoolean("tez.runtime.enable.final-merge.in.output", false);
        createConfiguration.setBoolean("tez.runtime.pipelined-shuffle.enabled", false);
        if (z) {
            new DefaultCodec().setConf(createConfiguration);
        }
        int i3 = 0;
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, i2, 2048L);
        int i4 = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).sizePerBuffer;
        int i5 = 12 + 12;
        BitSet bitSet = new BitSet(i2);
        IntWritable intWritable = new IntWritable();
        LongWritable longWritable = new LongWritable();
        for (int i6 = 0; i6 < i; i6++) {
            intWritable.set(i6);
            longWritable.set(i6);
            int partition = partitionerForTest.getPartition(intWritable, longWritable, i2);
            if (set == null || !set.contains(Integer.valueOf(partition))) {
                bitSet.set(partition);
                unorderedPartitionedKVWriterForTest.write(intWritable, longWritable);
                i3++;
            }
        }
        int i7 = i4 / i5;
        int i8 = i3 / i7;
        ArgumentCaptor forClass = ArgumentCaptor.forClass(List.class);
        List<CompositeDataMovementEvent> close = unorderedPartitionedKVWriterForTest.close();
        if (i2 == 1) {
            Assert.assertEquals(true, Boolean.valueOf(((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).skipBuffers));
        }
        Assert.assertEquals(Math.max(1, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).numSpills.get()) + 1, close.size());
        ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.atMost(0))).sendEvents((List) forClass.capture());
        for (int i9 = 0; i9 < close.size(); i9++) {
            Event event = (Event) close.get(i9);
            if ((event instanceof VertexManagerEvent) && i3 > 0) {
                verifyPartitionStats((VertexManagerEvent) event, bitSet);
            }
        }
        ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.never())).reportFailure((TaskFailureType) Matchers.any(TaskFailureType.class), (Throwable) Matchers.any(Throwable.class), (String) Matchers.any(String.class));
        Assert.assertNull(((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).currentBuffer);
        Assert.assertEquals(0L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).availableBuffers.size());
        TezCounter findCounter = tezCounters.findCounter(TaskCounter.OUTPUT_BYTES);
        TezCounter findCounter2 = tezCounters.findCounter(TaskCounter.OUTPUT_RECORDS);
        TezCounter findCounter3 = tezCounters.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
        TezCounter findCounter4 = tezCounters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
        TezCounter findCounter5 = tezCounters.findCounter(TaskCounter.SPILLED_RECORDS);
        TezCounter findCounter6 = tezCounters.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        TezCounter findCounter7 = tezCounters.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        TezCounter findCounter8 = tezCounters.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
        Assert.assertEquals(i3 * 12, findCounter.getValue());
        Assert.assertEquals(i3, findCounter2.getValue());
        if (findCounter2.getValue() > 0) {
            Assert.assertEquals(i3 * i5, findCounter3.getValue());
        } else {
            Assert.assertEquals(0L, findCounter3.getValue());
        }
        long value = findCounter4.getValue();
        if (i3 > 0) {
            Assert.assertTrue(value > 0);
            if (!z) {
                Assert.assertTrue("fileOutputBytes=" + value + ", outputRecordBytes=" + findCounter.getValue(), value > findCounter.getValue());
            }
        } else {
            Assert.assertEquals(0L, value);
        }
        Assert.assertTrue(((long) (i7 * i8)) >= findCounter5.getValue());
        long value2 = findCounter6.getValue();
        long value3 = findCounter7.getValue();
        Assert.assertEquals(value2, 0L);
        Assert.assertTrue(value2 == value3);
        Assert.assertEquals(findCounter8.getValue(), 0L);
        Assert.assertTrue(close.size() > 0);
        int size = close.size() - 1;
        Assert.assertTrue(close.get(size) instanceof CompositeDataMovementEvent);
        CompositeDataMovementEvent compositeDataMovementEvent = (CompositeDataMovementEvent) close.get(size);
        Assert.assertEquals(0L, compositeDataMovementEvent.getSourceIndexStart());
        Assert.assertEquals(i2, compositeDataMovementEvent.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent.getUserPayload()));
        verifyEmptyPartitions(parseFrom, i3, i2, set);
        if (findCounter2.getValue() > 0) {
            Assert.assertTrue(parseFrom.getLastEvent());
        }
        Pattern compile = Pattern.compile("(.*)(_\\d+)");
        for (CompositeDataMovementEvent compositeDataMovementEvent2 : close) {
            if (compositeDataMovementEvent2 instanceof CompositeDataMovementEvent) {
                ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom2 = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent2.getUserPayload()));
                Assert.assertEquals(false, Boolean.valueOf(parseFrom2.getPipelined()));
                if (parseFrom2.hasPathComponent()) {
                    Matcher matcher = compile.matcher(parseFrom2.getPathComponent());
                    Assert.assertTrue("spill id should be present in path component " + parseFrom2.getPathComponent(), matcher.matches());
                    Assert.assertEquals(2L, matcher.groupCount());
                    Assert.assertEquals(uuid, matcher.group(1));
                    Assert.assertTrue("spill id should be present in path component", matcher.group(2) != null);
                } else {
                    Assert.assertEquals(0L, parseFrom2.getSpillId());
                    if (findCounter2.getValue() > 0) {
                        Assert.assertEquals(true, Boolean.valueOf(parseFrom2.getLastEvent()));
                    } else {
                        Assert.assertEquals(i2, TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(parseFrom2.getEmptyPartitions())).cardinality());
                    }
                }
            }
        }
        ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.atLeast(1))).notifyProgress();
        TezTaskOutputFiles tezTaskOutputFiles = new TezTaskOutputFiles(createConfiguration, uuid, 1);
        if (i3 > 0) {
            int i10 = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).numSpills.get();
            for (int i11 = 0; i11 < i10; i11++) {
                Assert.assertTrue(localFs.exists(tezTaskOutputFiles.getSpillFileForWrite(i11, 10L)));
                Assert.assertTrue(localFs.exists(tezTaskOutputFiles.getSpillIndexFileForWrite(i11, 10L)));
            }
        }
    }

    private void baseTest(int i, int i2, Set<Integer> set, boolean z, int i3, int i4) throws IOException, InterruptedException {
        baseTest(i, i2, set, z, i3, i4, 2048);
    }

    private void baseTest(int i, int i2, Set<Integer> set, boolean z, int i3, int i4, int i5) throws IOException, InterruptedException {
        BitSet fromByteArray;
        PartitionerForTest partitionerForTest = new PartitionerForTest();
        ApplicationId newInstance = ApplicationId.newInstance(10000000L, 1);
        TezCounters tezCounters = new TezCounters();
        String uuid = UUID.randomUUID().toString();
        OutputContext createMockOutputContext = createMockOutputContext(tezCounters, newInstance, uuid, this.defaultConf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        Configuration createConfiguration = createConfiguration(createMockOutputContext, IntWritable.class, LongWritable.class, z, i3);
        createConfiguration.setInt("tez.runtime.unordered-partitioned-kvwriter.buffer-merge-percent", i4);
        Configurable configurable = null;
        if (z) {
            configurable = new DefaultCodec();
            configurable.setConf(createConfiguration);
        }
        int i6 = 0;
        HashMap hashMap = new HashMap();
        for (int i7 = 0; i7 < i2; i7++) {
            hashMap.put(Integer.valueOf(i7), LinkedListMultimap.create());
        }
        UnorderedPartitionedKVWriterForTest unorderedPartitionedKVWriterForTest = new UnorderedPartitionedKVWriterForTest(createMockOutputContext, createConfiguration, i2, i5);
        int i8 = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).sizePerBuffer;
        int i9 = 12 + 12;
        IntWritable intWritable = new IntWritable();
        LongWritable longWritable = new LongWritable();
        BitSet bitSet = new BitSet(i2);
        for (int i10 = 0; i10 < i; i10++) {
            intWritable.set(i10);
            longWritable.set(i10);
            int partition = partitionerForTest.getPartition(intWritable, longWritable, i2);
            if (set == null || !set.contains(Integer.valueOf(partition))) {
                bitSet.set(partition);
                ((Multimap) hashMap.get(Integer.valueOf(partition))).put(Integer.valueOf(intWritable.get()), Long.valueOf(longWritable.get()));
                unorderedPartitionedKVWriterForTest.write(intWritable, longWritable);
                i6++;
            }
        }
        List close = unorderedPartitionedKVWriterForTest.close();
        if (i2 == 1) {
            Assert.assertEquals(true, Boolean.valueOf(((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).skipBuffers));
        }
        int i11 = i8 / i9;
        int i12 = (i6 / i11) / ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).spillLimit;
        ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.never())).reportFailure((TaskFailureType) Matchers.any(TaskFailureType.class), (Throwable) Matchers.any(Throwable.class), (String) Matchers.any(String.class));
        Assert.assertNull(((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).currentBuffer);
        Assert.assertEquals(0L, ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).availableBuffers.size());
        TezCounter findCounter = tezCounters.findCounter(TaskCounter.OUTPUT_BYTES);
        TezCounter findCounter2 = tezCounters.findCounter(TaskCounter.OUTPUT_RECORDS);
        TezCounter findCounter3 = tezCounters.findCounter(TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
        TezCounter findCounter4 = tezCounters.findCounter(TaskCounter.OUTPUT_BYTES_PHYSICAL);
        TezCounter findCounter5 = tezCounters.findCounter(TaskCounter.SPILLED_RECORDS);
        TezCounter findCounter6 = tezCounters.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        TezCounter findCounter7 = tezCounters.findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        TezCounter findCounter8 = tezCounters.findCounter(TaskCounter.ADDITIONAL_SPILL_COUNT);
        Assert.assertEquals(i6 * 12, findCounter.getValue());
        if (i2 > 1) {
            Assert.assertEquals(i6 * i9, findCounter3.getValue());
        }
        Assert.assertEquals(i6, findCounter2.getValue());
        long value = findCounter4.getValue();
        if (i6 > 0) {
            Assert.assertTrue(value > 0);
            if (!z) {
                Assert.assertTrue(value > findCounter.getValue());
            }
        } else {
            Assert.assertEquals(0L, value);
        }
        Assert.assertEquals(i11 * i12, findCounter5.getValue());
        long value2 = findCounter6.getValue();
        long value3 = findCounter7.getValue();
        if (i12 == 0) {
            Assert.assertEquals(0L, value2);
            Assert.assertEquals(0L, value3);
        } else {
            Assert.assertTrue(value2 > 0);
            Assert.assertTrue(value3 > 0);
            if (!z) {
                Assert.assertTrue(value2 > ((long) ((i11 * i12) * 12)));
                Assert.assertTrue(value3 > ((long) ((i11 * i12) * 12)));
            }
        }
        Assert.assertEquals(value2, value3);
        Assert.assertTrue(((long) i12) >= findCounter8.getValue());
        Assert.assertEquals(2L, close.size());
        Assert.assertTrue(close.get(0) instanceof VertexManagerEvent);
        verifyPartitionStats((VertexManagerEvent) close.get(0), bitSet);
        Assert.assertTrue(close.get(1) instanceof CompositeDataMovementEvent);
        CompositeDataMovementEvent compositeDataMovementEvent = (CompositeDataMovementEvent) close.get(1);
        Assert.assertEquals(0L, compositeDataMovementEvent.getSourceIndexStart());
        Assert.assertEquals(i2, compositeDataMovementEvent.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent.getUserPayload()));
        if (set != null || i6 <= 0) {
            Assert.assertTrue(parseFrom.hasEmptyPartitions());
            fromByteArray = TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(parseFrom.getEmptyPartitions()));
            if (i6 == 0) {
                Assert.assertEquals(i2, fromByteArray.cardinality());
            } else {
                Iterator<Integer> it = set.iterator();
                while (it.hasNext()) {
                    Assert.assertTrue(fromByteArray.get(it.next().intValue()));
                }
                Assert.assertEquals(set.size(), fromByteArray.cardinality());
            }
        } else {
            Assert.assertFalse(parseFrom.hasEmptyPartitions());
            fromByteArray = new BitSet(i2);
        }
        if (fromByteArray.cardinality() != i2) {
            Assert.assertEquals("localhost", parseFrom.getHost());
            Assert.assertEquals(4000L, parseFrom.getPort());
            Assert.assertEquals(uuid, parseFrom.getPathComponent());
        } else {
            Assert.assertFalse(parseFrom.hasHost());
            Assert.assertFalse(parseFrom.hasPort());
            Assert.assertFalse(parseFrom.hasPathComponent());
        }
        new TezTaskOutputFiles(createConfiguration, uuid, 1);
        Path path = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).finalOutPath;
        Path path2 = ((UnorderedPartitionedKVWriter) unorderedPartitionedKVWriterForTest).finalIndexPath;
        if (i6 > 0) {
            Assert.assertTrue(localFs.exists(path));
            Assert.assertTrue(localFs.exists(path2));
            TezSpillRecord tezSpillRecord = new TezSpillRecord(path2, createConfiguration);
            DataInputBuffer dataInputBuffer = new DataInputBuffer();
            DataInputBuffer dataInputBuffer2 = new DataInputBuffer();
            IntWritable intWritable2 = new IntWritable();
            LongWritable longWritable2 = new LongWritable();
            for (int i13 = 0; i13 < i2; i13++) {
                TezIndexRecord index = tezSpillRecord.getIndex(i13);
                if (set == null || !set.contains(Integer.valueOf(i13))) {
                    FSDataInputStream open = FileSystem.getLocal(createConfiguration).open(path);
                    open.seek(index.getStartOffset());
                    IFile.Reader reader = new IFile.Reader(open, index.getPartLength(), configurable, (TezCounter) null, (TezCounter) null, false, 0, -1);
                    while (reader.nextRawKey(dataInputBuffer)) {
                        reader.nextRawValue(dataInputBuffer2);
                        intWritable2.readFields(dataInputBuffer);
                        longWritable2.readFields(dataInputBuffer2);
                        Assert.assertTrue(((Multimap) hashMap.get(Integer.valueOf(partitionerForTest.getPartition(intWritable2, longWritable2, i2)))).remove(Integer.valueOf(intWritable2.get()), Long.valueOf(longWritable2.get())));
                    }
                    open.close();
                } else {
                    Assert.assertFalse("The Index Record for partition " + i13 + " should not have any data", index.hasData());
                }
            }
            for (int i14 = 0; i14 < i2; i14++) {
                Assert.assertEquals(0L, ((Multimap) hashMap.get(Integer.valueOf(i14))).size());
                hashMap.remove(Integer.valueOf(i14));
            }
            Assert.assertEquals(0L, hashMap.size());
            ((OutputContext) Mockito.verify(createMockOutputContext, Mockito.atLeast(1))).notifyProgress();
        }
    }

    private static String createRandomString(int i) {
        StringBuilder sb = new StringBuilder(i);
        Random random = new Random();
        for (int i2 = 0; i2 < i; i2++) {
            sb.append((char) (65 + (Math.abs(random.nextInt()) % 26)));
        }
        return sb.toString();
    }

    private OutputContext createMockOutputContext(TezCounters tezCounters, ApplicationId applicationId, String str, String str2) {
        OutputContext outputContext = (OutputContext) Mockito.mock(OutputContext.class);
        ((OutputContext) Mockito.doReturn(tezCounters).when(outputContext)).getCounters();
        ((OutputContext) Mockito.doReturn(applicationId).when(outputContext)).getApplicationId();
        ((OutputContext) Mockito.doReturn(1).when(outputContext)).getDAGAttemptNumber();
        ((OutputContext) Mockito.doReturn("dagName").when(outputContext)).getDAGName();
        ((OutputContext) Mockito.doReturn("destinationVertexName").when(outputContext)).getDestinationVertexName();
        ((OutputContext) Mockito.doReturn(1).when(outputContext)).getOutputIndex();
        ((OutputContext) Mockito.doReturn(1).when(outputContext)).getTaskAttemptNumber();
        ((OutputContext) Mockito.doReturn(1).when(outputContext)).getTaskIndex();
        ((OutputContext) Mockito.doReturn(1).when(outputContext)).getTaskVertexIndex();
        ((OutputContext) Mockito.doReturn("vertexName").when(outputContext)).getTaskVertexName();
        ((OutputContext) Mockito.doReturn(str).when(outputContext)).getUniqueIdentifier();
        ((OutputContext) Mockito.doAnswer(new Answer<ByteBuffer>() { // from class: org.apache.tez.runtime.library.common.writers.TestUnorderedPartitionedKVWriter.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public ByteBuffer m40answer(InvocationOnMock invocationOnMock) throws Throwable {
                ByteBuffer allocate = ByteBuffer.allocate(4);
                allocate.mark();
                allocate.putInt(TestUnorderedPartitionedKVWriter.SHUFFLE_PORT);
                allocate.reset();
                return allocate;
            }
        }).when(outputContext)).getServiceProviderMetaData((String) Matchers.eq(str2));
        ((OutputContext) Mockito.doReturn(new String[]{new Path(TEST_ROOT_DIR, "outDir_" + str).toString()}).when(outputContext)).getWorkDirs();
        return outputContext;
    }

    private Configuration createConfiguration(OutputContext outputContext, Class<? extends Writable> cls, Class<? extends Writable> cls2, boolean z, int i) {
        return createConfiguration(outputContext, cls, cls2, z, i, PartitionerForTest.class);
    }

    private Configuration createConfiguration(OutputContext outputContext, Class<? extends Writable> cls, Class<? extends Writable> cls2, boolean z, int i, Class<? extends Partitioner> cls3) {
        Configuration configuration = new Configuration(false);
        configuration.setStrings("tez.runtime.framework.local.dirs", outputContext.getWorkDirs());
        configuration.set("tez.runtime.key.class", cls.getName());
        configuration.set("tez.runtime.value.class", cls2.getName());
        configuration.set("tez.runtime.partitioner.class", cls3.getName());
        if (i >= 0) {
            configuration.setInt("tez.runtime.unordered.output.max-per-buffer.size-bytes", i);
        }
        configuration.setBoolean("tez.runtime.compress", z);
        if (z) {
            configuration.set("tez.runtime.compress.codec", DefaultCodec.class.getName());
        }
        configuration.set("tez.runtime.report.partition.stats", this.reportPartitionStats.getType());
        return configuration;
    }
}
