/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.server.coordinator;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.utils.ZKPaths;
import org.apache.hive.druid.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.metamx.common.ISE;
import org.apache.hive.druid.com.metamx.emitter.EmittingLogger;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestNoop;
import org.apache.hive.druid.io.druid.server.coordinator.DruidCoordinator;
import org.apache.hive.druid.io.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.hive.druid.io.druid.server.coordinator.LoadPeonCallback;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Stat;

public class LoadQueuePeon {
    private static final EmittingLogger log = new EmittingLogger(LoadQueuePeon.class);
    private static final int DROP = 0;
    private static final int LOAD = 1;
    private final CuratorFramework curator;
    private final String basePath;
    private final ObjectMapper jsonMapper;
    private final ScheduledExecutorService processingExecutor;
    private final ExecutorService callBackExecutor;
    private final DruidCoordinatorConfig config;
    private final AtomicLong queuedSize = new AtomicLong(0L);
    private final AtomicInteger failedAssignCount = new AtomicInteger(0);
    private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToLoad = new ConcurrentSkipListMap(DruidCoordinator.SEGMENT_COMPARATOR);
    private final ConcurrentSkipListMap<DataSegment, SegmentHolder> segmentsToDrop = new ConcurrentSkipListMap(DruidCoordinator.SEGMENT_COMPARATOR);
    private final Object lock = new Object();
    private volatile SegmentHolder currentlyProcessing = null;
    private boolean stopped = false;

    private static void executeCallbacks(List<LoadPeonCallback> callbacks) {
        for (LoadPeonCallback callback : callbacks) {
            if (callback == null) continue;
            callback.execute();
        }
    }

    LoadQueuePeon(CuratorFramework curator, String basePath, ObjectMapper jsonMapper, ScheduledExecutorService processingExecutor, ExecutorService callbackExecutor, DruidCoordinatorConfig config) {
        this.curator = curator;
        this.basePath = basePath;
        this.jsonMapper = jsonMapper;
        this.callBackExecutor = callbackExecutor;
        this.processingExecutor = processingExecutor;
        this.config = config;
    }

    @JsonProperty
    public Set<DataSegment> getSegmentsToLoad() {
        return this.segmentsToLoad.keySet();
    }

    @JsonProperty
    public Set<DataSegment> getSegmentsToDrop() {
        return this.segmentsToDrop.keySet();
    }

    public long getLoadQueueSize() {
        return this.queuedSize.get();
    }

