package org.apache.drill.yarn.zk;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
import org.apache.drill.yarn.appMaster.AMWrapperException;
import org.apache.drill.yarn.appMaster.EventContext;
import org.apache.drill.yarn.appMaster.Pollable;
import org.apache.drill.yarn.appMaster.RegistryHandler;
import org.apache.drill.yarn.appMaster.Task;
import org.apache.drill.yarn.appMaster.TaskLifecycleListener;

/* loaded from: input_file:org/apache/drill/yarn/zk/ZKRegistry.class */
public class ZKRegistry implements TaskLifecycleListener, DrillbitStatusListener, Pollable {
    public static final String CONTROLLER_PROPERTY = "zk";
    public static final int UPDATE_PERIOD_MS = 20000;
    public static final String ENDPOINT_PROPERTY = "endpoint";
    private static final Log LOG;
    private Map<String, DrillbitTracker> registry = new HashMap();
    private ZKClusterCoordinatorDriver zkDriver;
    private RegistryHandler registryHandler;
    private long lastUpdateTime;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/drill/yarn/zk/ZKRegistry$AckEvent.class */
    public static class AckEvent {
        Task task;
        CoordinationProtos.DrillbitEndpoint endpoint;

        public AckEvent(Task task, CoordinationProtos.DrillbitEndpoint drillbitEndpoint) {
            this.task = task;
            this.endpoint = drillbitEndpoint;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/drill/yarn/zk/ZKRegistry$DrillbitTracker.class */
    public static class DrillbitTracker {
        protected final String key;
        protected State state;
        protected Task task;
        protected CoordinationProtos.DrillbitEndpoint endpoint;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* loaded from: input_file:org/apache/drill/yarn/zk/ZKRegistry$DrillbitTracker$State.class */
        public enum State {
            UNMANAGED,
            NEW,
            REGISTERED,
            DEREGISTERED
        }

        public DrillbitTracker(String str, CoordinationProtos.DrillbitEndpoint drillbitEndpoint) {
            this.key = str;
            this.state = State.UNMANAGED;
            this.endpoint = drillbitEndpoint;
        }

        public DrillbitTracker(String str, Task task) {
            this.key = str;
            this.task = task;
            this.state = State.NEW;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void becomeRegistered() {
            this.state = State.REGISTERED;
        }

        public void becomeUnregistered() {
            if (!$assertionsDisabled && this.state != State.REGISTERED) {
                throw new AssertionError();
            }
            this.state = State.DEREGISTERED;
            this.endpoint = null;
        }

        static {
            $assertionsDisabled = !ZKRegistry.class.desiredAssertionStatus();
        }
    }

    public ZKRegistry(ZKClusterCoordinatorDriver zKClusterCoordinatorDriver) {
        this.zkDriver = zKClusterCoordinatorDriver;
    }

    public void start(RegistryHandler registryHandler) {
        this.registryHandler = registryHandler;
        try {
            this.zkDriver.build();
            for (CoordinationProtos.DrillbitEndpoint drillbitEndpoint : this.zkDriver.getInitialEndpoints()) {
                String key = toKey(drillbitEndpoint);
                this.registry.put(key, new DrillbitTracker(key, drillbitEndpoint));
                registryHandler.reserveHost(drillbitEndpoint.getAddress());
                LOG.warn("Host " + drillbitEndpoint.getAddress() + " already running a Drillbit outside of YARN.");
            }
            this.zkDriver.addDrillbitListener(this);
        } catch (ZKRuntimeException e) {
            LOG.error("Failed to start ZK monitoring", e);
            throw new AMWrapperException("Failed to start ZK monitoring", e);
        }
    }

    private String toKey(CoordinationProtos.DrillbitEndpoint drillbitEndpoint) {
        return ZKClusterCoordinatorDriver.asString(drillbitEndpoint);
    }

    private String toKey(Task task) {
        return this.zkDriver.toKey(task.getHostName());
    }

    public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> set) {
        for (AckEvent ackEvent : registerDrillbits(set)) {
            if (ackEvent.task == null) {
                this.registryHandler.reserveHost(ackEvent.endpoint.getAddress());
            } else {
                this.registryHandler.startAck(ackEvent.task, ENDPOINT_PROPERTY, ackEvent.endpoint);
            }
        }
    }

    private synchronized List<AckEvent> registerDrillbits(Set<CoordinationProtos.DrillbitEndpoint> set) {
        ArrayList arrayList = new ArrayList();
        Iterator<CoordinationProtos.DrillbitEndpoint> it = set.iterator();
        while (it.hasNext()) {
            AckEvent drillbitRegistered = drillbitRegistered(it.next());
            if (drillbitRegistered != null) {
                arrayList.add(drillbitRegistered);
            }
        }
        return arrayList;
    }

    private AckEvent drillbitRegistered(CoordinationProtos.DrillbitEndpoint drillbitEndpoint) {
        String key = toKey(drillbitEndpoint);
        DrillbitTracker drillbitTracker = this.registry.get(key);
        if (drillbitTracker == null) {
            LOG.info("Registration of unmanaged drillbit: " + key);
            this.registry.put(key, new DrillbitTracker(key, drillbitEndpoint));
            return new AckEvent(null, drillbitEndpoint);
        }
        if (drillbitTracker.state == DrillbitTracker.State.REGISTERED) {
            LOG.info("Re-registration of known drillbit: " + key);
            return null;
        }
        LOG.info("Drillbit registered: " + key + ", task: " + drillbitTracker.task.toString());
        drillbitTracker.endpoint = drillbitEndpoint;
        drillbitTracker.becomeRegistered();
        return new AckEvent(drillbitTracker.task, drillbitEndpoint);
    }

    public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> set) {
        Iterator<AckEvent> it = unregisterDrillbits(set).iterator();
        while (it.hasNext()) {
            this.registryHandler.completionAck(it.next().task, ENDPOINT_PROPERTY);
        }
    }

