/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.test.KillSession;
import org.apache.curator.test.TestingCluster;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.CuratorBasedElectorService;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.event.Level;

public class TestLeaderElectorService {
    private static final String RM1_ADDRESS = "1.1.1.1:1";
    private static final String RM1_NODE_ID = "rm1";
    private static final String RM2_ADDRESS = "0.0.0.0:0";
    private static final String RM2_NODE_ID = "rm2";
    Configuration conf;
    MockRM rm1;
    MockRM rm2;
    TestingCluster zkCluster;

    @Before
    public void setUp() throws Exception {
        GenericTestUtils.setRootLogLevel((Level)Level.INFO);
        this.conf = new Configuration();
        this.conf.setBoolean("yarn.resourcemanager.ha.enabled", true);
        this.conf.setBoolean("yarn.resourcemanager.ha.curator-leader-elector.enabled", true);
        this.conf.set("yarn.resourcemanager.cluster-id", "cluster1");
        this.conf.set("yarn.resourcemanager.ha.rm-ids", "rm1,rm2");
        for (String confKey : YarnConfiguration.getServiceAddressConfKeys((Configuration)this.conf)) {
            this.conf.set(HAUtil.addSuffix((String)confKey, (String)RM1_NODE_ID), RM1_ADDRESS);
            this.conf.set(HAUtil.addSuffix((String)confKey, (String)RM2_NODE_ID), RM2_ADDRESS);
        }
        this.zkCluster = new TestingCluster(3);
        this.conf.set("yarn.resourcemanager.zk-address", this.zkCluster.getConnectString());
        this.zkCluster.start();
    }

    @After
    public void tearDown() throws Exception {
        if (this.rm1 != null) {
            this.rm1.stop();
        }
        if (this.rm2 != null) {
            this.rm2.stop();
        }
    }

    @Test(timeout=20000L)
    public void testRMShutDownCauseFailover() throws Exception {
        this.rm1 = this.startRM(RM1_NODE_ID, HAServiceProtocol.HAServiceState.ACTIVE);
        this.rm2 = this.startRM(RM2_NODE_ID, HAServiceProtocol.HAServiceState.STANDBY);
        Thread.sleep(5000L);
        this.waitFor(this.rm2, HAServiceProtocol.HAServiceState.STANDBY);
        this.rm1.stop();
        this.waitFor(this.rm2, HAServiceProtocol.HAServiceState.ACTIVE);
    }

    @Test
    public void testStateStoreFailureCauseFailover() throws Exception {
        this.conf.set("yarn.resourcemanager.ha.id", RM1_NODE_ID);
        MemoryRMStateStore memStore = new MemoryRMStateStore(){

            public synchronized void storeApplicationStateInternal(ApplicationId appId, ApplicationStateData appState) throws Exception {
                throw new Exception("store app failure.");
            }
        };
        memStore.init(this.conf);
        this.rm1 = new MockRM(this.conf, (RMStateStore)memStore, true);
        this.rm1.init(this.conf);
        this.rm1.start();
        this.waitFor(this.rm1, HAServiceProtocol.HAServiceState.ACTIVE);
        this.rm2 = this.startRM(RM2_NODE_ID, HAServiceProtocol.HAServiceState.STANDBY);
        MockRMAppSubmitter.submit(this.rm1, MockRMAppSubmissionData.Builder.createWithMemory(200L, this.rm1).withAppName("app1").withUser("user1").withAcls(null).withQueue("default").withWaitForAppAcceptedState(false).build());
        this.waitFor(this.rm1, HAServiceProtocol.HAServiceState.STANDBY);
        this.waitFor(this.rm2, HAServiceProtocol.HAServiceState.ACTIVE);
        this.rm2.stop();
        this.waitFor(this.rm1, HAServiceProtocol.HAServiceState.ACTIVE);
    }

    @Test
    public void testZKClusterDown() throws Exception {
        this.rm1 = this.startRM(RM1_NODE_ID, HAServiceProtocol.HAServiceState.ACTIVE);
        this.zkCluster.stop();
        this.waitFor(this.rm1, HAServiceProtocol.HAServiceState.STANDBY);
        Collection instanceSpecs = this.zkCluster.getInstances();
        this.zkCluster = new TestingCluster(instanceSpecs);
        this.zkCluster.start();
        this.waitFor(this.rm1, HAServiceProtocol.HAServiceState.ACTIVE);
    }