    public int getAndResetFailedAssignCount() {
        return this.failedAssignCount.getAndSet(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void loadSegment(DataSegment segment, LoadPeonCallback callback) {
        Object object = this.lock;
        synchronized (object) {
            if (this.currentlyProcessing != null && this.currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) {
                if (callback != null) {
                    this.currentlyProcessing.addCallback(callback);
                }
                return;
            }
        }
        object = this.lock;
        synchronized (object) {
            SegmentHolder existingHolder = this.segmentsToLoad.get(segment);
            if (existingHolder != null) {
                if (callback != null) {
                    existingHolder.addCallback(callback);
                }
                return;
            }
        }
        log.info("Asking server peon[%s] to load segment[%s]", this.basePath, segment.getIdentifier());
        this.queuedSize.addAndGet(segment.getSize());
        this.segmentsToLoad.put(segment, new SegmentHolder(segment, 1, Arrays.asList(callback)));
        this.doNext();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dropSegment(DataSegment segment, LoadPeonCallback callback) {
        Object object = this.lock;
        synchronized (object) {
            if (this.currentlyProcessing != null && this.currentlyProcessing.getSegmentIdentifier().equals(segment.getIdentifier())) {
                if (callback != null) {
                    this.currentlyProcessing.addCallback(callback);
                }
                return;
            }
        }
        object = this.lock;
        synchronized (object) {
            SegmentHolder existingHolder = this.segmentsToDrop.get(segment);
            if (existingHolder != null) {
                if (callback != null) {
                    existingHolder.addCallback(callback);
                }
                return;
            }
        }
        log.info("Asking server peon[%s] to drop segment[%s]", this.basePath, segment.getIdentifier());
        this.segmentsToDrop.put(segment, new SegmentHolder(segment, 0, Arrays.asList(callback)));
        this.doNext();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doNext() {
        Object object = this.lock;
        synchronized (object) {
            if (this.currentlyProcessing == null) {
                if (!this.segmentsToDrop.isEmpty()) {
                    this.currentlyProcessing = this.segmentsToDrop.firstEntry().getValue();
                    log.info("Server[%s] dropping [%s]", this.basePath, this.currentlyProcessing.getSegmentIdentifier());
                } else if (!this.segmentsToLoad.isEmpty()) {
                    this.currentlyProcessing = this.segmentsToLoad.firstEntry().getValue();
                    log.info("Server[%s] loading [%s]", this.basePath, this.currentlyProcessing.getSegmentIdentifier());
                } else {
                    return;
                }
                this.processingExecutor.execute(new Runnable(){

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    @Override
                    public void run() {
                        Object object = LoadQueuePeon.this.lock;
                        synchronized (object) {
                            try {
                                if (LoadQueuePeon.this.currentlyProcessing == null) {
                                    if (!LoadQueuePeon.this.stopped) {
                                        log.makeAlert("Crazy race condition! server[%s]", LoadQueuePeon.this.basePath).emit();
                                    }
                                    LoadQueuePeon.this.actionCompleted();
                                    LoadQueuePeon.this.doNext();
                                    return;
                                }
                                log.info("Server[%s] processing segment[%s]", LoadQueuePeon.this.basePath, LoadQueuePeon.this.currentlyProcessing.getSegmentIdentifier());
                                final String path = ZKPaths.makePath((String)LoadQueuePeon.this.basePath, (String)LoadQueuePeon.this.currentlyProcessing.getSegmentIdentifier());
                                byte[] payload = LoadQueuePeon.this.jsonMapper.writeValueAsBytes(LoadQueuePeon.this.currentlyProcessing.getChangeRequest());
                                ((ACLBackgroundPathAndBytesable)LoadQueuePeon.this.curator.create().withMode(CreateMode.EPHEMERAL)).forPath(path, payload);
                                LoadQueuePeon.this.processingExecutor.schedule(new Runnable(){

                                    @Override
                                    public void run() {
                                        try {
                                            if (LoadQueuePeon.this.curator.checkExists().forPath(path) != null) {
                                                LoadQueuePeon.this.failAssign(new ISE("%s was never removed! Failing this operation!", path));
                                            }
                                        }
                                        catch (Exception e) {
                                            LoadQueuePeon.this.failAssign(e);
                                        }
                                    }
                                }, LoadQueuePeon.this.config.getLoadTimeoutDelay().getMillis(), TimeUnit.MILLISECONDS);
                                Stat stat = (Stat)((BackgroundPathable)LoadQueuePeon.this.curator.checkExists().usingWatcher(new CuratorWatcher(){

                                    public void process(WatchedEvent watchedEvent) throws Exception {
                                        switch (watchedEvent.getType()) {
                                            case NodeDeleted: {
                                                LoadQueuePeon.this.entryRemoved(watchedEvent.getPath());
                                            }
                                        }
                                    }
                                })).forPath(path);
                                if (stat == null) {
                                    byte[] noopPayload = LoadQueuePeon.this.jsonMapper.writeValueAsBytes(new SegmentChangeRequestNoop());
                                    ((ACLBackgroundPathAndBytesable)LoadQueuePeon.this.curator.create().withMode(CreateMode.EPHEMERAL)).forPath(path, noopPayload);
                                    LoadQueuePeon.this.entryRemoved(path);
                                }
                            }
                            catch (Exception e) {
                                LoadQueuePeon.this.failAssign(e);
                            }
                        }
                    }
                });
            } else {
                log.info("Server[%s] skipping doNext() because something is currently loading[%s].", this.basePath, this.currentlyProcessing.getSegmentIdentifier());
            }
        }
    }

    private void actionCompleted() {
        if (this.currentlyProcessing != null) {
            switch (this.currentlyProcessing.getType()) {
                case 1: {
                    this.segmentsToLoad.remove(this.currentlyProcessing.getSegment());
                    this.queuedSize.addAndGet(-this.currentlyProcessing.getSegmentSize());
                    break;
                }
                case 0: {
                    this.segmentsToDrop.remove(this.currentlyProcessing.getSegment());
                    break;
                }
                default: {
                    throw new UnsupportedOperationException();
                }
            }
            final List<LoadPeonCallback> callbacks = this.currentlyProcessing.getCallbacks();
            this.currentlyProcessing = null;
            this.callBackExecutor.execute(new Runnable(){

                @Override
                public void run() {
                    LoadQueuePeon.executeCallbacks(callbacks);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        Object object = this.lock;
        synchronized (object) {
            if (this.currentlyProcessing != null) {
                LoadQueuePeon.executeCallbacks(this.currentlyProcessing.getCallbacks());
                this.currentlyProcessing = null;
            }
            if (!this.segmentsToDrop.isEmpty()) {
                for (SegmentHolder holder : this.segmentsToDrop.values()) {
                    LoadQueuePeon.executeCallbacks(holder.getCallbacks());
                }
            }
            this.segmentsToDrop.clear();
            if (!this.segmentsToLoad.isEmpty()) {
                for (SegmentHolder holder : this.segmentsToLoad.values()) {
                    LoadQueuePeon.executeCallbacks(holder.getCallbacks());
                }
            }
            this.segmentsToLoad.clear();
            this.queuedSize.set(0L);
            this.failedAssignCount.set(0);
            this.stopped = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void entryRemoved(String path) {
        Object object = this.lock;
        synchronized (object) {
            if (this.currentlyProcessing == null) {
                log.warn("Server[%s] an entry[%s] was removed even though it wasn't loading!?", this.basePath, path);
                return;
            }
            if (!ZKPaths.getNodeFromPath((String)path).equals(this.currentlyProcessing.getSegmentIdentifier())) {
                log.warn("Server[%s] entry [%s] was removed even though it's not what is currently loading[%s]", this.basePath, path, this.currentlyProcessing);
                return;
            }
            this.actionCompleted();
            log.info("Server[%s] done processing [%s]", this.basePath, path);
        }
        this.doNext();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failAssign(Exception e) {
        Object object = this.lock;
        synchronized (object) {
            log.error(e, "Server[%s], throwable caught when submitting [%s].", this.basePath, this.currentlyProcessing);
            this.failedAssignCount.getAndIncrement();
            this.actionCompleted();
            this.doNext();
        }
    }

    private static class SegmentHolder {
        private final DataSegment segment;
        private final DataSegmentChangeRequest changeRequest;
        private final int type;
        private final List<LoadPeonCallback> callbacks = Lists.newArrayList();

        private SegmentHolder(DataSegment segment, int type, Collection<LoadPeonCallback> callbacks) {
            this.segment = segment;
            this.type = type;
            this.changeRequest = type == 1 ? new SegmentChangeRequestLoad(segment) : new SegmentChangeRequestDrop(segment);
            this.callbacks.addAll(callbacks);
        }

        public DataSegment getSegment() {
            return this.segment;
        }

        public int getType() {
            return this.type;
        }

        public String getSegmentIdentifier() {
            return this.segment.getIdentifier();
        }

        public long getSegmentSize() {
            return this.segment.getSize();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addCallbacks(Collection<LoadPeonCallback> newCallbacks) {
            List<LoadPeonCallback> list = this.callbacks;
            synchronized (list) {
                this.callbacks.addAll(newCallbacks);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void addCallback(LoadPeonCallback newCallback) {
            List<LoadPeonCallback> list = this.callbacks;
            synchronized (list) {
                this.callbacks.add(newCallback);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<LoadPeonCallback> getCallbacks() {
            List<LoadPeonCallback> list = this.callbacks;
            synchronized (list) {
                return this.callbacks;
            }
        }

        public DataSegmentChangeRequest getChangeRequest() {
            return this.changeRequest;
        }

        public String toString() {
            return this.changeRequest.toString();
        }
    }
}

