/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.yarn.zk;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.drill.exec.coord.DrillServiceInstanceHelper;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.drill.test.BaseTest;
import org.apache.drill.yarn.appMaster.EventContext;
import org.apache.drill.yarn.appMaster.RegistryHandler;
import org.apache.drill.yarn.appMaster.Task;
import org.apache.drill.yarn.appMaster.TaskLifecycleListener;
import org.apache.drill.yarn.zk.ZKClusterCoordinatorDriver;
import org.apache.drill.yarn.zk.ZKRegistry;
import org.junit.Assert;
import org.junit.Test;

public class TestZkRegistry
extends BaseTest {
    private static final String BARNEY_HOST = "barney";
    private static final String WILMA_HOST = "wilma";
    private static final String TEST_HOST = "host";
    private static final String FRED_HOST = "fred";
    public static final int TEST_USER_PORT = 123;
    public static final int TEST_CONTROL_PORT = 456;
    public static final int TEST_DATA_PORT = 789;
    public static final String ZK_ROOT = "test-root";
    public static final String CLUSTER_ID = "test-cluster";

    @Test
    public void testFormat() {
        CoordinationProtos.DrillbitEndpoint dbe = this.makeEndpoint(TEST_HOST);
        Assert.assertEquals((Object)TestZkRegistry.makeKey(TEST_HOST), (Object)ZKClusterCoordinatorDriver.asString((CoordinationProtos.DrillbitEndpoint)dbe));
        ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver().setPorts(123, 456, 789);
        Assert.assertEquals((Object)TestZkRegistry.makeKey(TEST_HOST), (Object)driver.toKey(TEST_HOST));
        driver = new ZKClusterCoordinatorDriver();
        Assert.assertEquals((Object)"fred:31010:31011:31012", (Object)driver.toKey(FRED_HOST));
    }

    public static String makeKey(String host) {
        return host + ":" + 123 + ":" + 456 + ":" + 789;
    }

    @Test
    public void testBasics() throws Exception {
        try (TestingServer server = new TestingServer();){
            server.start();
            String connStr = server.getConnectString();
            ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver().setConnect(connStr, "drill", "drillbits").build();
            Assert.assertTrue((boolean)driver.getInitialEndpoints().isEmpty());
            driver.close();
            server.stop();
        }
    }

    @Test
    public void testCycle() throws Exception {
        TestingServer server = new TestingServer();
        server.start();
        String connStr = server.getConnectString();
        CuratorFramework probeZk = TestZkRegistry.connectToZk(connStr);
        this.addDrillbit(probeZk, FRED_HOST);
        ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver().setConnect(connStr, ZK_ROOT, CLUSTER_ID).build();
        List bits = driver.getInitialEndpoints();
        Assert.assertEquals((long)1L, (long)bits.size());
        Assert.assertEquals((Object)TestZkRegistry.makeKey(FRED_HOST), (Object)ZKClusterCoordinatorDriver.asString((CoordinationProtos.DrillbitEndpoint)((CoordinationProtos.DrillbitEndpoint)bits.get(0))));
        TestDrillbitStatusListener listener = new TestDrillbitStatusListener();
        driver.addDrillbitListener((DrillbitStatusListener)listener);
        this.addDrillbit(probeZk, WILMA_HOST);
        Thread.sleep(50L);
        Assert.assertNull(listener.removed);
        Assert.assertNotNull(listener.added);
        Assert.assertEquals((long)1L, (long)listener.added.size());
        for (CoordinationProtos.DrillbitEndpoint dbe : listener.added) {
            Assert.assertEquals((Object)TestZkRegistry.makeKey(WILMA_HOST), (Object)ZKClusterCoordinatorDriver.asString((CoordinationProtos.DrillbitEndpoint)dbe));
        }
        listener.clear();
        this.removeDrillbit(probeZk, FRED_HOST);
        Thread.sleep(50L);
        Assert.assertNull(listener.added);
        Assert.assertNotNull(listener.removed);
        Assert.assertEquals((long)1L, (long)listener.removed.size());
        for (CoordinationProtos.DrillbitEndpoint dbe : listener.removed) {
            Assert.assertEquals((Object)TestZkRegistry.makeKey(FRED_HOST), (Object)ZKClusterCoordinatorDriver.asString((CoordinationProtos.DrillbitEndpoint)dbe));
        }
        probeZk.close();
        driver.close();
        server.stop();
        server.close();
    }

    private CoordinationProtos.DrillbitEndpoint makeEndpoint(String host) {
        return CoordinationProtos.DrillbitEndpoint.newBuilder().setAddress(host).setControlPort(456).setDataPort(789).setUserPort(123).build();
    }

    private void addDrillbit(CuratorFramework zk, String host) throws Exception {
        CoordinationProtos.DrillbitEndpoint dbe = this.makeEndpoint(host);
        ServiceInstance si = ServiceInstance.builder().name(CLUSTER_ID).payload((Object)dbe).build();
        byte[] data = DrillServiceInstanceHelper.SERIALIZER.serialize(si);
        zk.create().forPath("/" + host, data);
    }

    private void removeDrillbit(CuratorFramework zk, String host) throws Exception {
        zk.delete().forPath("/" + host);
    }

    public static CuratorFramework connectToZk(String connectString) {
        CuratorFramework client = CuratorFrameworkFactory.builder().namespace("test-root/test-cluster").connectString(connectString).retryPolicy((RetryPolicy)new RetryNTimes(3, 1000)).build();
        client.start();
        return client;
    }

    @Test
    public void testZKRegistry() throws Exception {
        TestingServer server = new TestingServer();
        server.start();
        String connStr = server.getConnectString();
        CuratorFramework probeZk = TestZkRegistry.connectToZk(connStr);
        this.addDrillbit(probeZk, FRED_HOST);
        ZKClusterCoordinatorDriver driver = new ZKClusterCoordinatorDriver().setConnect(connStr, ZK_ROOT, CLUSTER_ID).setPorts(123, 456, 789).build();
        ZKRegistry registry = new ZKRegistry(driver);
        TestRegistryHandler handler = new TestRegistryHandler();
        registry.start((RegistryHandler)handler);
        Assert.assertEquals((Object)FRED_HOST, (Object)handler.reserved);
        List unmanaged = registry.listUnmanagedDrillits();
        Assert.assertEquals((long)1L, (long)unmanaged.size());
        String fredsKey = TestZkRegistry.makeKey(FRED_HOST);
        Assert.assertEquals((Object)fredsKey, unmanaged.get(0));
        Map trackers = registry.getRegistryForTesting();
        Assert.assertEquals((long)1L, (long)trackers.size());
        Assert.assertTrue((boolean)trackers.containsKey(fredsKey));
        ZKRegistry.DrillbitTracker fredsTracker = (ZKRegistry.DrillbitTracker)trackers.get(fredsKey);
        Assert.assertEquals((Object)fredsKey, (Object)fredsTracker.key);
        Assert.assertEquals((Object)ZKRegistry.DrillbitTracker.State.UNMANAGED, (Object)fredsTracker.state);
        Assert.assertNull((Object)fredsTracker.task);
        Assert.assertEquals((Object)fredsKey, (Object)ZKClusterCoordinatorDriver.asString((CoordinationProtos.DrillbitEndpoint)fredsTracker.endpoint));
        Assert.assertEquals((Object)FRED_HOST, (Object)handler.reserved);
        TestTask wilmasTask = new TestTask(WILMA_HOST);
        EventContext context = new EventContext((Task)wilmasTask);
        registry.stateChange(TaskLifecycleListener.Event.CREATED, context);
        Assert.assertEquals((long)1L, (long)registry.getRegistryForTesting().size());
        registry.stateChange(TaskLifecycleListener.Event.ALLOCATED, context);
        Assert.assertEquals((long)2L, (long)registry.getRegistryForTesting().size());
        String wilmasKey = TestZkRegistry.makeKey(WILMA_HOST);
        ZKRegistry.DrillbitTracker wilmasTracker = (ZKRegistry.DrillbitTracker)registry.getRegistryForTesting().get(wilmasKey);
        Assert.assertNotNull((Object)wilmasTracker);
        Assert.assertEquals((Object)((Object)wilmasTask), (Object)wilmasTracker.task);
        Assert.assertNull((Object)wilmasTracker.endpoint);
        Assert.assertEquals((Object)wilmasKey, (Object)wilmasTracker.key);
        Assert.assertEquals((Object)ZKRegistry.DrillbitTracker.State.NEW, (Object)wilmasTracker.state);
        handler.clear();
        this.addDrillbit(probeZk, WILMA_HOST);
        Thread.sleep(100L);
        Assert.assertEquals((Object)((Object)wilmasTask), (Object)handler.start);
        Assert.assertEquals((Object)ZKRegistry.DrillbitTracker.State.REGISTERED, (Object)wilmasTracker.state);
        Assert.assertEquals((Object)handler.start, (Object)((Object)wilmasTask));
        TestTask barneysTask = new TestTask(BARNEY_HOST);
        context = new EventContext((Task)barneysTask);
        registry.stateChange(TaskLifecycleListener.Event.CREATED, context);
        handler.clear();
        this.addDrillbit(probeZk, BARNEY_HOST);
        Thread.sleep(100L);
        Assert.assertEquals((Object)BARNEY_HOST, (Object)handler.reserved);
        String barneysKey = TestZkRegistry.makeKey(BARNEY_HOST);
        ZKRegistry.DrillbitTracker barneysTracker = (ZKRegistry.DrillbitTracker)registry.getRegistryForTesting().get(barneysKey);
        Assert.assertNotNull((Object)barneysTracker);
        Assert.assertEquals((Object)ZKRegistry.DrillbitTracker.State.UNMANAGED, (Object)barneysTracker.state);
        Assert.assertNull((Object)barneysTracker.task);
        Assert.assertEquals((long)2L, (long)registry.listUnmanagedDrillits().size());
        handler.clear();
        registry.stateChange(TaskLifecycleListener.Event.ALLOCATED, context);
        Assert.assertEquals((Object)ZKRegistry.DrillbitTracker.State.REGISTERED, (Object)barneysTracker.state);
        Assert.assertEquals((Object)handler.start, (Object)((Object)barneysTask));
        Assert.assertEquals((Object)((Object)barneysTask), (Object)barneysTracker.task);
        Assert.assertEquals((long)1L, (long)registry.listUnmanagedDrillits().size());
        handler.clear();
        this.removeDrillbit(probeZk, BARNEY_HOST);
        Thread.sleep(100L);
        Assert.assertEquals((Object)((Object)barneysTask), (Object)handler.end);
        Assert.assertEquals((Object)ZKRegistry.DrillbitTracker.State.DEREGISTERED, (Object)barneysTracker.state);
        handler.clear();
        this.addDrillbit(probeZk, BARNEY_HOST);
        Thread.sleep(100L);
        Assert.assertEquals((Object)((Object)barneysTask), (Object)handler.start);
        Assert.assertEquals((Object)ZKRegistry.DrillbitTracker.State.REGISTERED, (Object)barneysTracker.state);
        handler.clear();
        this.removeDrillbit(probeZk, BARNEY_HOST);
        Thread.sleep(100L);
        Assert.assertNotNull(registry.getRegistryForTesting().get(barneysKey));
        Assert.assertEquals((Object)((Object)barneysTask), (Object)handler.end);
        handler.clear();
        registry.stateChange(TaskLifecycleListener.Event.ENDED, context);
        Assert.assertNull((Object)handler.end);
        Assert.assertNull(registry.getRegistryForTesting().get(barneysKey));
        handler.clear();
        this.removeDrillbit(probeZk, FRED_HOST);
        Thread.sleep(100L);
        Assert.assertNull(registry.getRegistryForTesting().get(fredsKey));
        Assert.assertNull((Object)handler.end);
        Assert.assertEquals((Object)FRED_HOST, (Object)handler.released);
        handler.clear();
        this.removeDrillbit(probeZk, WILMA_HOST);
        Thread.sleep(100L);
        Assert.assertEquals((Object)((Object)wilmasTask), (Object)handler.end);
        Assert.assertNull((Object)handler.released);
        Assert.assertEquals((Object)ZKRegistry.DrillbitTracker.State.DEREGISTERED, (Object)wilmasTracker.state);
        Assert.assertNotNull(registry.getRegistryForTesting().get(wilmasKey));
        handler.clear();
        context = new EventContext((Task)wilmasTask);
        registry.stateChange(TaskLifecycleListener.Event.ENDED, context);
        Assert.assertNull(registry.getRegistryForTesting().get(wilmasKey));
        Assert.assertNull((Object)handler.released);
        Assert.assertNull((Object)handler.end);
        Assert.assertTrue((boolean)registry.getRegistryForTesting().isEmpty());
        probeZk.close();
        driver.close();
        server.stop();
        server.close();
    }

    public static class TestTask
    extends Task {
        private String host;

        public TestTask(String host) {
            super(null, null);
            this.host = host;
        }

        public String getHostName() {
            return this.host;
        }

        public void resetTrackingState() {
            this.trackingState = Task.TrackingState.NEW;
        }
    }

    private static class TestRegistryHandler
    implements RegistryHandler {
        String reserved;
        String released;
        Task start;
        Task end;

        private TestRegistryHandler() {
        }

        public void clear() {
            this.reserved = null;
            this.released = null;
            this.start = null;
            this.end = null;
        }

        public void reserveHost(String hostName) {
            Assert.assertNull((Object)this.reserved);
            this.reserved = hostName;
        }

        public void releaseHost(String hostName) {
            Assert.assertNull((Object)this.released);
            this.released = hostName;
        }

        public void startAck(Task task, String propertyKey, Object value) {
            this.start = task;
        }

        public void completionAck(Task task, String endpointProperty) {
            this.end = task;
        }

        public void registryDown() {
        }
    }

    private class TestDrillbitStatusListener
    implements DrillbitStatusListener {
        protected Set<CoordinationProtos.DrillbitEndpoint> added;
        protected Set<CoordinationProtos.DrillbitEndpoint> removed;

        private TestDrillbitStatusListener() {
        }

        public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
            this.removed = unregisteredDrillbits;
        }

        public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
            this.added = registeredDrillbits;
        }

        public void clear() {
            this.added = null;
            this.removed = null;
        }
    }
}

