package org.apache.hive.druid.io.druid.client.client;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingCluster;
import org.apache.curator.test.Timing;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Predicate;
import org.apache.hive.druid.com.google.common.base.Predicates;
import org.apache.hive.druid.com.google.common.base.Stopwatch;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.io.druid.client.BatchServerInventoryView;
import org.apache.hive.druid.io.druid.client.DruidServer;
import org.apache.hive.druid.io.druid.client.ServerView;
import org.apache.hive.druid.io.druid.curator.PotentiallyGzippedCompressionProvider;
import org.apache.hive.druid.io.druid.curator.announcement.Announcer;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.Pair;
import org.apache.hive.druid.io.druid.java.util.common.guava.Comparators;
import org.apache.hive.druid.io.druid.segment.TestHelper;
import org.apache.hive.druid.io.druid.server.coordination.BatchDataSegmentAnnouncer;
import org.apache.hive.druid.io.druid.server.coordination.CuratorDataSegmentServerAnnouncer;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.hive.druid.io.druid.server.coordination.DruidServerMetadata;
import org.apache.hive.druid.io.druid.server.coordination.ServerType;
import org.apache.hive.druid.io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.hive.druid.io.druid.server.initialization.ZkPathsConfig;
import org.apache.hive.druid.io.druid.timeline.DataSegment;
import org.apache.hive.druid.org.apache.calcite.sql.parser.parserextensiontesting.ExtensionSqlParserImplConstants;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.easymock.LogicalOperator;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/hive/druid/io/druid/client/client/BatchServerInventoryViewTest.class */
public class BatchServerInventoryViewTest {
    private static final String testBasePath = "/test";
    public static final int INITIAL_SEGMENTS = 100;
    private TestingCluster testingCluster;
    private CuratorFramework cf;
    private ObjectMapper jsonMapper;
    private Announcer announcer;
    private BatchDataSegmentAnnouncer segmentAnnouncer;
    private DataSegmentServerAnnouncer serverAnnouncer;
    private Set<DataSegment> testSegments;
    private BatchServerInventoryView batchServerInventoryView;
    private BatchServerInventoryView filteredBatchServerInventoryView;
    private final AtomicInteger inventoryUpdateCounter = new AtomicInteger();

    @Rule
    public ExpectedException exception = ExpectedException.none();
    public static final DateTime SEGMENT_INTERVAL_START = DateTimes.of("2013-01-01");
    private static final Timing timing = new Timing();

