/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.output;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.tez.common.EnvironmentUpdateUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.library.api.KeyValuesWriter;
import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(value=Parameterized.class)
public class TestOnFileSortedOutput {
    private static final Random rnd = new Random();
    private static final String UniqueID = "UUID";
    private static final String HOST = "localhost";
    private static final int PORT = 80;
    private Configuration conf;
    private FileSystem fs;
    private Path workingDir;
    private int partitions;
    private int sorterThreads;
    private KeyValuesWriter writer;
    private OrderedPartitionedKVOutput sortedOutput;
    private boolean sendEmptyPartitionViaEvent;
    private int emptyPartitionIdx;

    public TestOnFileSortedOutput(boolean sendEmptyPartitionViaEvent, int threads, int emptyPartitionIdx) throws IOException {
        this.sendEmptyPartitionViaEvent = sendEmptyPartitionViaEvent;
        this.emptyPartitionIdx = emptyPartitionIdx;
        this.sorterThreads = threads;
        this.conf = new Configuration();
        this.workingDir = new Path(".", this.getClass().getName());
        String localDirs = this.workingDir.toString();
        this.conf.setStrings("tez.runtime.framework.local.dirs", new String[]{localDirs});
        this.fs = FileSystem.getLocal((Configuration)this.conf);
    }

    @Before
    public void setup() throws Exception {
        this.conf.setInt("tez.runtime.sort.threads", this.sorterThreads);
        this.conf.setInt("tez.runtime.io.sort.mb", 5);
        this.conf.set("tez.runtime.key.class", Text.class.getName());
        this.conf.set("tez.runtime.value.class", Text.class.getName());
        this.conf.set("tez.runtime.partitioner.class", HashPartitioner.class.getName());
        this.conf.setBoolean("tez.runtime.empty.partitions.info-via-events.enabled", this.sendEmptyPartitionViaEvent);
        EnvironmentUpdateUtils.put((String)ApplicationConstants.Environment.NM_HOST.toString(), (String)HOST);
        this.fs.mkdirs(this.workingDir);
        this.partitions = Math.max(1, rnd.nextInt(10));
    }

    @After
    public void cleanup() throws IOException {
        this.fs.delete(this.workingDir, true);
    }

    @Parameterized.Parameters(name="test[{0}, {1}, {2}]")
    public static Collection<Object[]> getParameters() {
        ArrayList<Object[]> parameters = new ArrayList<Object[]>();
        parameters.add(new Object[]{false, 1, -1});
        parameters.add(new Object[]{false, 1, 0});
        parameters.add(new Object[]{true, 1, -1});
        parameters.add(new Object[]{true, 1, 0});
        parameters.add(new Object[]{false, 2, -1});
        parameters.add(new Object[]{false, 2, 0});
        parameters.add(new Object[]{true, 2, -1});
        parameters.add(new Object[]{true, 2, 0});
        return parameters;
    }

    private void startSortedOutput(int partitions) throws Exception {
        OutputContext context = this.createTezOutputContext();
        this.sortedOutput = new OrderedPartitionedKVOutput(context, partitions);
        this.sortedOutput.initialize();
        this.sortedOutput.start();
        this.writer = this.sortedOutput.getWriter();
    }

