/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hive.druid.io.druid.segment.realtime;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hive.druid.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hive.druid.com.google.common.base.Function;
import org.apache.hive.druid.com.google.common.base.Stopwatch;
import org.apache.hive.druid.com.google.common.base.Supplier;
import org.apache.hive.druid.com.google.common.base.Throwables;
import org.apache.hive.druid.com.google.common.collect.ImmutableList;
import org.apache.hive.druid.com.google.common.collect.ImmutableMap;
import org.apache.hive.druid.com.google.common.collect.Iterables;
import org.apache.hive.druid.com.google.common.collect.Lists;
import org.apache.hive.druid.com.google.common.util.concurrent.MoreExecutors;
import org.apache.hive.druid.io.druid.data.input.Committer;
import org.apache.hive.druid.io.druid.data.input.Firehose;
import org.apache.hive.druid.io.druid.data.input.FirehoseFactory;
import org.apache.hive.druid.io.druid.data.input.FirehoseFactoryV2;
import org.apache.hive.druid.io.druid.data.input.FirehoseV2;
import org.apache.hive.druid.io.druid.data.input.InputRow;
import org.apache.hive.druid.io.druid.data.input.Row;
import org.apache.hive.druid.io.druid.data.input.impl.InputRowParser;
import org.apache.hive.druid.io.druid.jackson.DefaultObjectMapper;
import org.apache.hive.druid.io.druid.java.util.common.DateTimes;
import org.apache.hive.druid.io.druid.java.util.common.ISE;
import org.apache.hive.druid.io.druid.java.util.common.Intervals;
import org.apache.hive.druid.io.druid.java.util.common.granularity.Granularities;
import org.apache.hive.druid.io.druid.java.util.common.parsers.ParseException;
import org.apache.hive.druid.io.druid.query.BaseQuery;
import org.apache.hive.druid.io.druid.query.Query;
import org.apache.hive.druid.io.druid.query.QueryRunner;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactory;
import org.apache.hive.druid.io.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.hive.druid.io.druid.query.QueryRunnerTestHelper;
import org.apache.hive.druid.io.druid.query.QuerySegmentWalker;
import org.apache.hive.druid.io.druid.query.SegmentDescriptor;
import org.apache.hive.druid.io.druid.query.aggregation.AggregatorFactory;
import org.apache.hive.druid.io.druid.query.aggregation.CountAggregatorFactory;
import org.apache.hive.druid.io.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.hive.druid.io.druid.query.dimension.DefaultDimensionSpec;
import org.apache.hive.druid.io.druid.query.dimension.DimensionSpec;
import org.apache.hive.druid.io.druid.query.groupby.GroupByQuery;
import org.apache.hive.druid.io.druid.query.groupby.GroupByQueryConfig;
import org.apache.hive.druid.io.druid.query.groupby.GroupByQueryRunnerFactory;
import org.apache.hive.druid.io.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.hive.druid.io.druid.query.groupby.GroupByQueryRunnerTestHelper;
import org.apache.hive.druid.io.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.hive.druid.io.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.hive.druid.io.druid.query.spec.QuerySegmentSpec;
import org.apache.hive.druid.io.druid.query.spec.SpecificSegmentQueryRunner;
import org.apache.hive.druid.io.druid.query.spec.SpecificSegmentSpec;
import org.apache.hive.druid.io.druid.segment.TestHelper;
import org.apache.hive.druid.io.druid.segment.incremental.IndexSizeExceededException;
import org.apache.hive.druid.io.druid.segment.indexing.DataSchema;
import org.apache.hive.druid.io.druid.segment.indexing.RealtimeIOConfig;
import org.apache.hive.druid.io.druid.segment.indexing.RealtimeTuningConfig;
import org.apache.hive.druid.io.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.hive.druid.io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hive.druid.io.druid.segment.realtime.FireDepartment;
import org.apache.hive.druid.io.druid.segment.realtime.FireDepartmentMetrics;
import org.apache.hive.druid.io.druid.segment.realtime.RealtimeManager;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Plumber;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.PlumberSchool;
import org.apache.hive.druid.io.druid.segment.realtime.plumber.Sink;
import org.apache.hive.druid.io.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.hive.druid.io.druid.timeline.partition.LinearShardSpec;
import org.apache.hive.druid.io.druid.timeline.partition.ShardSpec;
import org.apache.hive.druid.io.druid.utils.Runnables;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.Period;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class RealtimeManagerTest {
    private static QueryRunnerFactory factory;
    private static QueryRunnerFactoryConglomerate conglomerate;
    private static final List<TestInputRowHolder> rows;
    private RealtimeManager realtimeManager;
    private RealtimeManager realtimeManager2;
    private RealtimeManager realtimeManager3;
    private DataSchema schema;
    private DataSchema schema2;
    private TestPlumber plumber;
    private TestPlumber plumber2;
    private RealtimeTuningConfig tuningConfig_0;
    private RealtimeTuningConfig tuningConfig_1;
    private DataSchema schema3;

    @BeforeClass
    public static void setupStatic() {
        factory = RealtimeManagerTest.initFactory();
        conglomerate = new QueryRunnerFactoryConglomerate(){

            public <T, QueryType extends Query<T>> QueryRunnerFactory<T, QueryType> findFactory(QueryType query) {
                return factory;
            }
        };
    }

    @Before
    public void setUp() throws Exception {
        DefaultObjectMapper jsonMapper = new DefaultObjectMapper();
        this.schema = new DataSchema("test", null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, (GranularitySpec)new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), null, (ObjectMapper)jsonMapper);
        this.schema2 = new DataSchema("testV2", null, new AggregatorFactory[]{new CountAggregatorFactory("rows")}, (GranularitySpec)new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), null, (ObjectMapper)jsonMapper);
        RealtimeIOConfig ioConfig = new RealtimeIOConfig(new FirehoseFactory(){

            public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException {
                return new TestFirehose(rows.iterator());
            }
        }, new PlumberSchool(){

            public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) {
                return RealtimeManagerTest.this.plumber;
            }
        }, null);
        RealtimeIOConfig ioConfig2 = new RealtimeIOConfig(null, new PlumberSchool(){

            public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics) {
                return RealtimeManagerTest.this.plumber2;
            }
        }, new FirehoseFactoryV2(){

            public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException, ParseException {
                return new TestFirehoseV2(rows.iterator());
            }
        });
        RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(Integer.valueOf(1), new Period((Object)"P1Y"), null, null, null, null, null, null, null, null, 0, 0, null, null, null, null);
        this.plumber = new TestPlumber(new Sink(Intervals.of((String)"0/P5000Y"), this.schema, tuningConfig.getShardSpec(), DateTimes.nowUtc().toString(), tuningConfig.getMaxRowsInMemory(), tuningConfig.isReportParseExceptions()));
        this.realtimeManager = new RealtimeManager(Arrays.asList(new FireDepartment(this.schema, ioConfig, tuningConfig)), null, (DataSegmentServerAnnouncer)EasyMock.createNiceMock(DataSegmentServerAnnouncer.class));
        this.plumber2 = new TestPlumber(new Sink(Intervals.of((String)"0/P5000Y"), this.schema2, tuningConfig.getShardSpec(), DateTimes.nowUtc().toString(), tuningConfig.getMaxRowsInMemory(), tuningConfig.isReportParseExceptions()));
        this.realtimeManager2 = new RealtimeManager(Arrays.asList(new FireDepartment(this.schema2, ioConfig2, tuningConfig)), null, (DataSegmentServerAnnouncer)EasyMock.createNiceMock(DataSegmentServerAnnouncer.class));
        this.tuningConfig_0 = new RealtimeTuningConfig(Integer.valueOf(1), new Period((Object)"P1Y"), null, null, null, null, null, (ShardSpec)new LinearShardSpec(Integer.valueOf(0)), null, null, 0, 0, null, null, null, null);
        this.tuningConfig_1 = new RealtimeTuningConfig(Integer.valueOf(1), new Period((Object)"P1Y"), null, null, null, null, null, (ShardSpec)new LinearShardSpec(Integer.valueOf(1)), null, null, 0, 0, null, null, null, null);
        this.schema3 = new DataSchema("testing", null, new AggregatorFactory[]{new CountAggregatorFactory("ignore")}, (GranularitySpec)new UniformGranularitySpec(Granularities.HOUR, Granularities.NONE, null), null, (ObjectMapper)jsonMapper);
        FireDepartment department_0 = new FireDepartment(this.schema3, ioConfig, this.tuningConfig_0);
        FireDepartment department_1 = new FireDepartment(this.schema3, ioConfig2, this.tuningConfig_1);
        this.realtimeManager3 = new RealtimeManager(Arrays.asList(department_0, department_1), conglomerate, (DataSegmentServerAnnouncer)EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), null);
    }

    @After
    public void tearDown() throws Exception {
        this.realtimeManager.stop();
        this.realtimeManager2.stop();
        this.realtimeManager3.stop();
    }

    @Test
    public void testRun() throws Exception {
        this.realtimeManager.start();
        Stopwatch stopwatch = Stopwatch.createStarted();
        while (this.realtimeManager.getMetrics("test").processed() != 1L) {
            Thread.sleep(100L);
            if (stopwatch.elapsed(TimeUnit.MILLISECONDS) <= 1000L) continue;
            throw new ISE("Realtime manager should have completed processing 2 events!", new Object[0]);
        }
        Assert.assertEquals((long)1L, (long)this.realtimeManager.getMetrics("test").processed());
        Assert.assertEquals((long)2L, (long)this.realtimeManager.getMetrics("test").thrownAway());
        Assert.assertEquals((long)1L, (long)this.realtimeManager.getMetrics("test").unparseable());
        Assert.assertTrue((boolean)this.plumber.isStartedJob());
        Assert.assertTrue((boolean)this.plumber.isFinishedJob());
        Assert.assertEquals((long)0L, (long)this.plumber.getPersistCount());
    }

    @Test
    public void testRunV2() throws Exception {
        this.realtimeManager2.start();
        Stopwatch stopwatch = Stopwatch.createStarted();
        while (this.realtimeManager2.getMetrics("testV2").processed() != 1L) {
            Thread.sleep(100L);
            if (stopwatch.elapsed(TimeUnit.MILLISECONDS) <= 1000L) continue;
            throw new ISE("Realtime manager should have completed processing 2 events!", new Object[0]);
        }
        Assert.assertEquals((long)1L, (long)this.realtimeManager2.getMetrics("testV2").processed());
        Assert.assertEquals((long)1L, (long)this.realtimeManager2.getMetrics("testV2").thrownAway());
        Assert.assertEquals((long)2L, (long)this.realtimeManager2.getMetrics("testV2").unparseable());
        Assert.assertTrue((boolean)this.plumber2.isStartedJob());
        Assert.assertTrue((boolean)this.plumber2.isFinishedJob());
        Assert.assertEquals((long)0L, (long)this.plumber2.getPersistCount());
    }

    @Test(timeout=5000L)
    public void testNormalStop() throws IOException, InterruptedException {
        final TestFirehose firehose = new TestFirehose(rows.iterator());
        TestFirehoseV2 firehoseV2 = new TestFirehoseV2(rows.iterator());
        RealtimeIOConfig ioConfig = new RealtimeIOConfig(new FirehoseFactory(){

            public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException {
                return firehose;
            }
        }, (schema, config, metrics) -> this.plumber, null);
        RealtimeIOConfig ioConfig2 = new RealtimeIOConfig(null, (schema, config, metrics) -> this.plumber2, (parser, arg) -> firehoseV2);
        FireDepartment department_0 = new FireDepartment(this.schema3, ioConfig, this.tuningConfig_0);
        FireDepartment department_1 = new FireDepartment(this.schema3, ioConfig2, this.tuningConfig_1);
        RealtimeManager realtimeManager = new RealtimeManager(Arrays.asList(department_0, department_1), conglomerate, (DataSegmentServerAnnouncer)EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), null);
        realtimeManager.start();
        while (realtimeManager.getMetrics("testing").processed() < 2L) {
            Thread.sleep(100L);
        }
        realtimeManager.stop();
        Assert.assertTrue((boolean)firehose.isClosed());
        Assert.assertTrue((boolean)firehoseV2.isClosed());
        Assert.assertTrue((boolean)this.plumber.isFinishedJob());
        Assert.assertTrue((boolean)this.plumber2.isFinishedJob());
    }

    @Test(timeout=5000L)
    public void testStopByInterruption() throws IOException {
        final SleepingFirehose firehose = new SleepingFirehose();
        RealtimeIOConfig ioConfig = new RealtimeIOConfig(new FirehoseFactory(){

            public Firehose connect(InputRowParser parser, File temporaryDirectory) throws IOException {
                return firehose;
            }
        }, (schema, config, metrics) -> this.plumber, null);
        FireDepartment department_0 = new FireDepartment(this.schema, ioConfig, this.tuningConfig_0);
        RealtimeManager realtimeManager = new RealtimeManager(Collections.singletonList(department_0), conglomerate, (DataSegmentServerAnnouncer)EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), null);
        realtimeManager.start();
        realtimeManager.stop();
        Assert.assertTrue((boolean)firehose.isClosed());
        Assert.assertFalse((boolean)this.plumber.isFinishedJob());
    }

    @Test(timeout=10000L)
    public void testQueryWithInterval() throws IOException, InterruptedException {
        boolean notAllStarted;
        List<Row> expectedResults = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 2L, "idx", 270L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 2L, "idx", 236L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 2L, "idx", 316L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 2L, "idx", 240L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 6L, "idx", 5740L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 2L, "idx", 242L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 6L, "idx", 5800L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 2L, "idx", 156L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 2L, "idx", 238L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 2L, "idx", 294L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 2L, "idx", 224L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 2L, "idx", 332L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 2L, "idx", 226L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 6L, "idx", 4894L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 2L, "idx", 228L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 6L, "idx", 5010L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 2L, "idx", 194L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 2L, "idx", 252L));
        this.realtimeManager3.start();
        while (notAllStarted = this.realtimeManager3.getFireChiefs("testing").values().stream().anyMatch(fireChief -> {
            Plumber plumber = fireChief.getPlumber();
            return plumber == null || !((TestPlumber)plumber).isStartedJob();
        })) {
            Thread.sleep(10L);
        }
        for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory)factory)) {
            GroupByQuery query = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird).setDimensions((List)Lists.newArrayList((Object[])new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")})).setAggregatorSpecs(Arrays.asList(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index"))).setGranularity(QueryRunnerTestHelper.dayGran).build();
            this.plumber.setRunners((Map<Interval, QueryRunner>)ImmutableMap.of(query.getIntervals().get(0), runner));
            this.plumber2.setRunners((Map<Interval, QueryRunner>)ImmutableMap.of(query.getIntervals().get(0), runner));
            Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForIntervals((Query)query, (Iterable)QueryRunnerTestHelper.firstToThird.getIntervals()), query);
            TestHelper.assertExpectedObjects(expectedResults, results, "");
        }
    }

    @Test(timeout=10000L)
    public void testQueryWithSegmentSpec() throws IOException, InterruptedException {
        boolean notAllStarted;
        List<Row> expectedResults = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "business", "rows", 1L, "idx", 118L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "entertainment", "rows", 1L, "idx", 158L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "health", "rows", 1L, "idx", 120L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "news", "rows", 1L, "idx", 121L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "business", "rows", 1L, "idx", 112L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "entertainment", "rows", 1L, "idx", 166L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "health", "rows", 1L, "idx", 113L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "news", "rows", 1L, "idx", 114L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L));
        this.realtimeManager3.start();
        while (notAllStarted = this.realtimeManager3.getFireChiefs("testing").values().stream().anyMatch(fireChief -> {
            Plumber plumber = fireChief.getPlumber();
            return plumber == null || !((TestPlumber)plumber).isStartedJob();
        })) {
            Thread.sleep(10L);
        }
        for (QueryRunner runner : QueryRunnerTestHelper.makeQueryRunners((GroupByQueryRunnerFactory)factory)) {
            GroupByQuery query = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec(QueryRunnerTestHelper.firstToThird).setDimensions((List)Lists.newArrayList((Object[])new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")})).setAggregatorSpecs(Arrays.asList(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index"))).setGranularity(QueryRunnerTestHelper.dayGran).build();
            this.plumber.setRunners((Map<Interval, QueryRunner>)ImmutableMap.of(query.getIntervals().get(0), runner));
            this.plumber2.setRunners((Map<Interval, QueryRunner>)ImmutableMap.of(query.getIntervals().get(0), runner));
            Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments((Query)query, (Iterable)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"), "ver", 0))), query);
            TestHelper.assertExpectedObjects(expectedResults, results, "");
            results = GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments((Query)query, (Iterable)ImmutableList.of((Object)new SegmentDescriptor(Intervals.of((String)"2011-04-01T00:00:00.000Z/2011-04-03T00:00:00.000Z"), "ver", 1))), query);
            TestHelper.assertExpectedObjects(expectedResults, results, "");
        }
    }

    @Test(timeout=10000L)
    public void testQueryWithMultipleSegmentSpec() throws IOException, InterruptedException {
        boolean notAllStarted;
        List<Row> expectedResults_both_partitions = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 2L, "idx", 260L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 2L, "idx", 236L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 4L, "idx", 4556L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 2L, "idx", 284L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 2L, "idx", 202L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 2L, "idx", 288L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 2L, "idx", 326L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 2L, "idx", 312L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 2L, "idx", 248L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 2L, "idx", 326L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 2L, "idx", 262L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 6L, "idx", 5126L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 2L, "idx", 254L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 6L, "idx", 5276L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 2L, "idx", 206L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 2L, "idx", 260L));
        List<Row> expectedResults_single_partition_26_28 = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "business", "rows", 1L, "idx", 130L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "health", "rows", 1L, "idx", 118L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "mezzanine", "rows", 2L, "idx", 2278L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "news", "rows", 1L, "idx", 142L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-26", "alias", "technology", "rows", 1L, "idx", 101L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "automotive", "rows", 1L, "idx", 144L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-27", "alias", "entertainment", "rows", 1L, "idx", 163L));
        List<Row> expectedResults_single_partition_28_29 = Arrays.asList(GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "automotive", "rows", 1L, "idx", 156L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "business", "rows", 1L, "idx", 124L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "entertainment", "rows", 1L, "idx", 163L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "health", "rows", 1L, "idx", 131L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "mezzanine", "rows", 3L, "idx", 2563L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "news", "rows", 1L, "idx", 127L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "premium", "rows", 3L, "idx", 2638L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "technology", "rows", 1L, "idx", 103L), GroupByQueryRunnerTestHelper.createExpectedRow("2011-03-28", "alias", "travel", "rows", 1L, "idx", 130L));
        this.realtimeManager3.start();
        while (notAllStarted = this.realtimeManager3.getFireChiefs("testing").values().stream().anyMatch(fireChief -> {
            Plumber plumber = fireChief.getPlumber();
            return plumber == null || !((TestPlumber)plumber).isStartedJob();
        })) {
            Thread.sleep(10L);
        }
        Interval interval_26_28 = Intervals.of((String)"2011-03-26T00:00:00.000Z/2011-03-28T00:00:00.000Z");
        Interval interval_28_29 = Intervals.of((String)"2011-03-28T00:00:00.000Z/2011-03-29T00:00:00.000Z");
        SegmentDescriptor descriptor_26_28_0 = new SegmentDescriptor(interval_26_28, "ver0", 0);
        SegmentDescriptor descriptor_28_29_0 = new SegmentDescriptor(interval_28_29, "ver1", 0);
        SegmentDescriptor descriptor_26_28_1 = new SegmentDescriptor(interval_26_28, "ver0", 1);
        SegmentDescriptor descriptor_28_29_1 = new SegmentDescriptor(interval_28_29, "ver1", 1);
        GroupByQuery query = GroupByQuery.builder().setDataSource("testing").setQuerySegmentSpec((QuerySegmentSpec)new MultipleSpecificSegmentSpec((List)ImmutableList.of((Object)descriptor_26_28_0, (Object)descriptor_28_29_0, (Object)descriptor_26_28_1, (Object)descriptor_28_29_1))).setDimensions((List)Lists.newArrayList((Object[])new DimensionSpec[]{new DefaultDimensionSpec("quality", "alias")})).setAggregatorSpecs(Arrays.asList(QueryRunnerTestHelper.rowsCount, new LongSumAggregatorFactory("idx", "index"))).setGranularity(QueryRunnerTestHelper.dayGran).build();
        ImmutableMap runnerMap = ImmutableMap.of((Object)interval_26_28, QueryRunnerTestHelper.makeQueryRunner(factory, "druid.sample.numeric.tsv.top", null), (Object)interval_28_29, QueryRunnerTestHelper.makeQueryRunner(factory, "druid.sample.numeric.tsv.bottom", null));
        this.plumber.setRunners((Map<Interval, QueryRunner>)runnerMap);
        this.plumber2.setRunners((Map<Interval, QueryRunner>)runnerMap);
        Iterable results = GroupByQueryRunnerTestHelper.runQuery(factory, query.getQuerySegmentSpec().lookup((Query)query, (QuerySegmentWalker)this.realtimeManager3), query);
        TestHelper.assertExpectedObjects(expectedResults_both_partitions, results, "");
        results = GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments((Query)query, (Iterable)ImmutableList.of((Object)descriptor_26_28_0)), query);
        TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, "");
        results = GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments((Query)query, (Iterable)ImmutableList.of((Object)descriptor_28_29_0)), query);
        TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, "");
        results = GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments((Query)query, (Iterable)ImmutableList.of((Object)descriptor_26_28_1)), query);
        TestHelper.assertExpectedObjects(expectedResults_single_partition_26_28, results, "");
        results = GroupByQueryRunnerTestHelper.runQuery(factory, this.realtimeManager3.getQueryRunnerForSegments((Query)query, (Iterable)ImmutableList.of((Object)descriptor_28_29_1)), query);
        TestHelper.assertExpectedObjects(expectedResults_single_partition_28_29, results, "");
    }

    private static GroupByQueryRunnerFactory initFactory() {
        GroupByQueryConfig config = new GroupByQueryConfig();
        config.setMaxIntermediateRows(10000);
        return GroupByQueryRunnerTest.makeQueryRunnerFactory(config);
    }

    private static TestInputRowHolder makeRow(long timestamp) {
        return new TestInputRowHolder(timestamp, null);
    }

    private static TestInputRowHolder makeRow(RuntimeException e) {
        return new TestInputRowHolder(0L, e);
    }

    static {
        rows = Arrays.asList(RealtimeManagerTest.makeRow(DateTimes.of((String)"9000-01-01").getMillis()), RealtimeManagerTest.makeRow((RuntimeException)new ParseException("parse error", new Object[0])), null, RealtimeManagerTest.makeRow(System.currentTimeMillis()));
    }

    private static class TestPlumber
    implements Plumber {
        private final Sink sink;
        private volatile boolean startedJob = false;
        private volatile boolean finishedJob = false;
        private volatile int persistCount = 0;
        private Map<Interval, QueryRunner> runners;

        private TestPlumber(Sink sink) {
            this.sink = sink;
        }

        private boolean isStartedJob() {
            return this.startedJob;
        }

        private boolean isFinishedJob() {
            return this.finishedJob;
        }

        private int getPersistCount() {
            return this.persistCount;
        }

        public Object startJob() {
            this.startedJob = true;
            return null;
        }

        public int add(InputRow row, Supplier<Committer> committerSupplier) throws IndexSizeExceededException {
            if (row == null) {
                return -1;
            }
            Sink sink = this.getSink(row.getTimestampFromEpoch());
            if (sink == null) {
                return -1;
            }
            return sink.add(row, false);
        }

        public Sink getSink(long timestamp) {
            if (this.sink.getInterval().contains(timestamp)) {
                return this.sink;
            }
            return null;
        }

        public <T> QueryRunner<T> getQueryRunner(Query<T> query) {
            if (this.runners == null) {
                throw new UnsupportedOperationException();
            }
            BaseQuery baseQuery = (BaseQuery)query;
            if (baseQuery.getQuerySegmentSpec() instanceof MultipleIntervalSegmentSpec) {
                return factory.getToolchest().mergeResults(factory.mergeRunners((ExecutorService)MoreExecutors.sameThreadExecutor(), Iterables.transform((Iterable)baseQuery.getIntervals(), (Function)new Function<Interval, QueryRunner<T>>(){

                    public QueryRunner<T> apply(Interval input) {
                        return (QueryRunner)runners.get(input);
                    }
                })));
            }
            Assert.assertEquals((long)1L, (long)query.getIntervals().size());
            SegmentDescriptor descriptor = ((SpecificSegmentSpec)((BaseQuery)query).getQuerySegmentSpec()).getDescriptor();
            return new SpecificSegmentQueryRunner(this.runners.get(descriptor.getInterval()), new SpecificSegmentSpec(descriptor));
        }

        public void persist(Committer committer) {
            ++this.persistCount;
        }

        public void finishJob() {
            this.finishedJob = true;
        }

        public void setRunners(Map<Interval, QueryRunner> runners) {
            this.runners = runners;
        }
    }

    private static class SleepingFirehose
    implements Firehose {
        private boolean closed;

        private SleepingFirehose() {
        }

        public boolean hasMore() {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                throw Throwables.propagate((Throwable)e);
            }
            return true;
        }

        @Nullable
        public InputRow nextRow() {
            return null;
        }

        public Runnable commit() {
            return null;
        }

        public boolean isClosed() {
            return this.closed;
        }

        public void close() throws IOException {
            this.closed = true;
        }
    }

    private static class TestFirehoseV2
    implements FirehoseV2 {
        private final Iterator<TestInputRowHolder> rows;
        private InputRow currRow;
        private boolean stop;
        private boolean closed;

        private TestFirehoseV2(Iterator<TestInputRowHolder> rows) {
            this.rows = rows;
        }

        private void nextMessage() {
            this.currRow = null;
            while (this.currRow == null) {
                TestInputRowHolder holder = this.rows.next();
                this.currRow = holder == null ? null : holder.getRow();
            }
        }

        public void close() throws IOException {
            this.closed = true;
        }

        public boolean isClosed() {
            return this.closed;
        }

        public boolean advance() {
            boolean bl = this.stop = !this.rows.hasNext();
            if (this.stop) {
                return false;
            }
            this.nextMessage();
            return true;
        }

        public InputRow currRow() {
            return this.currRow;
        }

        public Committer makeCommitter() {
            return new Committer(){

                public Object getMetadata() {
                    return null;
                }

                public void run() {
                }
            };
        }

        public void start() throws Exception {
            this.nextMessage();
        }
    }

    private static class TestFirehose
    implements Firehose {
        private final Iterator<TestInputRowHolder> rows;
        private boolean closed;

        private TestFirehose(Iterator<TestInputRowHolder> rows) {
            this.rows = rows;
        }

        public boolean hasMore() {
            return this.rows.hasNext();
        }

        @Nullable
        public InputRow nextRow() {
            TestInputRowHolder holder = this.rows.next();
            if (holder == null) {
                return null;
            }
            return holder.getRow();
        }

        public Runnable commit() {
            return Runnables.getNoopRunnable();
        }

        public boolean isClosed() {
            return this.closed;
        }

        public void close() throws IOException {
            this.closed = true;
        }
    }

    private static class TestInputRowHolder {
        private long timestamp;
        private RuntimeException exception;

        public TestInputRowHolder(long timestamp, RuntimeException exception) {
            this.timestamp = timestamp;
            this.exception = exception;
        }

        public InputRow getRow() {
            if (this.exception != null) {
                throw this.exception;
            }
            return new InputRow(){

                public List<String> getDimensions() {
                    return Arrays.asList("testDim");
                }

                public long getTimestampFromEpoch() {
                    return timestamp;
                }

                public DateTime getTimestamp() {
                    return DateTimes.utc((long)timestamp);
                }

                public List<String> getDimension(String dimension) {
                    return Lists.newArrayList();
                }

                public Number getMetric(String metric) {
                    return 0;
                }

                public Object getRaw(String dimension) {
                    return null;
                }

                public int compareTo(Row o) {
                    return 0;
                }
            };
        }
    }
}