    @Test
    public void testExpireCurrentZKSession() throws Exception {
        this.rm1 = this.startRM(RM1_NODE_ID, HAServiceProtocol.HAServiceState.ACTIVE);
        CuratorBasedElectorService service = (CuratorBasedElectorService)this.rm1.getRMContext().getLeaderElectorService();
        CuratorZookeeperClient client = service.getCuratorClient().getZookeeperClient();
        KillSession.kill((ZooKeeper)client.getZooKeeper());
        this.waitFor(this.rm1, HAServiceProtocol.HAServiceState.ACTIVE);
    }

    @Test
    public void testRMFailToTransitionToActive() throws Exception {
        this.conf.set("yarn.resourcemanager.ha.id", RM1_NODE_ID);
        final AtomicBoolean throwException = new AtomicBoolean(true);
        Thread launchRM = new Thread(){

            @Override
            public void run() {
                TestLeaderElectorService.this.rm1 = new MockRM(TestLeaderElectorService.this.conf, true){

                    synchronized void transitionToActive() throws Exception {
                        if (throwException.get()) {
                            throw new Exception("Fail to transition to active");
                        }
                        super.transitionToActive();
                    }
                };
                TestLeaderElectorService.this.rm1.init(TestLeaderElectorService.this.conf);
                TestLeaderElectorService.this.rm1.start();
            }
        };
        launchRM.start();
        Thread.sleep(5000L);
        throwException.set(false);
        this.waitFor(this.rm1, HAServiceProtocol.HAServiceState.ACTIVE);
    }

    @Test
    public void testKillZKInstance() throws Exception {
        this.rm1 = this.startRM(RM1_NODE_ID, HAServiceProtocol.HAServiceState.ACTIVE);
        this.rm2 = this.startRM(RM2_NODE_ID, HAServiceProtocol.HAServiceState.STANDBY);
        CuratorBasedElectorService service = (CuratorBasedElectorService)this.rm1.getRMContext().getLeaderElectorService();
        ZooKeeper zkClient = service.getCuratorClient().getZookeeperClient().getZooKeeper();
        InstanceSpec connectionInstance = this.zkCluster.findConnectionInstance(zkClient);
        this.zkCluster.killServer(connectionInstance);
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                try {
                    HAServiceProtocol.HAServiceState rm1State = TestLeaderElectorService.this.rm1.getAdminService().getServiceStatus().getState();
                    HAServiceProtocol.HAServiceState rm2State = TestLeaderElectorService.this.rm2.getAdminService().getServiceStatus().getState();
                    return rm1State.equals((Object)HAServiceProtocol.HAServiceState.ACTIVE) && rm2State.equals((Object)HAServiceProtocol.HAServiceState.STANDBY) || rm1State.equals((Object)HAServiceProtocol.HAServiceState.STANDBY) && rm2State.equals((Object)HAServiceProtocol.HAServiceState.ACTIVE);
                }
                catch (IOException iOException) {
                    return false;
                }
            }
        }, (long)2000L, (long)15000L);
    }

    private MockRM startRM(String rmId, HAServiceProtocol.HAServiceState state) throws Exception {
        YarnConfiguration yarnConf = new YarnConfiguration(this.conf);
        yarnConf.set("yarn.resourcemanager.ha.id", rmId);
        MockRM rm = new MockRM((Configuration)yarnConf, true);
        rm.init((Configuration)yarnConf);
        rm.start();
        this.waitFor(rm, state);
        return rm;
    }

    private void waitFor(final MockRM rm, final HAServiceProtocol.HAServiceState state) throws TimeoutException, InterruptedException {
        GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                try {
                    return rm.getAdminService().getServiceStatus().getState().equals((Object)state);
                }
                catch (IOException iOException) {
                    return false;
                }
            }
        }, (long)2000L, (long)15000L);
    }
}