    @Test
    public void baseTest() throws Exception {
        this.startSortedOutput(this.partitions);
        for (int i = 0; i < Math.max(1, rnd.nextInt(50)); ++i) {
            Text key = new Text(new BigInteger(256, rnd).toString());
            LinkedList<Text> values = new LinkedList<Text>();
            for (int j = 0; j < Math.max(2, rnd.nextInt(10)); ++j) {
                values.add(new Text(new BigInteger(256, rnd).toString()));
            }
            this.writer.write((Object)key, values);
        }
        List eventList = this.sortedOutput.close();
        Assert.assertTrue((eventList != null && eventList.size() == 2 ? 1 : 0) != 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)((CompositeDataMovementEvent)eventList.get(1)).getUserPayload()));
        Assert.assertEquals((Object)HOST, (Object)payload.getHost());
        Assert.assertEquals((long)80L, (long)payload.getPort());
        Assert.assertEquals((Object)UniqueID, (Object)payload.getPathComponent());
    }

    @Test
    public void testWithSomeEmptyPartition() throws Exception {
        this.partitions = Math.max(2, this.partitions);
        this.startSortedOutput(this.partitions);
        for (int i = 0; i < 2 * this.partitions; ++i) {
            Text key = new Text(new BigInteger(256, rnd).toString());
            Text value = new Text(new BigInteger(256, rnd).toString());
            if (i % this.partitions == this.emptyPartitionIdx) continue;
            this.writer.write((Object)key, (Object)value);
        }
        List eventList = this.sortedOutput.close();
        Assert.assertTrue((eventList != null && eventList.size() == 2 ? 1 : 0) != 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)((CompositeDataMovementEvent)eventList.get(1)).getUserPayload()));
        Assert.assertEquals((Object)HOST, (Object)payload.getHost());
        Assert.assertEquals((long)80L, (long)payload.getPort());
        Assert.assertEquals((Object)UniqueID, (Object)payload.getPathComponent());
    }

    @Test
    public void testAllEmptyPartition() throws Exception {
        this.startSortedOutput(this.partitions);
        List eventList = this.sortedOutput.close();
        Assert.assertTrue((eventList != null && eventList.size() == 2 ? 1 : 0) != 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto payload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom((ByteString)ByteString.copyFrom((ByteBuffer)((CompositeDataMovementEvent)eventList.get(1)).getUserPayload()));
        if (this.sendEmptyPartitionViaEvent) {
            Assert.assertEquals((Object)"", (Object)payload.getHost());
            Assert.assertEquals((long)0L, (long)payload.getPort());
            Assert.assertEquals((Object)"", (Object)payload.getPathComponent());
        } else {
            Assert.assertEquals((Object)HOST, (Object)payload.getHost());
            Assert.assertEquals((long)80L, (long)payload.getPort());
            Assert.assertEquals((Object)UniqueID, (Object)payload.getPathComponent());
        }
    }

    private OutputContext createTezOutputContext() throws IOException {
        String[] workingDirs = new String[]{this.workingDir.toString()};
        UserPayload payLoad = TezUtils.createUserPayloadFromConf((Configuration)this.conf);
        DataOutputBuffer serviceProviderMetaData = new DataOutputBuffer();
        serviceProviderMetaData.writeInt(80);
        TezCounters counters = new TezCounters();
        OutputContext context = (OutputContext)Mockito.mock(OutputContext.class);
        ((OutputContext)Mockito.doReturn((Object)counters).when((Object)context)).getCounters();
        ((OutputContext)Mockito.doReturn((Object)workingDirs).when((Object)context)).getWorkDirs();
        ((OutputContext)Mockito.doReturn((Object)payLoad).when((Object)context)).getUserPayload();
        ((OutputContext)Mockito.doReturn((Object)0x500000L).when((Object)context)).getTotalMemoryAvailableToTask();
        ((OutputContext)Mockito.doReturn((Object)UniqueID).when((Object)context)).getUniqueIdentifier();
        ((OutputContext)Mockito.doReturn((Object)"v1").when((Object)context)).getDestinationVertexName();
        ((OutputContext)Mockito.doReturn((Object)ByteBuffer.wrap(serviceProviderMetaData.getData())).when((Object)context)).getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
        ((OutputContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                long requestedSize = (Long)invocation.getArguments()[0];
                MemoryUpdateCallbackHandler callback = (MemoryUpdateCallbackHandler)invocation.getArguments()[1];
                callback.memoryAssigned(requestedSize);
                return null;
            }
        }).when((Object)context)).requestInitialMemory(Matchers.anyLong(), (MemoryUpdateCallback)Matchers.any(MemoryUpdateCallback.class));
        return context;
    }
}

