/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.client.client;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CompressionProvider;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.Timing;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Predicate;
import org.apache.hive.druid.com.google.common.base.Predicates;
import org.apache.hive.druid.com.google.common.base.Stopwatch;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.google.common.util.concurrent.Futures;
import org.apache.hive.druid.com.google.common.util.concurrent.ListenableFuture;
import org.apache.hive.druid.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.io.druid.client.BatchServerInventoryView;
import org.apache.hive.druid.io.druid.client.DruidServer;
import org.apache.hive.druid.io.druid.client.ServerView;
import org.apache.hive.druid.io.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.hive.druid.io.druid.curator.announcement.Announcer;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.java.util.common.guava.Comparators;
import org.apache.hive.druid.io.druid.segment.TestHelper;
import org.apache.hive.druid.io.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.hive.druid.io.druid.server.coordination.CuratorDataSegmentServerAnnouncer;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.hive.druid.io.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.io.druid.server.coordination.ServerType;
import org.apache.hive.druid.io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.hive.druid.io.druid.server.initialization.ZkPathsConfig;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.LogicalOperator;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.ReadableInstant;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class BatchServerInventoryViewTest {
    private static final String testBasePath = "/test";
    public static final DateTime SEGMENT_INTERVAL_START = DateTimes.of((String)"2013-01-01");
    public static final int INITIAL_SEGMENTS = 100;
    private static final Timing timing = new Timing();
    private TestingCluster testingCluster;
    private CuratorFramework cf;
    private ObjectMapper jsonMapper;
    private Announcer announcer;
    private BatchDataSegmentAnnouncer segmentAnnouncer;
    private DataSegmentServerAnnouncer serverAnnouncer;
    private Set<DataSegment> testSegments;
    private BatchServerInventoryView batchServerInventoryView;
    private BatchServerInventoryView filteredBatchServerInventoryView;
    private final AtomicInteger inventoryUpdateCounter = new AtomicInteger();
    @Rule
    public ExpectedException exception = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        this.testingCluster = new TestingCluster(1);
        this.testingCluster.start();
        this.cf = CuratorFrameworkFactory.builder().connectString(this.testingCluster.getConnectString()).retryPolicy((RetryPolicy)new ExponentialBackoffRetry(1, 10)).compressionProvider((CompressionProvider)new PotentiallyGzippedCompressionProvider(true)).build();
        this.cf.start();
        this.cf.blockUntilConnected();
        this.cf.create().creatingParentsIfNeeded().forPath(testBasePath);
        this.jsonMapper = TestHelper.makeJsonMapper();
        this.announcer = new Announcer(this.cf, (ExecutorService)MoreExecutors.sameThreadExecutor());
        this.announcer.start();
        DruidServerMetadata serverMetadata = new DruidServerMetadata("id", "host", null, Long.MAX_VALUE, ServerType.HISTORICAL, "tier", 0);
        ZkPathsConfig zkPathsConfig = new ZkPathsConfig(){

            public String getBase() {
                return BatchServerInventoryViewTest.testBasePath;
            }
        };
        this.serverAnnouncer = new CuratorDataSegmentServerAnnouncer(serverMetadata, zkPathsConfig, this.announcer, this.jsonMapper);
        this.serverAnnouncer.announce();
        this.segmentAnnouncer = new BatchDataSegmentAnnouncer(serverMetadata, new BatchDataSegmentAnnouncerConfig(){

            public int getSegmentsPerNode() {
                return 50;
            }
        }, zkPathsConfig, this.announcer, this.jsonMapper);
        this.testSegments = Sets.newConcurrentHashSet();
        for (int i = 0; i < 100; ++i) {
            this.testSegments.add(this.makeSegment(i));
        }
        this.batchServerInventoryView = new BatchServerInventoryView(new ZkPathsConfig(){

            public String getBase() {
                return BatchServerInventoryViewTest.testBasePath;
            }
        }, this.cf, this.jsonMapper, Predicates.alwaysTrue());
        this.batchServerInventoryView.start();
        this.inventoryUpdateCounter.set(0);
        this.filteredBatchServerInventoryView = new BatchServerInventoryView(new ZkPathsConfig(){

            public String getBase() {
                return BatchServerInventoryViewTest.testBasePath;
            }
        }, this.cf, this.jsonMapper, (Predicate)new Predicate<Pair<DruidServerMetadata, DataSegment>>(){

            public boolean apply(@Nullable Pair<DruidServerMetadata, DataSegment> input) {
                return ((DataSegment)input.rhs).getInterval().getStart().isBefore((ReadableInstant)SEGMENT_INTERVAL_START.plusDays(100));
            }
        }){

            protected DruidServer addInnerInventory(DruidServer container, String inventoryKey, Set<DataSegment> inventory) {
                DruidServer server = super.addInnerInventory(container, inventoryKey, inventory);
                BatchServerInventoryViewTest.this.inventoryUpdateCounter.incrementAndGet();
                return server;
            }
        };
        this.filteredBatchServerInventoryView.start();
    }

    @After
    public void tearDown() throws Exception {
        this.batchServerInventoryView.stop();
        this.filteredBatchServerInventoryView.stop();
        this.serverAnnouncer.unannounce();
        this.announcer.stop();
        this.cf.close();
        this.testingCluster.stop();
    }

    @Test
    public void testRun() throws Exception {
        this.segmentAnnouncer.announceSegments(this.testSegments);
        BatchServerInventoryViewTest.waitForSync(this.batchServerInventoryView, this.testSegments);
        DruidServer server = (DruidServer)Iterables.get((Iterable)this.batchServerInventoryView.getInventory(), (int)0);
        HashSet segments = Sets.newHashSet(server.getSegments().values());
        Assert.assertEquals(this.testSegments, (Object)segments);
        DataSegment segment1 = this.makeSegment(101);
        DataSegment segment2 = this.makeSegment(102);
        this.segmentAnnouncer.announceSegment(segment1);
        this.segmentAnnouncer.announceSegment(segment2);
        this.testSegments.add(segment1);
        this.testSegments.add(segment2);
        BatchServerInventoryViewTest.waitForSync(this.batchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, (Object)Sets.newHashSet(server.getSegments().values()));
        this.segmentAnnouncer.unannounceSegment(segment1);
        this.segmentAnnouncer.unannounceSegment(segment2);
        this.testSegments.remove(segment1);
        this.testSegments.remove(segment2);
        BatchServerInventoryViewTest.waitForSync(this.batchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, (Object)Sets.newHashSet(server.getSegments().values()));
    }

    @Test
    public void testRunWithFilter() throws Exception {
        this.segmentAnnouncer.announceSegments(this.testSegments);
        BatchServerInventoryViewTest.waitForSync(this.filteredBatchServerInventoryView, this.testSegments);
        DruidServer server = (DruidServer)Iterables.get((Iterable)this.filteredBatchServerInventoryView.getInventory(), (int)0);
        HashSet segments = Sets.newHashSet(server.getSegments().values());
        Assert.assertEquals(this.testSegments, (Object)segments);
        int prevUpdateCount = this.inventoryUpdateCounter.get();
        DataSegment segment1 = this.makeSegment(101);
        this.segmentAnnouncer.announceSegment(segment1);
        this.testSegments.add(segment1);
        this.waitForUpdateEvents(prevUpdateCount + 1);
        Assert.assertNull((Object)((DruidServer)Iterables.getOnlyElement((Iterable)this.filteredBatchServerInventoryView.getInventory())).getSegment(segment1.getIdentifier()));
    }

    @Test
    public void testRunWithFilterCallback() throws Exception {
        final CountDownLatch removeCallbackLatch = new CountDownLatch(1);
        this.segmentAnnouncer.announceSegments(this.testSegments);
        BatchServerInventoryViewTest.waitForSync(this.filteredBatchServerInventoryView, this.testSegments);
        DruidServer server = (DruidServer)Iterables.get((Iterable)this.filteredBatchServerInventoryView.getInventory(), (int)0);
        HashSet segments = Sets.newHashSet(server.getSegments().values());
        Assert.assertEquals(this.testSegments, (Object)segments);
        ServerView.SegmentCallback callback = (ServerView.SegmentCallback)EasyMock.createStrictMock(ServerView.SegmentCallback.class);
        Comparator<DataSegment> dataSegmentComparator = Comparator.comparing(DataSegment::getInterval, Comparators.intervalsByStartThenEnd());
        EasyMock.expect((Object)callback.segmentAdded((DruidServerMetadata)EasyMock.anyObject(), (DataSegment)EasyMock.cmp((Object)this.makeSegment(102), dataSegmentComparator, (LogicalOperator)LogicalOperator.EQUAL))).andReturn((Object)ServerView.CallbackAction.CONTINUE).times(1);
        EasyMock.expect((Object)callback.segmentRemoved((DruidServerMetadata)EasyMock.anyObject(), (DataSegment)EasyMock.cmp((Object)this.makeSegment(102), dataSegmentComparator, (LogicalOperator)LogicalOperator.EQUAL))).andAnswer((IAnswer)new IAnswer<ServerView.CallbackAction>(){

            public ServerView.CallbackAction answer() throws Throwable {
                removeCallbackLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }
        }).times(1);
        EasyMock.replay((Object[])new Object[]{callback});
        this.filteredBatchServerInventoryView.registerSegmentCallback((Executor)MoreExecutors.sameThreadExecutor(), callback, (Predicate)new Predicate<Pair<DruidServerMetadata, DataSegment>>(){

            public boolean apply(@Nullable Pair<DruidServerMetadata, DataSegment> input) {
                return ((DataSegment)input.rhs).getInterval().getStart().equals((Object)SEGMENT_INTERVAL_START.plusDays(102));
            }
        });
        DataSegment segment2 = this.makeSegment(102);
        this.segmentAnnouncer.announceSegment(segment2);
        this.testSegments.add(segment2);
        DataSegment oldSegment = this.makeSegment(-1);
        this.segmentAnnouncer.announceSegment(oldSegment);
        this.testSegments.add(oldSegment);
        this.segmentAnnouncer.unannounceSegment(oldSegment);
        this.testSegments.remove(oldSegment);
        BatchServerInventoryViewTest.waitForSync(this.filteredBatchServerInventoryView, this.testSegments);
        this.segmentAnnouncer.unannounceSegment(segment2);
        this.testSegments.remove(segment2);
        BatchServerInventoryViewTest.waitForSync(this.filteredBatchServerInventoryView, this.testSegments);
        timing.forWaiting().awaitLatch(removeCallbackLatch);
        EasyMock.verify((Object[])new Object[]{callback});
    }

    private DataSegment makeSegment(int offset) {
        return DataSegment.builder().dataSource("foo").interval(new Interval((ReadableInstant)SEGMENT_INTERVAL_START.plusDays(offset), (ReadableInstant)SEGMENT_INTERVAL_START.plusDays(offset + 1))).version(DateTimes.nowUtc().toString()).build();
    }

    private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> testSegments) throws Exception {
        Timing forWaitingTiming = timing.forWaiting();
        Stopwatch stopwatch = Stopwatch.createStarted();
        while (Iterables.isEmpty((Iterable)batchServerInventoryView.getInventory()) || ((DruidServer)Iterables.get((Iterable)batchServerInventoryView.getInventory(), (int)0)).getSegments().size() != testSegments.size()) {
            Thread.sleep(100L);
            if (stopwatch.elapsed(TimeUnit.MILLISECONDS) <= (long)forWaitingTiming.milliseconds()) continue;
            throw new ISE("BatchServerInventoryView is not updating", new Object[0]);
        }
    }

    private void waitForUpdateEvents(int count) throws Exception {
        Timing forWaitingTiming = timing.forWaiting();
        Stopwatch stopwatch = Stopwatch.createStarted();
        while (this.inventoryUpdateCounter.get() != count) {
            Thread.sleep(100L);
            if (stopwatch.elapsed(TimeUnit.MILLISECONDS) <= (long)forWaitingTiming.milliseconds()) continue;
            throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", new Object[]{count, this.inventoryUpdateCounter.get()});
        }
    }

    @Test
    public void testSameTimeZnode() throws Exception {
        int numThreads = 10;
        ListeningExecutorService executor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(10));
        this.segmentAnnouncer.announceSegments(this.testSegments);
        BatchServerInventoryViewTest.waitForSync(this.batchServerInventoryView, this.testSegments);
        DruidServer server = (DruidServer)Iterables.get((Iterable)this.batchServerInventoryView.getInventory(), (int)0);
        HashSet segments = Sets.newHashSet(server.getSegments().values());
        Assert.assertEquals(this.testSegments, (Object)segments);
        final CountDownLatch latch = new CountDownLatch(10);
        ArrayList<ListenableFuture> futures = new ArrayList<ListenableFuture>();
        int i = 0;
        while (i < 10) {
            final int ii = i++;
            futures.add(executor.submit((Callable)new Callable<BatchDataSegmentAnnouncer>(){

                @Override
                public BatchDataSegmentAnnouncer call() {
                    BatchDataSegmentAnnouncer segmentAnnouncer = new BatchDataSegmentAnnouncer(new DruidServerMetadata("id", "host", null, Long.MAX_VALUE, ServerType.HISTORICAL, "tier", 0), new BatchDataSegmentAnnouncerConfig(){

                        public int getSegmentsPerNode() {
                            return 50;
                        }
                    }, new ZkPathsConfig(){

                        public String getBase() {
                            return BatchServerInventoryViewTest.testBasePath;
                        }
                    }, BatchServerInventoryViewTest.this.announcer, BatchServerInventoryViewTest.this.jsonMapper);
                    ArrayList<DataSegment> segments = new ArrayList<DataSegment>();
                    try {
                        for (int j = 0; j < 10; ++j) {
                            segments.add(BatchServerInventoryViewTest.this.makeSegment(100 + ii + 10 * j));
                        }
                        latch.countDown();
                        latch.await();
                        segmentAnnouncer.announceSegments(segments);
                        BatchServerInventoryViewTest.this.testSegments.addAll(segments);
                    }
                    catch (Exception e) {
                        throw Throwables.propagate((Throwable)e);
                    }
                    return segmentAnnouncer;
                }
            }));
        }
        List announcers = (List)Futures.allAsList(futures).get();
        Assert.assertEquals((long)200L, (long)this.testSegments.size());
        BatchServerInventoryViewTest.waitForSync(this.batchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, (Object)Sets.newHashSet(server.getSegments().values()));
        for (int i2 = 0; i2 < 100; ++i2) {
            DataSegment segment = this.makeSegment(100 + i2);
            this.segmentAnnouncer.unannounceSegment(segment);
            this.testSegments.remove(segment);
        }
        BatchServerInventoryViewTest.waitForSync(this.batchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, (Object)Sets.newHashSet(server.getSegments().values()));
    }
}

