/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.shuffle.common.impl;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.BitSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.ShuffleEventHandler;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.common.impl.ShuffleManager;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;

public class ShuffleInputEventHandlerImpl
implements ShuffleEventHandler {
    private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandlerImpl.class);
    private final ShuffleManager shuffleManager;
    private final FetchedInputAllocator inputAllocator;
    private final CompressionCodec codec;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;

    public ShuffleInputEventHandlerImpl(TezInputContext inputContext, ShuffleManager shuffleManager, FetchedInputAllocator inputAllocator, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength) {
        this.shuffleManager = shuffleManager;
        this.inputAllocator = inputAllocator;
        this.codec = codec;
        this.ifileReadAhead = ifileReadAhead;
        this.ifileReadAheadLength = ifileReadAheadLength;
    }

    @Override
    public void handleEvents(List<Event> events) throws IOException {
        for (Event event : events) {
            this.handleEvent(event);
        }
    }

    private void handleEvent(Event event) throws IOException {
        if (event instanceof DataMovementEvent) {
            this.processDataMovementEvent((DataMovementEvent)event);
        } else if (event instanceof InputFailedEvent) {
            this.processInputFailedEvent((InputFailedEvent)event);
        } else {
            throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
        }
    }

    private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
        ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload;
        try {
            shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
        }
        catch (InvalidProtocolBufferException e) {
            throw new TezUncheckedException("Unable to parse DataMovementEvent payload", (Throwable)e);
        }
        int srcIndex = dme.getSourceIndex();
        LOG.info((Object)("Processing DataMovementEvent with srcIndex: " + srcIndex + ", targetIndex: " + dme.getTargetIndex() + ", attemptNum: " + dme.getVersion() + ", payload: " + this.stringify(shufflePayload)));
        if (shufflePayload.hasEmptyPartitions()) {
            byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray((ByteString)shufflePayload.getEmptyPartitions());
            BitSet emptyPartionsBitSet = TezUtils.fromByteArray((byte[])emptyPartitions);
            if (emptyPartionsBitSet.get(srcIndex)) {
                InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion());
                LOG.info((Object)("Source partition: " + srcIndex + " did not generate any data. Not fetching."));
                this.shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
                return;
            }
        } else {
            InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
            if (shufflePayload.hasData()) {
                ShuffleUserPayloads.DataProto dataProto = shufflePayload.getData();
                FetchedInput fetchedInput = this.inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
                this.moveDataToFetchedInput(dataProto, fetchedInput);
                this.shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
            } else {
                this.shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
            }
        }
    }

    private void moveDataToFetchedInput(ShuffleUserPayloads.DataProto dataProto, FetchedInput fetchedInput) throws IOException {
        switch (fetchedInput.getType()) {
            case DISK: {
                ShuffleUtils.shuffleToDisk((DiskFetchedInput)fetchedInput, dataProto.getData().newInput(), dataProto.getCompressedLength(), LOG);
                break;
            }
            case MEMORY: {
                ShuffleUtils.shuffleToMemory((MemoryFetchedInput)fetchedInput, dataProto.getData().newInput(), dataProto.getRawLength(), dataProto.getCompressedLength(), this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG);
                break;
            }
            default: {
                throw new TezUncheckedException("Unexpected type: " + (Object)((Object)fetchedInput.getType()));
            }
        }
    }

    private void processInputFailedEvent(InputFailedEvent ife) {
        InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
        this.shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
    }

    private String stringify(ShuffleUserPayloads.DataMovementEventPayloadProto dmProto) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        sb.append("hasEmptyPartitions: ").append(dmProto.hasEmptyPartitions()).append(", ");
        sb.append("host: " + dmProto.getHost()).append(", ");
        sb.append("port: " + dmProto.getPort()).append(", ");
        sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
        sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
        sb.append("hasDataInEvent: " + dmProto.hasData());
        return sb.toString();
    }
}

