package org.apache.drill.yarn.zk;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.drill.exec.coord.DrillServiceInstanceHelper;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.drill.yarn.appMaster.EventContext;
import org.apache.drill.yarn.appMaster.RegistryHandler;
import org.apache.drill.yarn.appMaster.Scheduler;
import org.apache.drill.yarn.appMaster.Task;
import org.apache.drill.yarn.appMaster.TaskLifecycleListener;
import org.apache.drill.yarn.appMaster.TaskSpec;
import org.apache.drill.yarn.zk.ZKRegistry;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/drill/yarn/zk/TestZkRegistry.class */
public class TestZkRegistry {
    private static final String BARNEY_HOST = "barney";
    private static final String WILMA_HOST = "wilma";
    private static final String TEST_HOST = "host";
    private static final String FRED_HOST = "fred";
    public static final int TEST_USER_PORT = 123;
    public static final int TEST_CONTROL_PORT = 456;
    public static final int TEST_DATA_PORT = 789;
    public static final String ZK_ROOT = "test-root";
    public static final String CLUSTER_ID = "test-cluster";

    /* loaded from: input_file:org/apache/drill/yarn/zk/TestZkRegistry$TestDrillbitStatusListener.class */
    private class TestDrillbitStatusListener implements DrillbitStatusListener {
        protected Set<CoordinationProtos.DrillbitEndpoint> added;
        protected Set<CoordinationProtos.DrillbitEndpoint> removed;

        private TestDrillbitStatusListener() {
        }

        public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> set) {
            this.removed = set;
        }