    private synchronized List<AckEvent> unregisterDrillbits(Set<CoordinationProtos.DrillbitEndpoint> set) {
        ArrayList arrayList = new ArrayList();
        Iterator<CoordinationProtos.DrillbitEndpoint> it = set.iterator();
        while (it.hasNext()) {
            AckEvent drillbitUnregistered = drillbitUnregistered(it.next());
            if (drillbitUnregistered != null) {
                arrayList.add(drillbitUnregistered);
            }
        }
        return arrayList;
    }

    private AckEvent drillbitUnregistered(CoordinationProtos.DrillbitEndpoint drillbitEndpoint) {
        String key = toKey(drillbitEndpoint);
        DrillbitTracker drillbitTracker = this.registry.get(key);
        if (!$assertionsDisabled && drillbitTracker == null) {
            throw new AssertionError();
        }
        if (drillbitTracker == null) {
            LOG.error("Internal error - Unexpected drillbit unregistration: " + key);
            return null;
        }
        if (drillbitTracker.state != DrillbitTracker.State.UNMANAGED) {
            LOG.info("Drillbit unregistered: " + key + ", task: " + drillbitTracker.task.toString());
            drillbitTracker.becomeUnregistered();
            return new AckEvent(drillbitTracker.task, drillbitEndpoint);
        }
        if (!$assertionsDisabled && drillbitTracker.task != null) {
            throw new AssertionError();
        }
        LOG.info("Unmanaged drillbit unregistered: " + key);
        this.registry.remove(key);
        this.registryHandler.releaseHost(drillbitEndpoint.getAddress());
        return null;
    }

    @Override // org.apache.drill.yarn.appMaster.TaskLifecycleListener
    public synchronized void stateChange(TaskLifecycleListener.Event event, EventContext eventContext) {
        switch (event) {
            case ALLOCATED:
                taskCreated(eventContext.task);
                return;
            case ENDED:
                taskEnded(eventContext.task);
                return;
            default:
                return;
        }
    }

    private void taskCreated(Task task) {
        String key = toKey(task);
        DrillbitTracker drillbitTracker = this.registry.get(key);
        if (drillbitTracker == null) {
            this.registry.put(key, new DrillbitTracker(key, task));
            return;
        }
        if (drillbitTracker.state != DrillbitTracker.State.UNMANAGED) {
            LOG.error(task.getLabel() + " - Drillbit registry in wrong state " + drillbitTracker.state + " for new task: " + key);
            return;
        }
        LOG.info("Unmanaged drillbit became managed: " + key);
        drillbitTracker.task = task;
        drillbitTracker.becomeRegistered();
        this.registryHandler.startAck(task, ENDPOINT_PROPERTY, drillbitTracker.endpoint);
    }

    public synchronized boolean isRegistered(Task task) {
        DrillbitTracker drillbitTracker = this.registry.get(toKey(task));
        return drillbitTracker != null && drillbitTracker.state == DrillbitTracker.State.REGISTERED;
    }

    private void taskEnded(Task task) {
        if (task.getHostName() == null) {
            return;
        }
        String key = toKey(task);
        DrillbitTracker drillbitTracker = this.registry.get(key);
        if (!$assertionsDisabled && drillbitTracker == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && drillbitTracker.state != DrillbitTracker.State.DEREGISTERED) {
            throw new AssertionError();
        }
        this.registry.remove(key);
    }

    @Override // org.apache.drill.yarn.appMaster.Pollable
    public void tick(long j) {
        if (this.lastUpdateTime + 20000 < j) {
            return;
        }
        this.lastUpdateTime = j;
        if (this.zkDriver.hasFailed()) {
            LOG.error("ZooKeeper connection lost, failing after " + ((int) ((this.zkDriver.getLostConnectionDurationMs() + 500) / 1000)) + " seconds.");
            this.registryHandler.registryDown();
        }
    }

    public void finish(RegistryHandler registryHandler) {
        this.zkDriver.removeDrillbitListener(this);
        this.zkDriver.close();
    }

    public synchronized List<String> listUnmanagedDrillits() {
        ArrayList arrayList = new ArrayList();
        for (DrillbitTracker drillbitTracker : this.registry.values()) {
            if (drillbitTracker.state == DrillbitTracker.State.UNMANAGED) {
                arrayList.add(drillbitTracker.key);
            }
        }
        return arrayList;
    }

    protected Map<String, DrillbitTracker> getRegistryForTesting() {
        return this.registry;
    }

    static {
        $assertionsDisabled = !ZKRegistry.class.desiredAssertionStatus();
        LOG = LogFactory.getLog(ZKRegistry.class);
    }
}
