package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/impl/TestShuffleInputEventHandlerImpl.class */
public class TestShuffleInputEventHandlerImpl {
    private static final String HOST = "localhost";
    private static final int PORT = 8080;
    private static final String PATH_COMPONENT = "attempttmp";
    private final Configuration conf = new Configuration();
    private TezExecutors sharedExecutor;

    @Before
    public void setup() {
        this.sharedExecutor = new TezSharedExecutor(this.conf);
    }

    @After
    public void cleanup() {
        this.sharedExecutor.shutdownNow();
    }

    @Test(timeout = 5000)
    public void testSimple() throws IOException {
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ShuffleManager shuffleManager = (ShuffleManager) Mockito.mock(ShuffleManager.class);
        ShuffleInputEventHandlerImpl shuffleInputEventHandlerImpl = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class), (CompressionCodec) null, false, 0, false);
        Event createDataMovementEvent = createDataMovementEvent(0, 1, null);
        LinkedList linkedList = new LinkedList();
        linkedList.add(createDataMovementEvent);
        shuffleInputEventHandlerImpl.handleEvents(linkedList);
        ((ShuffleManager) Mockito.verify(shuffleManager)).addKnownInput((String) Matchers.eq("localhost"), Matchers.eq(PORT), (CompositeInputAttemptIdentifier) Matchers.eq(new CompositeInputAttemptIdentifier(1, 0, PATH_COMPONENT, 1)), Matchers.eq(0));
    }

    @Test(timeout = 5000)
    public void testCurrentPartitionEmpty() throws IOException {
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ShuffleManager shuffleManager = (ShuffleManager) Mockito.mock(ShuffleManager.class);
        ShuffleInputEventHandlerImpl shuffleInputEventHandlerImpl = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class), (CompressionCodec) null, false, 0, false);
        Event createDataMovementEvent = createDataMovementEvent(0, 1, createEmptyPartitionByteString(0));
        LinkedList linkedList = new LinkedList();
        linkedList.add(createDataMovementEvent);
        shuffleInputEventHandlerImpl.handleEvents(linkedList);
        ((ShuffleManager) Mockito.verify(shuffleManager)).addCompletedInputWithNoData((InputAttemptIdentifier) Matchers.eq(new InputAttemptIdentifier(1, 0)));
    }

    @Test(timeout = 5000)
    public void testOtherPartitionEmpty() throws IOException {
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ShuffleManager shuffleManager = (ShuffleManager) Mockito.mock(ShuffleManager.class);
        ShuffleInputEventHandlerImpl shuffleInputEventHandlerImpl = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class), (CompressionCodec) null, false, 0, false);
        Event createDataMovementEvent = createDataMovementEvent(0, 1, createEmptyPartitionByteString(1));
        LinkedList linkedList = new LinkedList();
        linkedList.add(createDataMovementEvent);
        shuffleInputEventHandlerImpl.handleEvents(linkedList);
        ((ShuffleManager) Mockito.verify(shuffleManager)).addKnownInput((String) Matchers.eq("localhost"), Matchers.eq(PORT), (CompositeInputAttemptIdentifier) Matchers.eq(new CompositeInputAttemptIdentifier(1, 0, PATH_COMPONENT, 1)), Matchers.eq(0));
    }

    @Test(timeout = 5000)
    public void testMultipleEvents1() throws IOException {
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ShuffleManager shuffleManager = (ShuffleManager) Mockito.mock(ShuffleManager.class);
        ShuffleInputEventHandlerImpl shuffleInputEventHandlerImpl = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class), (CompressionCodec) null, false, 0, false);
        Event createDataMovementEvent = createDataMovementEvent(0, 1, createEmptyPartitionByteString(0));
        Event createDataMovementEvent2 = createDataMovementEvent(0, 2, null);
        LinkedList linkedList = new LinkedList();
        linkedList.add(createDataMovementEvent);
        linkedList.add(createDataMovementEvent2);
        shuffleInputEventHandlerImpl.handleEvents(linkedList);
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(1, 0);
        CompositeInputAttemptIdentifier compositeInputAttemptIdentifier = new CompositeInputAttemptIdentifier(2, 0, PATH_COMPONENT, 1);
        ((ShuffleManager) Mockito.verify(shuffleManager)).addCompletedInputWithNoData((InputAttemptIdentifier) Matchers.eq(inputAttemptIdentifier));
        ((ShuffleManager) Mockito.verify(shuffleManager)).addKnownInput((String) Matchers.eq("localhost"), Matchers.eq(PORT), (CompositeInputAttemptIdentifier) Matchers.eq(compositeInputAttemptIdentifier), Matchers.eq(0));
    }

    private InputContext createInputContext() throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        dataOutputBuffer.writeInt(PORT);
        ByteBuffer wrap = ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
        dataOutputBuffer.close();
        ExecutionContext executionContext = (ExecutionContext) Mockito.mock(ExecutionContext.class);
        ((ExecutionContext) Mockito.doReturn("localhost").when(executionContext)).getHostName();
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ((InputContext) Mockito.doReturn(new TezCounters()).when(inputContext)).getCounters();
        ((InputContext) Mockito.doReturn("sourceVertex").when(inputContext)).getSourceVertexName();
        ((InputContext) Mockito.doReturn(wrap).when(inputContext)).getServiceProviderMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        ((InputContext) Mockito.doReturn(executionContext).when(inputContext)).getExecutionContext();
        Mockito.when(inputContext.createTezFrameworkExecutorService(Matchers.anyInt(), Matchers.anyString())).thenAnswer(new Answer<ExecutorService>() { // from class: org.apache.tez.runtime.library.common.shuffle.impl.TestShuffleInputEventHandlerImpl.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public ExecutorService m14answer(InvocationOnMock invocationOnMock) throws Throwable {
                return TestShuffleInputEventHandlerImpl.this.sharedExecutor.createExecutorService(((Integer) invocationOnMock.getArgumentAt(0, Integer.class)).intValue(), (String) invocationOnMock.getArgumentAt(1, String.class));
            }
        });
        return inputContext;
    }

    private ShuffleManager createShuffleManager(InputContext inputContext) throws IOException {
        ((InputContext) Mockito.doReturn(new String[]{new Path(".", "outDir").toString()}).when(inputContext)).getWorkDirs();
        this.conf.setStrings("tez.runtime.framework.local.dirs", inputContext.getWorkDirs());
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        new Token(new JobTokenIdentifier(), new JobTokenSecretManager((SecretKey) null)).write(dataOutputBuffer);
        ((InputContext) Mockito.doReturn(ByteBuffer.wrap(dataOutputBuffer.getData())).when(inputContext)).getServiceConsumerMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        return (ShuffleManager) Mockito.spy(new ShuffleManager(inputContext, this.conf, 2, 1024, false, -1, (CompressionCodec) null, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class)));
    }

    @Test(timeout = 5000)
    public void testPipelinedShuffleEvents() throws IOException {
        InputContext createInputContext = createInputContext();
        ShuffleManager createShuffleManager = createShuffleManager(createInputContext);
        ShuffleInputEventHandlerImpl shuffleInputEventHandlerImpl = new ShuffleInputEventHandlerImpl(createInputContext, createShuffleManager, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class), (CompressionCodec) null, false, 0, false);
        shuffleInputEventHandlerImpl.handleEvents(Collections.singletonList(createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0)));
        ((ShuffleManager) Mockito.verify(createShuffleManager, Mockito.times(1))).addKnownInput((String) Matchers.eq("localhost"), Matchers.eq(PORT), (CompositeInputAttemptIdentifier) Matchers.eq(new CompositeInputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1)), Matchers.eq(0));
        shuffleInputEventHandlerImpl.handleEvents(Collections.singletonList(createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0)));
        CompositeInputAttemptIdentifier compositeInputAttemptIdentifier = new CompositeInputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1);
        ((ShuffleManager) Mockito.verify(createShuffleManager, Mockito.times(2))).addKnownInput((String) Matchers.eq("localhost"), Matchers.eq(PORT), (CompositeInputAttemptIdentifier) Matchers.eq(compositeInputAttemptIdentifier), Matchers.eq(0));
        ((ShuffleManager.ShuffleEventInfo) createShuffleManager.shuffleInfoEventsMap.get(Integer.valueOf(compositeInputAttemptIdentifier.getInputIdentifier()))).scheduledForDownload = true;
        shuffleInputEventHandlerImpl.handleEvents(Collections.singletonList(createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1)));
        ((InputContext) Mockito.verify(createInputContext)).killSelf((Throwable) Matchers.any(Throwable.class), Matchers.anyString());
    }

    @Test(timeout = 5000)
    public void testPipelinedShuffleEvents_WithOutOfOrderAttempts() throws IOException {
        InputContext createInputContext = createInputContext();
        ShuffleManager createShuffleManager = createShuffleManager(createInputContext);
        ShuffleInputEventHandlerImpl shuffleInputEventHandlerImpl = new ShuffleInputEventHandlerImpl(createInputContext, createShuffleManager, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class), (CompressionCodec) null, false, 0, false);
        shuffleInputEventHandlerImpl.handleEvents(Collections.singletonList(createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1)));
        CompositeInputAttemptIdentifier compositeInputAttemptIdentifier = new CompositeInputAttemptIdentifier(1, 1, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1);
        ((ShuffleManager) Mockito.verify(createShuffleManager, Mockito.times(1))).addKnownInput((String) Matchers.eq("localhost"), Matchers.eq(PORT), (CompositeInputAttemptIdentifier) Matchers.eq(compositeInputAttemptIdentifier), Matchers.eq(0));
        ((ShuffleManager.ShuffleEventInfo) createShuffleManager.shuffleInfoEventsMap.get(Integer.valueOf(compositeInputAttemptIdentifier.getInputIdentifier()))).scheduledForDownload = true;
        shuffleInputEventHandlerImpl.handleEvents(Collections.singletonList(createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0)));
        ((InputContext) Mockito.verify(createInputContext)).killSelf((Throwable) Matchers.any(Throwable.class), Matchers.anyString());
    }

    @Test(timeout = 5000)
    public void testPipelinedShuffleEvents_WithEmptyPartitions() throws IOException {
        InputContext createInputContext = createInputContext();
        ShuffleManager createShuffleManager = createShuffleManager(createInputContext);
        ShuffleInputEventHandlerImpl shuffleInputEventHandlerImpl = new ShuffleInputEventHandlerImpl(createInputContext, createShuffleManager, (FetchedInputAllocator) Mockito.mock(FetchedInputAllocator.class), (CompressionCodec) null, false, 0, false);
        BitSet bitSet = new BitSet(4);
        bitSet.flip(0, 4);
        Event createDataMovementEvent = createDataMovementEvent(true, 0, 1, 0, false, bitSet, 4, 0);
        shuffleInputEventHandlerImpl.handleEvents(Collections.singletonList(createDataMovementEvent));
        ((ShuffleManager) Mockito.verify(createShuffleManager, Mockito.times(1))).addCompletedInputWithNoData(new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0));
        shuffleInputEventHandlerImpl.handleEvents(Collections.singletonList(createDataMovementEvent));
        createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
        ((ShuffleManager) Mockito.verify(createShuffleManager, Mockito.times(2))).addCompletedInputWithNoData(inputAttemptIdentifier);
        ((ShuffleManager.ShuffleEventInfo) createShuffleManager.shuffleInfoEventsMap.get(Integer.valueOf(inputAttemptIdentifier.getInputIdentifier()))).scheduledForDownload = true;
        shuffleInputEventHandlerImpl.handleEvents(Collections.singletonList(createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1)));
        ((InputContext) Mockito.verify(createInputContext)).killSelf((Throwable) Matchers.any(Throwable.class), Matchers.anyString());
    }

    private Event createDataMovementEvent(boolean z, int i, int i2, int i3, boolean z2, BitSet bitSet, int i4, int i5) throws IOException {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder newBuilder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        if (bitSet.cardinality() != 0) {
            newBuilder.setEmptyPartitions(TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet)));
        }
        if (bitSet.cardinality() != i4) {
            newBuilder.setHost("localhost");
            newBuilder.setPort(PORT);
            newBuilder.setPathComponent("attemptPath");
        }
        if (z) {
            newBuilder.setSpillId(i3);
            newBuilder.setLastEvent(z2);
        }
        return DataMovementEvent.create(i, i2, i5, newBuilder.build().toByteString().asReadOnlyByteBuffer());
    }

    private Event createDataMovementEvent(int i, int i2, ByteString byteString) {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder newBuilder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        newBuilder.setHost("localhost");
        newBuilder.setPort(PORT);
        newBuilder.setPathComponent(PATH_COMPONENT);
        if (byteString != null) {
            newBuilder.setEmptyPartitions(byteString);
        }
        return DataMovementEvent.create(i, i2, 0, newBuilder.build().toByteString().asReadOnlyByteBuffer());
    }

    private ByteString createEmptyPartitionByteString(int... iArr) throws IOException {
        BitSet bitSet = new BitSet();
        for (int i : iArr) {
            bitSet.set(i);
        }
        return TezCommonUtils.compressByteArrayToByteString(TezUtilsInternal.toByteArray(bitSet));
    }
}
