package org.apache.hadoop.hdfs.shortcircuit;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocketWatcher;

@InterfaceAudience.Private
/* loaded from: input_file:hadoop-client-2.7.0-mapr-1506/share/hadoop/client/lib/hadoop-hdfs-2.7.0-mapr-1506.jar:org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.class */
public class DfsClientShmManager implements Closeable {
    private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
    private boolean closed = false;
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition finishedLoading = this.lock.newCondition();
    private final HashMap<DatanodeInfo, EndpointShmManager> datanodes = new HashMap<>(1);
    private final DomainSocketWatcher domainSocketWatcher;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:hadoop-client-2.7.0-mapr-1506/share/hadoop/client/lib/hadoop-hdfs-2.7.0-mapr-1506.jar:org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager$EndpointShmManager.class */
    public class EndpointShmManager {
        private final DatanodeInfo datanode;
        private final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> full = new TreeMap<>();
        private final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> notFull = new TreeMap<>();
        private boolean disabled = false;
        private boolean loading = false;

        EndpointShmManager(DatanodeInfo datanodeInfo) {
            this.datanode = datanodeInfo;
        }

        private ShortCircuitShm.Slot allocSlotFromExistingShm(ExtendedBlockId extendedBlockId) {
            if (this.notFull.isEmpty()) {
                return null;
            }
            DfsClientShm value = this.notFull.firstEntry().getValue();
            ShortCircuitShm.ShmId shmId = value.getShmId();
            ShortCircuitShm.Slot allocAndRegisterSlot = value.allocAndRegisterSlot(extendedBlockId);
            if (value.isFull()) {
                if (DfsClientShmManager.LOG.isTraceEnabled()) {
                    DfsClientShmManager.LOG.trace(this + ": pulled the last slot " + allocAndRegisterSlot.getSlotIdx() + " out of " + value);
                }
                Preconditions.checkState(this.notFull.remove(shmId) == value);
                this.full.put(shmId, value);
            } else if (DfsClientShmManager.LOG.isTraceEnabled()) {
                DfsClientShmManager.LOG.trace(this + ": pulled slot " + allocAndRegisterSlot.getSlotIdx() + " out of " + value);
            }
            return allocAndRegisterSlot;
        }

        private DfsClientShm requestNewShm(String str, DomainPeer domainPeer) throws IOException {
            new Sender(new DataOutputStream(new BufferedOutputStream(domainPeer.getOutputStream()))).requestShortCircuitShm(str);
            DataTransferProtos.ShortCircuitShmResponseProto parseFrom = DataTransferProtos.ShortCircuitShmResponseProto.parseFrom(PBHelper.vintPrefixed(domainPeer.getInputStream()));
            String error = parseFrom.hasError() ? parseFrom.getError() : NetUtils.UNKNOWN_HOST;
            switch (parseFrom.getStatus()) {
                case SUCCESS:
                    byte[] bArr = new byte[1];
                    FileInputStream[] fileInputStreamArr = new FileInputStream[1];
                    if (domainPeer.getDomainSocket().recvFileInputStreams(fileInputStreamArr, bArr, 0, bArr.length) < 0) {
                        throw new EOFException("got EOF while trying to transfer the file descriptor for the shared memory segment.");
                    }
                    if (fileInputStreamArr[0] == null) {
                        throw new IOException("the datanode " + this.datanode + " failed to pass a file descriptor for the shared memory segment.");
                    }
                    try {
                        DfsClientShm dfsClientShm = new DfsClientShm(PBHelper.convert(parseFrom.getId()), fileInputStreamArr[0], this, domainPeer);
                        if (DfsClientShmManager.LOG.isTraceEnabled()) {
                            DfsClientShmManager.LOG.trace(this + ": createNewShm: created " + dfsClientShm);
                        }
                        IOUtils.cleanup(DfsClientShmManager.LOG, fileInputStreamArr[0]);
                        return dfsClientShm;
                    } catch (Throwable th) {
                        IOUtils.cleanup(DfsClientShmManager.LOG, fileInputStreamArr[0]);
                        throw th;
                    }
                case ERROR_UNSUPPORTED:
                    DfsClientShmManager.LOG.info(this + ": datanode does not support short-circuit shared memory access: " + error);
                    this.disabled = true;
                    return null;
                default:
                    DfsClientShmManager.LOG.warn(this + ": error requesting short-circuit shared memory access: " + error);
                    return null;
            }
        }

