package org.apache.hadoop.ha;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HealthMonitor;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/hadoop-common-3.3.4.100-eep-910-tests.jar:org/apache/hadoop/ha/TestHealthMonitor.class */
public class TestHealthMonitor {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) TestHealthMonitor.class);
    private AtomicInteger createProxyCount = new AtomicInteger(0);
    private volatile boolean throwOOMEOnCreate = false;
    private HealthMonitor hm;
    private DummyHAService svc;

    @Before
    public void setupHM() throws InterruptedException, IOException {
        Configuration configuration = new Configuration();
        configuration.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
        configuration.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
        configuration.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
        configuration.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
        this.svc = createDummyHAService();
        this.hm = new HealthMonitor(configuration, this.svc) { // from class: org.apache.hadoop.ha.TestHealthMonitor.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.hadoop.ha.HealthMonitor
            public HAServiceProtocol createProxy() throws IOException {
                TestHealthMonitor.this.createProxyCount.incrementAndGet();
                if (TestHealthMonitor.this.throwOOMEOnCreate) {
                    throw new OutOfMemoryError("oome");
                }
                return super.createProxy();
            }
        };
        LOG.info("Starting health monitor");
        this.hm.start();
        LOG.info("Waiting for HEALTHY signal");
        waitForState(this.hm, HealthMonitor.State.SERVICE_HEALTHY);
    }

    protected DummyHAService createDummyHAService() {
        return new DummyHAService(HAServiceProtocol.HAServiceState.ACTIVE, new InetSocketAddress("0.0.0.0", 0), true);
    }

    @Test(timeout = 15000)
    public void testMonitor() throws Exception {
        LOG.info("Mocking bad health check, waiting for UNHEALTHY");
        this.svc.isHealthy = false;
        waitForState(this.hm, HealthMonitor.State.SERVICE_UNHEALTHY);
        LOG.info("Returning to healthy state, waiting for HEALTHY");
        this.svc.isHealthy = true;
        waitForState(this.hm, HealthMonitor.State.SERVICE_HEALTHY);
        LOG.info("Returning an IOException, as if node went down");
        int i = this.createProxyCount.get();
        this.svc.actUnreachable = true;
        waitForState(this.hm, HealthMonitor.State.SERVICE_NOT_RESPONDING);
        while (this.createProxyCount.get() < i + 3) {
            Thread.sleep(10L);
        }
        LOG.info("Returning to healthy state, waiting for HEALTHY");
        this.svc.actUnreachable = false;
        waitForState(this.hm, HealthMonitor.State.SERVICE_HEALTHY);
        this.hm.shutdown();
        this.hm.join();
        Assert.assertFalse(this.hm.isAlive());
    }

    @Test(timeout = 15000)
    public void testHealthMonitorDies() throws Exception {
        LOG.info("Mocking RTE in health monitor, waiting for FAILED");
        this.throwOOMEOnCreate = true;
        this.svc.actUnreachable = true;
        waitForState(this.hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
        this.hm.shutdown();
        this.hm.join();
        Assert.assertFalse(this.hm.isAlive());
    }

    @Test(timeout = 15000)
    public void testCallbackThrowsRTE() throws Exception {
        this.hm.addCallback(new HealthMonitor.Callback() { // from class: org.apache.hadoop.ha.TestHealthMonitor.2
            @Override // org.apache.hadoop.ha.HealthMonitor.Callback
            public void enteredState(HealthMonitor.State state) {
                throw new RuntimeException("Injected RTE");
            }
        });
        LOG.info("Mocking bad health check, waiting for UNHEALTHY");
        this.svc.isHealthy = false;
        waitForState(this.hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
    }

    private void waitForState(HealthMonitor healthMonitor, HealthMonitor.State state) throws InterruptedException {
        long now = Time.now();
        while (Time.now() - now < 2000) {
            if (healthMonitor.getHealthState() == state) {
                return;
            } else {
                Thread.sleep(50L);
            }
        }
        Assert.assertEquals(state, healthMonitor.getHealthState());
    }
}
