package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestShuffleInputEventHandlerOrderedGrouped.class */
public class TestShuffleInputEventHandlerOrderedGrouped {
    private static final String HOST = "localhost";
    private static final int PORT = 8080;
    private static final String PATH_COMPONENT = "attempt";
    private ShuffleInputEventHandlerOrderedGrouped handler;
    private ShuffleScheduler scheduler;
    private ShuffleScheduler realScheduler;
    private MergeManager mergeManager;
    private TezExecutors sharedExecutor;

    private InputContext createTezInputContext() throws IOException {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 1);
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ((InputContext) Mockito.doReturn(newInstance).when(inputContext)).getApplicationId();
        ((InputContext) Mockito.doReturn("sourceVertex").when(inputContext)).getSourceVertexName();
        Mockito.when(inputContext.getCounters()).thenReturn(new TezCounters());
        ((InputContext) Mockito.doReturn(new ExecutionContextImpl("localhost")).when(inputContext)).getExecutionContext();
        ((InputContext) Mockito.doReturn(ByteBuffer.allocate(4).putInt(0, 4)).when(inputContext)).getServiceProviderMetaData(Matchers.anyString());
        ((InputContext) Mockito.doReturn(TezCommonUtils.serializeServiceData(new Token(new JobTokenIdentifier(new Text("text")), new JobTokenSecretManager()))).when(inputContext)).getServiceConsumerMetaData(Matchers.anyString());
        Mockito.when(inputContext.createTezFrameworkExecutorService(Matchers.anyInt(), Matchers.anyString())).thenAnswer(new Answer<ExecutorService>() { // from class: org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestShuffleInputEventHandlerOrderedGrouped.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public ExecutorService m32answer(InvocationOnMock invocationOnMock) throws Throwable {
                return TestShuffleInputEventHandlerOrderedGrouped.this.sharedExecutor.createExecutorService(((Integer) invocationOnMock.getArgumentAt(0, Integer.class)).intValue(), (String) invocationOnMock.getArgumentAt(1, String.class));
            }
        });
        return inputContext;
    }

    private Event createDataMovementEvent(int i, int i2, ByteString byteString, boolean z) {
        return createDataMovementEvent(i, i2, byteString, z, false, false, 0);
    }

    private Event createDataMovementEvent(int i, int i2, ByteString byteString, boolean z, boolean z2, boolean z3, int i3) {
        return createDataMovementEvent(i, i2, byteString, z, z2, z3, i3, "localhost", PORT);
    }

    private Event createDataMovementEvent(int i, int i2, ByteString byteString, boolean z, boolean z2, boolean z3, int i3, int i4) {
        return createDataMovementEvent(i, i2, byteString, z, z2, z3, i3, "localhost", PORT, i4);
    }

    private Event createDataMovementEvent(int i, int i2, ByteString byteString, boolean z, boolean z2, boolean z3, int i3, String str, int i4) {
        return createDataMovementEvent(i, i2, byteString, z, z2, z3, i3, str, i4, 0);
    }

    private Event createDataMovementEvent(int i, int i2, ByteString byteString, boolean z, boolean z2, boolean z3, int i3, String str, int i4, int i5) {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder newBuilder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        if (!z) {
            newBuilder.setHost(str);
            newBuilder.setPort(i4);
            newBuilder.setPathComponent(PATH_COMPONENT);
        }
        if (z2) {
            newBuilder.setLastEvent(!z3);
            newBuilder.setSpillId(i3);
        }
        newBuilder.setRunDuration(10);
        if (byteString != null) {
            newBuilder.setEmptyPartitions(byteString);
        }
        return DataMovementEvent.create(i, i2, i5, newBuilder.build().toByteString().asReadOnlyByteBuffer());
    }

    @Before
    public void setup() throws Exception {
        this.sharedExecutor = new TezSharedExecutor(new Configuration());
        setupScheduler(2);
    }

    @After
    public void cleanup() {
        this.sharedExecutor.shutdownNow();
    }

    private void setupScheduler(int i) throws Exception {
        InputContext createTezInputContext = createTezInputContext();
        Configuration configuration = new Configuration();
        this.realScheduler = new ShuffleScheduler(createTezInputContext, configuration, i, (ExceptionReporter) Mockito.mock(Shuffle.class), (MergeManager) Mockito.mock(MergeManager.class), (FetchedInputAllocatorOrderedGrouped) Mockito.mock(MergeManager.class), System.currentTimeMillis(), (CompressionCodec) null, false, 0, "src vertex");
        this.scheduler = (ShuffleScheduler) Mockito.spy(this.realScheduler);
        this.handler = new ShuffleInputEventHandlerOrderedGrouped(createTezInputContext, this.scheduler, ShuffleUtils.isTezShuffleHandler(configuration));
        this.mergeManager = (MergeManager) Mockito.mock(MergeManager.class);
    }

    @Test(timeout = 10000)
    public void testPiplinedShuffleEvents() throws IOException, InterruptedException {
        Event createDataMovementEvent = createDataMovementEvent(0, 0, null, false, true, true, 0);
        CompositeInputAttemptIdentifier compositeInputAttemptIdentifier = new CompositeInputAttemptIdentifier(0, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1);
        this.handler.handleEvents(Collections.singletonList(createDataMovementEvent));
        ((ShuffleScheduler) Mockito.verify(this.scheduler)).addKnownMapOutput((String) Matchers.eq("localhost"), Matchers.eq(PORT), Matchers.eq(0), (CompositeInputAttemptIdentifier) Matchers.eq(compositeInputAttemptIdentifier));
        ((ShuffleScheduler) Mockito.verify(this.scheduler)).pipelinedShuffleInfoEventsMap.containsKey(Integer.valueOf(compositeInputAttemptIdentifier.getInputIdentifier()));
        Event createDataMovementEvent2 = createDataMovementEvent(0, 0, null, false, true, false, 1);
        CompositeInputAttemptIdentifier compositeInputAttemptIdentifier2 = new CompositeInputAttemptIdentifier(0, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1, 1);
        this.handler.handleEvents(Collections.singletonList(createDataMovementEvent2));
        Assert.assertTrue(this.scheduler.pipelinedShuffleInfoEventsMap.containsKey(Integer.valueOf(compositeInputAttemptIdentifier2.getInputIdentifier())));
        ((ShuffleScheduler) Mockito.verify(this.scheduler)).addKnownMapOutput((String) Matchers.eq("localhost"), Matchers.eq(PORT), Matchers.eq(0), (CompositeInputAttemptIdentifier) Matchers.eq(compositeInputAttemptIdentifier2));
        Assert.assertTrue(this.scheduler.pipelinedShuffleInfoEventsMap.containsKey(Integer.valueOf(compositeInputAttemptIdentifier2.getInputIdentifier())));
        MapHost host = this.scheduler.getHost();
        Assert.assertTrue(host != null);
        Assert.assertTrue(!this.scheduler.getMapsForHost(host).isEmpty());
        this.scheduler.copySucceeded(compositeInputAttemptIdentifier2, host, 1000L, 10000L, 10000L, MapOutput.createMemoryMapOutput(compositeInputAttemptIdentifier2, this.mergeManager, 1000, true), false);
        Assert.assertTrue(!this.scheduler.isDone());
        this.scheduler.copySucceeded(compositeInputAttemptIdentifier, host, 1000L, 10000L, 10000L, MapOutput.createMemoryMapOutput(compositeInputAttemptIdentifier, this.mergeManager, 1000, true), false);
        Assert.assertTrue(!this.scheduler.isDone());
        Event createDataMovementEvent3 = createDataMovementEvent(0, 1, null, false, true, true, 1);
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
        this.handler.handleEvents(Collections.singletonList(createDataMovementEvent3));
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
        Assert.assertTrue(!this.scheduler.isInputFinished(inputAttemptIdentifier2.getInputIdentifier()));
        this.scheduler.copySucceeded(inputAttemptIdentifier2, (MapHost) null, 0L, 0L, 0L, (MapOutput) null, false);
        Assert.assertTrue(!this.scheduler.isDone());
        this.scheduler.copySucceeded(inputAttemptIdentifier, host, 1000L, 10000L, 10000L, MapOutput.createMemoryMapOutput(inputAttemptIdentifier, this.mergeManager, 1000, true), false);
        Assert.assertTrue(this.scheduler.isDone());
    }

    @Test(timeout = 5000)
    public void testPiplinedShuffleEvents_WithOutofOrderAttempts() throws IOException, InterruptedException {
        this.handler.handleEvents(Collections.singletonList(createDataMovementEvent(1, 1, null, false, true, true, 0, 1)));
        ((ShuffleScheduler) Mockito.verify(this.scheduler, Mockito.times(1))).addKnownMapOutput((String) Matchers.eq("localhost"), Matchers.eq(PORT), Matchers.eq(1), (CompositeInputAttemptIdentifier) Matchers.eq(new CompositeInputAttemptIdentifier(1, 1, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1)));
        Assert.assertTrue("Shuffle info events should not be empty for pipelined shuffle", !this.scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
        int size = this.scheduler.mapLocations.values().size();
        Assert.assertTrue("Maplocations should have values. current size: " + size, size > 0);
        this.scheduler.getMapsForHost((MapHost) this.scheduler.mapLocations.values().iterator().next());
        this.handler.handleEvents(Collections.singletonList(createDataMovementEvent(0, 1, null, false, true, true, 0, 0)));
        ((ShuffleScheduler) Mockito.verify(this.scheduler, Mockito.times(1))).killSelf((Exception) Matchers.any(IOException.class), (String) Matchers.any(String.class));
    }

    @Test(timeout = 5000)
    public void testPipelinedShuffle_WithObsoleteEvents() throws IOException, InterruptedException {
        this.handler.handleEvents(Collections.singletonList(createDataMovementEvent(1, 1, null, false, true, true, 0, 1)));
        ((ShuffleScheduler) Mockito.verify(this.scheduler, Mockito.times(1))).addKnownMapOutput((String) Matchers.eq("localhost"), Matchers.eq(PORT), Matchers.eq(1), (CompositeInputAttemptIdentifier) Matchers.eq(new CompositeInputAttemptIdentifier(1, 1, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1)));
        Assert.assertTrue("Shuffle info events should not be empty for pipelined shuffle", !this.scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
        int size = this.scheduler.mapLocations.values().size();
        Assert.assertTrue("Maplocations should have values. current size: " + size, size > 0);
        this.scheduler.getMapsForHost((MapHost) this.scheduler.mapLocations.values().iterator().next());
        LinkedList linkedList = new LinkedList();
        linkedList.add(InputFailedEvent.create(1, 0));
        this.handler.handleEvents(linkedList);
        ((ShuffleScheduler) Mockito.verify(this.scheduler, Mockito.times(1))).killSelf((Exception) Matchers.any(IOException.class), (String) Matchers.any(String.class));
    }

    @Test(timeout = 5000)
    public void basicTest() throws IOException {
        LinkedList linkedList = new LinkedList();
        linkedList.add(createDataMovementEvent(0, 1, null, false));
        this.handler.handleEvents(linkedList);
        ((ShuffleScheduler) Mockito.verify(this.scheduler)).addKnownMapOutput((String) Matchers.eq("localhost"), Matchers.eq(PORT), Matchers.eq(0), (CompositeInputAttemptIdentifier) Matchers.eq(new CompositeInputAttemptIdentifier(1, 0, PATH_COMPONENT, 1)));
        Assert.assertTrue("Shuffle info events should be empty for regular shuffle codepath", this.scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
    }

    @Test(timeout = 5000)
    public void testFailedEvent() throws IOException {
        LinkedList linkedList = new LinkedList();
        linkedList.add(InputFailedEvent.create(1, 0));
        this.handler.handleEvents(linkedList);
        ((ShuffleScheduler) Mockito.verify(this.scheduler)).obsoleteInput((InputAttemptIdentifier) Matchers.eq(new InputAttemptIdentifier(1, 0)));
    }

    @Test(timeout = 5000)
    public void testAllPartitionsEmpty() throws IOException {
        LinkedList linkedList = new LinkedList();
        linkedList.add(createDataMovementEvent(0, 1, createEmptyPartitionByteString(0), true));
        this.handler.handleEvents(linkedList);
        ((ShuffleScheduler) Mockito.verify(this.scheduler)).copySucceeded((InputAttemptIdentifier) Matchers.eq(new InputAttemptIdentifier(1, 0)), (MapHost) Matchers.any(MapHost.class), Matchers.eq(0L), Matchers.eq(0L), Matchers.eq(0L), (MapOutput) Matchers.any(MapOutput.class), Matchers.eq(true));
    }

    @Test(timeout = 5000)
    public void testCurrentPartitionEmpty() throws IOException {
        LinkedList linkedList = new LinkedList();
        linkedList.add(createDataMovementEvent(0, 1, createEmptyPartitionByteString(0), false));
        this.handler.handleEvents(linkedList);
        ((ShuffleScheduler) Mockito.verify(this.scheduler)).copySucceeded((InputAttemptIdentifier) Matchers.eq(new InputAttemptIdentifier(1, 0)), (MapHost) Matchers.any(MapHost.class), Matchers.eq(0L), Matchers.eq(0L), Matchers.eq(0L), (MapOutput) Matchers.any(MapOutput.class), Matchers.eq(true));
    }

    @Test(timeout = 5000)
    public void testOtherPartitionEmpty() throws IOException {
        LinkedList linkedList = new LinkedList();
        linkedList.add(createDataMovementEvent(0, 1, createEmptyPartitionByteString(100), false));
        this.handler.handleEvents(linkedList);
        ((ShuffleScheduler) Mockito.verify(this.scheduler)).addKnownMapOutput((String) Matchers.eq("localhost"), Matchers.eq(PORT), Matchers.eq(0), (CompositeInputAttemptIdentifier) Matchers.eq(new CompositeInputAttemptIdentifier(1, 0, PATH_COMPONENT, 1)));
    }

    private ByteString createEmptyPartitionByteString(int... iArr) throws IOException {
        BitSet bitSet = new BitSet();
        for (int i : iArr) {
            bitSet.set(i);
        }
        return TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet));
    }
}