        ShortCircuitShm.Slot allocSlot(DomainPeer domainPeer, MutableBoolean mutableBoolean, String str, ExtendedBlockId extendedBlockId) throws IOException {
            while (!DfsClientShmManager.this.closed) {
                if (this.disabled) {
                    if (!DfsClientShmManager.LOG.isTraceEnabled()) {
                        return null;
                    }
                    DfsClientShmManager.LOG.trace(this + ": shared memory segment access is disabled.");
                    return null;
                }
                ShortCircuitShm.Slot allocSlotFromExistingShm = allocSlotFromExistingShm(extendedBlockId);
                if (allocSlotFromExistingShm != null) {
                    return allocSlotFromExistingShm;
                }
                if (this.loading) {
                    if (DfsClientShmManager.LOG.isTraceEnabled()) {
                        DfsClientShmManager.LOG.trace(this + ": waiting for loading to finish...");
                    }
                    DfsClientShmManager.this.finishedLoading.awaitUninterruptibly();
                } else {
                    this.loading = true;
                    DfsClientShmManager.this.lock.unlock();
                    try {
                        DfsClientShm requestNewShm = requestNewShm(str, domainPeer);
                        if (requestNewShm != null) {
                            DfsClientShmManager.this.domainSocketWatcher.add(domainPeer.getDomainSocket(), requestNewShm);
                            mutableBoolean.setValue(true);
                            DfsClientShmManager.this.lock.lock();
                            this.loading = false;
                            DfsClientShmManager.this.finishedLoading.signalAll();
                            if (!requestNewShm.isDisconnected()) {
                                this.notFull.put(requestNewShm.getShmId(), requestNewShm);
                            } else if (DfsClientShmManager.LOG.isDebugEnabled()) {
                                DfsClientShmManager.LOG.debug(this + ": the UNIX domain socket associated with this short-circuit memory closed before we could make use of the shm.");
                            }
                        }
                    } finally {
                        DfsClientShmManager.this.lock.lock();
                        this.loading = false;
                        DfsClientShmManager.this.finishedLoading.signalAll();
                    }
                }
            }
            if (!DfsClientShmManager.LOG.isTraceEnabled()) {
                return null;
            }
            DfsClientShmManager.LOG.trace(this + ": the DfsClientShmManager has been closed.");
            return null;
        }

