/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.writers;

import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import java.io.DataInput;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutputFiles;
import org.apache.tez.runtime.library.common.writers.UnorderedPartitionedKVWriter;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

@RunWith(value=Parameterized.class)
public class TestUnorderedPartitionedKVWriter {
    private static final Log LOG = LogFactory.getLog(TestUnorderedPartitionedKVWriter.class);
    private static final String HOST_STRING = "localhost";
    private static final int SHUFFLE_PORT = 4000;
    private static String testTmpDir = System.getProperty("test.build.data", "/tmp");
    private static final Path TEST_ROOT_DIR = new Path(testTmpDir, TestUnorderedPartitionedKVWriter.class.getSimpleName());
    private static FileSystem localFs;
    private boolean shouldCompress;

    public TestUnorderedPartitionedKVWriter(boolean shouldCompress) {
        this.shouldCompress = shouldCompress;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        Object[][] data = new Object[][]{{false}, {true}};
        return Arrays.asList(data);
    }

    @Before
    public void setup() throws IOException {
        LOG.info((Object)("Setup. Using test dir: " + TEST_ROOT_DIR));
        localFs = FileSystem.getLocal((Configuration)new Configuration());
        localFs.delete(TEST_ROOT_DIR, true);
        localFs.mkdirs(TEST_ROOT_DIR);
    }

    @After
    public void cleanup() throws IOException {
        LOG.info((Object)"CleanUp");
        localFs.delete(TEST_ROOT_DIR, true);
    }

