/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.protobuf.ByteString;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedList;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleInputEventHandlerImpl;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestShuffleInputEventHandlerImpl {
    private static final String HOST = "localhost";
    private static final int PORT = 8080;
    private static final String PATH_COMPONENT = "attempttmp";
    private final Configuration conf = new Configuration();
    private TezExecutors sharedExecutor;

    @Before
    public void setup() {
        this.sharedExecutor = new TezSharedExecutor(this.conf);
    }

    @After
    public void cleanup() {
        this.sharedExecutor.shutdownNow();
    }

    @Test(timeout=5000L)
    public void testSimple() throws IOException {
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ShuffleManager shuffleManager = (ShuffleManager)Mockito.mock(ShuffleManager.class);
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, inputAllocator, null, false, 0, false);
        int taskIndex = 1;
        Event dme = this.createDataMovementEvent(0, taskIndex, null);
        LinkedList<Event> eventList = new LinkedList<Event>();
        eventList.add(dme);
        handler.handleEvents(eventList);
        CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT, 1);
        ((ShuffleManager)Mockito.verify((Object)shuffleManager)).addKnownInput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), (CompositeInputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier), Mockito.eq((int)0));
    }

    @Test(timeout=5000L)
    public void testCurrentPartitionEmpty() throws IOException {
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ShuffleManager shuffleManager = (ShuffleManager)Mockito.mock(ShuffleManager.class);
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, inputAllocator, null, false, 0, false);
        int taskIndex = 1;
        Event dme = this.createDataMovementEvent(0, taskIndex, this.createEmptyPartitionByteString(0));
        LinkedList<Event> eventList = new LinkedList<Event>();
        eventList.add(dme);
        handler.handleEvents(eventList);
        InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(taskIndex, 0);
        ((ShuffleManager)Mockito.verify((Object)shuffleManager)).addCompletedInputWithNoData((InputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier));
    }

    @Test(timeout=5000L)
    public void testOtherPartitionEmpty() throws IOException {
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ShuffleManager shuffleManager = (ShuffleManager)Mockito.mock(ShuffleManager.class);
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, inputAllocator, null, false, 0, false);
        int taskIndex = 1;
        Event dme = this.createDataMovementEvent(0, taskIndex, this.createEmptyPartitionByteString(1));
        LinkedList<Event> eventList = new LinkedList<Event>();
        eventList.add(dme);
        handler.handleEvents(eventList);
        CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT, 1);
        ((ShuffleManager)Mockito.verify((Object)shuffleManager)).addKnownInput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), (CompositeInputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier), Mockito.eq((int)0));
    }

    @Test(timeout=5000L)
    public void testMultipleEvents1() throws IOException {
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ShuffleManager shuffleManager = (ShuffleManager)Mockito.mock(ShuffleManager.class);
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, inputAllocator, null, false, 0, false);
        int taskIndex1 = 1;
        Event dme1 = this.createDataMovementEvent(0, taskIndex1, this.createEmptyPartitionByteString(0));
        int taskIndex2 = 2;
        Event dme2 = this.createDataMovementEvent(0, taskIndex2, null);
        LinkedList<Event> eventList = new LinkedList<Event>();
        eventList.add(dme1);
        eventList.add(dme2);
        handler.handleEvents(eventList);
        InputAttemptIdentifier expectedIdentifier1 = new InputAttemptIdentifier(taskIndex1, 0);
        CompositeInputAttemptIdentifier expectedIdentifier2 = new CompositeInputAttemptIdentifier(taskIndex2, 0, PATH_COMPONENT, 1);
        ((ShuffleManager)Mockito.verify((Object)shuffleManager)).addCompletedInputWithNoData((InputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier1));
        ((ShuffleManager)Mockito.verify((Object)shuffleManager)).addKnownInput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), (CompositeInputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier2), Mockito.eq((int)0));
    }

    private InputContext createInputContext() throws IOException {
        DataOutputBuffer port_dob = new DataOutputBuffer();
        port_dob.writeInt(8080);
        ByteBuffer shuffleMetaData = ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
        port_dob.close();
        ExecutionContext executionContext = (ExecutionContext)Mockito.mock(ExecutionContext.class);
        ((ExecutionContext)Mockito.doReturn((Object)HOST).when((Object)executionContext)).getHostName();
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ((InputContext)Mockito.doReturn((Object)new TezCounters()).when((Object)inputContext)).getCounters();
        ((InputContext)Mockito.doReturn((Object)"sourceVertex").when((Object)inputContext)).getSourceVertexName();
        ((InputContext)Mockito.doReturn((Object)"taskVertex").when((Object)inputContext)).getTaskVertexName();
        ((InputContext)Mockito.doReturn((Object)shuffleMetaData).when((Object)inputContext)).getServiceProviderMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        ((InputContext)Mockito.doReturn((Object)executionContext).when((Object)inputContext)).getExecutionContext();
        Mockito.when((Object)inputContext.createTezFrameworkExecutorService(Mockito.anyInt(), Mockito.anyString())).thenAnswer((Answer)new Answer<ExecutorService>(){

            public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
                return TestShuffleInputEventHandlerImpl.this.sharedExecutor.createExecutorService(((Integer)invocation.getArgument(0, Integer.class)).intValue(), (String)invocation.getArgument(1, String.class));
            }
        });
        return inputContext;
    }

    private ShuffleManager createShuffleManager(InputContext inputContext) throws IOException {
        Path outDirBase = new Path(".", "outDir");
        String[] outDirs = new String[]{outDirBase.toString()};
        ((InputContext)Mockito.doReturn((Object)outDirs).when((Object)inputContext)).getWorkDirs();
        this.conf.setStrings("tez.runtime.framework.local.dirs", inputContext.getWorkDirs());
        DataOutputBuffer out = new DataOutputBuffer();
        Token token = new Token((TokenIdentifier)new JobTokenIdentifier(), (SecretManager)new JobTokenSecretManager(null));
        token.write((DataOutput)out);
        ((InputContext)Mockito.doReturn((Object)ByteBuffer.wrap(out.getData())).when((Object)inputContext)).getServiceConsumerMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        ShuffleManager realShuffleManager = new ShuffleManager(inputContext, this.conf, 2, 1024, false, -1, null, inputAllocator);
        ShuffleManager shuffleManager = (ShuffleManager)Mockito.spy((Object)realShuffleManager);
        return shuffleManager;
    }

    @Test(timeout=5000L)
    public void testPipelinedShuffleEvents() throws IOException {
        InputContext inputContext = this.createInputContext();
        ShuffleManager shuffleManager = this.createShuffleManager(inputContext);
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, inputAllocator, null, false, 0, false);
        Event dme = this.createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
        handler.handleEvents(Collections.singletonList(dme));
        CompositeInputAttemptIdentifier expectedId1 = new CompositeInputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1);
        ((ShuffleManager)Mockito.verify((Object)shuffleManager, (VerificationMode)Mockito.times((int)1))).addKnownInput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), (CompositeInputAttemptIdentifier)Mockito.eq((Object)expectedId1), Mockito.eq((int)0));
        dme = this.createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
        handler.handleEvents(Collections.singletonList(dme));
        CompositeInputAttemptIdentifier expectedId2 = new CompositeInputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1);
        ((ShuffleManager)Mockito.verify((Object)shuffleManager, (VerificationMode)Mockito.times((int)2))).addKnownInput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), (CompositeInputAttemptIdentifier)Mockito.eq((Object)expectedId2), Mockito.eq((int)0));
        ((ShuffleManager.ShuffleEventInfo)shuffleManager.shuffleInfoEventsMap.get((Object)Integer.valueOf((int)expectedId2.getInputIdentifier()))).scheduledForDownload = true;
        dme = this.createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 1);
        handler.handleEvents(Collections.singletonList(dme));
        ((InputContext)Mockito.verify((Object)inputContext)).killSelf((Throwable)Mockito.any(), Mockito.anyString());
    }

    @Test(timeout=5000L)
    public void testPipelinedShuffleEvents_WithOutOfOrderAttempts() throws IOException {
        InputContext inputContext = this.createInputContext();
        ShuffleManager shuffleManager = this.createShuffleManager(inputContext);
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, inputAllocator, null, false, 0, false);
        Event dme = this.createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
        handler.handleEvents(Collections.singletonList(dme));
        CompositeInputAttemptIdentifier expected = new CompositeInputAttemptIdentifier(1, 1, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1, 1);
        ((ShuffleManager)Mockito.verify((Object)shuffleManager, (VerificationMode)Mockito.times((int)1))).addKnownInput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), (CompositeInputAttemptIdentifier)Mockito.eq((Object)expected), Mockito.eq((int)0));
        ((ShuffleManager.ShuffleEventInfo)shuffleManager.shuffleInfoEventsMap.get((Object)Integer.valueOf((int)expected.getInputIdentifier()))).scheduledForDownload = true;
        dme = this.createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 0);
        handler.handleEvents(Collections.singletonList(dme));
        ((InputContext)Mockito.verify((Object)inputContext)).killSelf((Throwable)Mockito.any(), Mockito.anyString());
    }

    @Test(timeout=5000L)
    public void testPipelinedShuffleEvents_WithEmptyPartitions() throws IOException {
        InputContext inputContext = this.createInputContext();
        ShuffleManager shuffleManager = this.createShuffleManager(inputContext);
        FetchedInputAllocator inputAllocator = (FetchedInputAllocator)Mockito.mock(FetchedInputAllocator.class);
        ShuffleInputEventHandlerImpl handler = new ShuffleInputEventHandlerImpl(inputContext, shuffleManager, inputAllocator, null, false, 0, false);
        BitSet bitSet = new BitSet(4);
        bitSet.flip(0, 4);
        Event dme = this.createDataMovementEvent(true, 0, 1, 0, false, bitSet, 4, 0);
        handler.handleEvents(Collections.singletonList(dme));
        InputAttemptIdentifier expected = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
        ((ShuffleManager)Mockito.verify((Object)shuffleManager, (VerificationMode)Mockito.times((int)1))).addCompletedInputWithNoData(expected);
        handler.handleEvents(Collections.singletonList(dme));
        dme = this.createDataMovementEvent(true, 0, 1, 1, false, new BitSet(), 4, 0);
        expected = new InputAttemptIdentifier(1, 0, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 1);
        ((ShuffleManager)Mockito.verify((Object)shuffleManager, (VerificationMode)Mockito.times((int)2))).addCompletedInputWithNoData(expected);
        ((ShuffleManager.ShuffleEventInfo)shuffleManager.shuffleInfoEventsMap.get((Object)Integer.valueOf((int)expected.getInputIdentifier()))).scheduledForDownload = true;
        dme = this.createDataMovementEvent(true, 0, 1, 0, false, new BitSet(), 4, 1);
        handler.handleEvents(Collections.singletonList(dme));
        ((InputContext)Mockito.verify((Object)inputContext)).killSelf((Throwable)Mockito.any(), Mockito.anyString());
    }

    private Event createDataMovementEvent(boolean addSpillDetails, int srcIdx, int targetIdx, int spillId, boolean isLastSpill, BitSet emptyPartitions, int numPartitions, int attemptNum) throws IOException {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder payloadBuilder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        if (emptyPartitions.cardinality() != 0) {
            ByteString emptyPartitionsByteString = TezCommonUtils.compressByteArrayToByteString((byte[])TezUtilsInternal.toByteArray((BitSet)emptyPartitions));
            payloadBuilder.setEmptyPartitions(emptyPartitionsByteString);
        }
        if (emptyPartitions.cardinality() != numPartitions) {
            payloadBuilder.setHost(HOST);
            payloadBuilder.setPort(8080);
            payloadBuilder.setPathComponent("attemptPath");
        }
        if (addSpillDetails) {
            payloadBuilder.setSpillId(spillId);
            payloadBuilder.setLastEvent(isLastSpill);
        }
        ByteBuffer payload = payloadBuilder.build().toByteString().asReadOnlyByteBuffer();
        return DataMovementEvent.create((int)srcIdx, (int)targetIdx, (int)attemptNum, (ByteBuffer)payload);
    }

    private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString) {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        builder.setHost(HOST);
        builder.setPort(8080);
        builder.setPathComponent(PATH_COMPONENT);
        if (emptyPartitionByteString != null) {
            builder.setEmptyPartitions(emptyPartitionByteString);
        }
        DataMovementEvent dme = DataMovementEvent.create((int)srcIndex, (int)targetIndex, (int)0, (ByteBuffer)builder.build().toByteString().asReadOnlyByteBuffer());
        return dme;
    }

    private ByteString createEmptyPartitionByteString(int ... emptyPartitions) throws IOException {
        BitSet bitSet = new BitSet();
        for (int i : emptyPartitions) {
            bitSet.set(i);
        }
        ByteString emptyPartitionsBytesString = TezCommonUtils.compressByteArrayToByteString((byte[])TezUtilsInternal.toByteArray((BitSet)bitSet));
        return emptyPartitionsBytesString;
    }
}

