/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.auxservices;

import io.netty.buffer.Unpooled;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.EOFException;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.Checksum;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.PureJavaCrc32;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.tez.auxservices.ShuffleHandler;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestShuffleHandler {
    static final long MiB = 0x100000L;
    private static final Logger LOG = LoggerFactory.getLogger(TestShuffleHandler.class);
    private static final File TEST_DIR = new File(System.getProperty("test.build.data"), TestShuffleHandler.class.getName()).getAbsoluteFile();
    private static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";

    @Test(timeout=10000L)
    public void testSerializeMeta() throws Exception {
        Assert.assertEquals((long)1L, (long)ShuffleHandler.deserializeMetaData((ByteBuffer)ShuffleHandler.serializeMetaData((int)1)));
        Assert.assertEquals((long)-1L, (long)ShuffleHandler.deserializeMetaData((ByteBuffer)ShuffleHandler.serializeMetaData((int)-1)));
        Assert.assertEquals((long)8080L, (long)ShuffleHandler.deserializeMetaData((ByteBuffer)ShuffleHandler.serializeMetaData((int)8080)));
    }

    @Test(timeout=10000L)
    public void testShuffleMetrics() throws Exception {
        MetricsSystemImpl ms = new MetricsSystemImpl();
        ShuffleHandler sh = new ShuffleHandler((MetricsSystem)ms);
        ChannelFuture cf = (ChannelFuture)Mockito.mock(ChannelFuture.class);
        Mockito.when((Object)cf.isSuccess()).thenReturn((Object)true, (Object[])new Boolean[]{false});
        sh.metrics.shuffleConnections.incr();
        sh.metrics.shuffleOutputBytes.incr(0x100000L);
        sh.metrics.shuffleConnections.incr();
        sh.metrics.shuffleOutputBytes.incr(0x200000L);
        TestShuffleHandler.checkShuffleMetrics((MetricsSystem)ms, 0x300000L, 0, 0, 2);
        sh.metrics.operationComplete(cf);
        sh.metrics.operationComplete(cf);
        TestShuffleHandler.checkShuffleMetrics((MetricsSystem)ms, 0x300000L, 1, 1, 0);
        sh.close();
    }

    static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed, int succeeded, int connections) {
    }

    @Test(timeout=10000L)
    public void testClientClosesConnection() throws Exception {
        final AtomicBoolean failureEncountered = new AtomicBoolean(false);
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        ShuffleHandler shuffleHandler = new ShuffleHandler(){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String dagId, String mapId, ShuffleHandler.Range reduceRange, String jobId, String user) throws IOException {
                        return null;
                    }

                    protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user, ShuffleHandler.Range reduceRange, HttpResponse response, boolean keepAliveParam, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> infoMap) throws IOException {
                        super.setResponseHeaders(response, keepAliveParam, 100L);
                    }

                    protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, ShuffleHandler.Range reduceRange, ShuffleHandler.Shuffle.MapOutputInfo info) throws IOException {
                        ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dob = new DataOutputBuffer();
                        header.write((DataOutput)dob);
                        ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                        dob = new DataOutputBuffer();
                        for (int i = 0; i < 100000; ++i) {
                            header.write((DataOutput)dob);
                        }
                        return ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                    }

                    protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
                        if (failureEncountered.compareAndSet(false, true)) {
                            ctx.channel().close();
                        }
                    }

                    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
                        if (failureEncountered.compareAndSet(false, true)) {
                            ctx.channel().close();
                        }
                    }
                };
            }
        };
        shuffleHandler.init(conf);
        shuffleHandler.start();
        URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setRequestProperty("name", "mapreduce");
        conn.setRequestProperty("version", "1.0.0");
        conn.connect();
        DataInputStream input = new DataInputStream(conn.getInputStream());
        Assert.assertEquals((long)200L, (long)conn.getResponseCode());
        Assert.assertEquals((Object)"close", (Object)conn.getHeaderField("Connection"));
        ShuffleHeader header = new ShuffleHeader();
        header.readFields((DataInput)input);
        input.close();
        shuffleHandler.close();
        Assert.assertTrue((String)"sendError called when client closed connection", (!failureEncountered.get() ? 1 : 0) != 0);
    }

    @Test(timeout=10000L)
    public void testKeepAlive() throws Exception {
        final AtomicBoolean failureEncountered = new AtomicBoolean(false);
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        conf.setBoolean("tez.shuffle.connection-keep-alive.enable", true);
        conf.setInt("tez.shuffle.connection-keep-alive.timeout", -100);
        final LastSocketAddress lastSocketAddress = new LastSocketAddress();
        ShuffleHandler shuffleHandler = new ShuffleHandler(){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String dagId, String mapId, ShuffleHandler.Range reduceRange, String jobId, String user) throws IOException {
                        return null;
                    }

                    protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    }

                    protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user, ShuffleHandler.Range reduceRange, HttpResponse response, boolean keepAliveParam, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> infoMap) throws IOException {
                        ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dob = new DataOutputBuffer();
                        header.write((DataOutput)dob);
                        dob = new DataOutputBuffer();
                        for (int i = 0; i < 100000; ++i) {
                            header.write((DataOutput)dob);
                        }
                        long contentLength = dob.getLength();
                        if (keepAliveParam) {
                            connectionKeepAliveEnabled = false;
                        }
                        super.setResponseHeaders(response, keepAliveParam, contentLength);
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, ShuffleHandler.Range reduceRange, ShuffleHandler.Shuffle.MapOutputInfo info) throws IOException {
                        lastSocketAddress.setAddress(ch.remoteAddress());
                        ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dob = new DataOutputBuffer();
                        header.write((DataOutput)dob);
                        ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                        dob = new DataOutputBuffer();
                        for (int i = 0; i < 100000; ++i) {
                            header.write((DataOutput)dob);
                        }
                        return ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                    }

                    protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
                        if (failureEncountered.compareAndSet(false, true)) {
                            ctx.channel().close();
                        }
                    }

                    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
                        if (failureEncountered.compareAndSet(false, true)) {
                            ctx.channel().close();
                        }
                    }
                };
            }
        };
        shuffleHandler.init(conf);
        shuffleHandler.start();
        String shuffleBaseURL = "http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port");
        URL url = new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setRequestProperty("name", "mapreduce");
        conn.setRequestProperty("version", "1.0.0");
        conn.connect();
        DataInputStream input = new DataInputStream(conn.getInputStream());
        Assert.assertEquals((Object)"keep-alive", (Object)conn.getHeaderField("Connection"));
        Assert.assertEquals((Object)"timeout=1", (Object)conn.getHeaderField("keep-alive"));
        Assert.assertEquals((long)200L, (long)conn.getResponseCode());
        ShuffleHeader header = new ShuffleHeader();
        header.readFields((DataInput)input);
        byte[] buffer = new byte[1024];
        while (input.read(buffer) != -1) {
        }
        SocketAddress firstAddress = lastSocketAddress.getSocketAddress();
        input.close();
        url = new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0&keepAlive=true");
        conn = (HttpURLConnection)url.openConnection();
        conn.setRequestProperty("name", "mapreduce");
        conn.setRequestProperty("version", "1.0.0");
        conn.connect();
        input = new DataInputStream(conn.getInputStream());
        Assert.assertEquals((Object)"keep-alive", (Object)conn.getHeaderField("Connection"));
        Assert.assertEquals((Object)"timeout=1", (Object)conn.getHeaderField("keep-alive"));
        Assert.assertEquals((long)200L, (long)conn.getResponseCode());
        header = new ShuffleHeader();
        header.readFields((DataInput)input);
        input.close();
        SocketAddress secondAddress = lastSocketAddress.getSocketAddress();
        Assert.assertNotNull((String)"Initial shuffle address should not be null", (Object)firstAddress);
        Assert.assertNotNull((String)"Keep-Alive shuffle address should not be null", (Object)secondAddress);
        Assert.assertEquals((String)"Initial shuffle address and keep-alive shuffle address should be the same", (Object)firstAddress, (Object)secondAddress);
        shuffleHandler.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSocketKeepAlive() throws Exception {
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        conf.setBoolean("tez.shuffle.connection-keep-alive.enable", true);
        conf.setInt("tez.shuffle.connection-keep-alive.timeout", -100);
        HttpURLConnection conn = null;
        MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
        try {
            shuffleHandler.init(conf);
            shuffleHandler.start();
            String shuffleBaseURL = "http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port");
            URL url = new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
            conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", "mapreduce");
            conn.setRequestProperty("version", "1.0.0");
            conn.connect();
            conn.getInputStream();
            Assert.assertTrue((String)"socket should be set KEEP_ALIVE", (boolean)shuffleHandler.isSocketKeepAlive());
        }
        finally {
            if (conn != null) {
                conn.disconnect();
            }
            shuffleHandler.close();
        }
    }

    @Test(timeout=10000L)
    public void testIncompatibleShuffleVersion() throws Exception {
        int failureNum = 3;
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        ShuffleHandler shuffleHandler = new ShuffleHandler();
        shuffleHandler.init(conf);
        shuffleHandler.start();
        URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_1&&dag=1reduce=1&map=attempt_12345_1_m_1_0");
        for (int i = 0; i < 3; ++i) {
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", i == 0 ? "mapreduce" : "other");
            conn.setRequestProperty("version", i == 1 ? "1.0.0" : "1.0.1");
            conn.connect();
            Assert.assertEquals((long)400L, (long)conn.getResponseCode());
        }
        shuffleHandler.close();
    }

    @Test(timeout=10000L)
    public void testMaxConnections() throws Exception {
        int i;
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        conf.setInt("tez.shuffle.max.connections", 3);
        ShuffleHandler shuffleHandler = new ShuffleHandler(){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String dagId, String mapId, ShuffleHandler.Range reduceRange, String jobId, String user) throws IOException {
                        return null;
                    }

                    protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user, ShuffleHandler.Range reduceRange, HttpResponse response, boolean keepAliveParam, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> infoMap) throws IOException {
                    }

                    protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, ShuffleHandler.Range reduceRange, ShuffleHandler.Shuffle.MapOutputInfo info) throws IOException {
                        ShuffleHeader header = new ShuffleHeader("dummy_header", 5678L, 5678L, 1);
                        DataOutputBuffer dob = new DataOutputBuffer();
                        header.write((DataOutput)dob);
                        ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                        dob = new DataOutputBuffer();
                        for (int i = 0; i < 100000; ++i) {
                            header.write((DataOutput)dob);
                        }
                        return ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                    }
                };
            }
        };
        shuffleHandler.init(conf);
        shuffleHandler.start();
        int connAttempts = 3;
        HttpURLConnection[] conns = new HttpURLConnection[connAttempts];
        for (i = 0; i < connAttempts; ++i) {
            String URLstring = "http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_" + i + "_0";
            URL url = new URL(URLstring);
            conns[i] = (HttpURLConnection)url.openConnection();
            conns[i].setRequestProperty("name", "mapreduce");
            conns[i].setRequestProperty("version", "1.0.0");
        }
        for (i = 0; i < connAttempts; ++i) {
            Thread.sleep(200L);
            conns[i].connect();
        }
        conns[0].getInputStream();
        int rc = conns[0].getResponseCode();
        Assert.assertEquals((long)200L, (long)rc);
        conns[1].getInputStream();
        rc = conns[1].getResponseCode();
        Assert.assertEquals((long)200L, (long)rc);
        try {
            conns[2].getInputStream();
            rc = conns[2].getResponseCode();
            Assert.fail((String)"Expected a SocketException");
        }
        catch (SocketException se) {
            LOG.info("Expected - connection should not be open");
        }
        catch (Exception e) {
            Assert.fail((String)"Expected a SocketException");
        }
        shuffleHandler.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=10000L)
    public void testRangedFetch() throws IOException {
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        conf.setInt("tez.shuffle.max.connections", 3);
        conf.set("hadoop.security.authentication", "simple");
        UserGroupInformation.setConfiguration((Configuration)conf);
        File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        conf.set("yarn.nodemanager.local-dirs", absLogDir.getAbsolutePath());
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)1);
        LOG.info(appId.toString());
        String appAttemptId = "attempt_12345_1_m_1_0";
        String user = "randomUser";
        String reducerIdStart = "0";
        String reducerIdEnd = "1";
        ArrayList<File> fileMap = new ArrayList<File>();
        TestShuffleHandler.createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, conf, fileMap);
        ShuffleHandler shuffleHandler = new ShuffleHandler(){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    }
                };
            }
        };
        shuffleHandler.init(conf);
        try {
            shuffleHandler.start();
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            outputBuffer.reset();
            Token jt = new Token("identifier".getBytes(), "password".getBytes(), new Text(user), new Text("shuffleService"));
            jt.write((DataOutput)outputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
            URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerIdStart + "-" + reducerIdEnd + "&map=attempt_12345_1_m_1_0");
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", "mapreduce");
            conn.setRequestProperty("version", "1.0.0");
            conn.connect();
            boolean succeeded = false;
            try {
                DataInputStream is = new DataInputStream(conn.getInputStream());
                int partitionCount = WritableUtils.readVInt((DataInput)is);
                ArrayList<ShuffleHeader> headers = new ArrayList<ShuffleHeader>(2);
                for (int i = 0; i < partitionCount; ++i) {
                    ShuffleHeader header = new ShuffleHeader();
                    header.readFields((DataInput)is);
                    Assert.assertEquals((String)"Incorrect map id", (Object)"attempt_12345_1_m_1_0", (Object)header.getMapId());
                    Assert.assertEquals((String)"Incorrect reduce id", (long)i, (long)header.getPartition());
                    headers.add(header);
                }
                for (ShuffleHeader header : headers) {
                    byte[] bytes = new byte[(int)header.getCompressedLength()];
                    is.read(bytes);
                }
                succeeded = true;
                is.readByte();
                Assert.fail((String)"More fetch bytes that expected in stream");
            }
            catch (EOFException e) {
                Assert.assertTrue((String)"Failed to copy ranged fetch", (boolean)succeeded);
            }
        }
        finally {
            shuffleHandler.close();
            FileUtil.fullyDelete((File)absLogDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=100000L)
    public void testMapFileAccess() throws IOException {
        Assume.assumeTrue((boolean)NativeIO.isAvailable());
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        conf.setInt("tez.shuffle.max.connections", 3);
        conf.set("hadoop.security.authentication", "kerberos");
        UserGroupInformation.setConfiguration((Configuration)conf);
        File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        conf.set("yarn.nodemanager.local-dirs", absLogDir.getAbsolutePath());
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)1);
        LOG.info(appId.toString());
        String appAttemptId = "attempt_12345_1_m_1_0";
        String user = "randomUser";
        String reducerId = "0";
        ArrayList<File> fileMap = new ArrayList<File>();
        TestShuffleHandler.createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, conf, fileMap);
        ShuffleHandler shuffleHandler = new ShuffleHandler(){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    }
                };
            }
        };
        shuffleHandler.init(conf);
        try {
            InputStream is2;
            shuffleHandler.start();
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            outputBuffer.reset();
            Token jt = new Token("identifier".getBytes(), "password".getBytes(), new Text(user), new Text("shuffleService"));
            jt.write((DataOutput)outputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
            URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerId + "&map=attempt_12345_1_m_1_0");
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", "mapreduce");
            conn.setRequestProperty("version", "1.0.0");
            conn.connect();
            byte[] byteArr = new byte[10000];
            try {
                is2 = new DataInputStream(conn.getInputStream());
                ((DataInputStream)is2).readFully(byteArr);
            }
            catch (EOFException is2) {
                // empty catch block
            }
            is2 = new FileInputStream((File)fileMap.get(0));
            String owner = NativeIO.POSIX.getFstat((FileDescriptor)((FileInputStream)is2).getFD()).getOwner();
            ((FileInputStream)is2).close();
            String message = "Owner '" + owner + "' for path " + ((File)fileMap.get(0)).getAbsolutePath() + " did not match expected owner '" + user + "'";
            Assert.assertTrue((boolean)new String(byteArr).contains(message));
        }
        finally {
            shuffleHandler.close();
            FileUtil.fullyDelete((File)absLogDir);
        }
    }

    private static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId, Configuration conf, List<File> fileMap) throws IOException {
        String attemptDir = StringUtils.join((CharSequence)"/", (String[])new String[]{logDir.getAbsolutePath(), "usercache", user, "appcache", appId, "dag_1/output", appAttemptId});
        File appAttemptDir = new File(attemptDir);
        appAttemptDir.mkdirs();
        System.out.println(appAttemptDir.getAbsolutePath());
        File indexFile = new File(appAttemptDir, "file.out.index");
        fileMap.add(indexFile);
        TestShuffleHandler.createIndexFile(indexFile, conf);
        File mapOutputFile = new File(appAttemptDir, "file.out");
        fileMap.add(mapOutputFile);
        TestShuffleHandler.createMapOutputFile(mapOutputFile, conf);
    }

    private static void createMapOutputFile(File mapOutputFile, Configuration conf) throws IOException {
        FileOutputStream out = new FileOutputStream(mapOutputFile);
        out.write("Creating new dummy map output file. Used only for testing".getBytes());
        out.flush();
        out.close();
    }

    private static void createIndexFile(File indexFile, Configuration conf) throws IOException {
        if (indexFile.exists()) {
            System.out.println("Deleting existing file");
            indexFile.delete();
        }
        PureJavaCrc32 crc = new PureJavaCrc32();
        TezSpillRecord tezSpillRecord = new TezSpillRecord(2);
        tezSpillRecord.putIndex(new TezIndexRecord(0L, 10L, 10L), 0);
        tezSpillRecord.putIndex(new TezIndexRecord(10L, 10L, 10L), 1);
        tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf, FileSystem.getLocal((Configuration)conf).getRaw(), (Checksum)crc);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecovery() throws IOException {
        String user = "someuser";
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)1);
        File tmpDir = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestShuffleHandler.class.getName());
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        conf.setInt("tez.shuffle.max.connections", 3);
        ShuffleHandler shuffle = new ShuffleHandler();
        shuffle.setRecoveryPath(new Path(tmpDir.toString()));
        tmpDir.mkdirs();
        try {
            shuffle.init(conf);
            shuffle.start();
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            outputBuffer.reset();
            Token jt = new Token("identifier".getBytes(), "password".getBytes(), new Text("someuser"), new Text("shuffleService"));
            jt.write((DataOutput)outputBuffer);
            shuffle.initializeApplication(new ApplicationInitializationContext("someuser", appId, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
            int rc = TestShuffleHandler.getShuffleResponseCode(shuffle, (Token<JobTokenIdentifier>)jt);
            Assert.assertEquals((long)200L, (long)rc);
            shuffle.close();
            shuffle = new ShuffleHandler();
            shuffle.setRecoveryPath(new Path(tmpDir.toString()));
            shuffle.init(conf);
            shuffle.start();
            rc = TestShuffleHandler.getShuffleResponseCode(shuffle, (Token<JobTokenIdentifier>)jt);
            Assert.assertEquals((long)200L, (long)rc);
            shuffle.stopApplication(new ApplicationTerminationContext(appId));
            rc = TestShuffleHandler.getShuffleResponseCode(shuffle, (Token<JobTokenIdentifier>)jt);
            Assert.assertEquals((long)401L, (long)rc);
            shuffle.close();
            shuffle = new ShuffleHandler();
            shuffle.setRecoveryPath(new Path(tmpDir.toString()));
            shuffle.init(conf);
            shuffle.start();
            rc = TestShuffleHandler.getShuffleResponseCode(shuffle, (Token<JobTokenIdentifier>)jt);
            Assert.assertEquals((long)401L, (long)rc);
        }
        finally {
            if (shuffle != null) {
                shuffle.close();
            }
            FileUtil.fullyDelete((File)tmpDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecoveryFromOtherVersions() throws IOException {
        String user = "someuser";
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)1);
        File tmpDir = new File(System.getProperty("test.build.data", System.getProperty("java.io.tmpdir")), TestShuffleHandler.class.getName());
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        conf.setInt("tez.shuffle.max.connections", 3);
        ShuffleHandler shuffle = new ShuffleHandler();
        shuffle.setRecoveryPath(new Path(tmpDir.toString()));
        tmpDir.mkdirs();
        try {
            shuffle.init(conf);
            shuffle.start();
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            outputBuffer.reset();
            Token jt = new Token("identifier".getBytes(), "password".getBytes(), new Text("someuser"), new Text("shuffleService"));
            jt.write((DataOutput)outputBuffer);
            shuffle.initializeApplication(new ApplicationInitializationContext("someuser", appId, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
            int rc = TestShuffleHandler.getShuffleResponseCode(shuffle, (Token<JobTokenIdentifier>)jt);
            Assert.assertEquals((long)200L, (long)rc);
            shuffle.close();
            shuffle = new ShuffleHandler();
            shuffle.setRecoveryPath(new Path(tmpDir.toString()));
            shuffle.init(conf);
            shuffle.start();
            rc = TestShuffleHandler.getShuffleResponseCode(shuffle, (Token<JobTokenIdentifier>)jt);
            Assert.assertEquals((long)200L, (long)rc);
            Version version = Version.newInstance((int)1, (int)0);
            Assert.assertEquals((Object)version, (Object)shuffle.getCurrentVersion());
            Version version11 = Version.newInstance((int)1, (int)1);
            shuffle.storeVersion(version11);
            Assert.assertEquals((Object)version11, (Object)shuffle.loadVersion());
            shuffle.close();
            shuffle = new ShuffleHandler();
            shuffle.setRecoveryPath(new Path(tmpDir.toString()));
            shuffle.init(conf);
            shuffle.start();
            Assert.assertEquals((Object)version, (Object)shuffle.loadVersion());
            rc = TestShuffleHandler.getShuffleResponseCode(shuffle, (Token<JobTokenIdentifier>)jt);
            Assert.assertEquals((long)200L, (long)rc);
            Version version21 = Version.newInstance((int)2, (int)1);
            shuffle.storeVersion(version21);
            Assert.assertEquals((Object)version21, (Object)shuffle.loadVersion());
            shuffle.close();
            shuffle = new ShuffleHandler();
            shuffle.setRecoveryPath(new Path(tmpDir.toString()));
            shuffle.init(conf);
            try {
                shuffle.start();
                Assert.fail((String)"Incompatible version, should expect fail here.");
            }
            catch (ServiceStateException e) {
                Assert.assertTrue((String)"Exception message mismatch", (boolean)e.getMessage().contains("Incompatible version for state DB schema:"));
            }
        }
        finally {
            if (shuffle != null) {
                shuffle.close();
            }
            FileUtil.fullyDelete((File)tmpDir);
        }
    }

    private static int getShuffleResponseCode(ShuffleHandler shuffle, Token<JobTokenIdentifier> jt) throws IOException {
        URL url = new URL("http://127.0.0.1:" + shuffle.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_0001&dag=1&reduce=0&map=attempt_12345_1_m_1_0");
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        String encHash = SecureShuffleUtils.hashFromString((String)SecureShuffleUtils.buildMsgFrom((URL)url), (JobTokenSecretManager)new JobTokenSecretManager(JobTokenSecretManager.createSecretKey((byte[])jt.getPassword())));
        conn.addRequestProperty("UrlHash", encHash);
        conn.setRequestProperty("name", "mapreduce");
        conn.setRequestProperty("version", "1.0.0");
        conn.connect();
        int rc = conn.getResponseCode();
        conn.disconnect();
        return rc;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=100000L)
    public void testGetMapOutputInfo() throws Exception {
        final AtomicBoolean failureEncountered = new AtomicBoolean(false);
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        conf.setInt("tez.shuffle.max.connections", 3);
        conf.set("hadoop.security.authentication", "simple");
        UserGroupInformation.setConfiguration((Configuration)conf);
        File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        conf.set("yarn.nodemanager.local-dirs", absLogDir.getAbsolutePath());
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)1);
        String appAttemptId = "attempt_12345_1_m_1_0";
        String user = "randomUser";
        String reducerId = "0";
        ArrayList<File> fileMap = new ArrayList<File>();
        TestShuffleHandler.createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, conf, fileMap);
        ShuffleHandler shuffleHandler = new ShuffleHandler(){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected void populateHeaders(List<String> mapIds, String outputBaseStr, String dagId, String user, ShuffleHandler.Range reduceRange, HttpResponse response, boolean keepAliveParam, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> infoMap) throws IOException {
                        super.setResponseHeaders(response, keepAliveParam, 100L);
                    }

                    protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    }

                    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
                        if (failureEncountered.compareAndSet(false, true)) {
                            ctx.channel().close();
                        }
                    }

                    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, ShuffleHandler.Range reduceRange, ShuffleHandler.Shuffle.MapOutputInfo info) throws IOException {
                        ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                        DataOutputBuffer dob = new DataOutputBuffer();
                        header.write((DataOutput)dob);
                        return ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                    }
                };
            }
        };
        shuffleHandler.init(conf);
        try {
            shuffleHandler.start();
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            outputBuffer.reset();
            Token jt = new Token("identifier".getBytes(), "password".getBytes(), new Text(user), new Text("shuffleService"));
            jt.write((DataOutput)outputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
            URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerId + "&map=attempt_12345_1_m_1_0");
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", "mapreduce");
            conn.setRequestProperty("version", "1.0.0");
            conn.connect();
            try {
                DataInputStream is = new DataInputStream(conn.getInputStream());
                ShuffleHeader header = new ShuffleHeader();
                header.readFields((DataInput)is);
                is.close();
            }
            catch (EOFException eOFException) {
                // empty catch block
            }
            Assert.assertEquals((String)"sendError called due to shuffle error", (Object)false, (Object)failureEncountered.get());
        }
        finally {
            shuffleHandler.close();
            FileUtil.fullyDelete((File)absLogDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testDagDelete() throws Exception {
        final AtomicBoolean failureEncountered = new AtomicBoolean(false);
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.max.connections", 3);
        conf.setInt("tez.shuffle.port", 0);
        conf.set("hadoop.security.authentication", "simple");
        UserGroupInformation.setConfiguration((Configuration)conf);
        File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        conf.set("yarn.nodemanager.local-dirs", absLogDir.getAbsolutePath());
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)1);
        String appAttemptId = "attempt_12345_1_m_1_0";
        String user = "randomUser";
        ArrayList<File> fileMap = new ArrayList<File>();
        TestShuffleHandler.createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, conf, fileMap);
        ShuffleHandler shuffleHandler = new ShuffleHandler(){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
                        if (failureEncountered.compareAndSet(false, true)) {
                            ctx.channel().close();
                        }
                    }
                };
            }
        };
        shuffleHandler.init(conf);
        try {
            shuffleHandler.start();
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            outputBuffer.reset();
            Token jt = new Token("identifier".getBytes(), "password".getBytes(), new Text(user), new Text("shuffleService"));
            jt.write((DataOutput)outputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
            URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?dagAction=delete&job=job_12345_0001&dag=1");
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", "mapreduce");
            conn.setRequestProperty("version", "1.0.0");
            String dagDirStr = StringUtils.join((CharSequence)"/", (String[])new String[]{absLogDir.getAbsolutePath(), "usercache", user, "appcache", appId.toString(), "dag_1/"});
            File dagDir = new File(dagDirStr);
            Assert.assertTrue((String)"Dag Directory does not exist!", (boolean)dagDir.exists());
            conn.connect();
            try {
                DataInputStream is = new DataInputStream(conn.getInputStream());
                is.close();
                Assert.assertFalse((String)"Dag Directory was not deleted!", (boolean)dagDir.exists());
            }
            catch (EOFException eOFException) {
                // empty catch block
            }
            Assert.assertEquals((String)"sendError called due to shuffle error", (Object)false, (Object)failureEncountered.get());
        }
        finally {
            shuffleHandler.close();
            FileUtil.fullyDelete((File)absLogDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testVertexShuffleDelete() throws Exception {
        final ArrayList failures = new ArrayList(1);
        Configuration conf = new Configuration();
        conf.setInt("tez.shuffle.max.connections", 3);
        conf.setInt("tez.shuffle.port", 0);
        conf.set("hadoop.security.authentication", "simple");
        UserGroupInformation.setConfiguration((Configuration)conf);
        File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        conf.set("yarn.nodemanager.local-dirs", absLogDir.getAbsolutePath());
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)1);
        String appAttemptId = "attempt_12345_0001_1_00_000000_0_10003_0";
        String user = "randomUser";
        ArrayList<File> fileMap = new ArrayList<File>();
        String vertexDirStr = StringUtils.join((CharSequence)"/", (String[])new String[]{absLogDir.getAbsolutePath(), "usercache", user, "appcache", appId.toString(), "dag_1/output/" + appAttemptId});
        File vertexDir = new File(vertexDirStr);
        Assert.assertFalse((String)"vertex directory should not be present", (boolean)vertexDir.exists());
        TestShuffleHandler.createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, conf, fileMap);
        ShuffleHandler shuffleHandler = new ShuffleHandler(){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
                        if (failures.size() == 0) {
                            failures.add(new Error(message));
                            ctx.channel().close();
                        }
                    }
                };
            }
        };
        shuffleHandler.init(conf);
        try {
            shuffleHandler.start();
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            outputBuffer.reset();
            Token jt = new Token("identifier".getBytes(), "password".getBytes(), new Text(user), new Text("shuffleService"));
            jt.write((DataOutput)outputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
            URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?vertexAction=delete&job=job_12345_0001&dag=1&vertex=00");
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", "mapreduce");
            conn.setRequestProperty("version", "1.0.0");
            Assert.assertTrue((String)"Attempt Directory does not exist!", (boolean)vertexDir.exists());
            conn.connect();
            try {
                DataInputStream is = new DataInputStream(conn.getInputStream());
                is.close();
                Assert.assertFalse((String)"Vertex Directory was not deleted", (boolean)vertexDir.exists());
            }
            catch (EOFException e) {
                Assert.fail((String)("Encountered Exception!" + e.getMessage()));
            }
        }
        finally {
            shuffleHandler.stop();
            FileUtil.fullyDelete((File)absLogDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testFailedTaskAttemptDelete() throws Exception {
        final ArrayList failures = new ArrayList(1);
        Configuration conf = new Configuration();
        conf.setInt("tez.shuffle.max.connections", 3);
        conf.setInt("tez.shuffle.port", 0);
        conf.set("hadoop.security.authentication", "simple");
        UserGroupInformation.setConfiguration((Configuration)conf);
        File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
        conf.set("yarn.nodemanager.local-dirs", absLogDir.getAbsolutePath());
        ApplicationId appId = ApplicationId.newInstance((long)12345L, (int)1);
        String appAttemptId = "attempt_12345_1_m_1_0";
        String user = "randomUser";
        ArrayList<File> fileMap = new ArrayList<File>();
        String taskAttemptDirStr = StringUtils.join((CharSequence)"/", (String[])new String[]{absLogDir.getAbsolutePath(), "usercache", user, "appcache", appId.toString(), "dag_1/output/", appAttemptId});
        File taskAttemptDir = new File(taskAttemptDirStr);
        Assert.assertFalse((String)"Task Attempt Directory should not exist", (boolean)taskAttemptDir.exists());
        TestShuffleHandler.createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId, conf, fileMap);
        ShuffleHandler shuffleHandler = new ShuffleHandler(){

            protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
                return new ShuffleHandler.Shuffle(conf){

                    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
                        if (failures.size() == 0) {
                            failures.add(new Error(message));
                            ctx.channel().close();
                        }
                    }
                };
            }
        };
        shuffleHandler.init(conf);
        try {
            shuffleHandler.start();
            DataOutputBuffer outputBuffer = new DataOutputBuffer();
            outputBuffer.reset();
            Token jt = new Token("identifier".getBytes(), "password".getBytes(), new Text(user), new Text("shuffleService"));
            jt.write((DataOutput)outputBuffer);
            shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId, ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
            URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port") + "/mapOutput?taskAttemptAction=delete&job=job_12345_0001&dag=1&map=" + appAttemptId);
            HttpURLConnection conn = (HttpURLConnection)url.openConnection();
            conn.setRequestProperty("name", "mapreduce");
            conn.setRequestProperty("version", "1.0.0");
            Assert.assertTrue((String)"Task Attempt Directory does not exist!", (boolean)taskAttemptDir.exists());
            conn.connect();
            try {
                DataInputStream is = new DataInputStream(conn.getInputStream());
                is.close();
                Assert.assertFalse((String)"Task Attempt file was not deleted!", (boolean)taskAttemptDir.exists());
            }
            catch (EOFException eOFException) {
                // empty catch block
            }
            Assert.assertEquals((String)"sendError called due to shuffle error", (long)0L, (long)failures.size());
        }
        finally {
            shuffleHandler.stop();
            FileUtil.fullyDelete((File)absLogDir);
        }
    }

    @Test(timeout=4000L)
    public void testSendMapCount() throws Exception {
        ArrayList<ShuffleHandler.ReduceMapFileCount> listenerList = new ArrayList<ShuffleHandler.ReduceMapFileCount>();
        ChannelHandlerContext mockCtx = (ChannelHandlerContext)Mockito.mock(ChannelHandlerContext.class);
        Channel mockCh = (Channel)Mockito.mock(AbstractChannel.class);
        ChannelPipeline mockPipeline = (ChannelPipeline)Mockito.mock(ChannelPipeline.class);
        FullHttpRequest httpRequest = this.createHttpRequest();
        ChannelFuture mockFuture = this.createMockChannelFuture(mockCh, listenerList);
        ShuffleHandler.TimeoutHandler timerHandler = new ShuffleHandler.TimeoutHandler();
        ((ChannelHandlerContext)Mockito.doReturn((Object)mockCh).when((Object)mockCtx)).channel();
        Mockito.when((Object)mockCh.pipeline()).thenReturn((Object)mockPipeline);
        Mockito.when((Object)mockPipeline.get((String)Mockito.any(String.class))).thenReturn((Object)timerHandler);
        Mockito.when((Object)mockCtx.channel()).thenReturn((Object)mockCh);
        ((Channel)Mockito.doReturn((Object)mockFuture).when((Object)mockCh)).writeAndFlush(Mockito.any());
        Mockito.when((Object)mockCh.writeAndFlush(Object.class)).thenReturn((Object)mockFuture);
        MockShuffleHandler sh = new MockShuffleHandler();
        Configuration conf = new Configuration();
        conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
        conf.setInt("tez.shuffle.port", 0);
        sh.init(conf);
        sh.start();
        int maxOpenFiles = conf.getInt("tez.shuffle.max.session-open-files", 3);
        sh.getShuffle(conf).channelRead(mockCtx, (Object)httpRequest);
        Assert.assertTrue((String)"Number of Open files should not exceed the configured value!-Not Expected", (listenerList.size() <= maxOpenFiles ? 1 : 0) != 0);
        while (!listenerList.isEmpty()) {
            ((ShuffleHandler.ReduceMapFileCount)listenerList.remove(0)).operationComplete(mockFuture);
            Assert.assertTrue((String)"Number of Open files should not exceed the configured value!-Not Expected", (listenerList.size() <= maxOpenFiles ? 1 : 0) != 0);
        }
        sh.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testShuffleHandlerSendsDiskError() throws Exception {
        Configuration conf = new Configuration();
        conf.setInt("tez.shuffle.port", 0);
        FilterInputStream input = null;
        MockShuffleHandlerWithFatalDiskError shuffleHandler = new MockShuffleHandlerWithFatalDiskError();
        try {
            shuffleHandler.init(conf);
            shuffleHandler.start();
            String shuffleBaseURL = "http://127.0.0.1:" + shuffleHandler.getConfig().get("tez.shuffle.port");
            URL url = new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&dag=1&reduce=1&map=attempt_12345_1_m_1_0");
            shuffleHandler.secretManager.addTokenForJob("job_12345_1", new Token("id".getBytes(), shuffleHandler.getSecret().getBytes(), null, null));
            HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams((Configuration)conf);
            BaseHttpConnection httpConnection = ShuffleUtils.getHttpConnection((boolean)true, (URL)url, (HttpConnectionParams)httpConnectionParams, (String)"testFetcher", (JobTokenSecretManager)shuffleHandler.secretManager);
            boolean connectSucceeded = httpConnection.connect();
            Assert.assertTrue((boolean)connectSucceeded);
            input = httpConnection.getInputStream();
            httpConnection.validate();
            ShuffleHeader header = new ShuffleHeader();
            header.readFields((DataInput)((Object)input));
            Assert.assertEquals((Object)(ShuffleHandlerError.DISK_ERROR_EXCEPTION + ": Could not find application_1234/240/output/attempt_1234_0/file.out.index"), (Object)header.getMapId());
            Assert.assertEquals((long)-1L, (long)header.getCompressedLength());
            Assert.assertEquals((long)-1L, (long)header.getUncompressedLength());
            Assert.assertEquals((long)-1L, (long)header.getPartition());
        }
        finally {
            if (input != null) {
                input.close();
            }
            shuffleHandler.close();
        }
    }

    public ChannelFuture createMockChannelFuture(Channel mockCh, final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
        ChannelFuture mockFuture = (ChannelFuture)Mockito.mock(ChannelFuture.class);
        Mockito.when((Object)mockFuture.channel()).thenReturn((Object)mockCh);
        ((ChannelFuture)Mockito.doReturn((Object)true).when((Object)mockFuture)).isSuccess();
        ((ChannelFuture)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) throws Throwable {
                if (invocation.getArguments()[0].getClass() == ShuffleHandler.ReduceMapFileCount.class) {
                    listenerList.add((ShuffleHandler.ReduceMapFileCount)invocation.getArguments()[0]);
                }
                return null;
            }
        }).when((Object)mockFuture)).addListener((GenericFutureListener)Mockito.any(ShuffleHandler.ReduceMapFileCount.class));
        return mockFuture;
    }

    public FullHttpRequest createHttpRequest() {
        String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
        for (int i = 0; i < 100; ++i) {
            uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
        }
        return new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConfigPortStatic() throws Exception {
        Random rand = new Random();
        int port = rand.nextInt(10) + 50000;
        Configuration conf = new Configuration();
        conf.setInt("tez.shuffle.port", port);
        MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
        shuffleHandler.serviceInit(conf);
        try {
            shuffleHandler.serviceStart();
            Assert.assertEquals((long)port, (long)shuffleHandler.getPort());
        }
        finally {
            shuffleHandler.stop();
        }
    }

    @Test
    public void testConfigPortDynamic() throws Exception {
        Configuration conf = new Configuration();
        conf.setInt("tez.shuffle.port", 0);
        MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
        shuffleHandler.serviceInit(conf);
        try {
            shuffleHandler.serviceStart();
            Assert.assertTrue((String)"ShuffleHandler should use a random chosen port", (shuffleHandler.getPort() > 0 ? 1 : 0) != 0);
        }
        finally {
            shuffleHandler.stop();
        }
    }

    static class LastSocketAddress {
        SocketAddress lastAddress;

        LastSocketAddress() {
        }

        void setAddress(SocketAddress lastAddress) {
            this.lastAddress = lastAddress;
        }

        SocketAddress getSocketAddress() {
            return this.lastAddress;
        }
    }

    class MockShuffleHandlerWithFatalDiskError
    extends ShuffleHandler {
        public static final String MESSAGE = "Could not find application_1234/240/output/attempt_1234_0/file.out.index";
        private JobTokenSecretManager secretManager = new JobTokenSecretManager(JobTokenSecretManager.createSecretKey((byte[])this.getSecret().getBytes()));

        MockShuffleHandlerWithFatalDiskError() {
        }

        protected JobTokenSecretManager getSecretManager() {
            return this.secretManager;
        }

        protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
            return new ShuffleHandler.Shuffle(conf){

                protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    super.verifyRequest(appid, ctx, request, response, requestUri);
                }

                protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String dagId, String mapId, ShuffleHandler.Range reduceRange, String jobId, String user) {
                    return null;
                }

                protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user, ShuffleHandler.Range reduceRange, HttpResponse response, boolean keepAliveParam, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> infoMap) throws IOException {
                    throw new DiskChecker.DiskErrorException(MockShuffleHandlerWithFatalDiskError.MESSAGE);
                }

                protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, ShuffleHandler.Range reduceRange, ShuffleHandler.Shuffle.MapOutputInfo info) throws IOException {
                    return null;
                }
            };
        }

        public String getSecret() {
            return "secret";
        }
    }

    private static class MockShuffleHandler2
    extends ShuffleHandler {
        boolean socketKeepAlive = false;

        private MockShuffleHandler2() {
        }

        protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
            return new ShuffleHandler.Shuffle(conf){

                protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                    SocketChannel channel = (SocketChannel)ctx.channel();
                    socketKeepAlive = channel.config().isKeepAlive();
                }
            };
        }

        protected boolean isSocketKeepAlive() {
            return this.socketKeepAlive;
        }
    }

    class MockShuffleHandler
    extends ShuffleHandler {
        MockShuffleHandler() {
        }

        protected ShuffleHandler.Shuffle getShuffle(Configuration conf) {
            return new ShuffleHandler.Shuffle(conf){

                protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
                }

                protected ShuffleHandler.Shuffle.MapOutputInfo getMapOutputInfo(String dagId, String mapId, ShuffleHandler.Range reduceRange, String jobId, String user) throws IOException {
                    return null;
                }

                protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user, ShuffleHandler.Range reduceRange, HttpResponse response, boolean keepAliveParam, Map<String, ShuffleHandler.Shuffle.MapOutputInfo> infoMap) throws IOException {
                }

                protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, ShuffleHandler.Range reduceRange, ShuffleHandler.Shuffle.MapOutputInfo info) throws IOException {
                    ShuffleHeader header = new ShuffleHeader("attempt_12345_1_m_1_0", 5678L, 5678L, 1);
                    DataOutputBuffer dob = new DataOutputBuffer();
                    header.write((DataOutput)dob);
                    ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                    dob = new DataOutputBuffer();
                    for (int i = 0; i < 100; ++i) {
                        header.write((DataOutput)dob);
                    }
                    return ch.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength()));
                }
            };
        }
    }
}