        public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> set) {
            this.added = set;
        }

        public void clear() {
            this.added = null;
            this.removed = null;
        }
    }

    /* loaded from: input_file:org/apache/drill/yarn/zk/TestZkRegistry$TestRegistryHandler.class */
    private static class TestRegistryHandler implements RegistryHandler {
        String reserved;
        String released;
        Task start;
        Task end;

        private TestRegistryHandler() {
        }

        public void clear() {
            this.reserved = null;
            this.released = null;
            this.start = null;
            this.end = null;
        }

        public void reserveHost(String str) {
            Assert.assertNull(this.reserved);
            this.reserved = str;
        }

        public void releaseHost(String str) {
            Assert.assertNull(this.released);
            this.released = str;
        }

        public void startAck(Task task, String str, Object obj) {
            this.start = task;
        }

        public void completionAck(Task task, String str) {
            this.end = task;
        }

        public void registryDown() {
        }
    }

    /* loaded from: input_file:org/apache/drill/yarn/zk/TestZkRegistry$TestTask.class */
    public static class TestTask extends Task {
        private String host;

        public TestTask(String str) {
            super((Scheduler) null, (TaskSpec) null);
            this.host = str;
        }

        public String getHostName() {
            return this.host;
        }

        public void resetTrackingState() {
            this.trackingState = Task.TrackingState.NEW;
        }
    }

    @Test
    public void testFormat() {
        Assert.assertEquals(makeKey(TEST_HOST), ZKClusterCoordinatorDriver.asString(makeEndpoint(TEST_HOST)));
        Assert.assertEquals(makeKey(TEST_HOST), new ZKClusterCoordinatorDriver().setPorts(TEST_USER_PORT, TEST_CONTROL_PORT, TEST_DATA_PORT).toKey(TEST_HOST));
        Assert.assertEquals("fred:31010:31011:31012", new ZKClusterCoordinatorDriver().toKey(FRED_HOST));
    }

    public static String makeKey(String str) {
        return str + ":" + TEST_USER_PORT + ":" + TEST_CONTROL_PORT + ":" + TEST_DATA_PORT;
    }

    @Test
    public void testBasics() throws Exception {
        TestingServer testingServer = new TestingServer();
        Throwable th = null;
        try {
            testingServer.start();
            ZKClusterCoordinatorDriver build = new ZKClusterCoordinatorDriver().setConnect(testingServer.getConnectString(), "drill", "drillbits").build();
            Assert.assertTrue(build.getInitialEndpoints().isEmpty());
            build.close();
            testingServer.stop();
            if (testingServer != null) {
                if (0 == 0) {
                    testingServer.close();
                    return;
                }
                try {
                    testingServer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (testingServer != null) {
                if (0 != 0) {
                    try {
                        testingServer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    testingServer.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCycle() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        String connectString = testingServer.getConnectString();
        CuratorFramework connectToZk = connectToZk(connectString);
        addDrillbit(connectToZk, FRED_HOST);
        ZKClusterCoordinatorDriver build = new ZKClusterCoordinatorDriver().setConnect(connectString, ZK_ROOT, CLUSTER_ID).build();
        List initialEndpoints = build.getInitialEndpoints();
        Assert.assertEquals(1L, initialEndpoints.size());
        Assert.assertEquals(makeKey(FRED_HOST), ZKClusterCoordinatorDriver.asString((CoordinationProtos.DrillbitEndpoint) initialEndpoints.get(0)));
        TestDrillbitStatusListener testDrillbitStatusListener = new TestDrillbitStatusListener();
        build.addDrillbitListener(testDrillbitStatusListener);
        addDrillbit(connectToZk, WILMA_HOST);
        Thread.sleep(50L);
        Assert.assertNull(testDrillbitStatusListener.removed);
        Assert.assertNotNull(testDrillbitStatusListener.added);
        Assert.assertEquals(1L, testDrillbitStatusListener.added.size());
        Iterator<CoordinationProtos.DrillbitEndpoint> it = testDrillbitStatusListener.added.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(makeKey(WILMA_HOST), ZKClusterCoordinatorDriver.asString(it.next()));
        }
        testDrillbitStatusListener.clear();
        removeDrillbit(connectToZk, FRED_HOST);
        Thread.sleep(50L);
        Assert.assertNull(testDrillbitStatusListener.added);
        Assert.assertNotNull(testDrillbitStatusListener.removed);
        Assert.assertEquals(1L, testDrillbitStatusListener.removed.size());
        Iterator<CoordinationProtos.DrillbitEndpoint> it2 = testDrillbitStatusListener.removed.iterator();
        while (it2.hasNext()) {
            Assert.assertEquals(makeKey(FRED_HOST), ZKClusterCoordinatorDriver.asString(it2.next()));
        }
        connectToZk.close();
        build.close();
        testingServer.stop();
        testingServer.close();
    }

    private CoordinationProtos.DrillbitEndpoint makeEndpoint(String str) {
        return CoordinationProtos.DrillbitEndpoint.newBuilder().setAddress(str).setControlPort(TEST_CONTROL_PORT).setDataPort(TEST_DATA_PORT).setUserPort(TEST_USER_PORT).build();
    }

    private void addDrillbit(CuratorFramework curatorFramework, String str) throws Exception {
        curatorFramework.create().forPath("/" + str, DrillServiceInstanceHelper.SERIALIZER.serialize(ServiceInstance.builder().name(CLUSTER_ID).payload(makeEndpoint(str)).build()));
    }

    private void removeDrillbit(CuratorFramework curatorFramework, String str) throws Exception {
        curatorFramework.delete().forPath("/" + str);
    }

    public static CuratorFramework connectToZk(String str) {
        CuratorFramework build = CuratorFrameworkFactory.builder().namespace("test-root/test-cluster").connectString(str).retryPolicy(new RetryNTimes(3, 1000)).build();
        build.start();
        return build;
    }

    @Test
    public void testZKRegistry() throws Exception {
        TestingServer testingServer = new TestingServer();
        testingServer.start();
        String connectString = testingServer.getConnectString();
        CuratorFramework connectToZk = connectToZk(connectString);
        addDrillbit(connectToZk, FRED_HOST);
        ZKClusterCoordinatorDriver build = new ZKClusterCoordinatorDriver().setConnect(connectString, ZK_ROOT, CLUSTER_ID).setPorts(TEST_USER_PORT, TEST_CONTROL_PORT, TEST_DATA_PORT).build();
        ZKRegistry zKRegistry = new ZKRegistry(build);
        TestRegistryHandler testRegistryHandler = new TestRegistryHandler();
        zKRegistry.start(testRegistryHandler);
        Assert.assertEquals(FRED_HOST, testRegistryHandler.reserved);
        List listUnmanagedDrillits = zKRegistry.listUnmanagedDrillits();
        Assert.assertEquals(1L, listUnmanagedDrillits.size());
        String makeKey = makeKey(FRED_HOST);
        Assert.assertEquals(makeKey, listUnmanagedDrillits.get(0));
        Map registryForTesting = zKRegistry.getRegistryForTesting();
        Assert.assertEquals(1L, registryForTesting.size());
        Assert.assertTrue(registryForTesting.containsKey(makeKey));
        ZKRegistry.DrillbitTracker drillbitTracker = (ZKRegistry.DrillbitTracker) registryForTesting.get(makeKey);
        Assert.assertEquals(makeKey, drillbitTracker.key);
        Assert.assertEquals(ZKRegistry.DrillbitTracker.State.UNMANAGED, drillbitTracker.state);
        Assert.assertNull(drillbitTracker.task);
        Assert.assertEquals(makeKey, ZKClusterCoordinatorDriver.asString(drillbitTracker.endpoint));
        Assert.assertEquals(FRED_HOST, testRegistryHandler.reserved);
        TestTask testTask = new TestTask(WILMA_HOST);
        EventContext eventContext = new EventContext(testTask);
        zKRegistry.stateChange(TaskLifecycleListener.Event.CREATED, eventContext);
        Assert.assertEquals(1L, zKRegistry.getRegistryForTesting().size());
        zKRegistry.stateChange(TaskLifecycleListener.Event.ALLOCATED, eventContext);
        Assert.assertEquals(2L, zKRegistry.getRegistryForTesting().size());
        String makeKey2 = makeKey(WILMA_HOST);
        ZKRegistry.DrillbitTracker drillbitTracker2 = (ZKRegistry.DrillbitTracker) zKRegistry.getRegistryForTesting().get(makeKey2);
        Assert.assertNotNull(drillbitTracker2);
        Assert.assertEquals(testTask, drillbitTracker2.task);
        Assert.assertNull(drillbitTracker2.endpoint);
        Assert.assertEquals(makeKey2, drillbitTracker2.key);
        Assert.assertEquals(ZKRegistry.DrillbitTracker.State.NEW, drillbitTracker2.state);
        testRegistryHandler.clear();
        addDrillbit(connectToZk, WILMA_HOST);
        Thread.sleep(100L);
        Assert.assertEquals(testTask, testRegistryHandler.start);
        Assert.assertEquals(ZKRegistry.DrillbitTracker.State.REGISTERED, drillbitTracker2.state);
        Assert.assertEquals(testRegistryHandler.start, testTask);
        TestTask testTask2 = new TestTask(BARNEY_HOST);
        EventContext eventContext2 = new EventContext(testTask2);
        zKRegistry.stateChange(TaskLifecycleListener.Event.CREATED, eventContext2);
        testRegistryHandler.clear();
        addDrillbit(connectToZk, BARNEY_HOST);
        Thread.sleep(100L);
        Assert.assertEquals(BARNEY_HOST, testRegistryHandler.reserved);
        String makeKey3 = makeKey(BARNEY_HOST);
        ZKRegistry.DrillbitTracker drillbitTracker3 = (ZKRegistry.DrillbitTracker) zKRegistry.getRegistryForTesting().get(makeKey3);
        Assert.assertNotNull(drillbitTracker3);
        Assert.assertEquals(ZKRegistry.DrillbitTracker.State.UNMANAGED, drillbitTracker3.state);
        Assert.assertNull(drillbitTracker3.task);
        Assert.assertEquals(2L, zKRegistry.listUnmanagedDrillits().size());
        testRegistryHandler.clear();
        zKRegistry.stateChange(TaskLifecycleListener.Event.ALLOCATED, eventContext2);
        Assert.assertEquals(ZKRegistry.DrillbitTracker.State.REGISTERED, drillbitTracker3.state);
        Assert.assertEquals(testRegistryHandler.start, testTask2);
        Assert.assertEquals(testTask2, drillbitTracker3.task);
        Assert.assertEquals(1L, zKRegistry.listUnmanagedDrillits().size());
        testRegistryHandler.clear();
        removeDrillbit(connectToZk, BARNEY_HOST);
        Thread.sleep(100L);
        Assert.assertEquals(testTask2, testRegistryHandler.end);
        Assert.assertEquals(ZKRegistry.DrillbitTracker.State.DEREGISTERED, drillbitTracker3.state);
        testRegistryHandler.clear();
        addDrillbit(connectToZk, BARNEY_HOST);
        Thread.sleep(100L);
        Assert.assertEquals(testTask2, testRegistryHandler.start);
        Assert.assertEquals(ZKRegistry.DrillbitTracker.State.REGISTERED, drillbitTracker3.state);
        testRegistryHandler.clear();
        removeDrillbit(connectToZk, BARNEY_HOST);
        Thread.sleep(100L);
        Assert.assertNotNull(zKRegistry.getRegistryForTesting().get(makeKey3));
        Assert.assertEquals(testTask2, testRegistryHandler.end);
        testRegistryHandler.clear();
        zKRegistry.stateChange(TaskLifecycleListener.Event.ENDED, eventContext2);
        Assert.assertNull(testRegistryHandler.end);
        Assert.assertNull(zKRegistry.getRegistryForTesting().get(makeKey3));
        testRegistryHandler.clear();
        removeDrillbit(connectToZk, FRED_HOST);
        Thread.sleep(100L);
        Assert.assertNull(zKRegistry.getRegistryForTesting().get(makeKey));
        Assert.assertNull(testRegistryHandler.end);
        Assert.assertEquals(FRED_HOST, testRegistryHandler.released);
        testRegistryHandler.clear();
        removeDrillbit(connectToZk, WILMA_HOST);
        Thread.sleep(100L);
        Assert.assertEquals(testTask, testRegistryHandler.end);
        Assert.assertNull(testRegistryHandler.released);
        Assert.assertEquals(ZKRegistry.DrillbitTracker.State.DEREGISTERED, drillbitTracker2.state);
        Assert.assertNotNull(zKRegistry.getRegistryForTesting().get(makeKey2));
        testRegistryHandler.clear();
        zKRegistry.stateChange(TaskLifecycleListener.Event.ENDED, new EventContext(testTask));
        Assert.assertNull(zKRegistry.getRegistryForTesting().get(makeKey2));
        Assert.assertNull(testRegistryHandler.released);
        Assert.assertNull(testRegistryHandler.end);
        Assert.assertTrue(zKRegistry.getRegistryForTesting().isEmpty());
        connectToZk.close();
        build.close();
        testingServer.stop();
        testingServer.close();
    }
}
