package org.apache.hadoop.hdfs;

import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-client-3.3.4.107-eep-910.jar:org/apache/hadoop/hdfs/DeadNodeDetector.class */
public class DeadNodeDetector extends Daemon {
    private static final long ERROR_SLEEP_MS = 5000;
    private final long idleSleepMs;
    private String name;
    private Configuration conf;
    private long deadNodeDetectInterval;
    private long suspectNodeDetectInterval;
    private long probeConnectionTimeoutMs;
    private ExecutorService probeDeadNodesThreadPool;
    private ExecutorService probeSuspectNodesThreadPool;
    private Thread probeDeadNodesSchedulerThr;
    private Thread probeSuspectNodesSchedulerThr;
    private ExecutorService rpcThreadPool;
    private int socketTimeout;
    private State state;
    public static final Logger LOG = LoggerFactory.getLogger((Class<?>) DeadNodeDetector.class);
    private static volatile boolean disabledProbeThreadForTest = false;
    private Map<String, DatanodeInfo> probeInProg = new ConcurrentHashMap();
    private final Map<String, DatanodeInfo> deadNodes = new ConcurrentHashMap();
    private final Map<DFSInputStream, HashSet<DatanodeInfo>> suspectAndDeadNodes = new ConcurrentHashMap();
    private UniqueQueue<DatanodeInfo> deadNodesProbeQueue = new UniqueQueue<>();
    private UniqueQueue<DatanodeInfo> suspectNodesProbeQueue = new UniqueQueue<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-client-3.3.4.107-eep-910.jar:org/apache/hadoop/hdfs/DeadNodeDetector$Probe.class */
    public class Probe implements Runnable {
        private DeadNodeDetector deadNodeDetector;
        private DatanodeInfo datanodeInfo;
        private ProbeType type;

        Probe(DeadNodeDetector deadNodeDetector, DatanodeInfo datanodeInfo, ProbeType probeType) {
            this.deadNodeDetector = deadNodeDetector;
            this.datanodeInfo = datanodeInfo;
            this.type = probeType;
        }

        public DatanodeInfo getDatanodeInfo() {
            return this.datanodeInfo;
        }

        public ProbeType getType() {
            return this.type;
        }

