package org.apache.tez.runtime.library.output;

import com.google.common.collect.Multimap;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.IOStatistics;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.apache.tez.runtime.common.resources.MemoryDistributor;
import org.apache.tez.runtime.library.api.KeyValuesWriter;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestFetcher;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.runtime.library.testutils.KVDataGen;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.class */
public class TestOnFileUnorderedKVOutput {
    private static final Logger LOG = LoggerFactory.getLogger(TestOnFileUnorderedKVOutput.class);
    private static Configuration defaultConf = new Configuration();
    private static FileSystem localFs;
    private static Path workDir;
    private static final int shufflePort = 2112;
    LogicalIOProcessorRuntimeTask task;

    @Before
    public void setup() throws Exception {
        localFs.mkdirs(workDir);
    }

    @After
    public void cleanup() throws Exception {
        localFs.delete(workDir, true);
    }

    @Test(timeout = 5000)
    public void testGeneratedDataMovementEvent() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("tez.runtime.key.class", Text.class.getName());
        configuration.set("tez.runtime.value.class", IntWritable.class.getName());
        OutputContext createOutputContext = createOutputContext(configuration);
        UnorderedKVOutput unorderedKVOutput = new UnorderedKVOutput(createOutputContext, 1);
        List initialize = unorderedKVOutput.initialize();
        unorderedKVOutput.start();
        Assert.assertTrue(initialize != null && initialize.size() == 0);
        KeyValuesWriter writer = unorderedKVOutput.getWriter();
        for (KVDataGen.KVPair kVPair : KVDataGen.generateTestData(true, 0)) {
            writer.write(kVPair.getKey(), kVPair.getvalue());
        }
        List close = unorderedKVOutput.close();
        Assert.assertEquals(45L, ((IOStatistics) this.task.getTaskStatistics().getIOStatistics().values().iterator().next()).getDataSize());
        Assert.assertEquals(5L, ((IOStatistics) this.task.getTaskStatistics().getIOStatistics().values().iterator().next()).getItemsProcessed());
        Assert.assertTrue(close != null && close.size() == 1);
        CompositeDataMovementEvent compositeDataMovementEvent = (CompositeDataMovementEvent) close.get(0);
        Assert.assertEquals("Invalid source index", 0L, compositeDataMovementEvent.getSourceIndexStart());
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent.getUserPayload()));
        Assert.assertFalse(parseFrom.hasEmptyPartitions());
        Assert.assertEquals(createOutputContext.getUniqueIdentifier(), parseFrom.getPathComponent());
        Assert.assertEquals(2112L, parseFrom.getPort());
        Assert.assertEquals(TestFetcher.HOST, parseFrom.getHost());
    }

    @Test(timeout = 30000)
    public void testWithPipelinedShuffle() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("tez.runtime.key.class", Text.class.getName());
        configuration.set("tez.runtime.value.class", IntWritable.class.getName());
        configuration.setBoolean("tez.runtime.pipelined-shuffle.enabled", true);
        configuration.setInt("tez.runtime.unordered.output.buffer.size-mb", 1);
        OutputContext createOutputContext = createOutputContext(configuration);
        UnorderedKVOutput unorderedKVOutput = new UnorderedKVOutput(createOutputContext, 1);
        List initialize = unorderedKVOutput.initialize();
        unorderedKVOutput.start();
        Assert.assertTrue(initialize != null && initialize.size() == 0);
        KeyValuesWriter writer = unorderedKVOutput.getWriter();
        for (int i = 0; i < 500; i++) {
            writer.write(new Text(RandomStringUtils.randomAscii(10000)), new IntWritable(i));
        }
        List close = unorderedKVOutput.close();
        Assert.assertTrue(close != null && close.size() == 0);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(List.class);
        ((OutputContext) Mockito.verify(createOutputContext, Mockito.atLeast(1))).sendEvents((List) forClass.capture());
        CompositeDataMovementEvent compositeDataMovementEvent = (CompositeDataMovementEvent) ((List) forClass.getValue()).get(0);
        Assert.assertEquals("Invalid source index", 0L, compositeDataMovementEvent.getSourceIndexStart());
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent.getUserPayload()));
        Assert.assertTrue(parseFrom.hasLastEvent());
        Assert.assertFalse(parseFrom.hasEmptyPartitions());
        Assert.assertEquals(2112L, parseFrom.getPort());
        Assert.assertEquals(TestFetcher.HOST, parseFrom.getHost());
    }

    private OutputContext createOutputContext(Configuration configuration) throws IOException {
        TezUmbilical tezUmbilical = (TezUmbilical) Mockito.mock(TezUmbilical.class);
        TezTaskAttemptID tezTaskAttemptID = TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(TezDAGID.getInstance("2000", 1, 1), 1), 1), 1);
        UserPayload createUserPayloadFromConf = TezUtils.createUserPayloadFromConf(configuration);
        TaskSpec taskSpec = (TaskSpec) Mockito.mock(TaskSpec.class);
        Mockito.when(taskSpec.getInputs()).thenReturn(Collections.singletonList(Mockito.mock(InputSpec.class)));
        Mockito.when(taskSpec.getOutputs()).thenReturn(Collections.singletonList(Mockito.mock(OutputSpec.class)));
        this.task = new LogicalIOProcessorRuntimeTask(taskSpec, 1, new Configuration(), new String[]{"/"}, tezUmbilical, (Map) null, (Map) null, (Multimap) null, (ObjectRegistry) null, "", (ExecutionContext) null, 1024L, false, new DefaultHadoopShim());
        LogicalIOProcessorRuntimeTask logicalIOProcessorRuntimeTask = (LogicalIOProcessorRuntimeTask) Mockito.spy(this.task);
        HashMap hashMap = new HashMap();
        ByteBuffer allocate = ByteBuffer.allocate(4);
        allocate.putInt(shufflePort);
        allocate.position(0);
        AuxiliaryServiceHelper.setServiceDataIntoEnv("mapreduce_shuffle", allocate, hashMap);
        OutputDescriptor outputDescriptor = (OutputDescriptor) Mockito.mock(OutputDescriptor.class);
        Mockito.when(outputDescriptor.getClassName()).thenReturn("OutputDescriptor");
        TezOutputContextImpl tezOutputContextImpl = new TezOutputContextImpl(configuration, new String[]{workDir.toString()}, 1, tezUmbilical, "currentDAG", "currentVertex", "destinationVertex", -1, tezTaskAttemptID, 0, createUserPayloadFromConf, logicalIOProcessorRuntimeTask, (Map) null, hashMap, new MemoryDistributor(1, 1, configuration), outputDescriptor, (ObjectRegistry) null, new ExecutionContextImpl(TestFetcher.HOST), 2048L);
        ((LogicalIOProcessorRuntimeTask) Mockito.verify(logicalIOProcessorRuntimeTask, Mockito.times(1))).addAndGetTezCounter("destinationVertex");
        ((LogicalIOProcessorRuntimeTask) Mockito.verify(logicalIOProcessorRuntimeTask, Mockito.times(1))).getTaskStatistics();
        Assert.assertTrue(this.task.getTaskStatistics().getIOStatistics().containsKey("destinationVertex"));
        OutputContext outputContext = (OutputContext) Mockito.spy(tezOutputContextImpl);
        ((OutputContext) Mockito.doAnswer(new Answer() { // from class: org.apache.tez.runtime.library.output.TestOnFileUnorderedKVOutput.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                ((MemoryUpdateCallbackHandler) invocationOnMock.getArguments()[1]).memoryAssigned(((Long) invocationOnMock.getArguments()[0]).longValue());
                return null;
            }
        }).when(outputContext)).requestInitialMemory(Matchers.anyLong(), (MemoryUpdateCallback) Matchers.any(MemoryUpdateCallback.class));
        return outputContext;
    }

    static {
        localFs = null;
        workDir = null;
        defaultConf.set("fs.defaultFS", "file:///");
        try {
            localFs = FileSystem.getLocal(defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), TestOnFileUnorderedKVOutput.class.getName()).makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
            LOG.info("Using workDir: " + workDir);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
