/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.server.coordinator;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ChildrenDeletable;
import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.io.druid.curator.CuratorTestBase;
import org.apache.hive.druid.io.druid.java.util.common.Intervals;
import org.apache.hive.druid.io.druid.java.util.common.concurrent.Execs;
import org.apache.hive.druid.io.druid.segment.TestHelper;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentChangeCallback;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentChangeHandler;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.hive.druid.io.druid.server.coordinator.CuratorLoadQueuePeon;
import org.apache.hive.druid.io.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.hive.druid.io.druid.server.coordinator.LoadPeonCallback;
import org.apache.hive.druid.io.druid.server.coordinator.LoadQueuePeon;
import org.apache.hive.druid.io.druid.server.coordinator.TestDruidCoordinatorConfig;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.io.druid.timeline.partition.NoneShardSpec;
import org.apache.hive.druid.io.druid.timeline.partition.ShardSpec;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class LoadQueuePeonTest
extends CuratorTestBase {
    private static final String LOAD_QUEUE_PATH = "/druid/loadqueue/localhost:1234";
    private final ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
    private LoadQueuePeon loadQueuePeon;
    private PathChildrenCache loadQueueCache;

    @Before
    public void setUp() throws Exception {
        this.setupServerAndCurator();
        this.curator.start();
        this.curator.blockUntilConnected();
        this.curator.create().creatingParentsIfNeeded().forPath(LOAD_QUEUE_PATH);
        this.loadQueueCache = new PathChildrenCache(this.curator, LOAD_QUEUE_PATH, true, true, Execs.singleThreaded((String)"load_queue_cache-%d"));
    }

    @Test
    public void testMultipleLoadDropSegments() throws Exception {
        final AtomicInteger requestSignalIdx = new AtomicInteger(0);
        final AtomicInteger segmentSignalIdx = new AtomicInteger(0);
        this.loadQueuePeon = new CuratorLoadQueuePeon(this.curator, LOAD_QUEUE_PATH, this.jsonMapper, Execs.scheduledSingleThreaded((String)"test_load_queue_peon_scheduled-%d"), Execs.singleThreaded((String)"test_load_queue_peon-%d"), (DruidCoordinatorConfig)new TestDruidCoordinatorConfig(null, null, null, null, null, null, 10, null, false, false, Duration.ZERO));
        this.loadQueuePeon.start();
        final CountDownLatch[] loadRequestSignal = new CountDownLatch[5];
        final CountDownLatch[] dropRequestSignal = new CountDownLatch[5];
        final CountDownLatch[] segmentLoadedSignal = new CountDownLatch[5];
        final CountDownLatch[] segmentDroppedSignal = new CountDownLatch[5];
        for (int i = 0; i < 5; ++i) {
            loadRequestSignal[i] = new CountDownLatch(1);
            dropRequestSignal[i] = new CountDownLatch(1);
            segmentLoadedSignal[i] = new CountDownLatch(1);
            segmentDroppedSignal[i] = new CountDownLatch(1);
        }
        final DataSegmentChangeHandler handler = new DataSegmentChangeHandler(){

            public void addSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                loadRequestSignal[requestSignalIdx.get()].countDown();
            }

            public void removeSegment(DataSegment segment, DataSegmentChangeCallback callback) {
                dropRequestSignal[requestSignalIdx.get()].countDown();
            }
        };
        List segmentToDrop = Lists.transform((List)ImmutableList.of((Object)"2014-10-26T00:00:00Z/P1D", (Object)"2014-10-25T00:00:00Z/P1D", (Object)"2014-10-24T00:00:00Z/P1D", (Object)"2014-10-23T00:00:00Z/P1D", (Object)"2014-10-22T00:00:00Z/P1D"), (Function)new Function<String, DataSegment>(){

            public DataSegment apply(String intervalStr) {
                return LoadQueuePeonTest.this.dataSegmentWithInterval(intervalStr);
            }
        });
        List segmentToLoad = Lists.transform((List)ImmutableList.of((Object)"2014-10-27T00:00:00Z/P1D", (Object)"2014-10-29T00:00:00Z/P1M", (Object)"2014-10-31T00:00:00Z/P1D", (Object)"2014-10-30T00:00:00Z/P1D", (Object)"2014-10-28T00:00:00Z/P1D"), (Function)new Function<String, DataSegment>(){

            public DataSegment apply(String intervalStr) {
                return LoadQueuePeonTest.this.dataSegmentWithInterval(intervalStr);
            }
        });
        List expectedLoadOrder = Lists.transform((List)ImmutableList.of((Object)"2014-10-29T00:00:00Z/P1M", (Object)"2014-10-31T00:00:00Z/P1D", (Object)"2014-10-30T00:00:00Z/P1D", (Object)"2014-10-28T00:00:00Z/P1D", (Object)"2014-10-27T00:00:00Z/P1D"), (Function)new Function<String, DataSegment>(){

            public DataSegment apply(String intervalStr) {
                return LoadQueuePeonTest.this.dataSegmentWithInterval(intervalStr);
            }
        });
        this.loadQueueCache.getListenable().addListener((Object)new PathChildrenCacheListener(){

            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                    DataSegmentChangeRequest request = (DataSegmentChangeRequest)LoadQueuePeonTest.this.jsonMapper.readValue(event.getData().getData(), DataSegmentChangeRequest.class);
                    request.go(handler, null);
                }
            }
        });
        this.loadQueueCache.start();
        for (DataSegment segment : segmentToDrop) {
            this.loadQueuePeon.dropSegment(segment, new LoadPeonCallback(){

                public void execute() {
                    segmentDroppedSignal[segmentSignalIdx.get()].countDown();
                }
            });
        }
        for (DataSegment segment : segmentToLoad) {
            this.loadQueuePeon.loadSegment(segment, new LoadPeonCallback(){

                public void execute() {
                    segmentLoadedSignal[segmentSignalIdx.get()].countDown();
                }
            });
        }
        Assert.assertEquals((long)6000L, (long)this.loadQueuePeon.getLoadQueueSize());
        Assert.assertEquals((long)5L, (long)this.loadQueuePeon.getSegmentsToLoad().size());
        Assert.assertEquals((long)5L, (long)this.loadQueuePeon.getSegmentsToDrop().size());
        for (DataSegment segment : segmentToDrop) {
            String dropRequestPath = ZKPaths.makePath((String)LOAD_QUEUE_PATH, (String)segment.getIdentifier());
            Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(dropRequestSignal[requestSignalIdx.get()]));
            Assert.assertNotNull((Object)this.curator.checkExists().forPath(dropRequestPath));
            Assert.assertEquals((Object)segment, (Object)((SegmentChangeRequestDrop)this.jsonMapper.readValue((byte[])((GetDataWatchBackgroundStatable)this.curator.getData().decompressed()).forPath(dropRequestPath), DataSegmentChangeRequest.class)).getSegment());
            if (requestSignalIdx.get() == 4) {
                requestSignalIdx.set(0);
            } else {
                requestSignalIdx.incrementAndGet();
            }
            ((ChildrenDeletable)this.curator.delete().guaranteed()).forPath(dropRequestPath);
            Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(segmentDroppedSignal[segmentSignalIdx.get()]));
            int expectedNumSegmentToDrop = 5 - segmentSignalIdx.get() - 1;
            Assert.assertEquals((long)expectedNumSegmentToDrop, (long)this.loadQueuePeon.getSegmentsToDrop().size());
            if (segmentSignalIdx.get() == 4) {
                segmentSignalIdx.set(0);
                continue;
            }
            segmentSignalIdx.incrementAndGet();
        }
        for (DataSegment segment : expectedLoadOrder) {
            String loadRequestPath = ZKPaths.makePath((String)LOAD_QUEUE_PATH, (String)segment.getIdentifier());
            Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(loadRequestSignal[requestSignalIdx.get()]));
            Assert.assertNotNull((Object)this.curator.checkExists().forPath(loadRequestPath));
            Assert.assertEquals((Object)segment, (Object)((SegmentChangeRequestLoad)this.jsonMapper.readValue((byte[])((GetDataWatchBackgroundStatable)this.curator.getData().decompressed()).forPath(loadRequestPath), DataSegmentChangeRequest.class)).getSegment());
            requestSignalIdx.incrementAndGet();
            ((ChildrenDeletable)this.curator.delete().guaranteed()).forPath(loadRequestPath);
            Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(segmentLoadedSignal[segmentSignalIdx.get()]));
            int expectedNumSegmentToLoad = 5 - segmentSignalIdx.get() - 1;
            Assert.assertEquals((long)(1200 * expectedNumSegmentToLoad), (long)this.loadQueuePeon.getLoadQueueSize());
            Assert.assertEquals((long)expectedNumSegmentToLoad, (long)this.loadQueuePeon.getSegmentsToLoad().size());
            segmentSignalIdx.incrementAndGet();
        }
    }

    @Test
    public void testFailAssign() throws Exception {
        DataSegment segment = this.dataSegmentWithInterval("2014-10-22T00:00:00Z/P1D");
        final CountDownLatch loadRequestSignal = new CountDownLatch(1);
        final CountDownLatch segmentLoadedSignal = new CountDownLatch(1);
        this.loadQueuePeon = new CuratorLoadQueuePeon(this.curator, LOAD_QUEUE_PATH, this.jsonMapper, Execs.scheduledSingleThreaded((String)"test_load_queue_peon_scheduled-%d"), Execs.singleThreaded((String)"test_load_queue_peon-%d"), (DruidCoordinatorConfig)new TestDruidCoordinatorConfig(null, null, null, new Duration(1L), null, null, 10, null, false, false, new Duration((Object)"PT1s")));
        this.loadQueuePeon.start();
        this.loadQueueCache.getListenable().addListener((Object)new PathChildrenCacheListener(){

            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {
                    loadRequestSignal.countDown();
                }
            }
        });
        this.loadQueueCache.start();
        this.loadQueuePeon.loadSegment(segment, new LoadPeonCallback(){

            public void execute() {
                segmentLoadedSignal.countDown();
            }
        });
        String loadRequestPath = ZKPaths.makePath((String)LOAD_QUEUE_PATH, (String)segment.getIdentifier());
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(loadRequestSignal));
        Assert.assertNotNull((Object)this.curator.checkExists().forPath(loadRequestPath));
        Assert.assertEquals((Object)segment, (Object)((SegmentChangeRequestLoad)this.jsonMapper.readValue((byte[])((GetDataWatchBackgroundStatable)this.curator.getData().decompressed()).forPath(loadRequestPath), DataSegmentChangeRequest.class)).getSegment());
        Assert.assertTrue((boolean)this.timing.forWaiting().awaitLatch(segmentLoadedSignal));
        Assert.assertEquals((long)0L, (long)this.loadQueuePeon.getSegmentsToLoad().size());
        Assert.assertEquals((long)0L, (long)this.loadQueuePeon.getLoadQueueSize());
    }

    private DataSegment dataSegmentWithInterval(String intervalStr) {
        return DataSegment.builder().dataSource("test_load_queue_peon").interval(Intervals.of((String)intervalStr)).loadSpec((Map)ImmutableMap.of()).version("2015-05-27T03:38:35.683Z").dimensions((List)ImmutableList.of()).metrics((List)ImmutableList.of()).shardSpec((ShardSpec)NoneShardSpec.instance()).binaryVersion(Integer.valueOf(9)).size(1200L).build();
    }

    @After
    public void tearDown() throws Exception {
        this.loadQueueCache.close();
        this.loadQueuePeon.stop();
        this.tearDownServerAndCurator();
    }
}

