/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.server.coordination.coordination;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CompressionProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.hive.druid.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Joiner;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.io.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.hive.druid.io.druid.curator.announcement.Announcer;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.segment.TestHelper;
import org.apache.hive.druid.io.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.hive.druid.io.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestHistory;
import org.apache.hive.druid.io.druid.server.coordination.SegmentChangeRequestsSnapshot;
import org.apache.hive.druid.io.druid.server.coordination.ServerType;
import org.apache.hive.druid.io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.hive.druid.io.druid.server.initialization.ZkPathsConfig;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BatchDataSegmentAnnouncerTest {
    private static final String testBasePath = "/test";
    private static final String testSegmentsPath = "/test/segments/id";
    private static final Joiner joiner = Joiner.on((String)"/");
    private TestingCluster testingCluster;
    private CuratorFramework cf;
    private ObjectMapper jsonMapper;
    private Announcer announcer;
    private SegmentReader segmentReader;
    private BatchDataSegmentAnnouncer segmentAnnouncer;
    private Set<DataSegment> testSegments;
    private final AtomicInteger maxBytesPerNode = new AtomicInteger(524288);
    private Boolean skipDimensionsAndMetrics;
    private Boolean skipLoadSpec;

    @Before
    public void setUp() throws Exception {
        this.testingCluster = new TestingCluster(1);
        this.testingCluster.start();
        this.cf = CuratorFrameworkFactory.builder().connectString(this.testingCluster.getConnectString()).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(1, 10)).compressionProvider((CompressionProvider)new PotentiallyGzippedCompressionProvider(false)).build();
        this.cf.start();
        this.cf.blockUntilConnected();
        this.cf.create().creatingParentsIfNeeded().forPath(testBasePath);
        this.jsonMapper = TestHelper.makeJsonMapper();
        this.announcer = new Announcer(this.cf, (ExecutorService)MoreExecutors.sameThreadExecutor());
        this.announcer.start();
        this.segmentReader = new SegmentReader(this.cf, this.jsonMapper);
        this.skipDimensionsAndMetrics = false;
        this.skipLoadSpec = false;
        this.segmentAnnouncer = new BatchDataSegmentAnnouncer(new DruidServerMetadata("id", "host", null, Long.MAX_VALUE, ServerType.HISTORICAL, "tier", 0), new BatchDataSegmentAnnouncerConfig(){

            public int getSegmentsPerNode() {
                return 50;
            }

            public long getMaxBytesPerNode() {
                return BatchDataSegmentAnnouncerTest.this.maxBytesPerNode.get();
            }

            public boolean isSkipDimensionsAndMetrics() {
                return BatchDataSegmentAnnouncerTest.this.skipDimensionsAndMetrics;
            }

            public boolean isSkipLoadSpec() {
                return BatchDataSegmentAnnouncerTest.this.skipLoadSpec;
            }
        }, new ZkPathsConfig(){

            public String getBase() {
                return BatchDataSegmentAnnouncerTest.testBasePath;
            }
        }, this.announcer, this.jsonMapper);
        this.testSegments = Sets.newHashSet();
        for (int i = 0; i < 100; ++i) {
            this.testSegments.add(this.makeSegment(i));
        }
    }

    @After
    public void tearDown() throws Exception {
        this.announcer.stop();
        this.cf.close();
        this.testingCluster.stop();
    }

    @Test
    public void testSingleAnnounce() throws Exception {
        Set<DataSegment> segments;
        Iterator<DataSegment> segIter = this.testSegments.iterator();
        DataSegment firstSegment = segIter.next();
        DataSegment secondSegment = segIter.next();
        this.segmentAnnouncer.announceSegment(firstSegment);
        List zNodes = (List)this.cf.getChildren().forPath(testSegmentsPath);
        for (String zNode : zNodes) {
            segments = this.segmentReader.read(joiner.join((Object)testSegmentsPath, (Object)zNode, new Object[0]));
            Assert.assertEquals((Object)segments.iterator().next(), (Object)firstSegment);
        }
        this.segmentAnnouncer.announceSegment(secondSegment);
        for (String zNode : zNodes) {
            segments = this.segmentReader.read(joiner.join((Object)testSegmentsPath, (Object)zNode, new Object[0]));
            Assert.assertEquals((Object)Sets.newHashSet((Object[])new DataSegment[]{firstSegment, secondSegment}), segments);
        }
        SegmentChangeRequestsSnapshot snapshot = (SegmentChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new SegmentChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals((long)2L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)2L, (long)snapshot.getCounter().getCounter());
        this.segmentAnnouncer.unannounceSegment(firstSegment);
        for (String zNode : zNodes) {
            Set<DataSegment> segments2 = this.segmentReader.read(joiner.join((Object)testSegmentsPath, (Object)zNode, new Object[0]));
            Assert.assertEquals((Object)segments2.iterator().next(), (Object)secondSegment);
        }
        this.segmentAnnouncer.unannounceSegment(secondSegment);
        Assert.assertTrue((boolean)((List)this.cf.getChildren().forPath(testSegmentsPath)).isEmpty());
        snapshot = (SegmentChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(snapshot.getCounter()).get();
        Assert.assertEquals((long)2L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)4L, (long)snapshot.getCounter().getCounter());
        snapshot = (SegmentChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new SegmentChangeRequestHistory.Counter(-1L, -1L)).get();
        Assert.assertEquals((long)0L, (long)snapshot.getRequests().size());
        Assert.assertEquals((long)4L, (long)snapshot.getCounter().getCounter());
    }

    @Test
    public void testSkipDimensions() throws Exception {
        this.skipDimensionsAndMetrics = true;
        Iterator<DataSegment> segIter = this.testSegments.iterator();
        DataSegment firstSegment = segIter.next();
        this.segmentAnnouncer.announceSegment(firstSegment);
        List zNodes = (List)this.cf.getChildren().forPath(testSegmentsPath);
        for (String zNode : zNodes) {
            DataSegment announcedSegment = (DataSegment)Iterables.getOnlyElement(this.segmentReader.read(joiner.join((Object)testSegmentsPath, (Object)zNode, new Object[0])));
            Assert.assertEquals((Object)announcedSegment, (Object)firstSegment);
            Assert.assertTrue((boolean)announcedSegment.getDimensions().isEmpty());
            Assert.assertTrue((boolean)announcedSegment.getMetrics().isEmpty());
        }
        this.segmentAnnouncer.unannounceSegment(firstSegment);
        Assert.assertTrue((boolean)((List)this.cf.getChildren().forPath(testSegmentsPath)).isEmpty());
    }

    @Test
    public void testSkipLoadSpec() throws Exception {
        this.skipLoadSpec = true;
        Iterator<DataSegment> segIter = this.testSegments.iterator();
        DataSegment firstSegment = segIter.next();
        this.segmentAnnouncer.announceSegment(firstSegment);
        List zNodes = (List)this.cf.getChildren().forPath(testSegmentsPath);
        for (String zNode : zNodes) {
            DataSegment announcedSegment = (DataSegment)Iterables.getOnlyElement(this.segmentReader.read(joiner.join((Object)testSegmentsPath, (Object)zNode, new Object[0])));
            Assert.assertEquals((Object)announcedSegment, (Object)firstSegment);
            Assert.assertNull((Object)announcedSegment.getLoadSpec());
        }
        this.segmentAnnouncer.unannounceSegment(firstSegment);
        Assert.assertTrue((boolean)((List)this.cf.getChildren().forPath(testSegmentsPath)).isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSingleAnnounceManyTimes() throws Exception {
        int prevMax = this.maxBytesPerNode.get();
        this.maxBytesPerNode.set(2048);
        try {
            for (DataSegment segment : this.testSegments) {
                this.segmentAnnouncer.announceSegment(segment);
            }
        }
        finally {
            this.maxBytesPerNode.set(prevMax);
        }
        List zNodes = (List)this.cf.getChildren().forPath(testSegmentsPath);
        Assert.assertEquals((long)20L, (long)zNodes.size());
        HashSet segments = Sets.newHashSet(this.testSegments);
        for (String zNode : zNodes) {
            for (DataSegment segment : this.segmentReader.read(joiner.join((Object)testSegmentsPath, (Object)zNode, new Object[0]))) {
                Assert.assertTrue((String)("Invalid segment " + segment), (boolean)segments.remove(segment));
            }
        }
        Assert.assertTrue((String)("Failed to find segments " + segments), (boolean)segments.isEmpty());
    }

    @Test
    public void testBatchAnnounce() throws Exception {
        this.testBatchAnnounce(true);
    }

    private void testBatchAnnounce(boolean testHistory) throws Exception {
        this.segmentAnnouncer.announceSegments(this.testSegments);
        List zNodes = (List)this.cf.getChildren().forPath(testSegmentsPath);
        Assert.assertEquals((long)2L, (long)zNodes.size());
        HashSet allSegments = Sets.newHashSet();
        for (String zNode : zNodes) {
            allSegments.addAll(this.segmentReader.read(joiner.join((Object)testSegmentsPath, (Object)zNode, new Object[0])));
        }
        Assert.assertEquals((Object)allSegments, this.testSegments);
        SegmentChangeRequestsSnapshot snapshot = null;
        if (testHistory) {
            snapshot = (SegmentChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new SegmentChangeRequestHistory.Counter(-1L, -1L)).get();
            Assert.assertEquals((long)this.testSegments.size(), (long)snapshot.getRequests().size());
            Assert.assertEquals((long)this.testSegments.size(), (long)snapshot.getCounter().getCounter());
        }
        this.segmentAnnouncer.unannounceSegments(this.testSegments);
        Assert.assertTrue((boolean)((List)this.cf.getChildren().forPath(testSegmentsPath)).isEmpty());
        if (testHistory) {
            snapshot = (SegmentChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(snapshot.getCounter()).get();
            Assert.assertEquals((long)this.testSegments.size(), (long)snapshot.getRequests().size());
            Assert.assertEquals((long)(2 * this.testSegments.size()), (long)snapshot.getCounter().getCounter());
            snapshot = (SegmentChangeRequestsSnapshot)this.segmentAnnouncer.getSegmentChangesSince(new SegmentChangeRequestHistory.Counter(-1L, -1L)).get();
            Assert.assertEquals((long)0L, (long)snapshot.getRequests().size());
            Assert.assertEquals((long)(2 * this.testSegments.size()), (long)snapshot.getCounter().getCounter());
        }
    }

    @Test
    public void testMultipleBatchAnnounce() throws Exception {
        for (int i = 0; i < 10; ++i) {
            this.testBatchAnnounce(false);
        }
    }

    private DataSegment makeSegment(int offset) {
        return DataSegment.builder().dataSource("foo").interval(new Interval((ReadableInstant)DateTimes.of((String)"2013-01-01").plusDays(offset), (ReadableInstant)DateTimes.of((String)"2013-01-02").plusDays(offset))).version(DateTimes.nowUtc().toString()).dimensions((List)ImmutableList.of((Object)"dim1", (Object)"dim2")).metrics((List)ImmutableList.of((Object)"met1", (Object)"met2")).loadSpec((Map)ImmutableMap.of((Object)"type", (Object)"local")).build();
    }

    private static class SegmentReader {
        private final CuratorFramework cf;
        private final ObjectMapper jsonMapper;

        public SegmentReader(CuratorFramework cf, ObjectMapper jsonMapper) {
            this.cf = cf;
            this.jsonMapper = jsonMapper;
        }

        public Set<DataSegment> read(String path) {
            try {
                if (this.cf.checkExists().forPath(path) != null) {
                    return (Set)this.jsonMapper.readValue((byte[])this.cf.getData().forPath(path), (TypeReference)new TypeReference<Set<DataSegment>>(){});
                }
            }
            catch (Exception e) {
                throw Throwables.propagate((Throwable)e);
            }
            return Sets.newHashSet();
        }
    }
}

