/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URL;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapred.FadvisedChunkedFile;
import org.apache.hadoop.mapred.FadvisedFileRegion;
import org.apache.hadoop.mapred.IndexRecord;
import org.apache.hadoop.mapred.ShuffleChannelHandlerContext;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.eclipse.jetty.http.HttpHeader;

public class ShuffleChannelHandler
extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final ShuffleChannelHandlerContext handlerCtx;

    ShuffleChannelHandler(ShuffleChannelHandlerContext ctx) {
        this.handlerCtx = ctx;
    }

    private List<String> splitMaps(List<String> mapq) {
        if (null == mapq) {
            return null;
        }
        ArrayList<String> ret = new ArrayList<String>();
        for (String s : mapq) {
            Collections.addAll(ret, s.split(","));
        }
        return ret;
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ShuffleHandler.LOG.debug("Executing channelActive; channel='{}'", (Object)ctx.channel().id());
        int numConnections = this.handlerCtx.activeConnections.incrementAndGet();
        if (this.handlerCtx.maxShuffleConnections > 0 && numConnections > this.handlerCtx.maxShuffleConnections) {
            ShuffleHandler.LOG.info(String.format("Current number of shuffle connections (%d) is greater than the max allowed shuffle connections (%d)", this.handlerCtx.allChannels.size(), this.handlerCtx.maxShuffleConnections));
            HashMap<String, String> headers = new HashMap<String, String>(1);
            headers.put("Retry-After", String.valueOf(1000L));
            this.sendError(ctx, "", ShuffleHandler.TOO_MANY_REQ_STATUS, headers);
        } else {
            super.channelActive(ctx);
            this.handlerCtx.allChannels.add((Object)ctx.channel());
            ShuffleHandler.LOG.debug("Added channel: {}, channel id: {}. Accepted number of connections={}", new Object[]{ctx.channel(), ctx.channel().id(), this.handlerCtx.activeConnections.get()});
        }
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ShuffleHandler.LOG.debug("Executing channelInactive; channel='{}'", (Object)ctx.channel().id());
        super.channelInactive(ctx);
        int noOfConnections = this.handlerCtx.activeConnections.decrementAndGet();
        ShuffleHandler.LOG.debug("New value of Accepted number of connections={}", (Object)noOfConnections);
    }

    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
        String jobId;
        int reduceId;
        Channel channel = ctx.channel();
        ShuffleHandler.LOG.debug("Received HTTP request: {}, channel='{}'", (Object)request, (Object)channel.id());
        if (request.method() != HttpMethod.GET) {
            this.sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
            return;
        }
        String shuffleVersion = "1.0.0";
        String httpHeaderName = "mapreduce";
        if (request.headers() != null) {
            shuffleVersion = request.headers().get("version");
            httpHeaderName = request.headers().get("name");
            ShuffleHandler.LOG.debug("Received from request header: ShuffleVersion={} header name={}, channel id: {}", new Object[]{shuffleVersion, httpHeaderName, channel.id()});
        }
        if (request.headers() == null || !"mapreduce".equals(httpHeaderName) || !"1.0.0".equals(shuffleVersion)) {
            this.sendError(ctx, "Incompatible shuffle request version", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        Map q = new QueryStringDecoder(request.uri()).parameters();
        List keepAliveList = (List)q.get("keepAlive");
        boolean keepAliveParam = false;
        if (keepAliveList != null && keepAliveList.size() == 1) {
            keepAliveParam = Boolean.parseBoolean((String)keepAliveList.get(0));
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                ShuffleHandler.LOG.debug("KeepAliveParam: {} : {}, channel id: {}", new Object[]{keepAliveList, keepAliveParam, channel.id()});
            }
        }
        List<String> mapIds = this.splitMaps((List)q.get("map"));
        List reduceQ = (List)q.get("reduce");
        List jobQ = (List)q.get("job");
        if (ShuffleHandler.LOG.isDebugEnabled()) {
            ShuffleHandler.LOG.debug("RECV: " + request.uri() + "\n  mapId: " + mapIds + "\n  reduceId: " + reduceQ + "\n  jobId: " + jobQ + "\n  keepAlive: " + keepAliveParam + "\n  channel id: " + channel.id());
        }
        if (mapIds == null || reduceQ == null || jobQ == null) {
            this.sendError(ctx, "Required param job, map and reduce", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        if (reduceQ.size() != 1 || jobQ.size() != 1) {
            this.sendError(ctx, "Too many job/reduce parameters", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        try {
            reduceId = Integer.parseInt((String)reduceQ.get(0));
            jobId = (String)jobQ.get(0);
        }
        catch (NumberFormatException e) {
            this.sendError(ctx, "Bad reduce parameter", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        catch (IllegalArgumentException e) {
            this.sendError(ctx, "Bad job parameter", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        String reqUri = request.uri();
        if (null == reqUri) {
            this.sendError(ctx, HttpResponseStatus.FORBIDDEN);
            return;
        }
        DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        try {
            this.verifyRequest(jobId, ctx, (HttpRequest)request, (HttpResponse)response, new URL("http", "", this.handlerCtx.port, reqUri));
        }
        catch (IOException e) {
            ShuffleHandler.LOG.warn("Shuffle failure ", (Throwable)e);
            this.sendError(ctx, e.getMessage(), HttpResponseStatus.UNAUTHORIZED);
            return;
        }
        HashMap<String, MapOutputInfo> mapOutputInfoMap = new HashMap<String, MapOutputInfo>();
        ChannelPipeline pipeline = channel.pipeline();
        ShuffleHandler.TimeoutHandler timeoutHandler = (ShuffleHandler.TimeoutHandler)pipeline.get("timeout");
        timeoutHandler.setEnabledTimeout(false);
        String user = this.handlerCtx.userRsrc.get(jobId);
        try {
            this.populateHeaders(mapIds, jobId, user, reduceId, (HttpResponse)response, keepAliveParam, mapOutputInfoMap);
        }
        catch (IOException e) {
            ShuffleHandler.LOG.error("Shuffle error while populating headers. Channel id: " + channel.id(), (Throwable)e);
            this.sendError(ctx, this.getErrorMessage(e), HttpResponseStatus.INTERNAL_SERVER_ERROR);
            return;
        }
        channel.write((Object)response);
        boolean keepAlive = keepAliveParam || this.handlerCtx.connectionKeepAliveEnabled;
        ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx, user, mapOutputInfoMap, jobId, keepAlive);
        this.sendMap(reduceContext);
    }

    public void sendMap(ReduceContext reduceContext) {
        ShuffleHandler.LOG.trace("Executing sendMap; channel='{}'", (Object)reduceContext.ctx.channel().id());
        if (reduceContext.getMapsToSend().get() < reduceContext.getMapIds().size()) {
            int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
            String mapId = reduceContext.getMapIds().get(nextIndex);
            try {
                MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
                if (info == null) {
                    info = this.getMapOutputInfo(mapId, reduceContext.getReduceId(), reduceContext.getJobId(), reduceContext.getUser());
                }
                ShuffleHandler.LOG.trace("Calling sendMapOutput; channel='{}'", (Object)reduceContext.ctx.channel().id());
                ChannelFuture nextMap = this.sendMapOutput(reduceContext.getCtx().channel(), reduceContext.getUser(), mapId, reduceContext.getReduceId(), info);
                nextMap.addListener((GenericFutureListener)new ReduceMapFileCount(this, reduceContext));
            }
            catch (IOException e) {
                ShuffleHandler.LOG.error("Shuffle error: {}; channel={}", (Object)e, (Object)reduceContext.ctx.channel().id());
                reduceContext.ctx.channel().close();
            }
        }
    }

    private String getErrorMessage(Throwable t) {
        StringBuilder sb = new StringBuilder(t.getMessage());
        while (t.getCause() != null) {
            sb.append(t.getCause().getMessage());
            t = t.getCause();
        }
        return sb.toString();
    }

    protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, String jobId, String user) throws IOException {
        ShuffleHandler.AttemptPathInfo pathInfo;
        try {
            ShuffleHandler.AttemptPathIdentifier identifier = new ShuffleHandler.AttemptPathIdentifier(jobId, user, mapId);
            pathInfo = (ShuffleHandler.AttemptPathInfo)this.handlerCtx.pathCache.get((Object)identifier);
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                ShuffleHandler.LOG.debug("Retrieved pathInfo for " + identifier + " check for corresponding loaded messages to determine whether it was loaded or cached");
            }
        }
        catch (ExecutionException e) {
            if (e.getCause() instanceof IOException) {
                throw (IOException)e.getCause();
            }
            throw new RuntimeException(e.getCause());
        }
        IndexRecord info = this.handlerCtx.indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user);
        if (ShuffleHandler.LOG.isDebugEnabled()) {
            ShuffleHandler.LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId + ",dataFile=" + pathInfo.dataPath + ", indexFile=" + pathInfo.indexPath);
            ShuffleHandler.LOG.debug("getMapOutputInfo: startOffset={}, partLength={} rawLength={}", new Object[]{info.startOffset, info.partLength, info.rawLength});
        }
        return new MapOutputInfo(pathInfo.dataPath, info);
    }

    protected void populateHeaders(List<String> mapIds, String jobId, String user, int reduce, HttpResponse response, boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException {
        long contentLength = 0L;
        for (String mapId : mapIds) {
            MapOutputInfo outputInfo = this.getMapOutputInfo(mapId, reduce, jobId, user);
            if (mapOutputInfoMap.size() < this.handlerCtx.mapOutputMetaInfoCacheSize) {
                mapOutputInfoMap.put(mapId, outputInfo);
            }
            ShuffleHeader header = new ShuffleHeader(mapId, outputInfo.indexRecord.partLength, outputInfo.indexRecord.rawLength, reduce);
            DataOutputBuffer dob = new DataOutputBuffer();
            header.write((DataOutput)dob);
            contentLength += outputInfo.indexRecord.partLength;
            contentLength += (long)dob.getLength();
            File spillFile = new File(outputInfo.mapOutputFileName.toString());
            RandomAccessFile r = SecureIOUtils.openForRandomRead((File)spillFile, (String)"r", (String)user, null);
            r.close();
        }
        this.setResponseHeaders(response, keepAliveParam, contentLength);
        if (ShuffleHandler.AUDITLOG.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder("shuffle for ");
            sb.append(jobId).append(" reducer ").append(reduce);
            sb.append(" length ").append(contentLength);
            if (ShuffleHandler.AUDITLOG.isTraceEnabled()) {
                sb.append(" mappers: ").append(mapIds);
                ShuffleHandler.AUDITLOG.trace(sb.toString());
            } else {
                ShuffleHandler.AUDITLOG.debug(sb.toString());
            }
        }
    }

    protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) {
        if (!this.handlerCtx.connectionKeepAliveEnabled && !keepAliveParam) {
            response.headers().set(HttpHeader.CONNECTION.asString(), (Object)"close");
        } else {
            response.headers().set(HttpHeader.CONNECTION.asString(), (Object)HttpHeader.KEEP_ALIVE.asString());
            response.headers().set(HttpHeader.KEEP_ALIVE.asString(), (Object)("timeout=" + this.handlerCtx.connectionKeepAliveTimeOut));
        }
        HttpUtil.setContentLength((HttpMessage)response, (long)contentLength);
    }

    protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
        SecretKey tokenSecret = this.handlerCtx.secretManager.retrieveTokenSecret(appid);
        if (null == tokenSecret) {
            ShuffleHandler.LOG.info("Request for unknown token {}, channel id: {}", (Object)appid, (Object)ctx.channel().id());
            throw new IOException("Could not find jobid");
        }
        String encryptedURL = SecureShuffleUtils.buildMsgFrom((URL)requestUri);
        String urlHashStr = request.headers().get("UrlHash");
        if (urlHashStr == null) {
            ShuffleHandler.LOG.info("Missing header hash for {}, channel id: {}", (Object)appid, (Object)ctx.channel().id());
            throw new IOException("fetcher cannot be authenticated");
        }
        if (ShuffleHandler.LOG.isDebugEnabled()) {
            int len = urlHashStr.length();
            ShuffleHandler.LOG.debug("Verifying request. encryptedURL:{}, hash:{}, channel id: {}", new Object[]{encryptedURL, urlHashStr.substring(len - len / 2, len - 1), ctx.channel().id()});
        }
        SecureShuffleUtils.verifyReply((String)urlHashStr, (String)encryptedURL, (SecretKey)tokenSecret);
        String reply = SecureShuffleUtils.generateHash((byte[])urlHashStr.getBytes(Charsets.UTF_8), (SecretKey)tokenSecret);
        response.headers().set("ReplyHash", (Object)reply);
        response.headers().set("name", (Object)"mapreduce");
        response.headers().set("version", (Object)"1.0.0");
        if (ShuffleHandler.LOG.isDebugEnabled()) {
            int len = reply.length();
            ShuffleHandler.LOG.debug("Fetcher request verified. encryptedURL: {}, reply: {}, channel id: {}", new Object[]{encryptedURL, reply.substring(len - len / 2, len - 1), ctx.channel().id()});
        }
    }

    public static ByteBuf shuffleHeaderToBytes(ShuffleHeader header) throws IOException {
        DataOutputBuffer dob = new DataOutputBuffer();
        header.write((DataOutput)dob);
        return Unpooled.wrappedBuffer((byte[])dob.getData(), (int)0, (int)dob.getLength());
    }

    protected ChannelFuture sendMapOutput(Channel ch, String user, String mapId, int reduce, MapOutputInfo mapOutputInfo) throws IOException {
        ChannelFuture writeFuture;
        IndexRecord info = mapOutputInfo.indexRecord;
        ch.write((Object)ShuffleChannelHandler.shuffleHeaderToBytes(new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce)));
        File spillFile = new File(mapOutputInfo.mapOutputFileName.toString());
        RandomAccessFile spill = SecureIOUtils.openForRandomRead((File)spillFile, (String)"r", (String)user, null);
        if (ch.pipeline().get(SslHandler.class) == null) {
            FadvisedFileRegion partition = new FadvisedFileRegion(spill, info.startOffset, info.partLength, this.handlerCtx.manageOsCache, this.handlerCtx.readaheadLength, this.handlerCtx.readaheadPool, spillFile.getAbsolutePath(), this.handlerCtx.shuffleBufferSize, this.handlerCtx.shuffleTransferToAllowed);
            writeFuture = ch.writeAndFlush((Object)partition);
            writeFuture.addListener((GenericFutureListener)((ChannelFutureListener)future -> {
                if (future.isSuccess()) {
                    partition.transferSuccessful();
                }
                partition.deallocate();
            }));
        } else {
            FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, info.startOffset, info.partLength, this.handlerCtx.sslFileBufferSize, this.handlerCtx.manageOsCache, this.handlerCtx.readaheadLength, this.handlerCtx.readaheadPool, spillFile.getAbsolutePath());
            writeFuture = ch.writeAndFlush((Object)chunk);
        }
        this.handlerCtx.metrics.shuffleConnections.incr();
        this.handlerCtx.metrics.shuffleOutputBytes.incr(info.partLength);
        return writeFuture;
    }

    protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
        this.sendError(ctx, "", status);
    }

    protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
        this.sendError(ctx, message, status, Collections.emptyMap());
    }

    protected void sendError(ChannelHandlerContext ctx, String msg, HttpResponseStatus status, Map<String, String> headers) {
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer((CharSequence)msg, (Charset)CharsetUtil.UTF_8));
        response.headers().set((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)"text/plain; charset=UTF-8");
        response.headers().set("name", (Object)"mapreduce");
        response.headers().set("version", (Object)"1.0.0");
        for (Map.Entry<String, String> header : headers.entrySet()) {
            response.headers().set(header.getKey(), (Object)header.getValue());
        }
        HttpUtil.setContentLength((HttpMessage)response, (long)response.content().readableBytes());
        ctx.channel().writeAndFlush((Object)response).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel ch = ctx.channel();
        if (cause instanceof TooLongFrameException) {
            ShuffleHandler.LOG.trace("TooLongFrameException, channel id: {}", (Object)ch.id());
            this.sendError(ctx, HttpResponseStatus.BAD_REQUEST);
            return;
        }
        if (cause instanceof IOException) {
            if (cause instanceof ClosedChannelException) {
                ShuffleHandler.LOG.debug("Ignoring closed channel error, channel id: " + ch.id(), cause);
                return;
            }
            String message = String.valueOf(cause.getMessage());
            if (ShuffleHandler.IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
                ShuffleHandler.LOG.debug("Ignoring client socket close, channel id: " + ch.id(), cause);
                return;
            }
        }
        ShuffleHandler.LOG.error("Shuffle error. Channel id: " + ch.id(), cause);
        if (ch.isActive()) {
            this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
        }
    }

    static class ReduceMapFileCount
    implements ChannelFutureListener {
        private final ShuffleChannelHandler handler;
        private final ReduceContext reduceContext;

        ReduceMapFileCount(ShuffleChannelHandler handler, ReduceContext rc) {
            this.handler = handler;
            this.reduceContext = rc;
        }

        public void operationComplete(ChannelFuture future) throws Exception {
            ShuffleHandler.LOG.trace("SendMap operation complete; mapsToWait='{}', channel='{}'", (Object)this.reduceContext.getMapsToWait().get(), (Object)future.channel().id());
            if (!future.isSuccess()) {
                ShuffleHandler.LOG.error("Future is unsuccessful. channel='{}' Cause: ", (Object)future.channel().id(), (Object)future.cause());
                future.channel().close();
                return;
            }
            int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
            if (waitCount == 0) {
                ChannelFuture lastContentFuture = future.channel().writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT);
                ((ShuffleChannelHandler)this.handler).handlerCtx.metrics.operationComplete(future);
                if (this.reduceContext.getKeepAlive()) {
                    ShuffleHandler.LOG.trace("SendMap operation complete, keeping alive the connection; channel='{}'", (Object)future.channel().id());
                    ChannelPipeline pipeline = future.channel().pipeline();
                    ShuffleHandler.TimeoutHandler timeoutHandler = (ShuffleHandler.TimeoutHandler)pipeline.get("timeout");
                    timeoutHandler.setEnabledTimeout(true);
                } else {
                    ShuffleHandler.LOG.trace("SendMap operation complete, closing connection; channel='{}'", (Object)future.channel().id());
                    lastContentFuture.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                }
            } else {
                ShuffleHandler.LOG.trace("SendMap operation complete, waitCount > 0, invoking sendMap with reduceContext; channel='{}'", (Object)future.channel().id());
                this.handler.sendMap(this.reduceContext);
            }
        }
    }

    public static class ReduceContext {
        private final List<String> mapIds;
        private final AtomicInteger mapsToWait;
        private final AtomicInteger mapsToSend;
        private final int reduceId;
        private final ChannelHandlerContext ctx;
        private final String user;
        private final Map<String, MapOutputInfo> infoMap;
        private final String jobId;
        private final boolean keepAlive;

        ReduceContext(List<String> mapIds, int rId, ChannelHandlerContext context, String usr, Map<String, MapOutputInfo> mapOutputInfoMap, String jobId, boolean keepAlive) {
            this.mapIds = mapIds;
            this.reduceId = rId;
            this.mapsToWait = new AtomicInteger(mapIds.size());
            this.mapsToSend = new AtomicInteger(0);
            this.ctx = context;
            this.user = usr;
            this.infoMap = mapOutputInfoMap;
            this.jobId = jobId;
            this.keepAlive = keepAlive;
        }

        public int getReduceId() {
            return this.reduceId;
        }

        public ChannelHandlerContext getCtx() {
            return this.ctx;
        }

        public String getUser() {
            return this.user;
        }

        public Map<String, MapOutputInfo> getInfoMap() {
            return this.infoMap;
        }

        public String getJobId() {
            return this.jobId;
        }

        public List<String> getMapIds() {
            return this.mapIds;
        }

        public AtomicInteger getMapsToSend() {
            return this.mapsToSend;
        }

        public AtomicInteger getMapsToWait() {
            return this.mapsToWait;
        }

        public boolean getKeepAlive() {
            return this.keepAlive;
        }
    }

    static class MapOutputInfo {
        final Path mapOutputFileName;
        final IndexRecord indexRecord;

        MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
            this.mapOutputFileName = mapOutputFileName;
            this.indexRecord = indexRecord;
        }
    }
}

