package org.apache.spark.network.shuffle;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.MergedBlockMetaResponseCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.StreamCallbackWithID;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.protocol.MergedBlockMetaRequest;
import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.protocol.AbstractFetchShuffleBlocks;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.BlocksRemoved;
import org.apache.spark.network.shuffle.protocol.CorruptionCause;
import org.apache.spark.network.shuffle.protocol.DiagnoseCorruption;
import org.apache.spark.network.shuffle.protocol.FetchShuffleBlockChunks;
import org.apache.spark.network.shuffle.protocol.FetchShuffleBlocks;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.GetLocalDirsForExecutors;
import org.apache.spark.network.shuffle.protocol.LocalDirsForExecutors;
import org.apache.spark.network.shuffle.protocol.OpenBlocks;
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.RemoveBlocks;
import org.apache.spark.network.shuffle.protocol.StreamHandle;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TimerWithCustomTimeUnit;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler.class */
public class ExternalBlockHandler extends RpcHandler implements RpcHandler.MergedBlockMetaReqHandler {
    private static final Logger logger = LoggerFactory.getLogger(ExternalBlockHandler.class);
    private static final String SHUFFLE_MERGER_IDENTIFIER = "shuffle-push-merger";
    private static final String SHUFFLE_BLOCK_ID = "shuffle";
    private static final String SHUFFLE_CHUNK_ID = "shuffleChunk";

    @VisibleForTesting
    final ExternalShuffleBlockResolver blockManager;
    private final OneForOneStreamManager streamManager;
    private final ShuffleMetrics metrics;
    private final MergedShuffleFileManager mergeManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler$ManagedBufferIterator.class */
    public class ManagedBufferIterator implements Iterator<ManagedBuffer> {
        private int index = 0;
        private final Function<Integer, ManagedBuffer> blockDataForIndexFn;
        private final int size;

        ManagedBufferIterator(OpenBlocks openBlocks) {
            String str = openBlocks.appId;
            String str2 = openBlocks.execId;
            String[] strArr = openBlocks.blockIds;
            String[] split = strArr[0].split("_");
            if (split.length == 4 && split[0].equals(ExternalBlockHandler.SHUFFLE_BLOCK_ID)) {
                int parseInt = Integer.parseInt(split[1]);
                int[] shuffleMapIdAndReduceIds = shuffleMapIdAndReduceIds(strArr, parseInt);
                this.size = shuffleMapIdAndReduceIds.length;
                this.blockDataForIndexFn = num -> {
                    return ExternalBlockHandler.this.blockManager.getBlockData(str, str2, parseInt, shuffleMapIdAndReduceIds[num.intValue()], shuffleMapIdAndReduceIds[num.intValue() + 1]);
                };
                return;
            }
            if (split.length == 5 && split[0].equals(ExternalBlockHandler.SHUFFLE_CHUNK_ID)) {
                int parseInt2 = Integer.parseInt(split[1]);
                int parseInt3 = Integer.parseInt(split[2]);
                int[] shuffleReduceIdAndChunkIds = shuffleReduceIdAndChunkIds(strArr, parseInt2, parseInt3);
                this.size = shuffleReduceIdAndChunkIds.length;
                this.blockDataForIndexFn = num2 -> {
                    return ExternalBlockHandler.this.mergeManager.getMergedBlockData(openBlocks.appId, parseInt2, parseInt3, shuffleReduceIdAndChunkIds[num2.intValue()], shuffleReduceIdAndChunkIds[num2.intValue() + 1]);
                };
                return;
            }
            if (split.length != 3 || !split[0].equals("rdd")) {
                throw new IllegalArgumentException("Unexpected block id format: " + strArr[0]);
            }
            int[] rddAndSplitIds = rddAndSplitIds(strArr);
            this.size = rddAndSplitIds.length;
            this.blockDataForIndexFn = num3 -> {
                return ExternalBlockHandler.this.blockManager.getRddBlockData(str, str2, rddAndSplitIds[num3.intValue()], rddAndSplitIds[num3.intValue() + 1]);
            };
        }