        @Override // java.lang.Runnable
        public void run() {
            DeadNodeDetector.LOG.debug("Check node: {}, type: {}.", this.datanodeInfo, this.type);
            try {
                final ClientDatanodeProtocol createClientDatanodeProtocolProxy = DFSUtilClient.createClientDatanodeProtocolProxy((DatanodeID) this.datanodeInfo, this.deadNodeDetector.conf, DeadNodeDetector.this.socketTimeout, true);
                Future submit = DeadNodeDetector.this.rpcThreadPool.submit(new Callable() { // from class: org.apache.hadoop.hdfs.DeadNodeDetector.Probe.1
                    @Override // java.util.concurrent.Callable
                    public DatanodeLocalInfo call() throws Exception {
                        return createClientDatanodeProtocolProxy.getDatanodeInfo();
                    }
                });
                try {
                    try {
                        submit.get(DeadNodeDetector.this.probeConnectionTimeoutMs, TimeUnit.MILLISECONDS);
                        submit.cancel(true);
                        this.deadNodeDetector.probeCallBack(this, true);
                    } catch (Throwable th) {
                        submit.cancel(true);
                        throw th;
                    }
                } catch (TimeoutException e) {
                    DeadNodeDetector.LOG.error("Probe failed, datanode: {}, type: {}.", this.datanodeInfo, this.type, e);
                    this.deadNodeDetector.probeCallBack(this, false);
                    submit.cancel(true);
                }
            } catch (Exception e2) {
                DeadNodeDetector.LOG.error("Probe failed, datanode: {}, type: {}.", this.datanodeInfo, this.type, e2);
                this.deadNodeDetector.probeCallBack(this, false);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-client-3.3.4.107-eep-910.jar:org/apache/hadoop/hdfs/DeadNodeDetector$ProbeScheduler.class */
    public static class ProbeScheduler implements Runnable {
        private DeadNodeDetector deadNodeDetector;
        private ProbeType type;

        ProbeScheduler(DeadNodeDetector deadNodeDetector, ProbeType probeType) {
            this.deadNodeDetector = deadNodeDetector;
            this.type = probeType;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!Thread.currentThread().isInterrupted()) {
                this.deadNodeDetector.scheduleProbe(this.type);
                if (this.type == ProbeType.CHECK_SUSPECT) {
                    DeadNodeDetector.probeSleep(this.deadNodeDetector.suspectNodeDetectInterval);
                } else {
                    DeadNodeDetector.probeSleep(this.deadNodeDetector.deadNodeDetectInterval);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-client-3.3.4.107-eep-910.jar:org/apache/hadoop/hdfs/DeadNodeDetector$ProbeType.class */
    public enum ProbeType {
        CHECK_DEAD,
        CHECK_SUSPECT
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-client-3.3.4.107-eep-910.jar:org/apache/hadoop/hdfs/DeadNodeDetector$State.class */
    public enum State {
        INIT,
        CHECK_DEAD,
        IDLE,
        ERROR
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-hdfs-client-3.3.4.107-eep-910.jar:org/apache/hadoop/hdfs/DeadNodeDetector$UniqueQueue.class */
    public static class UniqueQueue<T> {
        private Deque<T> queue = new LinkedList();
        private Set<T> set = new HashSet();

        UniqueQueue() {
        }

        synchronized boolean offer(T t) {
            if (!this.set.add(t)) {
                return false;
            }
            this.queue.addLast(t);
            return true;
        }

        synchronized T poll() {
            T pollFirst = this.queue.pollFirst();
            this.set.remove(pollFirst);
            return pollFirst;
        }

        synchronized int size() {
            return this.set.size();
        }
    }

    public DeadNodeDetector(String str, Configuration configuration) {
        this.deadNodeDetectInterval = 0L;
        this.suspectNodeDetectInterval = 0L;
        this.conf = new Configuration(configuration);
        this.name = str;
        this.deadNodeDetectInterval = configuration.getLong(HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_INTERVAL_MS_KEY, 60000L);
        this.suspectNodeDetectInterval = configuration.getLong(HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_INTERVAL_MS_KEY, 300L);
        this.socketTimeout = configuration.getInt("dfs.client.socket-timeout", 60000);
        this.probeConnectionTimeoutMs = configuration.getLong(HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_KEY, HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_CONNECTION_TIMEOUT_MS_DEFAULT);
        this.idleSleepMs = configuration.getLong(HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_IDLE_SLEEP_MS_KEY, 10000L);
        int i = configuration.getInt(HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_DEAD_NODE_THREADS_KEY, 10);
        int i2 = configuration.getInt(HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_PROBE_SUSPECT_NODE_THREADS_KEY, 10);
        int i3 = configuration.getInt(HdfsClientConfigKeys.DFS_CLIENT_DEAD_NODE_DETECTION_RPC_THREADS_KEY, 20);
        this.probeDeadNodesThreadPool = Executors.newFixedThreadPool(i, new Daemon.DaemonFactory());
        this.probeSuspectNodesThreadPool = Executors.newFixedThreadPool(i2, new Daemon.DaemonFactory());
        this.rpcThreadPool = Executors.newFixedThreadPool(i3, new Daemon.DaemonFactory());
        if (!disabledProbeThreadForTest) {
            startProbeScheduler();
        }
        LOG.info("Start dead node detector for DFSClient {}.", this.name);
        this.state = State.INIT;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            clearAndGetDetectedDeadNodes();
            LOG.debug("Current detector state {}, the detected nodes: {}.", this.state, this.deadNodes.values());
            switch (this.state) {
                case INIT:
                    init();
                    break;
                case CHECK_DEAD:
                    checkDeadNodes();
                    break;
                case IDLE:
                    idle();
                    break;
                case ERROR:
                    try {
                        Thread.sleep(5000L);
                        return;
                    } catch (InterruptedException e) {
                        LOG.debug("Got interrupted while DeadNodeDetector is error.", (Throwable) e);
                        Thread.currentThread().interrupt();
                        return;
                    }
            }
        }
    }

    public void shutdown() {
        threadShutDown(this);
        threadShutDown(this.probeDeadNodesSchedulerThr);
        threadShutDown(this.probeSuspectNodesSchedulerThr);
        this.probeDeadNodesThreadPool.shutdown();
        this.probeSuspectNodesThreadPool.shutdown();
        this.rpcThreadPool.shutdown();
    }

    private static void threadShutDown(Thread thread) {
        if (thread == null || !thread.isAlive()) {
            return;
        }
        thread.interrupt();
        try {
            thread.join();
        } catch (InterruptedException e) {
        }
    }

    @VisibleForTesting
    boolean isThreadsShutdown() {
        return !isAlive() && !this.probeDeadNodesSchedulerThr.isAlive() && !this.probeSuspectNodesSchedulerThr.isAlive() && this.probeDeadNodesThreadPool.isShutdown() && this.probeSuspectNodesThreadPool.isShutdown() && this.rpcThreadPool.isShutdown();
    }

    @VisibleForTesting
    static void setDisabledProbeThreadForTest(boolean z) {
        disabledProbeThreadForTest = z;
    }

    @VisibleForTesting
    void startProbeScheduler() {
        this.probeDeadNodesSchedulerThr = new Thread(new ProbeScheduler(this, ProbeType.CHECK_DEAD));
        this.probeDeadNodesSchedulerThr.setDaemon(true);
        this.probeDeadNodesSchedulerThr.start();
        this.probeSuspectNodesSchedulerThr = new Thread(new ProbeScheduler(this, ProbeType.CHECK_SUSPECT));
        this.probeSuspectNodesSchedulerThr.setDaemon(true);
        this.probeSuspectNodesSchedulerThr.start();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleProbe(ProbeType probeType) {
        LOG.debug("Schedule probe datanode for probe type: {}.", probeType);
        if (probeType == ProbeType.CHECK_DEAD) {
            while (true) {
                DatanodeInfo poll = this.deadNodesProbeQueue.poll();
                if (poll == null) {
                    return;
                }
                if (this.probeInProg.containsKey(poll.getDatanodeUuid())) {
                    LOG.debug("The datanode {} is already contained in probe queue, skip to add it.", poll);
                } else {
                    this.probeInProg.put(poll.getDatanodeUuid(), poll);
                    this.probeDeadNodesThreadPool.execute(new Probe(this, poll, ProbeType.CHECK_DEAD));
                }
            }
        } else {
            if (probeType != ProbeType.CHECK_SUSPECT) {
                return;
            }
            while (true) {
                DatanodeInfo poll2 = this.suspectNodesProbeQueue.poll();
                if (poll2 == null) {
                    return;
                }
                if (!this.probeInProg.containsKey(poll2.getDatanodeUuid())) {
                    this.probeInProg.put(poll2.getDatanodeUuid(), poll2);
                    this.probeSuspectNodesThreadPool.execute(new Probe(this, poll2, ProbeType.CHECK_SUSPECT));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void probeCallBack(Probe probe, boolean z) {
        LOG.debug("Probe datanode: {} result: {}, type: {}", probe.getDatanodeInfo(), Boolean.valueOf(z), probe.getType());
        this.probeInProg.remove(probe.getDatanodeInfo().getDatanodeUuid());
        if (!z) {
            if (probe.getType() == ProbeType.CHECK_SUSPECT) {
                LOG.warn("Probe failed, add suspect node to dead node list: {}.", probe.getDatanodeInfo());
                addToDead(probe.getDatanodeInfo());
                return;
            }
            return;
        }
        if (probe.getType() == ProbeType.CHECK_DEAD) {
            LOG.info("Remove the node out from dead node list: {}.", probe.getDatanodeInfo());
            removeDeadNode(probe.getDatanodeInfo());
        } else if (probe.getType() == ProbeType.CHECK_SUSPECT) {
            LOG.debug("Remove the node out from suspect node list: {}.", probe.getDatanodeInfo());
            removeNodeFromDeadNodeDetector(probe.getDatanodeInfo());
        }
    }

    private void checkDeadNodes() {
        for (DatanodeInfo datanodeInfo : clearAndGetDetectedDeadNodes()) {
            if (this.deadNodesProbeQueue.offer(datanodeInfo)) {
                LOG.debug("Add dead node to check: {}.", datanodeInfo);
            } else {
                LOG.debug("Skip to add dead node {} to check since the node is already in the probe queue.", datanodeInfo);
            }
        }
        this.state = State.IDLE;
    }

    private void idle() {
        try {
            Thread.sleep(this.idleSleepMs);
        } catch (InterruptedException e) {
            LOG.debug("Got interrupted while DeadNodeDetector is idle.", (Throwable) e);
            Thread.currentThread().interrupt();
        }
        this.state = State.CHECK_DEAD;
    }

    private void init() {
        this.state = State.CHECK_DEAD;
    }

    private void addToDead(DatanodeInfo datanodeInfo) {
        this.deadNodes.put(datanodeInfo.getDatanodeUuid(), datanodeInfo);
    }

    public boolean isDeadNode(DatanodeInfo datanodeInfo) {
        return this.deadNodes.containsKey(datanodeInfo.getDatanodeUuid());
    }

    private void removeFromDead(DatanodeInfo datanodeInfo) {
        this.deadNodes.remove(datanodeInfo.getDatanodeUuid());
    }

    public UniqueQueue<DatanodeInfo> getDeadNodesProbeQueue() {
        return this.deadNodesProbeQueue;
    }

    public UniqueQueue<DatanodeInfo> getSuspectNodesProbeQueue() {
        return this.suspectNodesProbeQueue;
    }

    @VisibleForTesting
    void setSuspectQueue(UniqueQueue<DatanodeInfo> uniqueQueue) {
        this.suspectNodesProbeQueue = uniqueQueue;
    }

    @VisibleForTesting
    void setDeadQueue(UniqueQueue<DatanodeInfo> uniqueQueue) {
        this.deadNodesProbeQueue = uniqueQueue;
    }

    public synchronized void addNodeToDetect(DFSInputStream dFSInputStream, DatanodeInfo datanodeInfo) {
        HashSet<DatanodeInfo> hashSet = this.suspectAndDeadNodes.get(dFSInputStream);
        if (hashSet == null) {
            HashSet<DatanodeInfo> hashSet2 = new HashSet<>();
            hashSet2.add(datanodeInfo);
            this.suspectAndDeadNodes.putIfAbsent(dFSInputStream, hashSet2);
        } else {
            hashSet.add(datanodeInfo);
        }
        LOG.debug("Add datanode {} to suspectAndDeadNodes.", datanodeInfo);
        addSuspectNodeToDetect(datanodeInfo);
    }

    private boolean addSuspectNodeToDetect(DatanodeInfo datanodeInfo) {
        return this.suspectNodesProbeQueue.offer(datanodeInfo);
    }

    public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() {
        HashSet hashSet = new HashSet();
        Iterator<HashSet<DatanodeInfo>> it = this.suspectAndDeadNodes.values().iterator();
        while (it.hasNext()) {
            hashSet.addAll(it.next());
        }
        for (DatanodeInfo datanodeInfo : this.deadNodes.values()) {
            if (!hashSet.contains(datanodeInfo)) {
                this.deadNodes.remove(datanodeInfo.getDatanodeUuid());
            }
        }
        return new HashSet(this.deadNodes.values());
    }

    public synchronized void removeNodeFromDeadNodeDetector(DFSInputStream dFSInputStream, DatanodeInfo datanodeInfo) {
        HashSet<DatanodeInfo> hashSet = this.suspectAndDeadNodes.get(dFSInputStream);
        if (hashSet != null) {
            hashSet.remove(datanodeInfo);
            dFSInputStream.removeFromLocalDeadNodes(datanodeInfo);
            if (hashSet.isEmpty()) {
                this.suspectAndDeadNodes.remove(dFSInputStream);
            }
        }
    }

    private synchronized void removeNodeFromDeadNodeDetector(DatanodeInfo datanodeInfo) {
        for (Map.Entry<DFSInputStream, HashSet<DatanodeInfo>> entry : this.suspectAndDeadNodes.entrySet()) {
            HashSet<DatanodeInfo> value = entry.getValue();
            if (value.remove(datanodeInfo)) {
                DFSInputStream key = entry.getKey();
                key.removeFromLocalDeadNodes(datanodeInfo);
                if (value.isEmpty()) {
                    this.suspectAndDeadNodes.remove(key);
                }
            }
        }
    }

    private void removeDeadNode(DatanodeInfo datanodeInfo) {
        removeNodeFromDeadNodeDetector(datanodeInfo);
        removeFromDead(datanodeInfo);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void probeSleep(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            LOG.debug("Got interrupted while probe is scheduling.", (Throwable) e);
            Thread.currentThread().interrupt();
        }
    }
}
