/*
 * Decompiled with CFR 0.152.
 */
package org.apache.twill.internal;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hive.com.google.common.collect.Lists;
import org.apache.hive.com.google.common.util.concurrent.FutureCallback;
import org.apache.hive.com.google.common.util.concurrent.Futures;
import org.apache.hive.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.com.google.common.util.concurrent.Service;
import org.apache.hive.org.apache.zookeeper.KeeperException;
import org.apache.hive.org.apache.zookeeper.WatchedEvent;
import org.apache.hive.org.apache.zookeeper.Watcher;
import org.apache.hive.org.apache.zookeeper.data.Stat;
import org.apache.hive.org.slf4j.Logger;
import org.apache.hive.org.slf4j.LoggerFactory;
import org.apache.twill.api.Command;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.AbstractExecutionServiceController;
import org.apache.twill.internal.ZKMessages;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.state.Messages;
import org.apache.twill.internal.state.SystemMessages;
import org.apache.twill.zookeeper.NodeData;
import org.apache.twill.zookeeper.ZKClient;

public abstract class AbstractZKServiceController
extends AbstractExecutionServiceController {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractZKServiceController.class);
    protected final ZKClient zkClient;
    private final InstanceNodeDataCallback instanceNodeDataCallback;
    private final List<ListenableFuture<?>> messageFutures;
    private ListenableFuture<Service.State> stopMessageFuture;

    protected AbstractZKServiceController(RunId runId, ZKClient zkClient) {
        super(runId);
        this.zkClient = zkClient;
        this.instanceNodeDataCallback = new InstanceNodeDataCallback();
        this.messageFutures = Lists.newLinkedList();
    }

    public final ListenableFuture<Command> sendCommand(Command command) {
        return this.sendMessage(Messages.createForAll(command), command);
    }

    public final ListenableFuture<Command> sendCommand(String runnableName, Command command) {
        return this.sendMessage(Messages.createForRunnable(runnableName, command), command);
    }

    @Override
    protected final void startUp() {
        this.doStartUp();
        this.actOnExists(this.getInstancePath(), new Runnable(){

            @Override
            public void run() {
                AbstractZKServiceController.this.watchInstanceNode();
            }
        });
    }

    @Override
    protected final synchronized void shutDown() {
        if (this.stopMessageFuture == null) {
            this.stopMessageFuture = ZKMessages.sendMessage(this.zkClient, this.getMessagePrefix(), SystemMessages.stopApplication(), Service.State.TERMINATED);
        }
        for (ListenableFuture<?> future : this.messageFutures) {
            future.cancel(true);
        }
        this.doShutDown();
    }

    protected final synchronized <V> ListenableFuture<V> sendMessage(Message message, V result) {
        if (!this.isRunning()) {
            return Futures.immediateFailedFuture(new IllegalStateException("Cannot send message to non-running application"));
        }
        final ListenableFuture<V> messageFuture = ZKMessages.sendMessage(this.zkClient, this.getMessagePrefix(), message, result);
        this.messageFutures.add(messageFuture);
        messageFuture.addListener(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                if (AbstractZKServiceController.this.state() == Service.State.STOPPING) {
                    return;
                }
                AbstractZKServiceController abstractZKServiceController = AbstractZKServiceController.this;
                synchronized (abstractZKServiceController) {
                    AbstractZKServiceController.this.messageFutures.remove(messageFuture);
                }
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        return messageFuture;
    }

    protected final ListenableFuture<Service.State> getStopMessageFuture() {
        return this.stopMessageFuture;
    }

    protected abstract void doStartUp();

    protected abstract void doShutDown();

    protected abstract void instanceNodeUpdated(NodeData var1);

    protected abstract void instanceNodeFailed(Throwable var1);

    protected synchronized void forceShutDown() {
        if (this.stopMessageFuture == null) {
            this.stopMessageFuture = Futures.immediateFuture(Service.State.TERMINATED);
        }
        this.stop();
    }

    private void actOnExists(final String path, final Runnable action) {
        final AtomicBoolean nodeExists = new AtomicBoolean(false);
        Futures.addCallback(this.zkClient.exists(path, new Watcher(){

            public void process(WatchedEvent event) {
                if (!AbstractZKServiceController.this.shouldProcessZKEvent()) {
                    return;
                }
                if (event.getType() == Watcher.Event.EventType.NodeCreated && nodeExists.compareAndSet(false, true)) {
                    action.run();
                }
            }
        }), new FutureCallback<Stat>(){

            @Override
            public void onSuccess(Stat result) {
                if (result != null && nodeExists.compareAndSet(false, true)) {
                    action.run();
                }
            }

            @Override
            public void onFailure(Throwable t) {
                LOG.error("Failed in exists call to {}. Shutting down service.", (Object)path, (Object)t);
                AbstractZKServiceController.this.forceShutDown();
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }

    protected final void watchInstanceNode() {
        if (!this.shouldProcessZKEvent()) {
            return;
        }
        Futures.addCallback(this.zkClient.getData(this.getInstancePath(), new Watcher(){

            public void process(WatchedEvent event) {
                if (!AbstractZKServiceController.this.shouldProcessZKEvent()) {
                    return;
                }
                switch (event.getType()) {
                    case NodeDataChanged: {
                        AbstractZKServiceController.this.watchInstanceNode();
                        break;
                    }
                    case NodeDeleted: {
                        AbstractZKServiceController.this.instanceNodeFailed(KeeperException.create((KeeperException.Code)KeeperException.Code.NONODE, (String)AbstractZKServiceController.this.getInstancePath()));
                        break;
                    }
                    default: {
                        LOG.info("Ignore ZK event for instance node: {}", (Object)event);
                    }
                }
            }
        }), this.instanceNodeDataCallback, Threads.SAME_THREAD_EXECUTOR);
    }

    private boolean shouldProcessZKEvent() {
        Service.State state = this.state();
        return state == Service.State.NEW || state == Service.State.STARTING || state == Service.State.RUNNING;
    }

    private String getMessagePrefix() {
        return this.getZKPath("messages/msg");
    }

    protected final String getInstancePath() {
        return String.format("/instances/%s", this.getRunId().getId());
    }

    private String getZKPath(String path) {
        return String.format("/%s/%s", this.getRunId().getId(), path);
    }

    private final class InstanceNodeDataCallback
    implements FutureCallback<NodeData> {
        private InstanceNodeDataCallback() {
        }

        @Override
        public void onSuccess(NodeData result) {
            if (AbstractZKServiceController.this.shouldProcessZKEvent()) {
                AbstractZKServiceController.this.instanceNodeUpdated(result);
            }
        }

        @Override
        public void onFailure(Throwable t) {
            LOG.error("Failed in fetching instance node data.", t);
            if (AbstractZKServiceController.this.shouldProcessZKEvent()) {
                AbstractZKServiceController.this.instanceNodeFailed(t);
            }
        }
    }
}