        private int[] rddAndSplitIds(String[] strArr) {
            int[] iArr = new int[2 * strArr.length];
            for (int i = 0; i < strArr.length; i++) {
                String[] split = strArr[i].split("_");
                if (split.length != 3 || !split[0].equals("rdd")) {
                    throw new IllegalArgumentException("Unexpected RDD block id format: " + strArr[i]);
                }
                iArr[2 * i] = Integer.parseInt(split[1]);
                iArr[(2 * i) + 1] = Integer.parseInt(split[2]);
            }
            return iArr;
        }

        private int[] shuffleMapIdAndReduceIds(String[] strArr, int i) {
            int[] iArr = new int[2 * strArr.length];
            for (int i2 = 0; i2 < strArr.length; i2++) {
                String[] split = strArr[i2].split("_");
                if (split.length != 4 || !split[0].equals(ExternalBlockHandler.SHUFFLE_BLOCK_ID)) {
                    throw new IllegalArgumentException("Unexpected shuffle block id format: " + strArr[i2]);
                }
                if (Integer.parseInt(split[1]) != i) {
                    throw new IllegalArgumentException("Expected shuffleId=" + i + ", got:" + strArr[i2]);
                }
                iArr[2 * i2] = Integer.parseInt(split[2]);
                iArr[(2 * i2) + 1] = Integer.parseInt(split[3]);
            }
            return iArr;
        }