        void freeSlot(ShortCircuitShm.Slot slot) {
            DfsClientShm dfsClientShm = (DfsClientShm) slot.getShm();
            dfsClientShm.unregisterSlot(slot.getSlotIdx());
            if (dfsClientShm.isDisconnected()) {
                Preconditions.checkState(!this.full.containsKey(dfsClientShm.getShmId()));
                Preconditions.checkState(!this.notFull.containsKey(dfsClientShm.getShmId()));
                if (dfsClientShm.isEmpty()) {
                    if (DfsClientShmManager.LOG.isTraceEnabled()) {
                        DfsClientShmManager.LOG.trace(this + ": freeing empty stale " + dfsClientShm);
                    }
                    dfsClientShm.free();
                    return;
                }
                return;
            }
            ShortCircuitShm.ShmId shmId = dfsClientShm.getShmId();
            this.full.remove(shmId);
            if (!dfsClientShm.isEmpty()) {
                this.notFull.put(shmId, dfsClientShm);
                return;
            }
            this.notFull.remove(shmId);
            if (DfsClientShmManager.LOG.isTraceEnabled()) {
                DfsClientShmManager.LOG.trace(this + ": shutting down UNIX domain socket for empty " + dfsClientShm);
            }
            shutdown(dfsClientShm);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void unregisterShm(ShortCircuitShm.ShmId shmId) {
            DfsClientShmManager.this.lock.lock();
            try {
                this.full.remove(shmId);
                this.notFull.remove(shmId);
                DfsClientShmManager.this.lock.unlock();
            } catch (Throwable th) {
                DfsClientShmManager.this.lock.unlock();
                throw th;
            }
        }

        public String toString() {
            return String.format("EndpointShmManager(%s, parent=%s)", this.datanode, DfsClientShmManager.this);
        }

        PerDatanodeVisitorInfo getVisitorInfo() {
            return new PerDatanodeVisitorInfo(this.full, this.notFull, this.disabled);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public final void shutdown(DfsClientShm dfsClientShm) {
            try {
                dfsClientShm.getPeer().getDomainSocket().shutdown();
            } catch (IOException e) {
                DfsClientShmManager.LOG.warn(this + ": error shutting down shm: got IOException calling shutdown(SHUT_RDWR)", e);
            }
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:hadoop-client-2.7.0-mapr-1506/share/hadoop/client/lib/hadoop-hdfs-2.7.0-mapr-1506.jar:org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager$PerDatanodeVisitorInfo.class */
    public static class PerDatanodeVisitorInfo {
        public final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> full;
        public final TreeMap<ShortCircuitShm.ShmId, DfsClientShm> notFull;
        public final boolean disabled;

        PerDatanodeVisitorInfo(TreeMap<ShortCircuitShm.ShmId, DfsClientShm> treeMap, TreeMap<ShortCircuitShm.ShmId, DfsClientShm> treeMap2, boolean z) {
            this.full = treeMap;
            this.notFull = treeMap2;
            this.disabled = z;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:hadoop-client-2.7.0-mapr-1506/share/hadoop/client/lib/hadoop-hdfs-2.7.0-mapr-1506.jar:org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager$Visitor.class */
    public interface Visitor {
        void visit(HashMap<DatanodeInfo, PerDatanodeVisitorInfo> hashMap) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DfsClientShmManager(int i) throws IOException {
        this.domainSocketWatcher = new DomainSocketWatcher(i, "client");
    }

    public ShortCircuitShm.Slot allocSlot(DatanodeInfo datanodeInfo, DomainPeer domainPeer, MutableBoolean mutableBoolean, ExtendedBlockId extendedBlockId, String str) throws IOException {
        this.lock.lock();
        try {
            if (this.closed) {
                LOG.trace(this + ": the DfsClientShmManager isclosed.");
                this.lock.unlock();
                return null;
            }
            EndpointShmManager endpointShmManager = this.datanodes.get(datanodeInfo);
            if (endpointShmManager == null) {
                endpointShmManager = new EndpointShmManager(datanodeInfo);
                this.datanodes.put(datanodeInfo, endpointShmManager);
            }
            ShortCircuitShm.Slot allocSlot = endpointShmManager.allocSlot(domainPeer, mutableBoolean, str, extendedBlockId);
            this.lock.unlock();
            return allocSlot;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public void freeSlot(ShortCircuitShm.Slot slot) {
        this.lock.lock();
        try {
            ((DfsClientShm) slot.getShm()).getEndpointShmManager().freeSlot(slot);
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @VisibleForTesting
    public void visit(Visitor visitor) throws IOException {
        this.lock.lock();
        try {
            HashMap<DatanodeInfo, PerDatanodeVisitorInfo> hashMap = new HashMap<>();
            for (Map.Entry<DatanodeInfo, EndpointShmManager> entry : this.datanodes.entrySet()) {
                hashMap.put(entry.getKey(), entry.getValue().getVisitorInfo());
            }
            visitor.visit(hashMap);
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.lock.lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.lock.unlock();
            IOUtils.cleanup(LOG, this.domainSocketWatcher);
        } finally {
            this.lock.unlock();
        }
    }

    public String toString() {
        return String.format("ShortCircuitShmManager(%08x)", Integer.valueOf(System.identityHashCode(this)));
    }

    @VisibleForTesting
    public DomainSocketWatcher getDomainSocketWatcher() {
        return this.domainSocketWatcher;
    }
}
