/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleClientMetrics;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestFetcher {
    public static final String SHUFFLE_INPUT_FILE_PREFIX = "shuffle_input_file_";
    public static final String HOST = "localhost";
    public static final int PORT = 0;
    static final Log LOG = LogFactory.getLog(TestFetcher.class);

    @Test(timeout=5000L)
    public void testSetupLocalDiskFetch() throws Exception {
        TezConfiguration conf = new TezConfiguration();
        ShuffleScheduler scheduler = (ShuffleScheduler)Mockito.mock(ShuffleScheduler.class);
        MergeManager merger = (MergeManager)Mockito.mock(MergeManager.class);
        ShuffleClientMetrics metrics = (ShuffleClientMetrics)Mockito.mock(ShuffleClientMetrics.class);
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        Mockito.when((Object)inputContext.getCounters()).thenReturn((Object)new TezCounters());
        Mockito.when((Object)inputContext.getSourceVertexName()).thenReturn((Object)"");
        FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0, null, inputContext, (Configuration)conf, true);
        FetcherOrderedGrouped spyFetcher = (FetcherOrderedGrouped)Mockito.spy((Object)fetcher);
        MapHost host = new MapHost(1, "localhost:0", "http://localhost:0/mapOutput?job=job_123&&reduce=1&map=");
        List<InputAttemptIdentifier> srcAttempts = Arrays.asList(new InputAttemptIdentifier(0, 1, "attemptpathComponent_0"), new InputAttemptIdentifier(1, 2, "attemptpathComponent_1"), new InputAttemptIdentifier(2, 3, "attemptpathComponent_2"), new InputAttemptIdentifier(3, 4, "attemptpathComponent_3"), new InputAttemptIdentifier(4, 4, "attemptpathComponent_4"));
        int FIRST_FAILED_ATTEMPT_IDX = 2;
        int SECOND_FAILED_ATTEMPT_IDX = 4;
        int[] sucessfulAttemptsIndexes = new int[]{0, 1, 3};
        ((ShuffleScheduler)Mockito.doReturn(srcAttempts).when((Object)scheduler)).getMapsForHost(host);
        ((FetcherOrderedGrouped)Mockito.doAnswer((Answer)new Answer<MapOutput>(){

            public MapOutput answer(InvocationOnMock invocation) throws Throwable {
                Object[] args = invocation.getArguments();
                MapOutput mapOutput = (MapOutput)Mockito.mock(MapOutput.class);
                ((MapOutput)Mockito.doReturn((Object)MapOutput.Type.DISK_DIRECT).when((Object)mapOutput)).getType();
                ((MapOutput)Mockito.doReturn((Object)args[0]).when((Object)mapOutput)).getAttemptIdentifier();
                return mapOutput;
            }
        }).when((Object)spyFetcher)).getMapOutputForDirectDiskFetch((InputAttemptIdentifier)Matchers.any(InputAttemptIdentifier.class), (Path)Matchers.any(Path.class), (TezIndexRecord)Matchers.any(TezIndexRecord.class));
        ((FetcherOrderedGrouped)Mockito.doAnswer((Answer)new Answer<Path>(){

            public Path answer(InvocationOnMock invocation) throws Throwable {
                Object[] args = invocation.getArguments();
                return new Path(TestFetcher.SHUFFLE_INPUT_FILE_PREFIX + args[0]);
            }
        }).when((Object)spyFetcher)).getShuffleInputFileName(Matchers.anyString(), Matchers.anyString());
        ((FetcherOrderedGrouped)Mockito.doAnswer((Answer)new Answer<TezIndexRecord>(){

            public TezIndexRecord answer(InvocationOnMock invocation) throws Throwable {
                int len;
                Object[] args = invocation.getArguments();
                String pathComponent = (String)args[0];
                long p = Long.valueOf(pathComponent.substring((len = pathComponent.length()) - 1, len));
                if (p == 2L || p == 4L) {
                    throw new IOException("failing to simulate failure case");
                }
                return new TezIndexRecord(p * 10L, p * 1000L, p * 100L);
            }
        }).when((Object)spyFetcher)).getIndexRecord(Matchers.anyString(), Matchers.eq((int)host.getPartitionId()));
        ((ShuffleScheduler)Mockito.doNothing().when((Object)scheduler)).copySucceeded((InputAttemptIdentifier)Matchers.any(InputAttemptIdentifier.class), (MapHost)Matchers.any(MapHost.class), Matchers.anyLong(), Matchers.anyLong(), Matchers.anyLong(), (MapOutput)Matchers.any(MapOutput.class));
        ((ShuffleScheduler)Mockito.doNothing().when((Object)scheduler)).putBackKnownMapOutput(host, srcAttempts.get(2));
        ((ShuffleScheduler)Mockito.doNothing().when((Object)scheduler)).putBackKnownMapOutput(host, srcAttempts.get(4));
        spyFetcher.setupLocalDiskFetch(host);
        for (int i : sucessfulAttemptsIndexes) {
            this.verifyCopySucceeded(scheduler, host, srcAttempts, i);
        }
        ((ShuffleScheduler)Mockito.verify((Object)scheduler)).copyFailed(srcAttempts.get(2), host, true, false);
        ((ShuffleScheduler)Mockito.verify((Object)scheduler)).copyFailed(srcAttempts.get(4), host, true, false);
        ((ShuffleClientMetrics)Mockito.verify((Object)metrics, (VerificationMode)Mockito.times((int)3))).successFetch();
        ((ShuffleClientMetrics)Mockito.verify((Object)metrics, (VerificationMode)Mockito.times((int)2))).failedFetch();
        ((FetcherOrderedGrouped)Mockito.verify((Object)spyFetcher)).putBackRemainingMapOutputs(host);
        ((ShuffleScheduler)Mockito.verify((Object)scheduler)).putBackKnownMapOutput(host, srcAttempts.get(2));
        ((ShuffleScheduler)Mockito.verify((Object)scheduler)).putBackKnownMapOutput(host, srcAttempts.get(4));
    }

    private void verifyCopySucceeded(ShuffleScheduler scheduler, MapHost host, List<InputAttemptIdentifier> srcAttempts, long p) throws IOException {
        InputAttemptIdentifier srcAttemptToMatch = srcAttempts.get((int)p);
        String filenameToMatch = SHUFFLE_INPUT_FILE_PREFIX + srcAttemptToMatch.getPathComponent();
        ArgumentCaptor captureMapOutput = ArgumentCaptor.forClass(MapOutput.class);
        ((ShuffleScheduler)Mockito.verify((Object)scheduler)).copySucceeded((InputAttemptIdentifier)Matchers.eq((Object)srcAttemptToMatch), (MapHost)Matchers.eq((Object)host), Matchers.eq((long)(p * 100L)), Matchers.eq((long)(p * 1000L)), Matchers.anyLong(), (MapOutput)captureMapOutput.capture());
        MapOutput m = (MapOutput)captureMapOutput.getAllValues().get(0);
        Assert.assertTrue((m.getType().equals((Object)MapOutput.Type.DISK_DIRECT) && m.getAttemptIdentifier().equals((Object)srcAttemptToMatch) ? 1 : 0) != 0);
    }

    @Test
    public void testWithRetry() throws Exception {
        TezConfiguration conf = new TezConfiguration();
        conf.setInt("tez.runtime.shuffle.read.timeout", 3000);
        conf.setInt("tez.runtime.shuffle.connect.timeout", 3000);
        ShuffleScheduler scheduler = (ShuffleScheduler)Mockito.mock(ShuffleScheduler.class);
        MergeManager merger = (MergeManager)Mockito.mock(MergeManager.class);
        ShuffleClientMetrics metrics = (ShuffleClientMetrics)Mockito.mock(ShuffleClientMetrics.class);
        Shuffle shuffle = (Shuffle)Mockito.mock(Shuffle.class);
        InputContext inputContext = (InputContext)Mockito.mock(InputContext.class);
        Mockito.when((Object)inputContext.getCounters()).thenReturn((Object)new TezCounters());
        Mockito.when((Object)inputContext.getSourceVertexName()).thenReturn((Object)"");
        HttpConnection.HttpConnectionParams httpConnectionParams = ShuffleUtils.constructHttpShuffleConnectionParams((Configuration)conf);
        FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, null, false, 0, null, inputContext, (Configuration)conf, false);
        final FetcherOrderedGrouped fetcher = (FetcherOrderedGrouped)Mockito.spy((Object)mockFetcher);
        final MapHost host = new MapHost(1, "localhost:0", "http://localhost:0/mapOutput?job=job_123&&reduce=1&map=");
        final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(new InputAttemptIdentifier(0, 1, "attemptpathComponent_0"), new InputAttemptIdentifier(1, 2, "attemptpathComponent_1"), new InputAttemptIdentifier(3, 4, "attemptpathComponent_3"));
        ((ShuffleScheduler)Mockito.doReturn(srcAttempts).when((Object)scheduler)).getMapsForHost(host);
        ((FetcherOrderedGrouped)Mockito.doReturn((Object)true).when((Object)fetcher)).setupConnection(host, srcAttempts);
        URL url = ShuffleUtils.constructInputURL((String)host.getBaseUrl(), srcAttempts, (boolean)false);
        fetcher.httpConnection = new FakeHttpConnection(url, null, "", null);
        ((MergeManager)Mockito.doAnswer((Answer)new Answer<MapOutput>(){

            public MapOutput answer(InvocationOnMock invocation) throws Throwable {
                Object[] args = invocation.getArguments();
                MapOutput mapOutput = (MapOutput)Mockito.mock(MapOutput.class);
                ((MapOutput)Mockito.doReturn((Object)MapOutput.Type.MEMORY).when((Object)mapOutput)).getType();
                ((MapOutput)Mockito.doReturn((Object)args[0]).when((Object)mapOutput)).getAttemptIdentifier();
                return mapOutput;
            }
        }).when((Object)merger)).reserve((InputAttemptIdentifier)Matchers.any(InputAttemptIdentifier.class), (long)Matchers.anyInt(), (long)Matchers.anyInt(), Matchers.anyInt());
        ((FetcherOrderedGrouped)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                Thread.sleep(4000L);
                ((FetcherOrderedGrouped)Mockito.doReturn((Object)false).when((Object)fetcher)).setupConnection(host, srcAttempts);
                throw new FetcherReadTimeoutException("creating fetcher socket read timeout exception");
            }
        }).when((Object)fetcher)).copyMapOutput((MapHost)Matchers.any(MapHost.class), (DataInputStream)Matchers.any(DataInputStream.class));
        try {
            fetcher.copyFromHost(host);
        }
        catch (IOException e) {
            // empty catch block
        }
        ((FetcherOrderedGrouped)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.times((int)2))).setupConnection((MapHost)Matchers.any(MapHost.class), Matchers.anyList());
        ((ShuffleScheduler)Mockito.verify((Object)scheduler, (VerificationMode)Mockito.times((int)1))).copyFailed((InputAttemptIdentifier)Matchers.any(InputAttemptIdentifier.class), (MapHost)Matchers.any(MapHost.class), Matchers.anyBoolean(), Matchers.anyBoolean());
        ((FetcherOrderedGrouped)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.times((int)1))).putBackRemainingMapOutputs((MapHost)Matchers.any(MapHost.class));
        ((ShuffleScheduler)Mockito.verify((Object)scheduler, (VerificationMode)Mockito.times((int)3))).putBackKnownMapOutput((MapHost)Matchers.any(MapHost.class), (InputAttemptIdentifier)Matchers.any(InputAttemptIdentifier.class));
        try {
            fetcher.stopped = false;
            fetcher.copyFromHost(host);
            ((FetcherOrderedGrouped)Mockito.verify((Object)fetcher, (VerificationMode)Mockito.times((int)2))).putBackRemainingMapOutputs((MapHost)Matchers.any(MapHost.class));
        }
        catch (IOException e) {
            // empty catch block
        }
    }

    static class FakeHttpConnection
    extends HttpConnection {
        public FakeHttpConnection(URL url, HttpConnection.HttpConnectionParams connParams, String logIdentifier, JobTokenSecretManager jobTokenSecretMgr) throws IOException {
            super(url, connParams, logIdentifier, jobTokenSecretMgr);
            this.connection = (HttpURLConnection)Mockito.mock(HttpURLConnection.class);
            Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
            Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
            Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
            Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)"");
        }

        public DataInputStream getInputStream() throws IOException {
            byte[] b = new byte[1024];
            ByteArrayInputStream bin = new ByteArrayInputStream(b);
            return new DataInputStream(bin);
        }

        public void validate() throws IOException {
        }

        public void cleanup(boolean disconnect) throws IOException {
            LOG.info((Object)("HttpConnection cleanup called with disconnect=" + disconnect));
        }
    }
}