    @Before
    public void setUp() throws Exception {
        this.testingCluster = new TestingCluster(1);
        this.testingCluster.start();
        this.cf = CuratorFrameworkFactory.builder().connectString(this.testingCluster.getConnectString()).retryPolicy(new ExponentialBackoffRetry(1, 10)).compressionProvider(new PotentiallyGzippedCompressionProvider(true)).build();
        this.cf.start();
        this.cf.blockUntilConnected();
        this.cf.create().creatingParentsIfNeeded().forPath(testBasePath);
        this.jsonMapper = TestHelper.makeJsonMapper();
        this.announcer = new Announcer(this.cf, MoreExecutors.sameThreadExecutor());
        this.announcer.start();
        DruidServerMetadata druidServerMetadata = new DruidServerMetadata("id", "host", (String) null, Long.MAX_VALUE, ServerType.HISTORICAL, "tier", 0);
        ZkPathsConfig zkPathsConfig = new ZkPathsConfig() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.1
            public String getBase() {
                return BatchServerInventoryViewTest.testBasePath;
            }
        };
        this.serverAnnouncer = new CuratorDataSegmentServerAnnouncer(druidServerMetadata, zkPathsConfig, this.announcer, this.jsonMapper);
        this.serverAnnouncer.announce();
        this.segmentAnnouncer = new BatchDataSegmentAnnouncer(druidServerMetadata, new BatchDataSegmentAnnouncerConfig() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.2
            public int getSegmentsPerNode() {
                return 50;
            }
        }, zkPathsConfig, this.announcer, this.jsonMapper);
        this.testSegments = Sets.newConcurrentHashSet();
        for (int i = 0; i < 100; i++) {
            this.testSegments.add(makeSegment(i));
        }
        this.batchServerInventoryView = new BatchServerInventoryView(new ZkPathsConfig() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.3
            public String getBase() {
                return BatchServerInventoryViewTest.testBasePath;
            }
        }, this.cf, this.jsonMapper, Predicates.alwaysTrue());
        this.batchServerInventoryView.start();
        this.inventoryUpdateCounter.set(0);
        this.filteredBatchServerInventoryView = new BatchServerInventoryView(new ZkPathsConfig() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.4
            public String getBase() {
                return BatchServerInventoryViewTest.testBasePath;
            }
        }, this.cf, this.jsonMapper, new Predicate<Pair<DruidServerMetadata, DataSegment>>() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.5
            public boolean apply(@Nullable Pair<DruidServerMetadata, DataSegment> pair) {
                return ((DataSegment) pair.rhs).getInterval().getStart().isBefore(BatchServerInventoryViewTest.SEGMENT_INTERVAL_START.plusDays(100));
            }
        }) { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.6
            /* JADX INFO: Access modifiers changed from: protected */
            public DruidServer addInnerInventory(DruidServer druidServer, String str, Set<DataSegment> set) {
                DruidServer addInnerInventory = super.addInnerInventory(druidServer, str, set);
                BatchServerInventoryViewTest.this.inventoryUpdateCounter.incrementAndGet();
                return addInnerInventory;
            }
        };
        this.filteredBatchServerInventoryView.start();
    }

    @After
    public void tearDown() throws Exception {
        this.batchServerInventoryView.stop();
        this.filteredBatchServerInventoryView.stop();
        this.serverAnnouncer.unannounce();
        this.announcer.stop();
        this.cf.close();
        this.testingCluster.stop();
    }

    @Test
    public void testRun() throws Exception {
        this.segmentAnnouncer.announceSegments(this.testSegments);
        waitForSync(this.batchServerInventoryView, this.testSegments);
        DruidServer druidServer = (DruidServer) Iterables.get(this.batchServerInventoryView.getInventory(), 0);
        Assert.assertEquals(this.testSegments, Sets.newHashSet(druidServer.getSegments().values()));
        DataSegment makeSegment = makeSegment(ExtensionSqlParserImplConstants.CONVERT);
        DataSegment makeSegment2 = makeSegment(ExtensionSqlParserImplConstants.CORR);
        this.segmentAnnouncer.announceSegment(makeSegment);
        this.segmentAnnouncer.announceSegment(makeSegment2);
        this.testSegments.add(makeSegment);
        this.testSegments.add(makeSegment2);
        waitForSync(this.batchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, Sets.newHashSet(druidServer.getSegments().values()));
        this.segmentAnnouncer.unannounceSegment(makeSegment);
        this.segmentAnnouncer.unannounceSegment(makeSegment2);
        this.testSegments.remove(makeSegment);
        this.testSegments.remove(makeSegment2);
        waitForSync(this.batchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, Sets.newHashSet(druidServer.getSegments().values()));
    }

    @Test
    public void testRunWithFilter() throws Exception {
        this.segmentAnnouncer.announceSegments(this.testSegments);
        waitForSync(this.filteredBatchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, Sets.newHashSet(((DruidServer) Iterables.get(this.filteredBatchServerInventoryView.getInventory(), 0)).getSegments().values()));
        int i = this.inventoryUpdateCounter.get();
        DataSegment makeSegment = makeSegment(ExtensionSqlParserImplConstants.CONVERT);
        this.segmentAnnouncer.announceSegment(makeSegment);
        this.testSegments.add(makeSegment);
        waitForUpdateEvents(i + 1);
        Assert.assertNull(((DruidServer) Iterables.getOnlyElement(this.filteredBatchServerInventoryView.getInventory())).getSegment(makeSegment.getIdentifier()));
    }

    @Test
    public void testRunWithFilterCallback() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.segmentAnnouncer.announceSegments(this.testSegments);
        waitForSync(this.filteredBatchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, Sets.newHashSet(((DruidServer) Iterables.get(this.filteredBatchServerInventoryView.getInventory(), 0)).getSegments().values()));
        ServerView.SegmentCallback segmentCallback = (ServerView.SegmentCallback) EasyMock.createStrictMock(ServerView.SegmentCallback.class);
        Comparator comparing = Comparator.comparing((v0) -> {
            return v0.getInterval();
        }, Comparators.intervalsByStartThenEnd());
        EasyMock.expect(segmentCallback.segmentAdded((DruidServerMetadata) EasyMock.anyObject(), (DataSegment) EasyMock.cmp(makeSegment(ExtensionSqlParserImplConstants.CORR), comparing, LogicalOperator.EQUAL))).andReturn(ServerView.CallbackAction.CONTINUE).times(1);
        EasyMock.expect(segmentCallback.segmentRemoved((DruidServerMetadata) EasyMock.anyObject(), (DataSegment) EasyMock.cmp(makeSegment(ExtensionSqlParserImplConstants.CORR), comparing, LogicalOperator.EQUAL))).andAnswer(new IAnswer<ServerView.CallbackAction>() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.7
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public ServerView.CallbackAction m251answer() throws Throwable {
                countDownLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }
        }).times(1);
        EasyMock.replay(new Object[]{segmentCallback});
        this.filteredBatchServerInventoryView.registerSegmentCallback(MoreExecutors.sameThreadExecutor(), segmentCallback, new Predicate<Pair<DruidServerMetadata, DataSegment>>() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.8
            public boolean apply(@Nullable Pair<DruidServerMetadata, DataSegment> pair) {
                return ((DataSegment) pair.rhs).getInterval().getStart().equals(BatchServerInventoryViewTest.SEGMENT_INTERVAL_START.plusDays(ExtensionSqlParserImplConstants.CORR));
            }
        });
        DataSegment makeSegment = makeSegment(ExtensionSqlParserImplConstants.CORR);
        this.segmentAnnouncer.announceSegment(makeSegment);
        this.testSegments.add(makeSegment);
        DataSegment makeSegment2 = makeSegment(-1);
        this.segmentAnnouncer.announceSegment(makeSegment2);
        this.testSegments.add(makeSegment2);
        this.segmentAnnouncer.unannounceSegment(makeSegment2);
        this.testSegments.remove(makeSegment2);
        waitForSync(this.filteredBatchServerInventoryView, this.testSegments);
        this.segmentAnnouncer.unannounceSegment(makeSegment);
        this.testSegments.remove(makeSegment);
        waitForSync(this.filteredBatchServerInventoryView, this.testSegments);
        timing.forWaiting().awaitLatch(countDownLatch);
        EasyMock.verify(new Object[]{segmentCallback});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public DataSegment makeSegment(int i) {
        return DataSegment.builder().dataSource("foo").interval(new Interval(SEGMENT_INTERVAL_START.plusDays(i), SEGMENT_INTERVAL_START.plusDays(i + 1))).version(DateTimes.nowUtc().toString()).build();
    }

    private static void waitForSync(BatchServerInventoryView batchServerInventoryView, Set<DataSegment> set) throws Exception {
        Timing forWaiting = timing.forWaiting();
        Stopwatch createStarted = Stopwatch.createStarted();
        do {
            if (!Iterables.isEmpty(batchServerInventoryView.getInventory()) && ((DruidServer) Iterables.get(batchServerInventoryView.getInventory(), 0)).getSegments().size() == set.size()) {
                return;
            } else {
                Thread.sleep(100L);
            }
        } while (createStarted.elapsed(TimeUnit.MILLISECONDS) <= forWaiting.milliseconds());
        throw new ISE("BatchServerInventoryView is not updating", new Object[0]);
    }

    private void waitForUpdateEvents(int i) throws Exception {
        Timing forWaiting = timing.forWaiting();
        Stopwatch createStarted = Stopwatch.createStarted();
        while (this.inventoryUpdateCounter.get() != i) {
            Thread.sleep(100L);
            if (createStarted.elapsed(TimeUnit.MILLISECONDS) > forWaiting.milliseconds()) {
                throw new ISE("BatchServerInventoryView is not updating counter expected[%d] value[%d]", new Object[]{Integer.valueOf(i), Integer.valueOf(this.inventoryUpdateCounter.get())});
            }
        }
    }

    @Test
    public void testSameTimeZnode() throws Exception {
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
        this.segmentAnnouncer.announceSegments(this.testSegments);
        waitForSync(this.batchServerInventoryView, this.testSegments);
        DruidServer druidServer = (DruidServer) Iterables.get(this.batchServerInventoryView.getInventory(), 0);
        Assert.assertEquals(this.testSegments, Sets.newHashSet(druidServer.getSegments().values()));
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            final int i2 = i;
            arrayList.add(listeningDecorator.submit(new Callable<BatchDataSegmentAnnouncer>() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.9
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public BatchDataSegmentAnnouncer call() {
                    BatchDataSegmentAnnouncer batchDataSegmentAnnouncer = new BatchDataSegmentAnnouncer(new DruidServerMetadata("id", "host", (String) null, Long.MAX_VALUE, ServerType.HISTORICAL, "tier", 0), new BatchDataSegmentAnnouncerConfig() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.9.1
                        public int getSegmentsPerNode() {
                            return 50;
                        }
                    }, new ZkPathsConfig() { // from class: org.apache.hive.druid.io.druid.client.client.BatchServerInventoryViewTest.9.2
                        public String getBase() {
                            return BatchServerInventoryViewTest.testBasePath;
                        }
                    }, BatchServerInventoryViewTest.this.announcer, BatchServerInventoryViewTest.this.jsonMapper);
                    ArrayList arrayList2 = new ArrayList();
                    for (int i3 = 0; i3 < 10; i3++) {
                        try {
                            arrayList2.add(BatchServerInventoryViewTest.this.makeSegment(100 + i2 + (10 * i3)));
                        } catch (Exception e) {
                            throw Throwables.propagate(e);
                        }
                    }
                    countDownLatch.countDown();
                    countDownLatch.await();
                    batchDataSegmentAnnouncer.announceSegments(arrayList2);
                    BatchServerInventoryViewTest.this.testSegments.addAll(arrayList2);
                    return batchDataSegmentAnnouncer;
                }
            }));
        }
        Assert.assertEquals(200L, this.testSegments.size());
        waitForSync(this.batchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, Sets.newHashSet(druidServer.getSegments().values()));
        for (int i3 = 0; i3 < 100; i3++) {
            DataSegment makeSegment = makeSegment(100 + i3);
            this.segmentAnnouncer.unannounceSegment(makeSegment);
            this.testSegments.remove(makeSegment);
        }
        waitForSync(this.batchServerInventoryView, this.testSegments);
        Assert.assertEquals(this.testSegments, Sets.newHashSet(druidServer.getSegments().values()));
    }
}