        private int[] shuffleReduceIdAndChunkIds(String[] strArr, int i, int i2) {
            int[] iArr = new int[2 * strArr.length];
            for (int i3 = 0; i3 < strArr.length; i3++) {
                String[] split = strArr[i3].split("_");
                if (split.length != 5 || !split[0].equals(ExternalBlockHandler.SHUFFLE_CHUNK_ID)) {
                    throw new IllegalArgumentException("Unexpected shuffle chunk id format: " + strArr[i3]);
                }
                if (Integer.parseInt(split[1]) != i || Integer.parseInt(split[2]) != i2) {
                    throw new IllegalArgumentException(String.format("Expected shuffleId = %s and shuffleMergeId = %s but got %s", Integer.valueOf(i), Integer.valueOf(i2), strArr[i3]));
                }
                iArr[2 * i3] = Integer.parseInt(split[3]);
                iArr[(2 * i3) + 1] = Integer.parseInt(split[4]);
            }
            return iArr;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.index < this.size;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ManagedBuffer next() {
            ManagedBuffer apply = this.blockDataForIndexFn.apply(Integer.valueOf(this.index));
            this.index += 2;
            ExternalBlockHandler.this.metrics.blockTransferRate.mark();
            ExternalBlockHandler.this.metrics.blockTransferMessageRate.mark();
            ExternalBlockHandler.this.metrics.blockTransferRateBytes.mark(apply != null ? apply.size() : 0L);
            return apply;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler$ShuffleChunkManagedBufferIterator.class */
    public class ShuffleChunkManagedBufferIterator implements Iterator<ManagedBuffer> {
        private int reduceIdx = 0;
        private int chunkIdx = 0;
        private final String appId;
        private final int shuffleId;
        private final int shuffleMergeId;
        private final int[] reduceIds;
        private final int[][] chunkIds;
        static final /* synthetic */ boolean $assertionsDisabled;

        ShuffleChunkManagedBufferIterator(FetchShuffleBlockChunks fetchShuffleBlockChunks) {
            this.appId = fetchShuffleBlockChunks.appId;
            this.shuffleId = fetchShuffleBlockChunks.shuffleId;
            this.shuffleMergeId = fetchShuffleBlockChunks.shuffleMergeId;
            this.reduceIds = fetchShuffleBlockChunks.reduceIds;
            this.chunkIds = fetchShuffleBlockChunks.chunkIds;
            if ($assertionsDisabled) {
                return;
            }
            if (this.reduceIds.length == 0 || this.reduceIds.length != this.chunkIds.length) {
                throw new AssertionError();
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.reduceIdx < this.reduceIds.length && this.chunkIdx < this.chunkIds[this.reduceIdx].length;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ManagedBuffer next() {
            ManagedBuffer managedBuffer = (ManagedBuffer) Preconditions.checkNotNull(ExternalBlockHandler.this.mergeManager.getMergedBlockData(this.appId, this.shuffleId, this.shuffleMergeId, this.reduceIds[this.reduceIdx], this.chunkIds[this.reduceIdx][this.chunkIdx]));
            if (this.chunkIdx < this.chunkIds[this.reduceIdx].length - 1) {
                this.chunkIdx++;
            } else {
                this.chunkIdx = 0;
                this.reduceIdx++;
            }
            ExternalBlockHandler.this.metrics.blockTransferRateBytes.mark(managedBuffer.size());
            return managedBuffer;
        }

        static {
            $assertionsDisabled = !ExternalBlockHandler.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler$ShuffleManagedBufferIterator.class */
    public class ShuffleManagedBufferIterator implements Iterator<ManagedBuffer> {
        private int mapIdx = 0;
        private int reduceIdx = 0;
        private final String appId;
        private final String execId;
        private final int shuffleId;
        private final long[] mapIds;
        private final int[][] reduceIds;
        private final boolean batchFetchEnabled;
        static final /* synthetic */ boolean $assertionsDisabled;

        ShuffleManagedBufferIterator(FetchShuffleBlocks fetchShuffleBlocks) {
            this.appId = fetchShuffleBlocks.appId;
            this.execId = fetchShuffleBlocks.execId;
            this.shuffleId = fetchShuffleBlocks.shuffleId;
            this.mapIds = fetchShuffleBlocks.mapIds;
            this.reduceIds = fetchShuffleBlocks.reduceIds;
            this.batchFetchEnabled = fetchShuffleBlocks.batchFetchEnabled;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if ($assertionsDisabled || (this.mapIds.length != 0 && this.mapIds.length == this.reduceIds.length)) {
                return this.mapIdx < this.mapIds.length && this.reduceIdx < this.reduceIds[this.mapIdx].length;
            }
            throw new AssertionError();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ManagedBuffer next() {
            ManagedBuffer continuousBlocksData;
            if (!this.batchFetchEnabled) {
                continuousBlocksData = ExternalBlockHandler.this.blockManager.getBlockData(this.appId, this.execId, this.shuffleId, this.mapIds[this.mapIdx], this.reduceIds[this.mapIdx][this.reduceIdx]);
                if (this.reduceIdx < this.reduceIds[this.mapIdx].length - 1) {
                    this.reduceIdx++;
                } else {
                    this.reduceIdx = 0;
                    this.mapIdx++;
                }
                ExternalBlockHandler.this.metrics.blockTransferRate.mark();
            } else {
                if (!$assertionsDisabled && this.reduceIds[this.mapIdx].length != 2) {
                    throw new AssertionError();
                }
                continuousBlocksData = ExternalBlockHandler.this.blockManager.getContinuousBlocksData(this.appId, this.execId, this.shuffleId, this.mapIds[this.mapIdx], this.reduceIds[this.mapIdx][0], this.reduceIds[this.mapIdx][1]);
                this.mapIdx++;
                ExternalBlockHandler.this.metrics.blockTransferRate.mark(r0 - r0);
            }
            ExternalBlockHandler.this.metrics.blockTransferMessageRate.mark();
            ExternalBlockHandler.this.metrics.blockTransferRateBytes.mark(continuousBlocksData != null ? continuousBlocksData.size() : 0L);
            return continuousBlocksData;
        }

        static {
            $assertionsDisabled = !ExternalBlockHandler.class.desiredAssertionStatus();
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockHandler$ShuffleMetrics.class */
    public class ShuffleMetrics implements MetricSet {
        private final Timer openBlockRequestLatencyMillis = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
        private final Timer registerExecutorRequestLatencyMillis = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
        private final Timer fetchMergedBlocksMetaLatencyMillis = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
        private final Timer finalizeShuffleMergeLatencyMillis = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
        private final Meter blockTransferRate = new Meter();
        private final Meter blockTransferMessageRate = new Meter();
        private final Meter blockTransferRateBytes = new Meter();
        private Counter activeConnections = new Counter();
        private Counter caughtExceptions = new Counter();
        private final Map<String, Metric> allMetrics = new HashMap();

        public ShuffleMetrics() {
            this.allMetrics.put("openBlockRequestLatencyMillis", this.openBlockRequestLatencyMillis);
            this.allMetrics.put("registerExecutorRequestLatencyMillis", this.registerExecutorRequestLatencyMillis);
            this.allMetrics.put("fetchMergedBlocksMetaLatencyMillis", this.fetchMergedBlocksMetaLatencyMillis);
            this.allMetrics.put("finalizeShuffleMergeLatencyMillis", this.finalizeShuffleMergeLatencyMillis);
            this.allMetrics.put("blockTransferRate", this.blockTransferRate);
            this.allMetrics.put("blockTransferMessageRate", this.blockTransferMessageRate);
            this.allMetrics.put("blockTransferRateBytes", this.blockTransferRateBytes);
            this.allMetrics.put("blockTransferAvgSize_1min", new RatioGauge() { // from class: org.apache.spark.network.shuffle.ExternalBlockHandler.ShuffleMetrics.1
                protected RatioGauge.Ratio getRatio() {
                    return RatioGauge.Ratio.of(ShuffleMetrics.this.blockTransferRateBytes.getOneMinuteRate(), ShuffleMetrics.this.blockTransferMessageRate.getOneMinuteRate());
                }
            });
            this.allMetrics.put("registeredExecutorsSize", () -> {
                return Integer.valueOf(ExternalBlockHandler.this.blockManager.getRegisteredExecutorsSize());
            });
            this.allMetrics.put("numActiveConnections", this.activeConnections);
            this.allMetrics.put("numCaughtExceptions", this.caughtExceptions);
        }

        public Map<String, Metric> getMetrics() {
            return this.allMetrics;
        }
    }

    public ExternalBlockHandler(TransportConf transportConf, File file) throws IOException {
        this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(transportConf, file), new NoOpMergedShuffleFileManager(transportConf));
    }

    public ExternalBlockHandler(TransportConf transportConf, File file, MergedShuffleFileManager mergedShuffleFileManager) throws IOException {
        this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(transportConf, file), mergedShuffleFileManager);
    }

    @VisibleForTesting
    public ExternalShuffleBlockResolver getBlockResolver() {
        return this.blockManager;
    }

    @VisibleForTesting
    public ExternalBlockHandler(OneForOneStreamManager oneForOneStreamManager, ExternalShuffleBlockResolver externalShuffleBlockResolver) {
        this(oneForOneStreamManager, externalShuffleBlockResolver, new NoOpMergedShuffleFileManager(null));
    }

    @VisibleForTesting
    public ExternalBlockHandler(OneForOneStreamManager oneForOneStreamManager, ExternalShuffleBlockResolver externalShuffleBlockResolver, MergedShuffleFileManager mergedShuffleFileManager) {
        this.metrics = new ShuffleMetrics();
        this.streamManager = oneForOneStreamManager;
        this.blockManager = externalShuffleBlockResolver;
        this.mergeManager = mergedShuffleFileManager;
    }

    public void receive(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
        handleMessage(BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer), transportClient, rpcResponseCallback);
    }

    public StreamCallbackWithID receiveStream(TransportClient transportClient, ByteBuffer byteBuffer, RpcResponseCallback rpcResponseCallback) {
        BlockTransferMessage fromByteBuffer = BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer);
        if (!(fromByteBuffer instanceof PushBlockStream)) {
            throw new UnsupportedOperationException("Unexpected message with #receiveStream: " + fromByteBuffer);
        }
        PushBlockStream pushBlockStream = (PushBlockStream) fromByteBuffer;
        checkAuth(transportClient, pushBlockStream.appId);
        return this.mergeManager.receiveBlockDataAsStream(pushBlockStream);
    }

    protected void handleMessage(BlockTransferMessage blockTransferMessage, TransportClient transportClient, RpcResponseCallback rpcResponseCallback) {
        int length;
        long registerStream;
        Timer.Context time;
        if ((blockTransferMessage instanceof AbstractFetchShuffleBlocks) || (blockTransferMessage instanceof OpenBlocks)) {
            Timer.Context time2 = this.metrics.openBlockRequestLatencyMillis.time();
            try {
                if (blockTransferMessage instanceof AbstractFetchShuffleBlocks) {
                    checkAuth(transportClient, ((AbstractFetchShuffleBlocks) blockTransferMessage).appId);
                    length = ((AbstractFetchShuffleBlocks) blockTransferMessage).getNumBlocks();
                    registerStream = this.streamManager.registerStream(transportClient.getClientId(), blockTransferMessage instanceof FetchShuffleBlocks ? new ShuffleManagedBufferIterator((FetchShuffleBlocks) blockTransferMessage) : new ShuffleChunkManagedBufferIterator((FetchShuffleBlockChunks) blockTransferMessage), transportClient.getChannel());
                } else {
                    OpenBlocks openBlocks = (OpenBlocks) blockTransferMessage;
                    length = openBlocks.blockIds.length;
                    checkAuth(transportClient, openBlocks.appId);
                    registerStream = this.streamManager.registerStream(transportClient.getClientId(), new ManagedBufferIterator(openBlocks), transportClient.getChannel());
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Registered streamId {} with {} buffers for client {} from host {}", new Object[]{Long.valueOf(registerStream), Integer.valueOf(length), transportClient.getClientId(), NettyUtils.getRemoteAddress(transportClient.getChannel())});
                }
                rpcResponseCallback.onSuccess(new StreamHandle(registerStream, length).toByteBuffer());
                time2.stop();
                return;
            } finally {
                time2.stop();
            }
        }
        if (blockTransferMessage instanceof RegisterExecutor) {
            time = this.metrics.registerExecutorRequestLatencyMillis.time();
            try {
                RegisterExecutor registerExecutor = (RegisterExecutor) blockTransferMessage;
                checkAuth(transportClient, registerExecutor.appId);
                this.blockManager.registerExecutor(registerExecutor.appId, registerExecutor.execId, registerExecutor.executorInfo);
                this.mergeManager.registerExecutor(registerExecutor.appId, registerExecutor.executorInfo);
                rpcResponseCallback.onSuccess(ByteBuffer.wrap(new byte[0]));
                time.stop();
                return;
            } finally {
            }
        }
        if (blockTransferMessage instanceof RemoveBlocks) {
            RemoveBlocks removeBlocks = (RemoveBlocks) blockTransferMessage;
            checkAuth(transportClient, removeBlocks.appId);
            rpcResponseCallback.onSuccess(new BlocksRemoved(this.blockManager.removeBlocks(removeBlocks.appId, removeBlocks.execId, removeBlocks.blockIds)).toByteBuffer());
            return;
        }
        if (blockTransferMessage instanceof GetLocalDirsForExecutors) {
            GetLocalDirsForExecutors getLocalDirsForExecutors = (GetLocalDirsForExecutors) blockTransferMessage;
            checkAuth(transportClient, getLocalDirsForExecutors.appId);
            HashSet newHashSet = Sets.newHashSet(getLocalDirsForExecutors.execIds);
            boolean remove = newHashSet.remove(SHUFFLE_MERGER_IDENTIFIER);
            Map<String, String[]> localDirs = this.blockManager.getLocalDirs(getLocalDirsForExecutors.appId, newHashSet);
            if (remove) {
                localDirs.put(SHUFFLE_MERGER_IDENTIFIER, this.mergeManager.getMergedBlockDirs(getLocalDirsForExecutors.appId));
            }
            rpcResponseCallback.onSuccess(new LocalDirsForExecutors(localDirs).toByteBuffer());
            return;
        }
        if (!(blockTransferMessage instanceof FinalizeShuffleMerge)) {
            if (!(blockTransferMessage instanceof DiagnoseCorruption)) {
                throw new UnsupportedOperationException("Unexpected message: " + blockTransferMessage);
            }
            DiagnoseCorruption diagnoseCorruption = (DiagnoseCorruption) blockTransferMessage;
            checkAuth(transportClient, diagnoseCorruption.appId);
            rpcResponseCallback.onSuccess(new CorruptionCause(this.blockManager.diagnoseShuffleBlockCorruption(diagnoseCorruption.appId, diagnoseCorruption.execId, diagnoseCorruption.shuffleId, diagnoseCorruption.mapId, diagnoseCorruption.reduceId, diagnoseCorruption.checksum, diagnoseCorruption.algorithm)).toByteBuffer());
            return;
        }
        time = this.metrics.finalizeShuffleMergeLatencyMillis.time();
        FinalizeShuffleMerge finalizeShuffleMerge = (FinalizeShuffleMerge) blockTransferMessage;
        try {
            try {
                checkAuth(transportClient, finalizeShuffleMerge.appId);
                rpcResponseCallback.onSuccess(this.mergeManager.finalizeShuffleMerge(finalizeShuffleMerge).toByteBuffer());
                time.stop();
            } catch (IOException e) {
                throw new RuntimeException(String.format("Error while finalizing shuffle merge for application %s shuffle %d with shuffleMergeId %d", finalizeShuffleMerge.appId, Integer.valueOf(finalizeShuffleMerge.shuffleId), Integer.valueOf(finalizeShuffleMerge.shuffleMergeId)), e);
            }
        } finally {
        }
    }

    public void receiveMergeBlockMetaReq(TransportClient transportClient, MergedBlockMetaRequest mergedBlockMetaRequest, MergedBlockMetaResponseCallback mergedBlockMetaResponseCallback) {
        Timer.Context time = this.metrics.fetchMergedBlocksMetaLatencyMillis.time();
        try {
            checkAuth(transportClient, mergedBlockMetaRequest.appId);
            MergedBlockMeta mergedBlockMeta = this.mergeManager.getMergedBlockMeta(mergedBlockMetaRequest.appId, mergedBlockMetaRequest.shuffleId, mergedBlockMetaRequest.shuffleMergeId, mergedBlockMetaRequest.reduceId);
            logger.debug("Merged block chunks appId {} shuffleId {} reduceId {} num-chunks : {} ", new Object[]{mergedBlockMetaRequest.appId, Integer.valueOf(mergedBlockMetaRequest.shuffleId), Integer.valueOf(mergedBlockMetaRequest.reduceId), Integer.valueOf(mergedBlockMeta.getNumChunks())});
            mergedBlockMetaResponseCallback.onSuccess(mergedBlockMeta.getNumChunks(), mergedBlockMeta.getChunksBitmapBuffer());
            time.stop();
        } catch (Throwable th) {
            time.stop();
            throw th;
        }
    }

    public RpcHandler.MergedBlockMetaReqHandler getMergedBlockMetaReqHandler() {
        return this;
    }

    public void exceptionCaught(Throwable th, TransportClient transportClient) {
        this.metrics.caughtExceptions.inc();
    }

    public MetricSet getAllMetrics() {
        return this.metrics;
    }

    public StreamManager getStreamManager() {
        return this.streamManager;
    }

    public void applicationRemoved(String str, boolean z) {
        this.blockManager.applicationRemoved(str, z);
        this.mergeManager.applicationRemoved(str, z);
    }

    public void executorRemoved(String str, String str2) {
        this.blockManager.executorRemoved(str, str2);
    }

    public void close() {
        this.blockManager.close();
    }

    private void checkAuth(TransportClient transportClient, String str) {
        if (transportClient.getClientId() != null && !transportClient.getClientId().equals(str)) {
            throw new SecurityException(String.format("Client for %s not authorized for application %s.", transportClient.getClientId(), str));
        }
    }

    public void channelActive(TransportClient transportClient) {
        this.metrics.activeConnections.inc();
        super.channelActive(transportClient);
    }

    public void channelInactive(TransportClient transportClient) {
        this.metrics.activeConnections.dec();
        super.channelInactive(transportClient);
    }
}