    @Test(timeout=10000L)
    public void testBufferSizing() throws IOException {
        ApplicationId appId = ApplicationId.newInstance((long)10000L, (int)1);
        TezCounters counters = new TezCounters();
        String uniqueId = UUID.randomUUID().toString();
        OutputContext outputContext = this.createMockOutputContext(counters, appId, uniqueId);
        int maxSingleBufferSizeBytes = 2047;
        Configuration conf = this.createConfiguration(outputContext, IntWritable.class, LongWritable.class, false, maxSingleBufferSizeBytes);
        int numOutputs = 10;
        UnorderedPartitionedKVWriterForTest kvWriter = null;
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 2048L);
        Assert.assertEquals((long)2L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1024L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, maxSingleBufferSizeBytes * 3);
        Assert.assertEquals((long)3L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)(maxSingleBufferSizeBytes - maxSingleBufferSizeBytes % 4), (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, maxSingleBufferSizeBytes * 2 + 1);
        Assert.assertEquals((long)3L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1364L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, 10240L);
        Assert.assertEquals((long)6L, (long)kvWriter.numBuffers);
        Assert.assertEquals((long)1704L, (long)kvWriter.sizePerBuffer);
        Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
    }

    @Test(timeout=10000L)
    public void testNoSpill() throws IOException, InterruptedException {
        this.baseTest(10, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testSingleSpill() throws IOException, InterruptedException {
        this.baseTest(50, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testMultipleSpills() throws IOException, InterruptedException {
        this.baseTest(200, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testNoRecords() throws IOException, InterruptedException {
        this.baseTest(0, 10, null, this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testSkippedPartitions() throws IOException, InterruptedException {
        this.baseTest(200, 10, Sets.newHashSet((Object[])new Integer[]{2, 5}), this.shouldCompress);
    }

    @Test(timeout=10000L)
    public void testRandomText() throws IOException, InterruptedException {
        this.textTest(100, 10, 2048L, 0, 0, 0);
    }

    @Test(timeout=10000L)
    public void testLargeKeys() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 10, 0, 0);
    }

    @Test(timeout=10000L)
    public void testLargevalues() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 0, 10, 0);
    }

    @Test(timeout=10000L)
    public void testLargeKvPairs() throws IOException, InterruptedException {
        this.textTest(0, 10, 2048L, 0, 0, 10);
    }

    @Test(timeout=10000L)
    public void testTextMixedRecords() throws IOException, InterruptedException {
        this.textTest(100, 10, 2048L, 10, 10, 10);
    }

    public void textTest(int numRegularRecords, int numPartitions, long availableMemory, int numLargeKeys, int numLargevalues, int numLargeKvPairs) throws IOException, InterruptedException {
        int i;
        int partition;
        String val;
        String key;
        int i2;
        HashPartitioner partitioner = new HashPartitioner();
        ApplicationId appId = ApplicationId.newInstance((long)10000L, (int)1);
        TezCounters counters = new TezCounters();
        String uniqueId = UUID.randomUUID().toString();
        OutputContext outputContext = this.createMockOutputContext(counters, appId, uniqueId);
        Random random = new Random();
        Configuration conf = this.createConfiguration(outputContext, Text.class, Text.class, this.shouldCompress, -1, HashPartitioner.class);
        DefaultCodec codec = null;
        if (this.shouldCompress) {
            codec = new DefaultCodec();
            ((Configurable)codec).setConf(conf);
        }
        int numRecordsWritten = 0;
        HashMap<Integer, LinkedListMultimap> expectedValues = new HashMap<Integer, LinkedListMultimap>();
        for (int i3 = 0; i3 < numPartitions; ++i3) {
            expectedValues.put(i3, LinkedListMultimap.create());
        }
        UnorderedPartitionedKVWriterForTest kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numPartitions, availableMemory);
        int sizePerBuffer = kvWriter.sizePerBuffer;
        BitSet partitionsWithData = new BitSet(numPartitions);
        Text keyText = new Text();
        Text valText = new Text();
        for (i2 = 0; i2 < numRegularRecords; ++i2) {
            key = TestUnorderedPartitionedKVWriter.createRandomString(Math.abs(random.nextInt(10)));
            val = TestUnorderedPartitionedKVWriter.createRandomString(Math.abs(random.nextInt(20)));
            keyText.set(key);
            valText.set(val);
            partition = partitioner.getPartition((Object)keyText, (Object)valText, numPartitions);
            partitionsWithData.set(partition);
            ((Multimap)expectedValues.get(partition)).put((Object)key, (Object)val);
            kvWriter.write(keyText, valText);
            ++numRecordsWritten;
        }
        for (i2 = 0; i2 < numLargeKeys; ++i2) {
            key = TestUnorderedPartitionedKVWriter.createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
            val = TestUnorderedPartitionedKVWriter.createRandomString(Math.abs(random.nextInt(20)));
            keyText.set(key);
            valText.set(val);
            partition = partitioner.getPartition((Object)keyText, (Object)valText, numPartitions);
            partitionsWithData.set(partition);
            ((Multimap)expectedValues.get(partition)).put((Object)key, (Object)val);
            kvWriter.write(keyText, valText);
            ++numRecordsWritten;
        }
        for (i2 = 0; i2 < numLargevalues; ++i2) {
            key = TestUnorderedPartitionedKVWriter.createRandomString(Math.abs(random.nextInt(10)));
            val = TestUnorderedPartitionedKVWriter.createRandomString(sizePerBuffer + Math.abs(random.nextInt(100)));
            keyText.set(key);
            valText.set(val);
            partition = partitioner.getPartition((Object)keyText, (Object)valText, numPartitions);
            partitionsWithData.set(partition);
            ((Multimap)expectedValues.get(partition)).put((Object)key, (Object)val);
            kvWriter.write(keyText, valText);
            ++numRecordsWritten;
        }
        for (i2 = 0; i2 < numLargeKvPairs; ++i2) {
            key = TestUnorderedPartitionedKVWriter.createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
            val = TestUnorderedPartitionedKVWriter.createRandomString(sizePerBuffer / 2 + Math.abs(random.nextInt(100)));
            keyText.set(key);
            valText.set(val);
            partition = partitioner.getPartition((Object)keyText, (Object)valText, numPartitions);
            partitionsWithData.set(partition);
            ((Multimap)expectedValues.get(partition)).put((Object)key, (Object)val);
            kvWriter.write(keyText, valText);
            ++numRecordsWritten;
        }
        List events = kvWriter.close();
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.never())).fatalError((Throwable)Matchers.any(Throwable.class), (String)Matchers.any(String.class));
        TezCounter outputLargeRecordsCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_LARGE_RECORDS);
        Assert.assertEquals((long)(numLargeKeys + numLargevalues + numLargeKvPairs), (long)outputLargeRecordsCounter.getValue());
        Assert.assertEquals((long)1L, (long)events.size());
        Assert.assertTrue((boolean)(events.get(0) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(0);
        Assert.assertEquals((long)0L, (long)cdme.getSourceIndexStart());
        Assert.assertEquals((long)numPartitions, (long)cdme.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto eventProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        Assert.assertFalse((boolean)eventProto.hasData());
        BitSet emptyPartitionBits = null;
        if (partitionsWithData.cardinality() != numPartitions) {
            Assert.assertTrue((boolean)eventProto.hasEmptyPartitions());
            byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)eventProto.getEmptyPartitions());
            emptyPartitionBits = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
            Assert.assertEquals((long)(numPartitions - partitionsWithData.cardinality()), (long)emptyPartitionBits.cardinality());
        } else {
            Assert.assertFalse((boolean)eventProto.hasEmptyPartitions());
            emptyPartitionBits = new BitSet(numPartitions);
        }
        Assert.assertEquals((Object)HOST_STRING, (Object)eventProto.getHost());
        Assert.assertEquals((long)4000L, (long)eventProto.getPort());
        Assert.assertEquals((Object)uniqueId, (Object)eventProto.getPathComponent());
        TezTaskOutputFiles taskOutput = new TezTaskOutputFiles(conf, uniqueId);
        Path outputFilePath = null;
        Path spillFilePath = null;
        try {
            outputFilePath = taskOutput.getOutputFile();
        }
        catch (DiskChecker.DiskErrorException e) {
            if (numRecordsWritten > 0) {
                Assert.fail();
            }
            return;
        }
        try {
            spillFilePath = taskOutput.getOutputIndexFile();
        }
        catch (DiskChecker.DiskErrorException e) {
            if (numRecordsWritten > 0) {
                Assert.fail();
            }
            return;
        }
        TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
        DataInputBuffer keyBuffer = new DataInputBuffer();
        DataInputBuffer valBuffer = new DataInputBuffer();
        Text keyDeser = new Text();
        Text valDeser = new Text();
        for (i = 0; i < numPartitions; ++i) {
            if (emptyPartitionBits.get(i)) continue;
            TezIndexRecord indexRecord = spillRecord.getIndex(i);
            FSDataInputStream inStream = FileSystem.getLocal((Configuration)conf).open(outputFilePath);
            inStream.seek(indexRecord.getStartOffset());
            IFile.Reader reader = new IFile.Reader((InputStream)inStream, indexRecord.getPartLength(), (CompressionCodec)codec, null, null, false, 0, -1);
            while (reader.nextRawKey(keyBuffer)) {
                reader.nextRawValue(valBuffer);
                keyDeser.readFields((DataInput)keyBuffer);
                valDeser.readFields((DataInput)valBuffer);
                int partition2 = partitioner.getPartition((Object)keyDeser, (Object)valDeser, numPartitions);
                Assert.assertTrue((boolean)((Multimap)expectedValues.get(partition2)).remove((Object)keyDeser.toString(), (Object)valDeser.toString()));
            }
            inStream.close();
        }
        for (i = 0; i < numPartitions; ++i) {
            Assert.assertEquals((long)0L, (long)((Multimap)expectedValues.get(i)).size());
            expectedValues.remove(i);
        }
        Assert.assertEquals((long)0L, (long)expectedValues.size());
    }

    private void baseTest(int numRecords, int numPartitions, Set<Integer> skippedPartitions, boolean shouldCompress) throws IOException, InterruptedException {
        int i;
        PartitionerForTest partitioner = new PartitionerForTest();
        ApplicationId appId = ApplicationId.newInstance((long)10000L, (int)1);
        TezCounters counters = new TezCounters();
        String uniqueId = UUID.randomUUID().toString();
        OutputContext outputContext = this.createMockOutputContext(counters, appId, uniqueId);
        Configuration conf = this.createConfiguration(outputContext, IntWritable.class, LongWritable.class, shouldCompress, -1);
        DefaultCodec codec = null;
        if (shouldCompress) {
            codec = new DefaultCodec();
            ((Configurable)codec).setConf(conf);
        }
        int numOutputs = numPartitions;
        long availableMemory = 2048L;
        int numRecordsWritten = 0;
        HashMap<Integer, LinkedListMultimap> expectedValues = new HashMap<Integer, LinkedListMultimap>();
        for (int i2 = 0; i2 < numOutputs; ++i2) {
            expectedValues.put(i2, LinkedListMultimap.create());
        }
        UnorderedPartitionedKVWriterForTest kvWriter = new UnorderedPartitionedKVWriterForTest(outputContext, conf, numOutputs, availableMemory);
        int sizePerBuffer = kvWriter.sizePerBuffer;
        int sizePerRecord = 12;
        int sizePerRecordWithOverhead = sizePerRecord + 12;
        IntWritable intWritable = new IntWritable();
        LongWritable longWritable = new LongWritable();
        for (int i3 = 0; i3 < numRecords; ++i3) {
            intWritable.set(i3);
            longWritable.set((long)i3);
            int partition = partitioner.getPartition(intWritable, longWritable, numOutputs);
            if (skippedPartitions != null && skippedPartitions.contains(partition)) continue;
            ((Multimap)expectedValues.get(partition)).put((Object)intWritable.get(), (Object)longWritable.get());
            kvWriter.write(intWritable, longWritable);
            ++numRecordsWritten;
        }
        List events = kvWriter.close();
        int recordsPerBuffer = sizePerBuffer / sizePerRecordWithOverhead;
        int numExpectedSpills = numRecordsWritten / recordsPerBuffer;
        ((OutputContext)Mockito.verify((Object)outputContext, (VerificationMode)Mockito.never())).fatalError((Throwable)Matchers.any(Throwable.class), (String)Matchers.any(String.class));
        if (numExpectedSpills == 0) {
            Assert.assertEquals((long)1L, (long)kvWriter.numInitializedBuffers);
        } else {
            Assert.assertTrue((kvWriter.numInitializedBuffers > 1 ? 1 : 0) != 0);
        }
        Assert.assertNull((Object)kvWriter.currentBuffer);
        Assert.assertEquals((long)0L, (long)kvWriter.availableBuffers.size());
        TezCounter outputRecordBytesCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES);
        TezCounter outputRecordsCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_RECORDS);
        TezCounter outputBytesWithOverheadCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES_WITH_OVERHEAD);
        TezCounter fileOutputBytesCounter = counters.findCounter((Enum)TaskCounter.OUTPUT_BYTES_PHYSICAL);
        TezCounter spilledRecordsCounter = counters.findCounter((Enum)TaskCounter.SPILLED_RECORDS);
        TezCounter additionalSpillBytesWritternCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
        TezCounter additionalSpillBytesReadCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
        TezCounter numAdditionalSpillsCounter = counters.findCounter((Enum)TaskCounter.ADDITIONAL_SPILL_COUNT);
        Assert.assertEquals((long)(numRecordsWritten * sizePerRecord), (long)outputRecordBytesCounter.getValue());
        Assert.assertEquals((long)numRecordsWritten, (long)outputRecordsCounter.getValue());
        Assert.assertEquals((long)(numRecordsWritten * sizePerRecordWithOverhead), (long)outputBytesWithOverheadCounter.getValue());
        long fileOutputBytes = fileOutputBytesCounter.getValue();
        if (numRecordsWritten > 0) {
            Assert.assertTrue((fileOutputBytes > 0L ? 1 : 0) != 0);
            if (!shouldCompress) {
                Assert.assertTrue((fileOutputBytes > outputRecordBytesCounter.getValue() ? 1 : 0) != 0);
            }
        } else {
            Assert.assertEquals((long)0L, (long)fileOutputBytes);
        }
        Assert.assertEquals((long)(recordsPerBuffer * numExpectedSpills), (long)spilledRecordsCounter.getValue());
        long additionalSpillBytesWritten = additionalSpillBytesWritternCounter.getValue();
        long additionalSpillBytesRead = additionalSpillBytesReadCounter.getValue();
        if (numExpectedSpills == 0) {
            Assert.assertEquals((long)0L, (long)additionalSpillBytesWritten);
            Assert.assertEquals((long)0L, (long)additionalSpillBytesRead);
        } else {
            Assert.assertTrue((additionalSpillBytesWritten > 0L ? 1 : 0) != 0);
            Assert.assertTrue((additionalSpillBytesRead > 0L ? 1 : 0) != 0);
            if (!shouldCompress) {
                Assert.assertTrue((additionalSpillBytesWritten > (long)(recordsPerBuffer * numExpectedSpills * sizePerRecord) ? 1 : 0) != 0);
                Assert.assertTrue((additionalSpillBytesRead > (long)(recordsPerBuffer * numExpectedSpills * sizePerRecord) ? 1 : 0) != 0);
            }
        }
        Assert.assertTrue((additionalSpillBytesWritten == additionalSpillBytesRead ? 1 : 0) != 0);
        Assert.assertEquals((long)numExpectedSpills, (long)numAdditionalSpillsCounter.getValue());
        BitSet emptyPartitionBits = null;
        Assert.assertEquals((long)1L, (long)events.size());
        Assert.assertTrue((boolean)(events.get(0) instanceof CompositeDataMovementEvent));
        CompositeDataMovementEvent cdme = (CompositeDataMovementEvent)events.get(0);
        Assert.assertEquals((long)0L, (long)cdme.getSourceIndexStart());
        Assert.assertEquals((long)numOutputs, (long)cdme.getCount());
        ShuffleUserPayloads.DataMovementEventPayloadProto eventProto = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)cdme.getUserPayload()));
        Assert.assertFalse((boolean)eventProto.hasData());
        if (skippedPartitions == null && numRecordsWritten > 0) {
            Assert.assertFalse((boolean)eventProto.hasEmptyPartitions());
            emptyPartitionBits = new BitSet(numPartitions);
        } else {
            Assert.assertTrue((boolean)eventProto.hasEmptyPartitions());
            byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)eventProto.getEmptyPartitions());
            emptyPartitionBits = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
            if (numRecordsWritten == 0) {
                Assert.assertEquals((long)numPartitions, (long)emptyPartitionBits.cardinality());
            } else {
                for (Integer e : skippedPartitions) {
                    Assert.assertTrue((boolean)emptyPartitionBits.get(e));
                }
                Assert.assertEquals((long)skippedPartitions.size(), (long)emptyPartitionBits.cardinality());
            }
        }
        if (emptyPartitionBits.cardinality() != numPartitions) {
            Assert.assertEquals((Object)HOST_STRING, (Object)eventProto.getHost());
            Assert.assertEquals((long)4000L, (long)eventProto.getPort());
            Assert.assertEquals((Object)uniqueId, (Object)eventProto.getPathComponent());
        } else {
            Assert.assertFalse((boolean)eventProto.hasHost());
            Assert.assertFalse((boolean)eventProto.hasPort());
            Assert.assertFalse((boolean)eventProto.hasPathComponent());
        }
        TezTaskOutputFiles taskOutput = new TezTaskOutputFiles(conf, uniqueId);
        Path outputFilePath = null;
        Path spillFilePath = null;
        try {
            outputFilePath = taskOutput.getOutputFile();
        }
        catch (DiskChecker.DiskErrorException e) {
            if (numRecordsWritten > 0) {
                Assert.fail();
            }
            return;
        }
        try {
            spillFilePath = taskOutput.getOutputIndexFile();
        }
        catch (DiskChecker.DiskErrorException e) {
            if (numRecordsWritten > 0) {
                Assert.fail();
            }
            return;
        }
        TezSpillRecord spillRecord = new TezSpillRecord(spillFilePath, conf);
        DataInputBuffer keyBuffer = new DataInputBuffer();
        DataInputBuffer valBuffer = new DataInputBuffer();
        IntWritable keyDeser = new IntWritable();
        LongWritable valDeser = new LongWritable();
        for (i = 0; i < numOutputs; ++i) {
            if (skippedPartitions != null && skippedPartitions.contains(i)) continue;
            TezIndexRecord indexRecord = spillRecord.getIndex(i);
            FSDataInputStream inStream = FileSystem.getLocal((Configuration)conf).open(outputFilePath);
            inStream.seek(indexRecord.getStartOffset());
            IFile.Reader reader = new IFile.Reader((InputStream)inStream, indexRecord.getPartLength(), (CompressionCodec)codec, null, null, false, 0, -1);
            while (reader.nextRawKey(keyBuffer)) {
                reader.nextRawValue(valBuffer);
                keyDeser.readFields((DataInput)keyBuffer);
                valDeser.readFields((DataInput)valBuffer);
                int partition = partitioner.getPartition(keyDeser, valDeser, numOutputs);
                Assert.assertTrue((boolean)((Multimap)expectedValues.get(partition)).remove((Object)keyDeser.get(), (Object)valDeser.get()));
            }
            inStream.close();
        }
        for (i = 0; i < numOutputs; ++i) {
            Assert.assertEquals((long)0L, (long)((Multimap)expectedValues.get(i)).size());
            expectedValues.remove(i);
        }
        Assert.assertEquals((long)0L, (long)expectedValues.size());
    }

    private static String createRandomString(int size) {
        StringBuilder sb = new StringBuilder();
        Random random = new Random();
        for (int i = 0; i < size; ++i) {
            int r = Math.abs(random.nextInt()) % 26;
            sb.append((char)(65 + r));
        }
        return sb.toString();
    }

    private OutputContext createMockOutputContext(TezCounters counters, ApplicationId appId, String uniqueId) {
        OutputContext outputContext = (OutputContext)Mockito.mock(OutputContext.class);
        ((OutputContext)Mockito.doReturn((Object)counters).when((Object)outputContext)).getCounters();
        ((OutputContext)Mockito.doReturn((Object)appId).when((Object)outputContext)).getApplicationId();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getDAGAttemptNumber();
        ((OutputContext)Mockito.doReturn((Object)"dagName").when((Object)outputContext)).getDAGName();
        ((OutputContext)Mockito.doReturn((Object)"destinationVertexName").when((Object)outputContext)).getDestinationVertexName();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getOutputIndex();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getTaskAttemptNumber();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getTaskIndex();
        ((OutputContext)Mockito.doReturn((Object)1).when((Object)outputContext)).getTaskVertexIndex();
        ((OutputContext)Mockito.doReturn((Object)"vertexName").when((Object)outputContext)).getTaskVertexName();
        ((OutputContext)Mockito.doReturn((Object)uniqueId).when((Object)outputContext)).getUniqueIdentifier();
        ByteBuffer portBuffer = ByteBuffer.allocate(4);
        portBuffer.mark();
        portBuffer.putInt(4000);
        portBuffer.reset();
        ((OutputContext)Mockito.doReturn((Object)portBuffer).when((Object)outputContext)).getServiceProviderMetaData((String)Matchers.eq((Object)ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID));
        Path outDirBase = new Path(TEST_ROOT_DIR, "outDir_" + uniqueId);
        String[] outDirs = new String[]{outDirBase.toString()};
        ((OutputContext)Mockito.doReturn((Object)outDirs).when((Object)outputContext)).getWorkDirs();
        return outputContext;
    }

    private Configuration createConfiguration(OutputContext outputContext, Class<? extends Writable> keyClass, Class<? extends Writable> valClass, boolean shouldCompress, int maxSingleBufferSizeBytes) {
        return this.createConfiguration(outputContext, keyClass, valClass, shouldCompress, maxSingleBufferSizeBytes, PartitionerForTest.class);
    }

    private Configuration createConfiguration(OutputContext outputContext, Class<? extends Writable> keyClass, Class<? extends Writable> valClass, boolean shouldCompress, int maxSingleBufferSizeBytes, Class<? extends Partitioner> partitionerClass) {
        Configuration conf = new Configuration(false);
        conf.setStrings("tez.runtime.framework.local.dirs", outputContext.getWorkDirs());
        conf.set("tez.runtime.key.class", keyClass.getName());
        conf.set("tez.runtime.value.class", valClass.getName());
        conf.set("tez.runtime.partitioner.class", partitionerClass.getName());
        if (maxSingleBufferSizeBytes >= 0) {
            conf.setInt("tez.runtime.unordered.output.max-per-buffer.size-bytes", maxSingleBufferSizeBytes);
        }
        conf.setBoolean("tez.runtime.compress", shouldCompress);
        if (shouldCompress) {
            conf.set("tez.runtime.compress.codec", DefaultCodec.class.getName());
        }
        return conf;
    }

    private static class UnorderedPartitionedKVWriterForTest
    extends UnorderedPartitionedKVWriter {
        public UnorderedPartitionedKVWriterForTest(OutputContext outputContext, Configuration conf, int numOutputs, long availableMemoryBytes) throws IOException {
            super(outputContext, conf, numOutputs, availableMemoryBytes);
        }

        String getHost() {
            return TestUnorderedPartitionedKVWriter.HOST_STRING;
        }
    }

    public static class PartitionerForTest
    implements Partitioner {
        public int getPartition(Object key, Object value, int numPartitions) {
            if (key instanceof IntWritable) {
                return ((IntWritable)key).get() % numPartitions;
            }
            throw new UnsupportedOperationException("Test partitioner expected to be called with IntWritable only");
        }
    }
}

