/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ExceptionReporter;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetchedInputAllocatorOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleInputEventHandlerOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestShuffleInputEventHandlerOrderedGrouped {
    private static final String HOST = "localhost";
    private static final int PORT = 8080;
    private static final String PATH_COMPONENT = "attempt";
    private ShuffleInputEventHandlerOrderedGrouped handler;
    private ShuffleScheduler scheduler;
    private ShuffleScheduler realScheduler;
    private MergeManager mergeManager;
    private TezExecutors sharedExecutor;

    private InputContext createTezInputContext() throws IOException {
        ApplicationId applicationId = ApplicationId.newInstance((long)1L, (int)1);
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        ((InputContext)Mockito.doReturn((Object)applicationId).when((Object)inputContext)).getApplicationId();
        ((InputContext)Mockito.doReturn((Object)"sourceVertex").when((Object)inputContext)).getSourceVertexName();
        Mockito.when((Object)inputContext.getCounters()).thenReturn((Object)new TezCounters());
        ExecutionContextImpl executionContext = new ExecutionContextImpl(HOST);
        ((InputContext)Mockito.doReturn((Object)executionContext).when((Object)inputContext)).getExecutionContext();
        ByteBuffer shuffleBuffer = ByteBuffer.allocate(4).putInt(0, 4);
        ((InputContext)Mockito.doReturn((Object)shuffleBuffer).when((Object)inputContext)).getServiceProviderMetaData(Mockito.anyString());
        Token sessionToken = new Token((TokenIdentifier)new JobTokenIdentifier(new Text("text")), (SecretManager)new JobTokenSecretManager());
        ByteBuffer tokenBuffer = TezCommonUtils.serializeServiceData((Token)sessionToken);
        ((InputContext)Mockito.doReturn((Object)tokenBuffer).when((Object)inputContext)).getServiceConsumerMetaData(Mockito.anyString());
        Mockito.when((Object)inputContext.createTezFrameworkExecutorService(Mockito.anyInt(), Mockito.anyString())).thenAnswer((Answer)new Answer<ExecutorService>(){

            public ExecutorService answer(InvocationOnMock invocation) throws Throwable {
                return TestShuffleInputEventHandlerOrderedGrouped.this.sharedExecutor.createExecutorService(((Integer)invocation.getArgument(0, Integer.class)).intValue(), (String)invocation.getArgument(1, String.class));
            }
        });
        return inputContext;
    }

    private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString, boolean allPartitionsEmpty) {
        return this.createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString, allPartitionsEmpty, false, false, 0);
    }

    private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean finalMergeDisabled, boolean incrementalEvent, int spillId) {
        return this.createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString, allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, HOST, 8080);
    }

    private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean finalMergeDisabled, boolean incrementalEvent, int spillId, int attemptNum) {
        return this.createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString, allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, HOST, 8080, attemptNum);
    }

    private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean finalMergeDisabled, boolean incrementalEvent, int spillId, String host, int port) {
        return this.createDataMovementEvent(srcIndex, targetIndex, emptyPartitionByteString, allPartitionsEmpty, finalMergeDisabled, incrementalEvent, spillId, host, port, 0);
    }

    private Event createDataMovementEvent(int srcIndex, int targetIndex, ByteString emptyPartitionByteString, boolean allPartitionsEmpty, boolean finalMergeDisabled, boolean incrementalEvent, int spillId, String host, int port, int attemptNum) {
        ShuffleUserPayloads.DataMovementEventPayloadProto.Builder builder = ShuffleUserPayloads.DataMovementEventPayloadProto.newBuilder();
        if (!allPartitionsEmpty) {
            builder.setHost(host);
            builder.setPort(port);
            builder.setPathComponent(PATH_COMPONENT);
        }
        if (finalMergeDisabled) {
            builder.setLastEvent(!incrementalEvent);
            builder.setSpillId(spillId);
        }
        builder.setRunDuration(10);
        if (emptyPartitionByteString != null) {
            builder.setEmptyPartitions(emptyPartitionByteString);
        }
        return DataMovementEvent.create((int)srcIndex, (int)targetIndex, (int)attemptNum, (ByteBuffer)builder.build().toByteString().asReadOnlyByteBuffer());
    }

    @Before
    public void setup() throws Exception {
        this.sharedExecutor = new TezSharedExecutor(new Configuration());
        this.setupScheduler(2);
    }

    @After
    public void cleanup() {
        this.sharedExecutor.shutdownNow();
    }

    private void setupScheduler(int numInputs) throws Exception {
        InputContext inputContext = this.createTezInputContext();
        Configuration config = new Configuration();
        this.realScheduler = new ShuffleScheduler(inputContext, config, numInputs, (ExceptionReporter)Mockito.mock(Shuffle.class), (MergeManager)Mockito.mock(MergeManager.class), (FetchedInputAllocatorOrderedGrouped)Mockito.mock(MergeManager.class), System.currentTimeMillis(), null, false, 0, "src vertex");
        this.scheduler = (ShuffleScheduler)Mockito.spy((Object)this.realScheduler);
        this.handler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, this.scheduler, ShuffleUtils.isTezShuffleHandler((Configuration)config));
        this.mergeManager = (MergeManager)Mockito.mock(MergeManager.class);
    }

    @Test(timeout=10000L)
    public void testPiplinedShuffleEvents() throws IOException, InterruptedException {
        int attemptNum = 0;
        int inputIdx = 0;
        Event dme1 = this.createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0);
        CompositeInputAttemptIdentifier id1 = new CompositeInputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1);
        this.handler.handleEvents(Collections.singletonList(dme1));
        int partitionId = attemptNum;
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler)).addKnownMapOutput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), Mockito.eq((int)partitionId), (CompositeInputAttemptIdentifier)Mockito.eq((Object)id1));
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler)).pipelinedShuffleInfoEventsMap.containsKey(id1.getInputIdentifier());
        Event dme2 = this.createDataMovementEvent(attemptNum, inputIdx, null, false, true, false, 1);
        CompositeInputAttemptIdentifier id2 = new CompositeInputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1, 1);
        this.handler.handleEvents(Collections.singletonList(dme2));
        partitionId = attemptNum;
        Assert.assertTrue((boolean)this.scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler)).addKnownMapOutput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), Mockito.eq((int)partitionId), (CompositeInputAttemptIdentifier)Mockito.eq((Object)id2));
        Assert.assertTrue((boolean)this.scheduler.pipelinedShuffleInfoEventsMap.containsKey(id2.getInputIdentifier()));
        MapHost host = this.scheduler.getHost();
        Assert.assertTrue((host != null ? 1 : 0) != 0);
        List list = this.scheduler.getMapsForHost(host);
        Assert.assertTrue((!list.isEmpty() ? 1 : 0) != 0);
        MapOutput output = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)id2, (FetchedInputAllocatorOrderedGrouped)this.mergeManager, (int)1000, (boolean)true);
        this.scheduler.copySucceeded((InputAttemptIdentifier)id2, host, 1000L, 10000L, 10000L, output, false);
        Assert.assertTrue((!this.scheduler.isDone() ? 1 : 0) != 0);
        output = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)id1, (FetchedInputAllocatorOrderedGrouped)this.mergeManager, (int)1000, (boolean)true);
        this.scheduler.copySucceeded((InputAttemptIdentifier)id1, host, 1000L, 10000L, 10000L, output, false);
        Assert.assertTrue((!this.scheduler.isDone() ? 1 : 0) != 0);
        attemptNum = 0;
        inputIdx = 1;
        Event dme3 = this.createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 1);
        InputAttemptIdentifier id3 = new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0);
        this.handler.handleEvents(Collections.singletonList(dme3));
        InputAttemptIdentifier id4 = new InputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE, 1);
        Assert.assertTrue((!this.scheduler.isInputFinished(id4.getInputIdentifier()) ? 1 : 0) != 0);
        this.scheduler.copySucceeded(id4, null, 0L, 0L, 0L, null, false);
        Assert.assertTrue((!this.scheduler.isDone() ? 1 : 0) != 0);
        output = MapOutput.createMemoryMapOutput((InputAttemptIdentifier)id3, (FetchedInputAllocatorOrderedGrouped)this.mergeManager, (int)1000, (boolean)true);
        this.scheduler.copySucceeded(id3, host, 1000L, 10000L, 10000L, output, false);
        Assert.assertTrue((boolean)this.scheduler.isDone());
    }

    @Test(timeout=5000L)
    public void testPiplinedShuffleEvents_WithOutofOrderAttempts() throws IOException, InterruptedException {
        int attemptNum = 1;
        int inputIdx = 1;
        Event dme1 = this.createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum);
        this.handler.handleEvents(Collections.singletonList(dme1));
        CompositeInputAttemptIdentifier id1 = new CompositeInputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1);
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler, (VerificationMode)Mockito.times((int)1))).addKnownMapOutput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), Mockito.eq((int)1), (CompositeInputAttemptIdentifier)Mockito.eq((Object)id1));
        Assert.assertTrue((String)"Shuffle info events should not be empty for pipelined shuffle", (!this.scheduler.pipelinedShuffleInfoEventsMap.isEmpty() ? 1 : 0) != 0);
        int valuesInMapLocations = this.scheduler.mapLocations.values().size();
        Assert.assertTrue((String)("Maplocations should have values. current size: " + valuesInMapLocations), (valuesInMapLocations > 0 ? 1 : 0) != 0);
        this.scheduler.getMapsForHost((MapHost)this.scheduler.mapLocations.values().iterator().next());
        attemptNum = 0;
        inputIdx = 1;
        Event dme2 = this.createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum);
        this.handler.handleEvents(Collections.singletonList(dme2));
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler, (VerificationMode)Mockito.times((int)1))).killSelf((Exception)Mockito.any(), (String)Mockito.any());
    }

    @Test(timeout=5000L)
    public void testPipelinedShuffle_WithObsoleteEvents() throws IOException, InterruptedException {
        int attemptNum = 1;
        int inputIdx = 1;
        Event dme1 = this.createDataMovementEvent(attemptNum, inputIdx, null, false, true, true, 0, attemptNum);
        this.handler.handleEvents(Collections.singletonList(dme1));
        CompositeInputAttemptIdentifier id1 = new CompositeInputAttemptIdentifier(inputIdx, attemptNum, PATH_COMPONENT, false, InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE, 0, 1);
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler, (VerificationMode)Mockito.times((int)1))).addKnownMapOutput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), Mockito.eq((int)1), (CompositeInputAttemptIdentifier)Mockito.eq((Object)id1));
        Assert.assertTrue((String)"Shuffle info events should not be empty for pipelined shuffle", (!this.scheduler.pipelinedShuffleInfoEventsMap.isEmpty() ? 1 : 0) != 0);
        int valuesInMapLocations = this.scheduler.mapLocations.values().size();
        Assert.assertTrue((String)("Maplocations should have values. current size: " + valuesInMapLocations), (valuesInMapLocations > 0 ? 1 : 0) != 0);
        this.scheduler.getMapsForHost((MapHost)this.scheduler.mapLocations.values().iterator().next());
        LinkedList<InputFailedEvent> events = new LinkedList<InputFailedEvent>();
        int targetIdx = 1;
        InputFailedEvent failedEvent = InputFailedEvent.create((int)targetIdx, (int)0);
        events.add(failedEvent);
        this.handler.handleEvents(events);
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler, (VerificationMode)Mockito.times((int)1))).killSelf((Exception)Mockito.any(), (String)Mockito.any());
    }

    @Test(timeout=5000L)
    public void basicTest() throws IOException {
        LinkedList<Event> events = new LinkedList<Event>();
        int srcIdx = 0;
        int targetIdx = 1;
        Event dme = this.createDataMovementEvent(srcIdx, targetIdx, null, false);
        events.add(dme);
        this.handler.handleEvents(events);
        CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(targetIdx, 0, PATH_COMPONENT, 1);
        int partitionId = srcIdx;
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler)).addKnownMapOutput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), Mockito.eq((int)partitionId), (CompositeInputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier));
        Assert.assertTrue((String)"Shuffle info events should be empty for regular shuffle codepath", (boolean)this.scheduler.pipelinedShuffleInfoEventsMap.isEmpty());
    }

    @Test(timeout=5000L)
    public void testFailedEvent() throws IOException {
        LinkedList<InputFailedEvent> events = new LinkedList<InputFailedEvent>();
        int targetIdx = 1;
        InputFailedEvent failedEvent = InputFailedEvent.create((int)targetIdx, (int)0);
        events.add(failedEvent);
        this.handler.handleEvents(events);
        InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler)).obsoleteInput((InputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier));
    }

    @Test(timeout=5000L)
    public void testAllPartitionsEmpty() throws IOException {
        LinkedList<Event> events = new LinkedList<Event>();
        int srcIdx = 0;
        int targetIdx = 1;
        Event dme = this.createDataMovementEvent(srcIdx, targetIdx, this.createEmptyPartitionByteString(srcIdx), true);
        events.add(dme);
        this.handler.handleEvents(events);
        InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler)).copySucceeded((InputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier), (MapHost)Mockito.any(), Mockito.eq((long)0L), Mockito.eq((long)0L), Mockito.eq((long)0L), (MapOutput)Mockito.any(), Mockito.eq((boolean)true));
    }

    @Test(timeout=5000L)
    public void testCurrentPartitionEmpty() throws IOException {
        LinkedList<Event> events = new LinkedList<Event>();
        int srcIdx = 0;
        int targetIdx = 1;
        Event dme = this.createDataMovementEvent(srcIdx, targetIdx, this.createEmptyPartitionByteString(srcIdx), false);
        events.add(dme);
        this.handler.handleEvents(events);
        InputAttemptIdentifier expectedIdentifier = new InputAttemptIdentifier(targetIdx, 0);
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler)).copySucceeded((InputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier), (MapHost)Mockito.any(), Mockito.eq((long)0L), Mockito.eq((long)0L), Mockito.eq((long)0L), (MapOutput)Mockito.any(), Mockito.eq((boolean)true));
    }

    @Test(timeout=5000L)
    public void testOtherPartitionEmpty() throws IOException {
        LinkedList<Event> events = new LinkedList<Event>();
        int srcIdx = 0;
        int taskIndex = 1;
        Event dme = this.createDataMovementEvent(srcIdx, taskIndex, this.createEmptyPartitionByteString(100), false);
        events.add(dme);
        this.handler.handleEvents(events);
        int partitionId = srcIdx;
        CompositeInputAttemptIdentifier expectedIdentifier = new CompositeInputAttemptIdentifier(taskIndex, 0, PATH_COMPONENT, 1);
        ((ShuffleScheduler)Mockito.verify((Object)this.scheduler)).addKnownMapOutput((String)Mockito.eq((Object)HOST), Mockito.eq((int)8080), Mockito.eq((int)partitionId), (CompositeInputAttemptIdentifier)Mockito.eq((Object)expectedIdentifier));
    }

    private ByteString createEmptyPartitionByteString(int ... emptyPartitions) throws IOException {
        BitSet bitSet = new BitSet();
        for (int i : emptyPartitions) {
            bitSet.set(i);
        }
        return TezCommonUtils.compressByteArrayToByteString((byte[])TezUtilsInternal.toByteArray((BitSet)bitSet));
    }
}

