/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.highavailability;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.flink.kubernetes.configuration.KubernetesLeaderElectionConfiguration;
import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient;
import org.apache.flink.kubernetes.kubeclient.KubernetesConfigMapSharedWatcher;
import org.apache.flink.kubernetes.kubeclient.KubernetesSharedWatcher;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMap;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesException;
import org.apache.flink.kubernetes.kubeclient.resources.KubernetesLeaderElector;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
import org.apache.flink.runtime.leaderelection.LeaderElectionDriver;
import org.apache.flink.runtime.leaderelection.LeaderElectionException;
import org.apache.flink.runtime.leaderelection.LeaderInformation;
import org.apache.flink.runtime.leaderelection.LeaderInformationRegister;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KubernetesLeaderElectionDriver
implements LeaderElectionDriver {
    private static final Logger LOG = LoggerFactory.getLogger(KubernetesLeaderElectionDriver.class);
    private final FlinkKubeClient kubeClient;
    private final String configMapName;
    private final String lockIdentity;
    private final LeaderElectionDriver.Listener leaderElectionListener;
    private final KubernetesLeaderElector leaderElector;
    private final KubernetesSharedWatcher.Watch kubernetesWatch;
    private final AtomicBoolean running = new AtomicBoolean(true);

    public KubernetesLeaderElectionDriver(KubernetesLeaderElectionConfiguration leaderElectionConfiguration, FlinkKubeClient kubeClient, LeaderElectionDriver.Listener leaderElectionListener, KubernetesConfigMapSharedWatcher configMapSharedWatcher, Executor watchExecutor) {
        Preconditions.checkNotNull((Object)leaderElectionConfiguration);
        this.kubeClient = (FlinkKubeClient)Preconditions.checkNotNull((Object)kubeClient);
        this.leaderElectionListener = (LeaderElectionDriver.Listener)Preconditions.checkNotNull((Object)leaderElectionListener);
        Preconditions.checkNotNull((Object)configMapSharedWatcher);
        Preconditions.checkNotNull((Object)watchExecutor);
        this.configMapName = leaderElectionConfiguration.getConfigMapName();
        this.lockIdentity = leaderElectionConfiguration.getLockIdentity();
        this.leaderElector = kubeClient.createLeaderElector(leaderElectionConfiguration, new LeaderCallbackHandlerImpl());
        this.kubernetesWatch = configMapSharedWatcher.watch(this.configMapName, new ConfigMapCallbackHandlerImpl(), watchExecutor);
        this.leaderElector.run();
        LOG.debug("Starting the {} for config map {}.", (Object)this.getClass().getSimpleName(), (Object)this.configMapName);
    }

    public void close() throws Exception {
        if (this.running.compareAndSet(true, false)) {
            LOG.info("Closing {}.", (Object)this);
            this.leaderElector.stop();
            this.kubernetesWatch.close();
        }
    }

    public boolean hasLeadership() {
        Preconditions.checkState((boolean)this.running.get());
        Optional<KubernetesConfigMap> optionalConfigMap = this.kubeClient.getConfigMap(this.configMapName);
        if (optionalConfigMap.isPresent()) {
            return KubernetesLeaderElector.hasLeadership(optionalConfigMap.get(), this.lockIdentity);
        }
        this.leaderElectionListener.onError((Throwable)((Object)new KubernetesException(String.format("ConfigMap %s does not exist. This indicates that somebody has interfered with Flink's operation.", this.configMapName))));
        return false;
    }

    public void publishLeaderInformation(String componentId, LeaderInformation leaderInformation) {
        Preconditions.checkState((boolean)this.running.get());
        try {
            this.kubeClient.checkAndUpdateConfigMap(this.configMapName, this.updateConfigMapWithLeaderInformation(componentId, leaderInformation)).get();
        }
        catch (InterruptedException | ExecutionException e) {
            this.leaderElectionListener.onError((Throwable)e);
        }
        LOG.debug("Successfully wrote leader information {} for leader {} into the config map {}.", new Object[]{leaderInformation, componentId, this.configMapName});
    }

    public void deleteLeaderInformation(String componentId) {
        this.publishLeaderInformation(componentId, LeaderInformation.empty());
    }

    private Function<KubernetesConfigMap, Optional<KubernetesConfigMap>> updateConfigMapWithLeaderInformation(String leaderName, LeaderInformation leaderInformation) {
        String configMapDataKey = KubernetesUtils.createSingleLeaderKey(leaderName);
        return kubernetesConfigMap -> {
            if (KubernetesLeaderElector.hasLeadership(kubernetesConfigMap, this.lockIdentity)) {
                Map<String, String> data = kubernetesConfigMap.getData();
                if (leaderInformation.isEmpty()) {
                    data.remove(configMapDataKey);
                } else {
                    data.put(configMapDataKey, KubernetesUtils.encodeLeaderInformation(leaderInformation));
                }
                return Optional.of(kubernetesConfigMap);
            }
            return Optional.empty();
        };
    }

    private static LeaderInformationRegister extractLeaderInformation(KubernetesConfigMap configMap) {
        Map<String, String> data = configMap.getData();
        HashMap<String, LeaderInformation> extractedLeaderInformation = new HashMap<String, LeaderInformation>();
        for (Map.Entry<String, String> keyValuePair : data.entrySet()) {
            String key = keyValuePair.getKey();
            if (!KubernetesUtils.isSingleLeaderKey(key)) continue;
            String leaderName = KubernetesUtils.extractLeaderName(key);
            LeaderInformation leaderInformation = KubernetesUtils.parseLeaderInformationSafely(keyValuePair.getValue()).orElse(LeaderInformation.empty());
            extractedLeaderInformation.put(leaderName, leaderInformation);
        }
        return new LeaderInformationRegister(extractedLeaderInformation);
    }

    public String toString() {
        return String.format("%s{configMapName='%s'}", this.getClass().getSimpleName(), this.configMapName);
    }

    private class LeaderCallbackHandlerImpl
    extends KubernetesLeaderElector.LeaderCallbackHandler {
        private LeaderCallbackHandlerImpl() {
        }

        @Override
        public void isLeader() {
            KubernetesLeaderElectionDriver.this.leaderElectionListener.onGrantLeadership(UUID.randomUUID());
        }

        @Override
        public void notLeader() {
            KubernetesLeaderElectionDriver.this.leaderElectionListener.onRevokeLeadership();
            KubernetesLeaderElectionDriver.this.leaderElector.run();
        }
    }

    private class ConfigMapCallbackHandlerImpl
    implements FlinkKubeClient.WatchCallbackHandler<KubernetesConfigMap> {
        private ConfigMapCallbackHandlerImpl() {
        }

        @Override
        public void onAdded(List<KubernetesConfigMap> resources) {
        }

        @Override
        public void onModified(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.getOnlyConfigMap(configMaps, KubernetesLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(configMap, KubernetesLeaderElectionDriver.this.lockIdentity)) {
                KubernetesLeaderElectionDriver.this.leaderElectionListener.onLeaderInformationChange(KubernetesLeaderElectionDriver.extractLeaderInformation(configMap));
            }
        }

        @Override
        public void onDeleted(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.getOnlyConfigMap(configMaps, KubernetesLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(configMap, KubernetesLeaderElectionDriver.this.lockIdentity)) {
                KubernetesLeaderElectionDriver.this.leaderElectionListener.onError((Throwable)new LeaderElectionException(String.format("ConfigMap %s has been deleted externally.", KubernetesLeaderElectionDriver.this.configMapName)));
            }
        }

        @Override
        public void onError(List<KubernetesConfigMap> configMaps) {
            KubernetesConfigMap configMap = KubernetesUtils.getOnlyConfigMap(configMaps, KubernetesLeaderElectionDriver.this.configMapName);
            if (KubernetesLeaderElector.hasLeadership(configMap, KubernetesLeaderElectionDriver.this.lockIdentity)) {
                KubernetesLeaderElectionDriver.this.leaderElectionListener.onError((Throwable)new LeaderElectionException(String.format("Error while watching the ConfigMap %s.", KubernetesLeaderElectionDriver.this.configMapName)));
            }
        }

        @Override
        public void handleError(Throwable throwable) {
            KubernetesLeaderElectionDriver.this.leaderElectionListener.onError((Throwable)new LeaderElectionException(String.format("Error while watching the ConfigMap %s.", KubernetesLeaderElectionDriver.this.configMapName), throwable));
        }
    }
}

