package org.apache.hadoop.ha;

import java.io.IOException;
import java.lang.Thread;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:WEB-INF/lib/hadoop-common-3.4.1.0-eep-940.jar:org/apache/hadoop/ha/HealthMonitor.class */
public class HealthMonitor {
    private static final Logger LOG;
    private long connectRetryInterval;
    private long checkIntervalMillis;
    private long sleepAfterDisconnectMillis;
    private int rpcConnectRetries;
    private int rpcTimeout;
    private HAServiceProtocol proxy;
    private final HAServiceTarget targetToMonitor;
    private final Configuration conf;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile boolean shouldRun = true;
    private State state = State.INITIALIZING;
    private List<Callback> callbacks = Collections.synchronizedList(new LinkedList());
    private List<ServiceStateCallback> serviceStateCallbacks = Collections.synchronizedList(new LinkedList());
    private HAServiceStatus lastServiceState = new HAServiceStatus(HAServiceProtocol.HAServiceState.INITIALIZING);
    private Daemon daemon = new MonitorDaemon();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-common-3.4.1.0-eep-940.jar:org/apache/hadoop/ha/HealthMonitor$Callback.class */
    public interface Callback {
        void enteredState(State state);
    }

    /* loaded from: input_file:WEB-INF/lib/hadoop-common-3.4.1.0-eep-940.jar:org/apache/hadoop/ha/HealthMonitor$MonitorDaemon.class */
    private class MonitorDaemon extends Daemon {
        private MonitorDaemon() {
            setName("Health Monitor for " + HealthMonitor.this.targetToMonitor);
            setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: org.apache.hadoop.ha.HealthMonitor.MonitorDaemon.1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread, Throwable th) {
                    HealthMonitor.LOG.error("Health monitor failed", th);
                    HealthMonitor.this.enterState(State.HEALTH_MONITOR_FAILED);
                }
            });
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (HealthMonitor.this.shouldRun) {
                try {
                    HealthMonitor.this.loopUntilConnected();
                    HealthMonitor.this.doHealthChecks();
                } catch (InterruptedException e) {
                    Preconditions.checkState(!HealthMonitor.this.shouldRun, "Interrupted but still supposed to run");
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/hadoop-common-3.4.1.0-eep-940.jar:org/apache/hadoop/ha/HealthMonitor$ServiceStateCallback.class */
    public interface ServiceStateCallback {
        void reportServiceStatus(HAServiceStatus hAServiceStatus);
    }

    @InterfaceAudience.Private
    /* loaded from: input_file:WEB-INF/lib/hadoop-common-3.4.1.0-eep-940.jar:org/apache/hadoop/ha/HealthMonitor$State.class */
    public enum State {
        INITIALIZING,
        SERVICE_NOT_RESPONDING,
        SERVICE_HEALTHY,
        SERVICE_UNHEALTHY,
        HEALTH_MONITOR_FAILED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HealthMonitor(Configuration configuration, HAServiceTarget hAServiceTarget) {
        this.targetToMonitor = hAServiceTarget;
        this.conf = configuration;
        this.sleepAfterDisconnectMillis = configuration.getLong(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 1000L);
        this.checkIntervalMillis = configuration.getLong(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 1000L);
        this.connectRetryInterval = configuration.getLong(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 1000L);
        this.rpcConnectRetries = configuration.getInt(CommonConfigurationKeys.HA_HM_RPC_CONNECT_MAX_RETRIES_KEY, 1);
        this.rpcTimeout = configuration.getInt(CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_KEY, CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT);
    }

    public void addCallback(Callback callback) {
        this.callbacks.add(callback);
    }

    public synchronized void addServiceStateCallback(ServiceStateCallback serviceStateCallback) {
        this.serviceStateCallbacks.add(serviceStateCallback);
    }

    public void shutdown() {
        LOG.info("Stopping HealthMonitor thread");
        this.shouldRun = false;
        this.daemon.interrupt();
    }

    public synchronized HAServiceProtocol getProxy() {
        return this.proxy;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void loopUntilConnected() throws InterruptedException {
        tryConnect();
        while (this.proxy == null) {
            Thread.sleep(this.connectRetryInterval);
            tryConnect();
        }
        if (!$assertionsDisabled && this.proxy == null) {
            throw new AssertionError();
        }
    }

    private void tryConnect() {
        Preconditions.checkState(this.proxy == null);
        try {
            synchronized (this) {
                this.proxy = createProxy();
            }
        } catch (IOException e) {
            LOG.warn("Could not connect to local service at " + this.targetToMonitor + ": " + e.getMessage());
            this.proxy = null;
            enterState(State.SERVICE_NOT_RESPONDING);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HAServiceProtocol createProxy() throws IOException {
        return this.targetToMonitor.getHealthMonitorProxy(this.conf, this.rpcTimeout, this.rpcConnectRetries);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doHealthChecks() throws InterruptedException {
        while (this.shouldRun) {
            HAServiceStatus hAServiceStatus = null;
            boolean z = false;
            try {
                hAServiceStatus = this.proxy.getServiceStatus();
                this.proxy.monitorHealth();
                z = true;
            } catch (Throwable th) {
                if (!isHealthCheckFailedException(th)) {
                    LOG.warn("Transport-level exception trying to monitor health of {}", this.targetToMonitor, th);
                    RPC.stopProxy(this.proxy);
                    this.proxy = null;
                    enterState(State.SERVICE_NOT_RESPONDING);
                    Thread.sleep(this.sleepAfterDisconnectMillis);
                    return;
                }
                LOG.warn("Service health check failed for {}", this.targetToMonitor, th);
                enterState(State.SERVICE_UNHEALTHY);
            }
            if (hAServiceStatus != null) {
                setLastServiceStatus(hAServiceStatus);
            }
            if (z) {
                enterState(State.SERVICE_HEALTHY);
            }
            Thread.sleep(this.checkIntervalMillis);
        }
    }

    private boolean isHealthCheckFailedException(Throwable th) {
        return (th instanceof HealthCheckFailedException) || ((th instanceof RemoteException) && (((RemoteException) th).unwrapRemoteException(HealthCheckFailedException.class) instanceof HealthCheckFailedException));
    }

    private synchronized void setLastServiceStatus(HAServiceStatus hAServiceStatus) {
        this.lastServiceState = hAServiceStatus;
        Iterator<ServiceStateCallback> it = this.serviceStateCallbacks.iterator();
        while (it.hasNext()) {
            it.next().reportServiceStatus(this.lastServiceState);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void enterState(State state) {
        if (state != this.state) {
            LOG.info("Entering state {}", state);
            this.state = state;
            synchronized (this.callbacks) {
                Iterator<Callback> it = this.callbacks.iterator();
                while (it.hasNext()) {
                    it.next().enteredState(state);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized State getHealthState() {
        return this.state;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isAlive() {
        return this.daemon.isAlive();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void join() throws InterruptedException {
        this.daemon.join();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        this.daemon.start();
    }

    static {
        $assertionsDisabled = !HealthMonitor.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger((Class<?>) HealthMonitor.class);
    }
}
