package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.GuavaShim;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.TezRuntimeUtils;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator;
import org.apache.tez.runtime.library.exceptions.InputAlreadyClosedException;
import org.apache.tez.runtime.library.utils.CodecUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.class */
public class Shuffle implements ExceptionReporter {
    private static final Logger LOG = LoggerFactory.getLogger(Shuffle.class);
    private static final int PROGRESS_FREQUENCY = 2000;
    private final Configuration conf;
    private final InputContext inputContext;
    private final ShuffleInputEventHandlerOrderedGrouped eventHandler;

    @VisibleForTesting
    final ShuffleScheduler scheduler;

    @VisibleForTesting
    final MergeManager merger;
    private final CompressionCodec codec;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private final RunShuffleCallable runShuffleCallable;
    private volatile ListenableFuture<TezRawKeyValueIterator> runShuffleFuture;
    private final ListeningExecutorService executor;
    private final String sourceDestNameTrimmed;
    private final long startTime;
    private final TezCounter mergePhaseTime;
    private final TezCounter shufflePhaseTime;
    private AtomicReference<Throwable> throwable = new AtomicReference<>();
    private String throwingThreadName = null;
    private AtomicBoolean isShutDown = new AtomicBoolean(false);
    private AtomicBoolean fetchersClosed = new AtomicBoolean(false);
    private AtomicBoolean schedulerClosed = new AtomicBoolean(false);
    private AtomicBoolean mergerClosed = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle$RunShuffleCallable.class */
    private class RunShuffleCallable extends CallableWithNdc<TezRawKeyValueIterator> {
        private RunShuffleCallable() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
        public TezRawKeyValueIterator m241callInternal() throws IOException, InterruptedException {
            ShuffleError shuffleError;
            if (!Shuffle.this.isShutDown.get()) {
                try {
                    try {
                        Shuffle.this.scheduler.start();
                        Shuffle.this.cleanupShuffleScheduler();
                    } finally {
                    }
                } catch (Throwable th) {
                    Shuffle.this.cleanupShuffleScheduler();
                    throw th;
                }
            }
            synchronized (Shuffle.this) {
                if (Shuffle.this.throwable.get() != null) {
                    throw new ShuffleError("error in shuffle in " + Shuffle.this.throwingThreadName, Shuffle.this.throwable.get());
                }
            }
            Shuffle.this.shufflePhaseTime.setValue(System.currentTimeMillis() - Shuffle.this.startTime);
            Shuffle.this.cleanupShuffleScheduler();
            Shuffle.this.inputContext.notifyProgress();
            try {
                TezRawKeyValueIterator close = Shuffle.this.merger.close(true);
                Shuffle.this.mergePhaseTime.setValue(System.currentTimeMillis() - Shuffle.this.startTime);
                Shuffle.this.inputContext.notifyProgress();
                synchronized (Shuffle.this) {
                    if (Shuffle.this.throwable.get() != null) {
                        throw new ShuffleError("error in shuffle in " + Shuffle.this.throwingThreadName, Shuffle.this.throwable.get());
                    }
                }
                Shuffle.this.inputContext.inputIsReady();
                Shuffle.LOG.info("merge complete for input vertex : " + Shuffle.this.sourceDestNameTrimmed);
                return close;
            } catch (Throwable th2) {
                Shuffle.this.throwable.set(th2);
                throw new ShuffleError("Error while doing final merge ", th2);
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle$ShuffleError.class */
    public static class ShuffleError extends IOException {
        private static final long serialVersionUID = 5753909320586607881L;

        ShuffleError(String str, Throwable th) {
            super(str, th);
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle$ShuffleRunnerFutureCallback.class */
    private class ShuffleRunnerFutureCallback implements FutureCallback<TezRawKeyValueIterator> {
        private ShuffleRunnerFutureCallback() {
        }

        public void onSuccess(TezRawKeyValueIterator tezRawKeyValueIterator) {
            Shuffle.LOG.info(Shuffle.this.sourceDestNameTrimmed + ": Shuffle Runner thread complete");
        }

        public void onFailure(Throwable th) {
            if (Shuffle.this.isShutDown.get()) {
                Shuffle.LOG.info(Shuffle.this.sourceDestNameTrimmed + ": Already shutdown. Ignoring error");
                return;
            }
            Shuffle.LOG.error(Shuffle.this.sourceDestNameTrimmed + ": ShuffleRunner failed with error", th);
            Shuffle.this.inputContext.reportFailure(TaskFailureType.NON_FATAL, th, "Shuffle Runner Failed");
            Shuffle.this.cleanupIgnoreErrors();
        }
    }

    public Shuffle(InputContext inputContext, Configuration configuration, int i, long j) throws IOException {
        this.inputContext = inputContext;
        this.conf = configuration;
        this.sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> " + TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName());
        this.codec = CodecUtils.getCodec(configuration);
        this.ifileReadAhead = configuration.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD, true);
        if (this.ifileReadAhead) {
            this.ifileReadAheadLength = configuration.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES, TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT);
        } else {
            this.ifileReadAheadLength = 0;
        }
        Combiner instantiateCombiner = TezRuntimeUtils.instantiateCombiner(configuration, inputContext);
        LocalFileSystem local = FileSystem.getLocal(this.conf);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
        TezCounter findCounter = inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
        TezCounter findCounter2 = inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
        TezCounter findCounter3 = inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
        LOG.info(this.sourceDestNameTrimmed + ": Shuffle assigned with " + i + " inputs, codec: " + (this.codec == null ? "None" : this.codec.getClass().getName()) + ", ifileReadAhead: " + this.ifileReadAhead);
        this.startTime = System.currentTimeMillis();
        this.merger = new MergeManager(this.conf, local, localDirAllocator, inputContext, instantiateCombiner, findCounter, findCounter2, findCounter3, this, j, this.codec, this.ifileReadAhead, this.ifileReadAheadLength);
        this.scheduler = new ShuffleScheduler(this.inputContext, this.conf, i, this, this.merger, this.merger, this.startTime, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, this.sourceDestNameTrimmed);
        this.mergePhaseTime = inputContext.getCounters().findCounter(TaskCounter.MERGE_PHASE_TIME);
        this.shufflePhaseTime = inputContext.getCounters().findCounter(TaskCounter.SHUFFLE_PHASE_TIME);
        this.eventHandler = new ShuffleInputEventHandlerOrderedGrouped(inputContext, this.scheduler, ShuffleUtils.isTezShuffleHandler(configuration));
        this.executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ShuffleAndMergeRunner {" + this.sourceDestNameTrimmed + "}").build()));
        this.runShuffleCallable = new RunShuffleCallable();
    }

    public void handleEvents(List<Event> list) throws IOException {
        if (this.isShutDown.get()) {
            LOG.info(this.sourceDestNameTrimmed + ": Ignoring events since already shutdown. EventCount: " + list.size());
        } else {
            this.eventHandler.handleEvents(list);
        }
    }

    public boolean isInputReady() throws IOException, InterruptedException, TezException {
        if (this.isShutDown.get()) {
            throw new InputAlreadyClosedException();
        }
        if (this.throwable.get() != null) {
            handleThrowable(this.throwable.get());
        }
        if (this.runShuffleFuture == null) {
            return false;
        }
        return this.runShuffleFuture.isDone();
    }

    private void handleThrowable(Throwable th) throws IOException, InterruptedException {
        if (th instanceof IOException) {
            throw ((IOException) th);
        }
        if (!(th instanceof InterruptedException)) {
            throw new UndeclaredThrowableException(th);
        }
        throw ((InterruptedException) th);
    }

    public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException, TezException {
        Preconditions.checkState(this.runShuffleFuture != null, "waitForInput can only be called after run");
        TezRawKeyValueIterator tezRawKeyValueIterator = null;
        try {
            tezRawKeyValueIterator = (TezRawKeyValueIterator) this.runShuffleFuture.get();
        } catch (ExecutionException e) {
            handleThrowable(e.getCause());
        }
        if (this.isShutDown.get()) {
            throw new InputAlreadyClosedException();
        }
        if (this.throwable.get() != null) {
            handleThrowable(this.throwable.get());
        }
        return tezRawKeyValueIterator;
    }

    public void run() throws IOException {
        this.merger.configureAndStart();
        this.runShuffleFuture = this.executor.submit(this.runShuffleCallable);
        Futures.addCallback(this.runShuffleFuture, new ShuffleRunnerFutureCallback(), GuavaShim.directExecutor());
        this.executor.shutdown();
    }

    public void shutdown() {
        if (this.isShutDown.getAndSet(true)) {
            return;
        }
        LOG.info("Shutting down Shuffle for source: " + this.sourceDestNameTrimmed);
        this.runShuffleFuture.cancel(true);
        cleanupIgnoreErrors();
    }

    private void cleanupShuffleSchedulerIgnoreErrors() {
        try {
            cleanupShuffleScheduler();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.info(this.sourceDestNameTrimmed + ": Interrupted while attempting to close the scheduler during cleanup. Ignoring");
        }
    }

    private void cleanupShuffleScheduler() throws InterruptedException {
        if (this.schedulerClosed.getAndSet(true)) {
            return;
        }
        this.scheduler.close();
    }

    private void cleanupMerger(boolean z) throws Throwable {
        if (this.mergerClosed.getAndSet(true)) {
            return;
        }
        try {
            this.merger.close(false);
        } catch (InterruptedException e) {
            if (!z) {
                throw e;
            }
            Thread.currentThread().interrupt();
            LOG.info(this.sourceDestNameTrimmed + ": Interrupted while attempting to close the merger during cleanup. Ignoring");
        } catch (Throwable th) {
            if (!z) {
                throw th;
            }
            LOG.info(this.sourceDestNameTrimmed + ": Exception while trying to shutdown merger, Ignoring", th);
        }
    }

    private void cleanupIgnoreErrors() {
        try {
            if (this.eventHandler != null) {
                this.eventHandler.logProgress(true);
            }
            try {
                cleanupShuffleSchedulerIgnoreErrors();
            } catch (Exception e) {
                LOG.warn("Error cleaning up shuffle scheduler. Ignoring and continuing with shutdown. Message={}", e.getMessage());
            }
            cleanupMerger(true);
        } catch (Throwable th) {
            LOG.info(this.sourceDestNameTrimmed + ": Error in cleaning up.., ", th);
        }
    }

    @Override // org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ExceptionReporter
    @InterfaceAudience.Private
    public synchronized void reportException(Throwable th) {
        if (this.throwable.get() == null) {
            LOG.info(this.sourceDestNameTrimmed + ": Setting throwable in reportException with message [" + th.getMessage() + "] from thread [" + Thread.currentThread().getName());
            this.throwable.set(th);
            this.throwingThreadName = Thread.currentThread().getName();
            cleanupShuffleSchedulerIgnoreErrors();
        }
    }

    @Override // org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ExceptionReporter
    @InterfaceAudience.Private
    public synchronized void killSelf(Exception exc, String str) {
        if (this.isShutDown.get() || this.throwable.get() != null) {
            return;
        }
        shutdown();
        this.inputContext.killSelf(exc, str);
    }

    @InterfaceAudience.Private
    public static long getInitialMemoryRequirement(Configuration configuration, long j) {
        return MergeManager.getInitialMemoryRequirement(configuration, j);
    }
}
